import os import pandas as pd from sqlalchemy import create_engine import logging.handlers import traceback import re import CBMSBatInterShort import CBMSBatUniform import VoltStray import CBMSSafetyWarning import datetime import dateutil.relativedelta from urllib import parse from apscheduler.schedulers.blocking import BlockingScheduler import warnings import pymysql from multiprocessing import Process import time #电池热安全预警核心算法函数 def fun(host, port, db, user, password, runenv, logger, period_second): logger.info("pid is {}".format(os.getpid())) global df_warning_ram global df_warning_ram1 global df_warning_ram2 global df_warning_ram3 global df_lfp_ram global df_lfp_ram1 try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db) cursor = conn.cursor() except: logger.error(u"数据库连接错误", exc_info=True) logger.error(traceback.format_exc) return now_time=datetime.datetime.now() start_time=now_time-datetime.timedelta(seconds=6*3600) #6*3600 start_time1=now_time-datetime.timedelta(seconds=7*24*3600) #7*24*3600 start_time2=now_time-datetime.timedelta(seconds=3*24*3600) #3*24*3600 start_time3=now_time-datetime.timedelta(seconds=1*24*3600) #1*24*3600 start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') start_time1=start_time1.strftime('%Y-%m-%d %H:%M:%S') start_time2=start_time2.strftime('%Y-%m-%d %H:%M:%S') start_time3=start_time3.strftime('%Y-%m-%d %H:%M:%S') end_time=now_time.strftime('%Y-%m-%d %H:%M:%S') # now_time = datetime.datetime.now() # pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second) #1min # end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S") # start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S") # 获取配置信息 df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine) # db_engine.dispose() # 获取历史报警数据 df_fault_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 110 and time_sp='0000-00-00 00:00:00'",db_engine); logger.info("time range from {} to {} ".format(start_time, end_time)) for i in range(len(df_confs)): try: factory = df_confs.loc[i, 'factory'] sn = df_confs.loc[i, 'device_name'] if '康普盾' in factory: celltype = 1 cell_volt_count = 120 cell_temp_count = 40 elif '华霆' in factory: celltype = 2 cell_volt_count = 68 cell_temp_count = 40 elif '力神' in factory: celltype = 99 cell_volt_count = 72 cell_temp_count = 32 logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn)) sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time) logger.info(sql) df_bms = pd.read_sql(sql, db_engine) if df_bms.empty: continue df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt', 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp', 'CellVolt', 'id'] # 单体数据拆分 cell_temp = df_bms['CellTemp'] cell_volt = df_bms['CellVolt'] cell_temps = [] [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp] cell_volts = [] [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt] del_indexes = [] for i, cell_volt in enumerate(cell_volts): if len(cell_volt) != cell_volt_count: del_indexes.append(i) for i, cell_temp in enumerate(cell_temps): if len(cell_temp) != cell_temp_count: del_indexes.append(i) del_indexes = list(set(del_indexes)) for i, del_index in enumerate(del_indexes): del_index = del_index - i cell_volts.pop(del_index) cell_temps.pop(del_index) df_bms = df_bms.drop(index=del_indexes, axis=1) cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)] celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)] df_bms[cellvolt_name] = cell_volts df_bms[celltemp_name] = cell_temps df_bms = df_bms.drop(columns='CellVolt', axis=0) df_bms = df_bms.drop(columns='CellTemp', axis=0) df_bms = df_bms.reset_index(drop=True) logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes))) # 算法执行 df_soh = pd.read_sql( "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn), db_engine); df_uniform = pd.read_sql( "select time,sn,cellsoc_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn), db_engine); if not df_bms.empty: #ram处理............................................................................................................... df_warning_ram_sn=df_warning_ram[df_warning_ram['sn']==sn] df_warning_ram_sn1=df_warning_ram1[df_warning_ram1['sn']==sn] df_warning_ram_sn2=df_warning_ram2[df_warning_ram2['sn']==sn] df_warning_ram_sn3=df_warning_ram3[df_warning_ram3['sn']==sn] df_warning_ram_sn.reset_index(inplace=True,drop=True) #重置索引 df_warning_ram_sn1.reset_index(inplace=True,drop=True) #重置索引 df_warning_ram_sn2.reset_index(inplace=True,drop=True) #重置索引 df_warning_ram_sn3.reset_index(inplace=True,drop=True) #重置索引 if celltype>50 and (not df_lfp_ram.empty): df_lfp_ram_sn=df_lfp_ram[df_lfp_ram['sn']==sn] df_lfp_ram_sn.reset_index(inplace=True,drop=True) #重置索引 else: df_lfp_ram_sn=pd.DataFrame() df_lfp_ram=pd.DataFrame(columns=df_bms.columns.tolist()) if celltype>50 and (not df_lfp_ram1.empty): df_lfp_ram_sn1=df_lfp_ram1[df_lfp_ram1['sn']==sn] df_lfp_ram_sn1.reset_index(inplace=True,drop=True) #重置索引 else: df_lfp_ram_sn1=pd.DataFrame() df_lfp_ram1=pd.DataFrame(columns=df_bms.columns.tolist()) #内短路计算.................................................................................................................................................. BatShort=CBMSBatInterShort.BatInterShort(sn,celltype,df_bms,df_soh,df_warning_ram_sn,df_warning_ram_sn1,df_warning_ram_sn2,df_warning_ram_sn3,df_lfp_ram_sn) df_short_res, df_ram_res, df_ram_res1, df_ram_res2, df_ram_res3, df_ram_res4=BatShort.intershort() if not df_short_res.empty: df_short_res['add_time'] = datetime.datetime.now() df_short_res['factory'] = factory df_short_res.to_sql("intershort_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} intershort_result写入成功!!!\n".format(sn), exc_info=True) #!!!!!!!!!!!!!!往下还未进行部署修改 #静置电压排名.................................................................................................................................................. BatUniform=CBMSBatUniform.BatUniform(sn,celltype,df_bms,df_uniform,df_ram_res3,df_lfp_ram_sn1) df_rank_res, df_ram_res3, df_ram_res5=BatUniform.batuniform() if not df_rank_res.empty: df_rank_res['add_time'] = datetime.datetime.now() df_rank_res['factory'] = factory df_rank_res.to_sql("uniform_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} uniform_result写入成功!!!\n".format(sn), exc_info=True) #电压离群..................................................................................................................................................... df_voltsigma=VoltStray.main(sn,df_bms,celltype) if not df_voltsigma.empty: df_voltsigma['add_time'] = datetime.datetime.now() df_voltsigma['factory'] = factory df_voltsigma.to_sql("outlier_voltchangeratio_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} outlier_voltchangeratio_result写入成功!!!\n".format(sn), exc_info=True) #ram处理................................................................................................................ df_warning_ram=df_warning_ram.drop(df_warning_ram[df_warning_ram.sn==sn].index) df_warning_ram1=df_warning_ram1.drop(df_warning_ram1[df_warning_ram1.sn==sn].index) df_warning_ram2=df_warning_ram2.drop(df_warning_ram2[df_warning_ram2.sn==sn].index) df_warning_ram3=df_warning_ram3.drop(df_warning_ram3[df_warning_ram3.sn==sn].index) df_warning_ram=pd.concat([df_warning_ram,df_ram_res],ignore_index=True) df_warning_ram1=pd.concat([df_warning_ram1,df_ram_res1],ignore_index=True) df_warning_ram2=pd.concat([df_warning_ram2,df_ram_res2],ignore_index=True) df_warning_ram3=pd.concat([df_warning_ram3,df_ram_res3],ignore_index=True) if celltype>50: df_lfp_ram=df_lfp_ram.drop(df_lfp_ram[df_lfp_ram.sn==sn].index) df_lfp_ram=pd.concat([df_lfp_ram,df_ram_res4],ignore_index=True) df_lfp_ram1=df_lfp_ram1.drop(df_lfp_ram1[df_lfp_ram1.sn==sn].index) df_lfp_ram1=pd.concat([df_lfp_ram1,df_ram_res5],ignore_index=True) #电池热安全预警.............................................................................................................................................................. #读取内短路、析锂和一致性结果数据库数据 df_short = pd.read_sql("select time_sp,sn,short_current from intershort_result where sn = '{}' and time_sp between '{}' and '{}'".format(sn,start_time1,end_time), db_engine) # df_liplated = pd.read_sql("select time,sn,liplated,liplated_amount from mechanism_liplated where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_qxcas_engine) df_uniform = pd.read_sql("select time,sn,cellsoc_diff,cellvolt_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_engine) df_voltsigma = pd.read_sql("select time,sn,VolOl_Uni,VolChng_Uni from outlier_voltchangeratio_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time3,end_time), db_engine) df_uniform=df_uniform.dropna(axis=0,how='any') #获取sn的故障RAM df_fault_ram_sn=df_fault_ram[df_fault_ram['sn']==sn] #热安全预警 if df_fault_ram_sn.empty: BatWarning=CBMSSafetyWarning.SafetyWarning(sn,celltype,df_short,df_uniform,df_voltsigma,df_soh) df_warning_res=BatWarning.diag() #当前热失控故障写入数据库 if not df_warning_res.empty: df_warning_res['add_time'] = datetime.datetime.now() df_warning_res['factory'] = factory df_warning_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} fault_result写入成功!!!\n".format(sn), exc_info=True) else: fault_time=df_fault_ram_sn.iloc[-1]['time_st'] if (now_time-fault_time).total_seconds()>7*24*3600: #df_warning_end历史故障筛选并更改数据库故障结束时间 df_fault_ram_sn['time_sp']=end_time try: cursor.execute(''' update fault_result set update_time='{}',time_sp='{}' where sn='{}' and faultcode={} and time_sp='0000-00-00 00:00:00' '''.format(datetime.datetime.now(), end_time, sn, 110)) conn.commit() except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn)) except: logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True) logger.error(traceback.format_exc) db_engine.dispose() logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time)) def heart_beat(host, port, db, user, password, log): while True: try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) while True: now = datetime.datetime.now() db_engine.execute("update status set algo_safetywarning='{}'".format(now)) log.info("safetywarning心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except: log.error(u"safetywarning心跳错误", exc_info=True) log.error(traceback.format_exc) time.sleep(5) finally: db_engine.dispose() if __name__ == '__main__': warnings.filterwarnings("ignore") env_dist = os.environ # 配置信息信息 host = env_dist.get("ALI_HOST", '120.25.223.1') port = env_dist.get("ALI_PORT", '4901') db = env_dist.get("ALI_DB", 'ali') user = env_dist.get("ALI_ROOT", 'root') password = env_dist.get("ALI_PASSWORD", '123456') # host = env_dist.get("ALI_HOST", '192.168.31.141') # port = env_dist.get("ALI_PORT", '3306') # db = env_dist.get("ALI_DB", 'ali') # user = env_dist.get("ALI_ROOT", 'root') # password = env_dist.get("ALI_PASSWORD", 'Ali@123456') runenv = env_dist.get("ALI_RUNENV", 'dev') period_second = env_dist.get("ALI_PERIOD_SECOND", '5,5,5,5').split(',') period_second = env_dist.get("ALI_PERIOD_SECOND", '10') algo_name = 'safetywarning' # 日志配置 log_path = './log' + '/' + algo_name if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger() logger.setLevel(logging.DEBUG) fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$") fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logger.addHandler(fh) fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) #参数初始化 #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................................... df_warning_ram=pd.DataFrame(columns=['sn','time','deltsoc','cellsoc']) df_warning_ram1=pd.DataFrame(columns=['sn','time1','deltsoc1']) df_warning_ram2=pd.DataFrame(columns=['sn','time2','deltAs2']) df_warning_ram3=pd.DataFrame(columns=['sn','time3','standingtime','standingtime1','standingtime2']) df_lfp_ram=pd.DataFrame() df_lfp_ram1=pd.DataFrame() # 开启定时任务 logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!") p = Process(target=heart_beat, args=(host, port, db, user, password, logger,)) p.start() scheduler = BlockingScheduler() # heart_beat(host, port, db, user, password, logger) # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5, max_instances=1, coalesce=True) fun(host, port, db, user, password, runenv, logger, period_second) scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=int(period_second), max_instances=1, coalesce=True) try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))