2 Commits b3e3825e3b ... 67564e2c03

Auteur SHA1 Message Date
  qingfeng 67564e2c03 Merge branch 'dev' of http://git.fast-fun.cn:92/lmstack/data_analyze_platform into dev il y a 2 ans
  qingfeng 32dc3d41d3 update il y a 2 ans

BIN
USER/SPF/alibaba/01BatSoh/CBMSBatSoh.py


+ 203 - 0
USER/SPF/alibaba/01BatSoh/deploy_soh.py

@@ -0,0 +1,203 @@
+
+import os
+import time
+
+import pandas as pd
+from sqlalchemy import create_engine
+import logging.handlers
+import traceback
+import re
+import CBMSBatSoh
+import datetime
+import dateutil.relativedelta
+from urllib import parse
+from apscheduler.schedulers.blocking import BlockingScheduler
+import warnings
+from multiprocessing import Process
+
+
+def fun(host, port, db, user, password, runenv, logger, period_second):
+    logger.info("pid is {}".format(os.getpid()))
+    try:
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, parse.quote_plus(password), host, port, db
+            ))
+    except:
+        logger.error(u"数据库连接错误", exc_info=True)
+        logger.error(traceback.format_exc)
+        return
+
+    # 获取配置信息
+    df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
+    # db_engine.dispose()
+
+
+
+
+    now_time = datetime.datetime.now()
+    pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-6*3600)  #两周前
+    end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
+    start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
+    logger.info("time range from {} to {} ".format(start_time, end_time))
+    for i in range(len(df_confs)):
+        try:
+            factory = df_confs.loc[i, 'factory']
+            sn = df_confs.loc[i, 'device_name']
+            if '康普盾' in factory:
+                celltype = 1
+                cell_volt_count = 120
+                cell_temp_count = 40
+            elif '华霆' in factory:
+                celltype = 2
+                cell_volt_count = 204
+                cell_temp_count = 40
+            elif '力神' in factory:
+                celltype = 99
+                cell_volt_count = 72
+                cell_temp_count = 32
+
+            logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
+            sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
+            logger.info(sql)
+            df_bms = pd.read_sql(sql, db_engine)
+            if df_bms.empty:
+                continue
+            df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
+                              'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
+                              'CellVolt', 'id']
+            # 单体数据拆分
+            cell_temp = df_bms['CellTemp']
+            cell_volt = df_bms['CellVolt']
+            cell_temps = []
+            [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
+            cell_volts = []
+            [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
+            del_indexes = []
+            for i, cell_volt in enumerate(cell_volts):
+                if len(cell_volt) != cell_volt_count:
+                    del_indexes.append(i)
+            for i, cell_temp in enumerate(cell_temps):
+                if len(cell_temp) != cell_temp_count:
+                    del_indexes.append(i)
+            del_indexes = list(set(del_indexes))
+            for i, del_index in enumerate(del_indexes):
+                del_index = del_index - i
+                cell_volts.pop(del_index)
+                cell_temps.pop(del_index)
+            df_bms = df_bms.drop(index=del_indexes, axis=1)
+            cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
+            celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
+            df_bms[cellvolt_name] = cell_volts
+            df_bms[celltemp_name] = cell_temps
+            df_bms = df_bms.drop(columns='CellVolt', axis=0)
+            df_bms = df_bms.drop(columns='CellTemp', axis=0)
+            df_bms = df_bms.reset_index(drop=True)
+            logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
+
+            # 算法执行
+            df_soh = pd.read_sql(
+                "select time_st,time_sp,sn,method,soh,cellsoh,packsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
+                db_engine);
+            BatSoh = CBMSBatSoh.BatSoh(sn, celltype, df_bms, df_soh)
+            df_res = BatSoh.batsoh()
+
+            # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+            if (runenv == 'dev'):
+                logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn))
+                df_res = pd.DataFrame({'time_st': ["2021-01-01 00:00:00"], 'time_sp': ["2021-01-02 00:00:00"],
+                                       'sn': [sn], 'method': [1], 'bmssoh': [99], 'packsoh': [100], 'soh': [99],"cellsohmin":[10],"cellsohmax":[20],"sohmin_num":[30], 'sohmax_num':[10],
+                                       'cellsoh_diff': [1], 'cellsoh': ["98,97"]})
+            if not df_res.empty:
+                df_res.columns = ['time_st', 'time_sp', 'sn', 'method', 'bmssoh', 'packsoh', 'soh','cellsohmin', 'cellsohmax', 'sohmin_num', 'sohmax_num', 'cellsoh_diff',
+                                  'cellsoh']
+                df_res['factory'] = factory
+                df_res['add_time'] = datetime.datetime.now()
+                df_res.to_sql("soh_result", con=db_engine, if_exists="append", index=False)
+                logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn))
+
+            logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
+        except:
+            logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
+            logger.error(traceback.format_exc)
+    db_engine.dispose()
+    logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
+
+
+def heart_beat(host, port, db, user, password, log):
+    while True:
+        try:
+            db_engine = create_engine(
+                "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                    user, parse.quote_plus(password), host, port, db
+                ))
+            while True:
+                now = datetime.datetime.now()
+                db_engine.execute("update status set algo_soh='{}'".format(now))
+                log.info("soh心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
+                time.sleep(5)
+        except:
+            log.error(u"soh心跳错误", exc_info=True)
+            log.error(traceback.format_exc)
+            time.sleep(5)
+        finally:
+            db_engine.dispose()
+
+if __name__ == '__main__':
+    warnings.filterwarnings("ignore")
+    env_dist = os.environ
+    # 配置信息信息
+    # host = env_dist.get("ALI_HOST", '120.25.223.1')
+    # port = env_dist.get("ALI_PORT", '4901')
+    # db = env_dist.get("ALI_DB", 'ali')
+    # user = env_dist.get("ALI_ROOT", 'root')
+    # password = parse.quote_plus(env_dist.get("ALI_PASSWORD", '123456'))
+    host = env_dist.get("ALI_HOST", '192.168.31.141')
+    port = env_dist.get("ALI_PORT", '3306')
+    db = env_dist.get("ALI_DB", 'ali')
+    user = env_dist.get("ALI_ROOT", 'root')
+    password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
+    runenv = env_dist.get("ALI_RUNENV", 'dev')
+    period_second = int(env_dist.get("ALI_PERIOD_SECOND", '604800'))
+
+
+    algo_name = 'soh'
+    # 日志配置
+    log_path = './log' + '/' + algo_name
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M.log"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.DEBUG)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
+    # 开启定时任务
+    p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
+    p.start()
+    scheduler = BlockingScheduler()
+    # heart_beat(host, port, db, user, password, logger)
+    # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
+    fun(host, port, db, user, password, runenv, logger, period_second)
+    scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
+                      ,max_instances=1, coalesce=True)
+
+    try:
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))

BIN
USER/SPF/alibaba/02BatUniform/CBMSBatUniform.py


BIN
USER/SPF/alibaba/02BatUniform/main.py


BIN
USER/SPF/alibaba/03BatInterShort/CBMSBatInterShort.py


BIN
USER/SPF/alibaba/03BatInterShort/main.py


BIN
USER/SPF/alibaba/04BatSoc/CBMSBatSoc.py


+ 236 - 0
USER/SPF/alibaba/04BatSoc/deploy_soc.py

@@ -0,0 +1,236 @@
+
+import os
+import time
+
+import pandas as pd
+from sqlalchemy import create_engine
+import logging
+import logging.handlers
+import traceback
+import re
+import CBMSBatSoc
+import datetime
+import dateutil.relativedelta
+from urllib import parse
+from apscheduler.schedulers.blocking import BlockingScheduler
+import warnings
+from multiprocessing import Process
+
+
+
+def fun(host, port, db, user, password, runenv, logger, period_second):
+    global df_ram
+    try:
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, parse.quote_plus(password), host, port, db
+            ))
+    except:
+        logger.error(u"数据库连接错误", exc_info=True)
+        logger.error(traceback.format_exc)
+        return
+
+
+# 获取配置信息
+    df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
+    # db_engine.dispose()
+
+
+    logger.info("pid is {}".format(os.getpid()))
+
+    now_time = datetime.datetime.now()
+    pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-1)  #
+    end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
+    start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
+    logger.info("time range from {} to {} ".format(start_time, end_time))
+    for i in range(len(df_confs)):
+        try:
+            factory = df_confs.loc[i, 'factory']
+            sn = df_confs.loc[i, 'device_name']
+            if '康普盾' in factory:
+                celltype = 1
+                cell_volt_count = 120
+                cell_temp_count = 40
+            elif '华霆' in factory:
+                celltype = 2
+                cell_volt_count = 204
+                cell_temp_count = 40
+            elif '力神' in factory:
+                celltype = 99
+                cell_volt_count = 72
+                cell_temp_count = 32
+
+            logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
+            sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
+            logger.info(sql)
+            df_bms = pd.read_sql(sql, db_engine)
+            if df_bms.empty:
+                continue
+            df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
+                              'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
+                              'CellVolt', 'id']
+            # 单体数据拆分
+            cell_temp = df_bms['CellTemp']
+            cell_volt = df_bms['CellVolt']
+            cell_temps = []
+            [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
+            cell_volts = []
+            [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
+            del_indexes = []
+            for i, cell_volt in enumerate(cell_volts):
+                if len(cell_volt) != cell_volt_count:
+                    del_indexes.append(i)
+            for i, cell_temp in enumerate(cell_temps):
+                if len(cell_temp) != cell_temp_count:
+                    del_indexes.append(i)
+            del_indexes = list(set(del_indexes))
+            for i, del_index in enumerate(del_indexes):
+                del_index = del_index - i
+                cell_volts.pop(del_index)
+                cell_temps.pop(del_index)
+            df_bms = df_bms.drop(index=del_indexes, axis=1)
+            cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
+            celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
+            df_bms[cellvolt_name] = cell_volts
+            df_bms[celltemp_name] = cell_temps
+            df_bms = df_bms.drop(columns='CellVolt', axis=0)
+            df_bms = df_bms.drop(columns='CellTemp', axis=0)
+            df_bms = df_bms.reset_index(drop=True)
+            logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
+
+            # 算法执行
+            df_soh = pd.read_sql(
+                "select time_st,time_sp,sn,method,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
+                db_engine);
+
+            if celltype>50:
+                df_socdiff = pd.read_sql(
+                    "select time,sn,cellsoc_diff from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
+                    db_engine);
+            else:
+                df_socdiff = pd.DataFrame()
+
+            df_ram_sn=df_ram[df_ram['sn']==sn]
+            BatSoc= CBMSBatSoc.BatSoc(sn, celltype, df_bms, df_soh, df_ram_sn, df_socdiff)
+            df_res, df_ram_sn=BatSoc.batsoc()
+            if not df_ram_sn.empty:
+                sn_index=df_ram.loc[df_ram['sn']==sn].index
+                df_ram=df_ram.drop(index=sn_index)
+                df_ram=df_ram.append(df_ram_sn)
+                df_ram.reset_index(inplace=True,drop=True)
+
+            # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+            if (runenv == 'dev'):
+                logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn))
+                df_res = pd.DataFrame({'time': ["2021-01-01 00:00:00"],
+                                       'sn': [sn], 'bms_soc': [10],'packsoc': [1], 'socdsp':[100], 'cellsocmin': [99], 'cellsocmax': [100], 'socmin_num':[10], 'socmax_num':[20],
+                                       'cellsoc_diff':['12,22'], 'cellsoc':[10,20], 'ocvweight':[1], 'socstep':[2]})
+            if not df_res.empty:
+                df_res.columns = ['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax', 'socmin_num', 'socmax_num', 'cellsoc_diff', 'cellsoc', 'ocvweight', 'socstep']
+                df_res['factory'] = factory
+                df_res['add_time'] = datetime.datetime.now()
+
+                df_res.to_sql("soc_result", con=db_engine, if_exists="append", index=False)
+                logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn))
+
+            logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
+        except:
+            logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
+            logger.error(traceback.format_exc)
+    db_engine.dispose()
+    logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
+
+
+# 多进程运行
+# def mainprocess(host, port, db, user, password, runenv, logger, period_second, process):
+#
+#     process = int(process)
+#     pool = multiprocessing.Pool(processes = process)
+#
+#     for i in range(process):
+#         sn_list = SNnums[i]
+#         pool.apply_async(fun, (host, port, db, user, password, runenv, logger, period_second, ))
+#
+#     pool.close()
+#     pool.join()
+
+def heart_beat(host, port, db, user, password, log):
+    while True:
+        try:
+            db_engine = create_engine(
+                "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                    user, parse.quote_plus(password), host, port, db
+                ))
+            while True:
+                now = datetime.datetime.now()
+                db_engine.execute("update status set algo_soc='{}'".format(now))
+                log.info("soc心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
+                time.sleep(5)
+        except:
+            log.error(u"soc心跳错误", exc_info=True)
+            log.error(traceback.format_exc)
+            time.sleep(5)
+        finally:
+            db_engine.dispose()
+
+
+
+
+
+# 开启定时任务
+if __name__ == '__main__':
+    warnings.filterwarnings("ignore")
+    env_dist = os.environ
+    # 配置信息信息
+    host = env_dist.get("ALI_HOST", '120.25.223.1')
+    port = env_dist.get("ALI_PORT", '4901')
+    db = env_dist.get("ALI_DB", 'ali')
+    user = env_dist.get("ALI_ROOT", 'root')
+    password = env_dist.get("ALI_PASSWORD", '123456')
+    runenv = env_dist.get("ALI_RUNENV", 'pro')
+    process = env_dist.get("ALI_PROCESS", '10')
+    period_second = int(env_dist.get("ALI_PERIOD_SECOND", '10'))
+
+    algo_name = 'soc'
+    # 日志配置
+    log_path = './log' + '/' + algo_name
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M.log"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.DEBUG)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
+    # 算法参数初始化
+    column_name=['time', 'sn', 'bms_soc', 'soc','cellsoc','standingtime','rampackcrnt','ramcellvolt','kocellvoltmin','kocellvoltmax','ocvweight','as_accum','socstep']
+    df_ram=pd.DataFrame(columns=column_name)
+
+    p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
+    p.start()
+    scheduler = BlockingScheduler()
+    # heart_beat(host, port, db, user, password, logger)
+    # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
+    fun(host, port, db, user, password, runenv, logger, period_second)
+    scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
+                      ,max_instances=1, coalesce=True)
+
+    try:
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))

BIN
USER/SPF/alibaba/05BatDiag/CBMSBatDiag.py


+ 246 - 0
USER/SPF/alibaba/05BatDiag/deploy_diag.py

@@ -0,0 +1,246 @@
+
+import os
+import time
+
+import pandas as pd
+from sqlalchemy import create_engine
+import logging
+import logging.handlers
+import traceback
+import re
+import CBMSBatDiag
+import datetime
+import dateutil.relativedelta
+from urllib import parse
+from apscheduler.schedulers.blocking import BlockingScheduler
+import warnings
+import pymysql
+from urllib import parse
+from multiprocessing import Process
+
+
+def diag_cal(host, port, db, user, password, runenv, logger, period_second):
+    logger.info("pid is {}".format(os.getpid()))
+    try:
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, parse.quote_plus(password), host, port, db
+            ))
+        conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
+        cursor = conn.cursor()
+    except:
+        logger.error(u"数据库连接错误", exc_info=True)
+        logger.error(traceback.format_exc)
+        return
+
+    # 获取历史报警数据
+    df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where time_sp='0000-00-00 00:00:00'",db_engine);
+    # 获取配置信息
+    df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
+    # db_engine.dispose()
+
+
+
+
+    now_time = datetime.datetime.now()
+    pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-5)  #
+    end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
+    start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
+    logger.info("time range from {} to {} ".format(start_time, end_time))
+    for i in range(len(df_confs)):
+        try:
+            factory = df_confs.loc[i, 'factory']
+            sn = df_confs.loc[i, 'device_name']
+            if '康普盾' in factory:
+                celltype = 1
+                cell_volt_count = 120
+                cell_temp_count = 40
+            elif '华霆' in factory:
+                celltype = 2
+                cell_volt_count = 68
+                cell_temp_count = 40
+            elif '力神' in factory:
+                celltype = 99
+                cell_volt_count = 72
+                cell_temp_count = 32
+
+            logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
+            sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
+            logger.info(sql)
+            df_bms = pd.read_sql(sql, db_engine)
+            if df_bms.empty:
+                continue
+            df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
+                              'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
+                              'CellVolt', 'id']
+            # 单体数据拆分
+            cell_temp = df_bms['CellTemp']
+            cell_volt = df_bms['CellVolt']
+            cell_temps = []
+            [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
+            cell_volts = []
+            [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
+            del_indexes = []
+            for i, cell_volt in enumerate(cell_volts):
+                if len(cell_volt) != cell_volt_count:
+                    del_indexes.append(i)
+            for i, cell_temp in enumerate(cell_temps):
+                if len(cell_temp) != cell_temp_count:
+                    del_indexes.append(i)
+
+            for i, del_index in enumerate(del_indexes):
+                del_index = del_index - i
+                cell_volts.pop(del_index)
+                cell_temps.pop(del_index)
+            df_bms = df_bms.drop(index=del_indexes, axis=1)
+            cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
+            celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
+            df_bms[cellvolt_name] = cell_volts
+            df_bms[celltemp_name] = cell_temps
+            df_bms = df_bms.drop(columns='CellVolt', axis=0)
+            df_bms = df_bms.drop(columns='CellTemp', axis=0)
+            df_bms = df_bms.reset_index(drop=True)
+            logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
+
+            # 算法执行
+            df_soh = pd.read_sql(
+                "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
+                db_engine);
+            df_uniform = pd.read_sql(
+                "select time,sn,cellsoc_diff,cellmin_num,cellmax_num from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
+                db_engine);
+            df_soc = pd.read_sql(
+                "select time,sn,packsoc from soc_result where factory ='{}' and sn = '{}' order by add_time desc limit 1".format(factory, sn),
+                db_engine);
+            #电池诊断................................................................................................................................................................
+            if not df_bms.empty:
+                df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn]
+                df_diag_ram_sn.reset_index(inplace=True,drop=True)
+
+                batdiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms,df_soh,df_soc,df_uniform,df_diag_ram_sn)
+                df_diag_res, df_health_res=batdiag.diag()   #获取电池故障结果和电池评分结果
+
+                #电池评分写入数据库
+                if not df_health_res.empty:   #变为历史故障更改数据库
+                    df_health_res['add_time'] = datetime.datetime.now()
+                    df_health_res['factory'] = factory
+                    df_health_res.to_sql("health_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} health_result 写入成功!!!\n".format(sn), exc_info=True)
+                #历史故障筛选并更改数据库故障结束时间.........................................................
+                if not df_diag_res.empty:
+                    df_diag_now=df_diag_res[df_diag_res['time_sp'] == '0000-00-00 00:00:00']   #去除历史故障
+                    df_diag_new = pd.concat([df_diag_res,df_diag_ram_sn,df_diag_ram_sn]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障
+                    df_diag_end=pd.concat([df_diag_res,df_diag_new,df_diag_new]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障
+                    df_diag_end=df_diag_end[df_diag_end['time_sp'] != '0000-00-00 00:00:00']
+                    df_diag_end.reset_index(inplace=True,drop=True)  #重置索引
+                    if not df_diag_end.empty:   #变为历史故障更改数据库
+                        try:
+                            for i in range(0,len(df_diag_end)):
+                                sql = '''update fault_result set update_time='{}', time_sp='{}' where sn='{}' and faultcode={} and time_st='{}'
+                                            '''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), df_diag_end.loc[i,'time_sp'], sn, df_diag_end.loc[i,'faultcode'],
+                                                       df_diag_end.loc[i,'time_st'])
+                                cursor.execute(sql)
+                                conn.commit()
+                            logger.info(u"{} fault_result更新成功\n".format(sn), exc_info=True)
+                        except:
+                            logger.error(traceback.format_exc)
+                            logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
+                    #新增故障筛选并存入数据库.....................................................................
+                    if not df_diag_new.empty:
+                        df_diag_new['add_time'] = datetime.datetime.now()
+                        df_diag_new['factory'] = factory
+                        df_diag_new.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
+                        logger.info(u"{} fault_result 写入成功!!!\n".format(sn), exc_info=True)
+                    #更新diag的Ram数据
+                    df_diag_ram=df_diag_ram.drop(df_diag_ram[df_diag_ram.sn==sn].index)
+                    df_diag_ram=df_diag_ram.append(df_diag_now, ignore_index=True)
+                    df_diag_ram.reset_index(inplace=True,drop=True)  #重置索引
+
+            logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
+        except:
+            logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
+            logger.error(traceback.format_exc)
+    db_engine.dispose()
+    cursor.close()
+    conn.close()
+    logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
+
+
+def heart_beat(host, port, db, user, password, log):
+    while True:
+        try:
+            db_engine = create_engine(
+                "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                    user, parse.quote_plus(password), host, port, db
+                ))
+            while True:
+                now = datetime.datetime.now()
+                db_engine.execute("update status set algo_diag='{}'".format(now))
+                log.info("diag心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
+                time.sleep(5)
+        except:
+            log.error(u"diag心跳错误", exc_info=True)
+            log.error(traceback.format_exc)
+            time.sleep(5)
+        finally:
+            db_engine.dispose()
+
+# 开启定时任务
+if __name__ == '__main__':
+    warnings.filterwarnings("ignore")
+    env_dist = os.environ
+    # 配置信息信息
+    # host = env_dist.get("ALI_HOST", '120.25.223.1')
+    # port = env_dist.get("ALI_PORT", '4901')
+    # db = env_dist.get("ALI_DB", 'ali')
+    # user = env_dist.get("ALI_ROOT", 'root')
+    # password =
+    host = env_dist.get("ALI_HOST", '192.168.31.141')
+    port = env_dist.get("ALI_PORT", '3306')
+    db = env_dist.get("ALI_DB", 'ali')
+    user = env_dist.get("ALI_ROOT", 'root')
+    password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
+    runenv = env_dist.get("ALI_RUNENV", 'pro')
+    period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60'))
+
+
+    algo_name = 'diag'
+    # 日志配置
+    log_path = './log' + '/' + algo_name
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M.log"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.DEBUG)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
+                                              backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
+    # 调度
+    p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
+    p.start()
+    scheduler = BlockingScheduler()
+    # heart_beat(host, port, db, user, password, logger)
+    # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
+    diag_cal(host, port, db, user, password, runenv, logger, period_second)
+    scheduler.add_job(func=diag_cal, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
+                      ,max_instances=1, coalesce=True)
+
+    try:
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))

+ 196 - 0
USER/SPF/alibaba/06BatSafetyAlarm/BatParam.py

@@ -0,0 +1,196 @@
+
+#定义电池参数
+
+class BatParam:
+    def __init__(self,celltype):
+
+        #公用参数................................................................................................................................................
+        #热失控参数
+        self.TrwTempHigh=60
+        self.TrwTempRate=3
+        self.TrwTempDiff=15
+        self.TrwCellVoltDiff=2.5
+        self.TrwCellVoltFall=0.5
+        self.TrwCellVoltLow=1.5
+        self.TrwPackVoltFall=1.5
+        
+        #故障诊断参数
+        self.temp_time=10
+        self.volt_time=10
+
+        self.SocJump=10
+        self.SocClamp=0.1
+        self.SocLow=3
+        self.SocDiff=20
+
+        self.SohLow=65
+        self.SohDiff=15
+
+        #mana-kendall趋势检验参数
+        self.mk_p=0.001
+        self.mk_z=-5
+        self.mk_Tau=-0.7
+        self.mk_slope=-0.1
+        self.mk_s=-260
+        self.mk_svar=1600
+
+        self.OcvWeight_Temp=[-30,-20,-10,0,10,20,30,40,50]
+        self.OcvWeight_StandingTime=[0,500,600,1200,1800,3600,7200,10800]
+        self.OcvWeight            =[[0,0,  0,  0,    0,   0.1,0.3, 1],
+                                    [0,0,  0,  0,    0,   0.1,0.3, 1],
+                                    [0,0,  0,  0,    0,   0.2,0.5, 1],
+                                    [0,0,  0,  0,    0.2, 0.4,0.8, 1],
+                                    [0,0,  0,  0.1,  0.3, 0.6,1,   1],
+                                    [0,0,  0.1,0.2,  0.5, 0.8,1,   1],
+                                    [0,0,  0.1,0.3,  0.6, 1,  1,   1],
+                                    [0,0,  0.1,0.3,  0.7, 1,  1,   1],
+                                    [0,0,  0.2,0.3,  0.8, 1,  1,   1]]
+
+
+        if celltype==1: #永联+康普顿21700
+            self.Capacity = 4*2
+            self.PackFullChrgVolt=69.99
+            self.CellFullChrgVolt=4.2
+            self.CellFullChrgCrnt=-4
+            self.CellVoltNums=120
+            self.CellTempNums=40
+            self.FullChrgSoc=90
+            self.PackCrntDec=1
+            self.BalCurrent=0.015
+            self.LookTab_SOC = [0,	     5,	    10,	    15,	    20,	    25,	    30,	    35,	    40,	    45,	    50,	    55,	    60,	    65,	    70,	    75,	    80,	    85,	    90,	    95,	    100]
+            self.LookTab_OCV = [2.8497,	3.0924,	3.2868,	3.4117,	3.4461,	3.5032,	3.5602,	3.6049,	3.6368,	3.6709,	3.7121,	3.7593,	3.8237,	3.8755,	3.9093,	3.9518,	4.0119,	4.0615,	4.077,	4.0969,	4.1821]
+
+            self.CellOvLv1=4.2
+            self.CellOvLv2=4.25
+            self.CellUvLv1=2.8
+            self.CellUvLv2=2.5
+            self.CellVoltDiffLv1=0.3
+            self.CellVoltDiffLv2=0.5
+            self.PackVoltOvLv1=self.CellOvLv1*self.CellVoltNums
+            self.PackVoltOvLv2=self.CellOvLv2*self.CellVoltNums
+            self.PackVoltUvLv1=self.CellUvLv1*self.CellVoltNums
+            self.PackVoltUvLv2=self.CellUvLv2*self.CellVoltNums
+
+            self.CellTempUpLmt=119
+            self.CellTempLwLmt=-39
+            self.CellTempHighLv1=45
+            self.CellTempHighLv2=50
+            self.CellTempLowLv1=0
+            self.CellTempLowLv2=-5
+            self.CellTempDiffLv1=10
+            self.CellTempDiffLv2=15
+            self.CellTempRate=5
+
+            self.PackChgOc=-40
+            self.PackDisOc=200
+
+            self.LeakCurrentLv1=-10
+            self.LeakCurrentLv2=-15
+            self.LeakCurrentLv3=-50
+
+            self.TrwVoltRate=-1
+
+        elif celltype==2: #中恒+华庭18650
+            self.Capacity = 2*9
+            self.CellVoltNums= 68
+            self.CellTempNums= 40
+            self.CellFullChrgVolt=4.2
+            self.CellFullChrgCrnt=-self.Capacity/2
+            self.FullChrgSoc=100
+            self.PackCrntDec=1
+            self.BalCurrent=0.015
+            self.LookTab_SOC = [0,	     5,	    10,	    15,	    20,	    25,	    30,	    35,	    40,	    45,	    50,	    55,	    60,	    65,	    70,	    75,	    80,	    85,	    90,	    95,	    100]
+            self.LookTab_OCV = [2.929,	3.2902,	3.3888,	3.425,	3.4846,	3.5382,	3.577,	3.6083,	3.638,	3.6786,	3.7127,	3.7596,	3.7853,	3.8371,	3.9081,	3.9127,	4.0097,	4.0314,	4.0463,	4.0969,	4.174]
+
+            self.CellOvLv1=4.04
+            self.CellOvLv2=4.05
+            self.CellUvLv1=2.99
+            self.CellUvLv2=2.950
+            self.CellVoltDiffLv1=0.35
+            self.CellVoltDiffLv2=0.4
+            self.PackVoltOvLv1=self.CellOvLv1*self.CellVoltNums
+            self.PackVoltOvLv2=self.CellOvLv2*self.CellVoltNums
+            self.PackVoltUvLv1=self.CellUvLv1*self.CellVoltNums
+            self.PackVoltUvLv2=self.CellUvLv2*self.CellVoltNums
+
+            self.CellTempUpLmt=119
+            self.CellTempLwLmt=-39
+            self.CellTempHighLv1=45
+            self.CellTempHighLv2=50
+            self.CellTempLowLv1=0
+            self.CellTempLowLv2=-5
+            self.CellTempDiffLv1=10
+            self.CellTempDiffLv2=15
+            self.CellTempRate=5
+
+            self.PackChgOc=-45
+            self.PackDisOc=200
+
+            self.LeakCurrentLv1=-10
+            self.LeakCurrentLv2=-15
+            self.LeakCurrentLv3=-50
+
+            self.TrwVoltRate=-1
+
+        elif celltype==99:   #永联+力神
+            self.Capacity = 40*2
+            self.CellVoltNums=72
+            self.CellTempNums=32
+            self.CellFullChrgVolt=3.5
+            self.CellFullChrgCrnt=-self.Capacity/2
+
+            self.OcvInflexionBelow=3.285
+            self.OcvInflexion2=3.303
+            self.OcvInflexion3=3.343
+            self.OcvInflexionAbove=3.36
+            self.SocInflexion1=30
+            self.SocInflexion2=60
+            self.SocInflexion3=70
+
+            self.FullChrgSoc=97
+            self.PeakSoc=62.5
+            self.PeakVoltLowLmt=3.37
+            self.PeakVoltUpLmt=3.42
+            self.PeakCellVolt=[3.384,3.385,3.386,3.387,3.388]
+            self.PackCrntDec=1
+            self.BalCurrent=0.015
+
+            self.LookTab_SOC = [0,	5,	10,	15,	20,	25,	30,	35,	40,	45,	50,	55,	60,	61,	62,	63,	64,	65,	66,	67,	68,	69,	70,	75,	80,	85,	90,	95,	96,	97,	98,	99,	100]
+            self.LookTab_OCV = [2.6748,	3.2020,	3.2170,	3.2430,	3.2600,	3.2770,	3.2880,	3.2890,	3.2900,	3.2910,	3.2911,	3.2970,	3.3030,	3.3080,	3.3200,	3.3270,	3.3290,	3.3300,	3.3301,	3.3310,	3.3311,	3.3312,	3.3313,	3.3314,	3.3315,	3.3316,	3.3320,	3.3330,	3.3340,	3.3350,	3.3360,	3.3580,	3.3640]
+            self.LookTab_OCVChg=[2.6460, 3.1570, 3.2220, 3.2450,3.2760,	3.2970,	3.3060,	3.3070,	3.3080,	3.3090,	3.3110,	3.3140,	3.3220,	3.3270,	3.3330,	3.3380,	3.3420,	3.3421,	3.3422,	3.3423,	3.3424,	3.3431,	3.3432,	3.3433,	3.3434,	3.3441,	3.3442,	3.3443,	3.3444,	3.3445,	3.3446,	3.3447,	3.3640]
+
+            self.CellOvLv1=3.65
+            self.CellOvLv2=3.7
+            self.CellUvLv1=2.7
+            self.CellUvLv2=2.6
+            self.CellVoltDiffLv1=0.6
+            self.CellVoltDiffLv2=1.1
+            self.PackVoltOvLv1=self.CellOvLv1*self.CellVoltNums
+            self.PackVoltOvLv2=self.CellOvLv2*self.CellVoltNums
+            self.PackVoltUvLv1=self.CellUvLv1*self.CellVoltNums
+            self.PackVoltUvLv2=self.CellUvLv2*self.CellVoltNums
+
+            self.CellTempUpLmt=119
+            self.CellTempLwLmt=-39
+            self.CellTempHighLv1=45
+            self.CellTempHighLv2=50
+            self.CellTempLowLv1=0
+            self.CellTempLowLv2=-5
+            self.CellTempDiffLv1=15
+            self.CellTempDiffLv2=20
+            self.CellTempRate=5
+
+            self.PackChgOc=-160
+            self.PackDisOc=180
+
+            self.LeakCurrentLv1=-20
+            self.LeakCurrentLv2=-30
+            self.LeakCurrentLv3=-100
+
+            self.TrwVoltRate=-8  
+            self.mk_slope=-0.8
+        
+
+        else:
+            print('未找到对应电池编号!!!')
+            # sys.exit()

BIN
USER/SPF/alibaba/06BatSafetyAlarm/CBMSSafetyAlarm.py


+ 236 - 0
USER/SPF/alibaba/06BatSafetyAlarm/deploy_safetyalarm.py

@@ -0,0 +1,236 @@
+
+import os
+import time
+
+import pandas as pd
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+import logging
+import logging.handlers
+import traceback
+import re
+import CBMSSafetyAlarm
+import datetime
+import dateutil.relativedelta
+from urllib import parse
+from apscheduler.schedulers.blocking import BlockingScheduler
+import warnings
+import pymysql
+from multiprocessing import Process
+
+
+
+
+def fun(host, port, db, user, password, runenv, logger, period_second):
+    logger.info("pid is {}".format(os.getpid()))
+    global df_bms_ram, df_alarm_ram
+    try:
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, parse.quote_plus(password), host, port, db
+            ))
+
+        conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
+        cursor = conn.cursor()
+        # 获取配置信息
+        df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
+        # db_engine.dispose()
+    except:
+        logger.error(u"数据库连接错误", exc_info=True)
+        logger.error(traceback.format_exc)
+        return
+
+    # 获取历史报警数据
+
+    df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 119 and time_sp='0000-00-00 00:00:00'",db_engine);
+
+
+
+    now_time = datetime.datetime.now()
+    pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second)  #1min
+    end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
+    start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
+
+    logger.info("time range from {} to {} ".format(start_time, end_time))
+    for i in range(len(df_confs)):
+        try:
+            factory = df_confs.loc[i, 'factory']
+            sn = df_confs.loc[i, 'device_name']
+            if '康普盾' in factory:
+                celltype = 1
+                cell_volt_count = 120
+                cell_temp_count = 40
+            elif '华霆' in factory:
+                celltype = 2
+                cell_volt_count = 68
+                cell_temp_count = 40
+            elif '力神' in factory:
+                celltype = 99
+                cell_volt_count = 72
+                cell_temp_count = 32
+
+            logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
+
+            sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
+            logger.info(sql)
+            df_bms = pd.read_sql(sql, db_engine)
+            if df_bms.empty:
+                continue
+            df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
+                              'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
+                              'CellVolt', 'id']
+            # 单体数据拆分
+            cell_temp = df_bms['CellTemp']
+            cell_volt = df_bms['CellVolt']
+            cell_temps = []
+            [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
+            cell_volts = []
+            [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
+            del_indexes = []
+            for i, cell_volt in enumerate(cell_volts):
+                if len(cell_volt) != cell_volt_count:
+                    del_indexes.append(i)
+            for i, cell_temp in enumerate(cell_temps):
+                if len(cell_temp) != cell_temp_count:
+                    del_indexes.append(i)
+
+            del_indexes = list(set(del_indexes))
+            for i, del_index in enumerate(del_indexes):
+                del_index = del_index - i
+                cell_volts.pop(del_index)
+                cell_temps.pop(del_index)
+            df_bms = df_bms.drop(index=del_indexes, axis=1)
+            cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
+            celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
+            df_bms[cellvolt_name] = cell_volts
+            df_bms[celltemp_name] = cell_temps
+            df_bms = df_bms.drop(columns='CellVolt', axis=0)
+            df_bms = df_bms.drop(columns='CellTemp', axis=0)
+            df_bms = df_bms.reset_index(drop=True)
+            logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
+
+            # 算法执行
+            #电池诊断................................................................................................................................................................
+            df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn]
+            df_bms_ram_sn=df_bms_ram[df_bms_ram['sn']==sn]
+            df_alarm_ram_sn=df_alarm_ram[df_alarm_ram['sn']==sn]
+            if df_diag_ram_sn.empty:
+
+                SafetyAlarm=CBMSSafetyAlarm.SafetyAlarm(sn,celltype,df_bms, df_bms_ram_sn, df_alarm_ram_sn)
+                df_diag_res, df_bms_res, df_ram_res=SafetyAlarm.safety_alarm_diag()
+
+                #更新bms的ram数据
+                sn_index=df_bms_ram.loc[df_bms_ram['sn']==sn].index
+                df_bms_ram=df_bms_ram.drop(index=sn_index)
+                df_bms_ram=df_bms_ram.append(df_bms_res)
+
+                sn_index=df_alarm_ram.loc[df_alarm_ram['sn']==sn].index
+                df_alarm_ram=df_alarm_ram.drop(index=sn_index)
+                df_alarm_ram=df_alarm_ram.append(df_ram_res)
+
+                #当前热失控故障写入数据库
+                if not df_diag_res.empty:
+                    df_diag_res['add_time'] = datetime.datetime.now()
+                    df_diag_res['factory'] = factory
+                    df_diag_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} 写入成功!!!\n".format(sn), exc_info=True)
+
+            #当前热失控已超过一天变为历史故障并更改数据库
+            else:
+                fault_time=df_diag_ram_sn.iloc[-1]['time_st']
+                if (now_time-fault_time).total_seconds()>24 * 3600:
+                    df_diag_ram_sn['time_sp']=end_time
+                    try:
+
+                        cursor.execute('''
+                                    update fault_result set update_time='{}',time_sp='{}' where sn='{}' and time_sp='0000-00-00 00:00:00' and faultcode={}
+                                    '''.format(datetime.datetime.now(), end_time,sn, 119))
+                        conn.commit()
+                        logger.info(u"{} 更新成功\n".format(sn), exc_info=True)
+
+                    except:
+                        logger.error(traceback.format_exc)
+                        logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
+
+            logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
+        except:
+            logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
+            logger.error(traceback.format_exc)
+    db_engine.dispose()
+    logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
+
+def heart_beat(host, port, db, user, password, log):
+    while True:
+        try:
+            db_engine = create_engine(
+                "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                    user, parse.quote_plus(password), host, port, db
+                ))
+            while True:
+                now = datetime.datetime.now()
+                db_engine.execute("update status set algo_safetyalarm='{}'".format(now))
+                log.info("safetyalarm心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
+                time.sleep(5)
+        except:
+            log.error(u"safetyalarm心跳错误", exc_info=True)
+            log.error(traceback.format_exc)
+            time.sleep(5)
+        finally:
+            db_engine.dispose()
+
+if __name__ == '__main__':
+    warnings.filterwarnings("ignore")
+    env_dist = os.environ
+    # 配置信息信息
+    host = env_dist.get("ALI_HOST", '120.25.223.1')
+    port = env_dist.get("ALI_PORT", '4901')
+    db = env_dist.get("ALI_DB", 'ali')
+    user = env_dist.get("ALI_ROOT", 'root')
+    password = env_dist.get("ALI_PASSWORD", '123456')
+    runenv = env_dist.get("ALI_RUNENV", 'dev')
+    period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60'))
+
+
+    algo_name = 'safetyalarm'
+    # 日志配置
+    log_path = './log' + '/' + algo_name
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M.log"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.DEBUG)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    #参数初始化
+    df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
+    df_alarm_ram=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
+
+    logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
+    # 开启定时任务
+    p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
+    p.start()
+    scheduler = BlockingScheduler()
+    # heart_beat(host, port, db, user, password, logger)
+    # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
+    fun(host, port, db, user, password, runenv, logger, period_second)
+    scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
+                      ,max_instances=1, coalesce=True)
+
+
+    try:
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))

BIN
USER/SPF/alibaba/07BatSafetyWarning/CBMSBatInterShort.py


BIN
USER/SPF/alibaba/07BatSafetyWarning/CBMSBatUniform.py


BIN
USER/SPF/alibaba/07BatSafetyWarning/CBMSSafetyWarning.py


BIN
USER/SPF/alibaba/07BatSafetyWarning/VoltStray.py


+ 332 - 0
USER/SPF/alibaba/07BatSafetyWarning/deploy_safetywarning.py

@@ -0,0 +1,332 @@
+
+import os
+import pandas as pd
+from sqlalchemy import create_engine
+import logging.handlers
+import traceback
+import re
+import CBMSBatInterShort
+import CBMSBatUniform
+import VoltStray
+import CBMSSafetyWarning
+import datetime
+import dateutil.relativedelta
+from urllib import parse
+from apscheduler.schedulers.blocking import BlockingScheduler
+import warnings
+import pymysql
+from multiprocessing import Process
+import time
+
+
+#电池热安全预警核心算法函数
+def fun(host, port, db, user, password, runenv, logger, period_second):
+    logger.info("pid is {}".format(os.getpid()))
+    global df_warning_ram
+    global df_warning_ram1
+    global df_warning_ram2
+    global df_warning_ram3
+    global df_lfp_ram
+    global df_lfp_ram1
+    try:
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, parse.quote_plus(password), host, port, db
+            ))
+
+        conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
+        cursor = conn.cursor()
+    except:
+        logger.error(u"数据库连接错误", exc_info=True)
+        logger.error(traceback.format_exc)
+        return
+
+    now_time=datetime.datetime.now()
+    start_time=now_time-datetime.timedelta(seconds=6*3600) #6*3600
+    start_time1=now_time-datetime.timedelta(seconds=7*24*3600) #7*24*3600
+    start_time2=now_time-datetime.timedelta(seconds=3*24*3600) #3*24*3600
+    start_time3=now_time-datetime.timedelta(seconds=1*24*3600) #1*24*3600
+    start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
+    start_time1=start_time1.strftime('%Y-%m-%d %H:%M:%S')
+    start_time2=start_time2.strftime('%Y-%m-%d %H:%M:%S')
+    start_time3=start_time3.strftime('%Y-%m-%d %H:%M:%S')
+    end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
+
+
+    # now_time = datetime.datetime.now()
+    # pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second)  #1min
+    # end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
+    # start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
+    # 获取配置信息
+    df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
+    # db_engine.dispose()
+
+    # 获取历史报警数据
+    df_fault_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 110 and time_sp='0000-00-00 00:00:00'",db_engine);
+
+
+
+
+    logger.info("time range from {} to {} ".format(start_time, end_time))
+    for i in range(len(df_confs)):
+        try:
+            factory = df_confs.loc[i, 'factory']
+            sn = df_confs.loc[i, 'device_name']
+            if '康普盾' in factory:
+                celltype = 1
+                cell_volt_count = 120
+                cell_temp_count = 40
+            elif '华霆' in factory:
+                celltype = 2
+                cell_volt_count = 68
+                cell_temp_count = 40
+            elif '力神' in factory:
+                celltype = 99
+                cell_volt_count = 72
+                cell_temp_count = 32
+
+            logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
+            sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
+            logger.info(sql)
+            df_bms = pd.read_sql(sql, db_engine)
+            if df_bms.empty:
+                continue
+            df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
+                              'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
+                              'CellVolt', 'id']
+            # 单体数据拆分
+            cell_temp = df_bms['CellTemp']
+            cell_volt = df_bms['CellVolt']
+            cell_temps = []
+            [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
+            cell_volts = []
+            [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
+            del_indexes = []
+            for i, cell_volt in enumerate(cell_volts):
+                if len(cell_volt) != cell_volt_count:
+                    del_indexes.append(i)
+            for i, cell_temp in enumerate(cell_temps):
+                if len(cell_temp) != cell_temp_count:
+                    del_indexes.append(i)
+
+            del_indexes = list(set(del_indexes))
+            for i, del_index in enumerate(del_indexes):
+                del_index = del_index - i
+                cell_volts.pop(del_index)
+                cell_temps.pop(del_index)
+            df_bms = df_bms.drop(index=del_indexes, axis=1)
+            cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
+            celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
+            df_bms[cellvolt_name] = cell_volts
+            df_bms[celltemp_name] = cell_temps
+            df_bms = df_bms.drop(columns='CellVolt', axis=0)
+            df_bms = df_bms.drop(columns='CellTemp', axis=0)
+            df_bms = df_bms.reset_index(drop=True)
+            logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
+
+            # 算法执行
+            df_soh = pd.read_sql(
+                "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
+                db_engine);
+            df_uniform = pd.read_sql(
+                "select time,sn,cellsoc_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
+                db_engine);
+
+            if not df_bms.empty:
+                #ram处理...............................................................................................................
+                df_warning_ram_sn=df_warning_ram[df_warning_ram['sn']==sn]
+                df_warning_ram_sn1=df_warning_ram1[df_warning_ram1['sn']==sn]
+                df_warning_ram_sn2=df_warning_ram2[df_warning_ram2['sn']==sn]
+                df_warning_ram_sn3=df_warning_ram3[df_warning_ram3['sn']==sn]
+                df_warning_ram_sn.reset_index(inplace=True,drop=True)     #重置索引
+                df_warning_ram_sn1.reset_index(inplace=True,drop=True)     #重置索引
+                df_warning_ram_sn2.reset_index(inplace=True,drop=True)     #重置索引
+                df_warning_ram_sn3.reset_index(inplace=True,drop=True)     #重置索引
+                if celltype>50 and (not df_lfp_ram.empty):
+                    df_lfp_ram_sn=df_lfp_ram[df_lfp_ram['sn']==sn]
+                    df_lfp_ram_sn.reset_index(inplace=True,drop=True)     #重置索引
+                else:
+                    df_lfp_ram_sn=pd.DataFrame()
+                    df_lfp_ram=pd.DataFrame(columns=df_bms.columns.tolist())
+                if celltype>50 and (not df_lfp_ram1.empty):
+                    df_lfp_ram_sn1=df_lfp_ram1[df_lfp_ram1['sn']==sn]
+                    df_lfp_ram_sn1.reset_index(inplace=True,drop=True)     #重置索引
+                else:
+                    df_lfp_ram_sn1=pd.DataFrame()
+                    df_lfp_ram1=pd.DataFrame(columns=df_bms.columns.tolist())
+
+                #内短路计算..................................................................................................................................................
+                BatShort=CBMSBatInterShort.BatInterShort(sn,celltype,df_bms,df_soh,df_warning_ram_sn,df_warning_ram_sn1,df_warning_ram_sn2,df_warning_ram_sn3,df_lfp_ram_sn)
+                df_short_res, df_ram_res, df_ram_res1, df_ram_res2, df_ram_res3, df_ram_res4=BatShort.intershort()
+                if not df_short_res.empty:
+                    df_short_res['add_time'] = datetime.datetime.now()
+                    df_short_res['factory'] = factory
+                    df_short_res.to_sql("intershort_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} intershort_result写入成功!!!\n".format(sn), exc_info=True)
+
+                #!!!!!!!!!!!!!!往下还未进行部署修改
+
+                #静置电压排名..................................................................................................................................................
+                BatUniform=CBMSBatUniform.BatUniform(sn,celltype,df_bms,df_uniform,df_ram_res3,df_lfp_ram_sn1)
+                df_rank_res, df_ram_res3, df_ram_res5=BatUniform.batuniform()
+                if not df_rank_res.empty:
+                    df_rank_res['add_time'] = datetime.datetime.now()
+                    df_rank_res['factory'] = factory
+                    df_rank_res.to_sql("uniform_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} uniform_result写入成功!!!\n".format(sn), exc_info=True)
+
+                #电压离群.....................................................................................................................................................
+                df_voltsigma=VoltStray.main(sn,df_bms,celltype)
+                if not df_voltsigma.empty:
+                    df_voltsigma['add_time'] = datetime.datetime.now()
+                    df_voltsigma['factory'] = factory
+                    df_voltsigma.to_sql("outlier_voltchangeratio_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} outlier_voltchangeratio_result写入成功!!!\n".format(sn), exc_info=True)
+
+                #ram处理................................................................................................................
+                df_warning_ram=df_warning_ram.drop(df_warning_ram[df_warning_ram.sn==sn].index)
+                df_warning_ram1=df_warning_ram1.drop(df_warning_ram1[df_warning_ram1.sn==sn].index)
+                df_warning_ram2=df_warning_ram2.drop(df_warning_ram2[df_warning_ram2.sn==sn].index)
+                df_warning_ram3=df_warning_ram3.drop(df_warning_ram3[df_warning_ram3.sn==sn].index)
+
+                df_warning_ram=pd.concat([df_warning_ram,df_ram_res],ignore_index=True)
+                df_warning_ram1=pd.concat([df_warning_ram1,df_ram_res1],ignore_index=True)
+                df_warning_ram2=pd.concat([df_warning_ram2,df_ram_res2],ignore_index=True)
+                df_warning_ram3=pd.concat([df_warning_ram3,df_ram_res3],ignore_index=True)
+
+                if celltype>50:
+                    df_lfp_ram=df_lfp_ram.drop(df_lfp_ram[df_lfp_ram.sn==sn].index)
+                    df_lfp_ram=pd.concat([df_lfp_ram,df_ram_res4],ignore_index=True)
+                    df_lfp_ram1=df_lfp_ram1.drop(df_lfp_ram1[df_lfp_ram1.sn==sn].index)
+                    df_lfp_ram1=pd.concat([df_lfp_ram1,df_ram_res5],ignore_index=True)
+
+
+            #电池热安全预警..............................................................................................................................................................
+            #读取内短路、析锂和一致性结果数据库数据
+            df_short = pd.read_sql("select time_sp,sn,short_current from intershort_result where sn = '{}' and time_sp between '{}' and '{}'".format(sn,start_time1,end_time), db_engine)
+            # df_liplated = pd.read_sql("select time,sn,liplated,liplated_amount from mechanism_liplated where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_qxcas_engine)
+            df_uniform = pd.read_sql("select time,sn,cellsoc_diff,cellvolt_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_engine)
+            df_voltsigma = pd.read_sql("select time,sn,VolOl_Uni,VolChng_Uni from outlier_voltchangeratio_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time3,end_time), db_engine)
+            df_uniform=df_uniform.dropna(axis=0,how='any')
+            #获取sn的故障RAM
+            df_fault_ram_sn=df_fault_ram[df_fault_ram['sn']==sn]
+
+            #热安全预警
+            if df_fault_ram_sn.empty:
+                BatWarning=CBMSSafetyWarning.SafetyWarning(sn,celltype,df_short,df_uniform,df_voltsigma,df_soh)
+                df_warning_res=BatWarning.diag()
+                #当前热失控故障写入数据库
+                if not df_warning_res.empty:
+                    df_warning_res['add_time'] = datetime.datetime.now()
+                    df_warning_res['factory'] = factory
+                    df_warning_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
+                    logger.info(u"{} fault_result写入成功!!!\n".format(sn), exc_info=True)
+
+            else:
+                fault_time=df_fault_ram_sn.iloc[-1]['time_st']
+                if (now_time-fault_time).total_seconds()>7*24*3600:   #df_warning_end历史故障筛选并更改数据库故障结束时间
+                    df_fault_ram_sn['time_sp']=end_time
+                    try:
+                        cursor.execute('''
+                                update fault_result set update_time='{}',time_sp='{}' where sn='{}' and faultcode={} and time_sp='0000-00-00 00:00:00'
+                                '''.format(datetime.datetime.now(), end_time, sn, 110))
+                        conn.commit()
+                    except:
+                        logger.error(traceback.format_exc)
+                        logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
+
+            logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
+        except:
+            logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
+            logger.error(traceback.format_exc)
+    db_engine.dispose()
+    logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
+
+
+def heart_beat(host, port, db, user, password, log):
+    while True:
+        try:
+            db_engine = create_engine(
+                "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                    user, parse.quote_plus(password), host, port, db
+                ))
+            while True:
+                now = datetime.datetime.now()
+                db_engine.execute("update status set algo_safetywarning='{}'".format(now))
+                log.info("safetywarning心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
+                time.sleep(5)
+        except:
+            log.error(u"safetywarning心跳错误", exc_info=True)
+            log.error(traceback.format_exc)
+            time.sleep(5)
+        finally:
+            db_engine.dispose()
+
+if __name__ == '__main__':
+    warnings.filterwarnings("ignore")
+    env_dist = os.environ
+    # 配置信息信息
+    host = env_dist.get("ALI_HOST", '120.25.223.1')
+    port = env_dist.get("ALI_PORT", '4901')
+    db = env_dist.get("ALI_DB", 'ali')
+    user = env_dist.get("ALI_ROOT", 'root')
+    password = env_dist.get("ALI_PASSWORD", '123456')
+    # host = env_dist.get("ALI_HOST", '192.168.31.141')
+    # port = env_dist.get("ALI_PORT", '3306')
+    # db = env_dist.get("ALI_DB", 'ali')
+    # user = env_dist.get("ALI_ROOT", 'root')
+    # password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
+    runenv = env_dist.get("ALI_RUNENV", 'dev')
+    period_second = env_dist.get("ALI_PERIOD_SECOND", '5,5,5,5').split(',')
+    period_second = env_dist.get("ALI_PERIOD_SECOND", '10')
+
+
+
+    algo_name = 'safetywarning'
+    # 日志配置
+    log_path = './log' + '/' + algo_name
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger()
+    logger.setLevel(logging.DEBUG)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M.log"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.DEBUG)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    #参数初始化
+    #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取...................................
+    df_warning_ram=pd.DataFrame(columns=['sn','time','deltsoc','cellsoc'])
+    df_warning_ram1=pd.DataFrame(columns=['sn','time1','deltsoc1'])
+    df_warning_ram2=pd.DataFrame(columns=['sn','time2','deltAs2'])
+    df_warning_ram3=pd.DataFrame(columns=['sn','time3','standingtime','standingtime1','standingtime2'])
+    df_lfp_ram=pd.DataFrame()
+    df_lfp_ram1=pd.DataFrame()
+
+    # 开启定时任务
+    logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
+    p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
+    p.start()
+    scheduler = BlockingScheduler()
+    # heart_beat(host, port, db, user, password, logger)
+    # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5, max_instances=1, coalesce=True)
+    fun(host, port, db, user, password, runenv, logger, period_second)
+    scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=int(period_second),
+                      max_instances=1, coalesce=True)
+
+
+    try:
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))

BIN
USER/SPF/alibaba/Common/BatParam.py