deploy_safetywarning.py 18 KB


  1. import os
  2. import pandas as pd
  3. from sqlalchemy import create_engine
  4. import logging.handlers
  5. import traceback
  6. import re
  7. import CBMSBatInterShort
  8. import CBMSBatUniform
  9. import VoltStray
  10. import CBMSSafetyWarning
  11. import datetime
  12. import dateutil.relativedelta
  13. from urllib import parse
  14. from apscheduler.schedulers.blocking import BlockingScheduler
  15. import warnings
  16. import pymysql
  17. from multiprocessing import Process
  18. import time
  19. #电池热安全预警核心算法函数
  20. def fun(host, port, db, user, password, runenv, logger, period_second):
  21. logger.info("pid is {}".format(os.getpid()))
  22. global df_warning_ram
  23. global df_warning_ram1
  24. global df_warning_ram2
  25. global df_warning_ram3
  26. global df_lfp_ram
  27. global df_lfp_ram1
  28. try:
  29. db_engine = create_engine(
  30. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  31. user, parse.quote_plus(password), host, port, db
  32. ))
  33. conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
  34. cursor = conn.cursor()
  35. except:
  36. logger.error(u"数据库连接错误", exc_info=True)
  37. logger.error(traceback.format_exc)
  38. return
  39. now_time=datetime.datetime.now()
  40. start_time=now_time-datetime.timedelta(seconds=6*3600) #6*3600
  41. start_time1=now_time-datetime.timedelta(seconds=7*24*3600) #7*24*3600
  42. start_time2=now_time-datetime.timedelta(seconds=3*24*3600) #3*24*3600
  43. start_time3=now_time-datetime.timedelta(seconds=1*24*3600) #1*24*3600
  44. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  45. start_time1=start_time1.strftime('%Y-%m-%d %H:%M:%S')
  46. start_time2=start_time2.strftime('%Y-%m-%d %H:%M:%S')
  47. start_time3=start_time3.strftime('%Y-%m-%d %H:%M:%S')
  48. end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
  49. # now_time = datetime.datetime.now()
  50. # pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second) #1min
  51. # end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
  52. # start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
  53. # 获取配置信息
  54. df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
  55. # db_engine.dispose()
  56. # 获取历史报警数据
  57. df_fault_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 110 and time_sp='0000-00-00 00:00:00'",db_engine);
  58. logger.info("time range from {} to {} ".format(start_time, end_time))
  59. for i in range(len(df_confs)):
  60. try:
  61. factory = df_confs.loc[i, 'factory']
  62. sn = df_confs.loc[i, 'device_name']
  63. if '康普盾' in factory:
  64. celltype = 1
  65. cell_volt_count = 120
  66. cell_temp_count = 40
  67. elif '华霆' in factory:
  68. celltype = 2
  69. cell_volt_count = 68
  70. cell_temp_count = 40
  71. elif '力神' in factory:
  72. celltype = 99
  73. cell_volt_count = 72
  74. cell_temp_count = 32
  75. logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
  76. sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
  77. logger.info(sql)
  78. df_bms = pd.read_sql(sql, db_engine)
  79. if df_bms.empty:
  80. continue
  81. df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
  82. 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
  83. 'CellVolt', 'id']
  84. # 单体数据拆分
  85. cell_temp = df_bms['CellTemp']
  86. cell_volt = df_bms['CellVolt']
  87. cell_temps = []
  88. [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
  89. cell_volts = []
  90. [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
  91. del_indexes = []
  92. for i, cell_volt in enumerate(cell_volts):
  93. if len(cell_volt) != cell_volt_count:
  94. del_indexes.append(i)
  95. for i, cell_temp in enumerate(cell_temps):
  96. if len(cell_temp) != cell_temp_count:
  97. del_indexes.append(i)
  98. del_indexes = list(set(del_indexes))
  99. for i, del_index in enumerate(del_indexes):
  100. del_index = del_index - i
  101. cell_volts.pop(del_index)
  102. cell_temps.pop(del_index)
  103. df_bms = df_bms.drop(index=del_indexes, axis=1)
  104. cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
  105. celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
  106. df_bms[cellvolt_name] = cell_volts
  107. df_bms[celltemp_name] = cell_temps
  108. df_bms = df_bms.drop(columns='CellVolt', axis=0)
  109. df_bms = df_bms.drop(columns='CellTemp', axis=0)
  110. df_bms = df_bms.reset_index(drop=True)
  111. logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
  112. # 算法执行
  113. df_soh = pd.read_sql(
  114. "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
  115. db_engine);
  116. df_uniform = pd.read_sql(
  117. "select time,sn,cellsoc_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
  118. db_engine);
  119. if not df_bms.empty:
  120. #ram处理...............................................................................................................
  121. df_warning_ram_sn=df_warning_ram[df_warning_ram['sn']==sn]
  122. df_warning_ram_sn1=df_warning_ram1[df_warning_ram1['sn']==sn]
  123. df_warning_ram_sn2=df_warning_ram2[df_warning_ram2['sn']==sn]
  124. df_warning_ram_sn3=df_warning_ram3[df_warning_ram3['sn']==sn]
  125. df_warning_ram_sn.reset_index(inplace=True,drop=True) #重置索引
  126. df_warning_ram_sn1.reset_index(inplace=True,drop=True) #重置索引
  127. df_warning_ram_sn2.reset_index(inplace=True,drop=True) #重置索引
  128. df_warning_ram_sn3.reset_index(inplace=True,drop=True) #重置索引
  129. if celltype>50 and (not df_lfp_ram.empty):
  130. df_lfp_ram_sn=df_lfp_ram[df_lfp_ram['sn']==sn]
  131. df_lfp_ram_sn.reset_index(inplace=True,drop=True) #重置索引
  132. else:
  133. df_lfp_ram_sn=pd.DataFrame()
  134. df_lfp_ram=pd.DataFrame(columns=df_bms.columns.tolist())
  135. if celltype>50 and (not df_lfp_ram1.empty):
  136. df_lfp_ram_sn1=df_lfp_ram1[df_lfp_ram1['sn']==sn]
  137. df_lfp_ram_sn1.reset_index(inplace=True,drop=True) #重置索引
  138. else:
  139. df_lfp_ram_sn1=pd.DataFrame()
  140. df_lfp_ram1=pd.DataFrame(columns=df_bms.columns.tolist())
  141. #内短路计算..................................................................................................................................................
  142. BatShort=CBMSBatInterShort.BatInterShort(sn,celltype,df_bms,df_soh,df_warning_ram_sn,df_warning_ram_sn1,df_warning_ram_sn2,df_warning_ram_sn3,df_lfp_ram_sn)
  143. df_short_res, df_ram_res, df_ram_res1, df_ram_res2, df_ram_res3, df_ram_res4=BatShort.intershort()
  144. if not df_short_res.empty:
  145. df_short_res['add_time'] = datetime.datetime.now()
  146. df_short_res['factory'] = factory
  147. df_short_res.to_sql("intershort_result",con=db_engine, if_exists="append",index=False)
  148. logger.info(u"{} intershort_result写入成功!!!\n".format(sn), exc_info=True)
  149. #!!!!!!!!!!!!!!往下还未进行部署修改
  150. #静置电压排名..................................................................................................................................................
  151. BatUniform=CBMSBatUniform.BatUniform(sn,celltype,df_bms,df_uniform,df_ram_res3,df_lfp_ram_sn1)
  152. df_rank_res, df_ram_res3, df_ram_res5=BatUniform.batuniform()
  153. if not df_rank_res.empty:
  154. df_rank_res['add_time'] = datetime.datetime.now()
  155. df_rank_res['factory'] = factory
  156. df_rank_res.to_sql("uniform_result",con=db_engine, if_exists="append",index=False)
  157. logger.info(u"{} uniform_result写入成功!!!\n".format(sn), exc_info=True)
  158. #电压离群.....................................................................................................................................................
  159. df_voltsigma=VoltStray.main(sn,df_bms,celltype)
  160. if not df_voltsigma.empty:
  161. df_voltsigma['add_time'] = datetime.datetime.now()
  162. df_voltsigma['factory'] = factory
  163. df_voltsigma.to_sql("outlier_voltchangeratio_result",con=db_engine, if_exists="append",index=False)
  164. logger.info(u"{} outlier_voltchangeratio_result写入成功!!!\n".format(sn), exc_info=True)
  165. #ram处理................................................................................................................
  166. df_warning_ram=df_warning_ram.drop(df_warning_ram[df_warning_ram.sn==sn].index)
  167. df_warning_ram1=df_warning_ram1.drop(df_warning_ram1[df_warning_ram1.sn==sn].index)
  168. df_warning_ram2=df_warning_ram2.drop(df_warning_ram2[df_warning_ram2.sn==sn].index)
  169. df_warning_ram3=df_warning_ram3.drop(df_warning_ram3[df_warning_ram3.sn==sn].index)
  170. df_warning_ram=pd.concat([df_warning_ram,df_ram_res],ignore_index=True)
  171. df_warning_ram1=pd.concat([df_warning_ram1,df_ram_res1],ignore_index=True)
  172. df_warning_ram2=pd.concat([df_warning_ram2,df_ram_res2],ignore_index=True)
  173. df_warning_ram3=pd.concat([df_warning_ram3,df_ram_res3],ignore_index=True)
  174. if celltype>50:
  175. df_lfp_ram=df_lfp_ram.drop(df_lfp_ram[df_lfp_ram.sn==sn].index)
  176. df_lfp_ram=pd.concat([df_lfp_ram,df_ram_res4],ignore_index=True)
  177. df_lfp_ram1=df_lfp_ram1.drop(df_lfp_ram1[df_lfp_ram1.sn==sn].index)
  178. df_lfp_ram1=pd.concat([df_lfp_ram1,df_ram_res5],ignore_index=True)
  179. #电池热安全预警..............................................................................................................................................................
  180. #读取内短路、析锂和一致性结果数据库数据
  181. df_short = pd.read_sql("select time_sp,sn,short_current from intershort_result where sn = '{}' and time_sp between '{}' and '{}'".format(sn,start_time1,end_time), db_engine)
  182. # df_liplated = pd.read_sql("select time,sn,liplated,liplated_amount from mechanism_liplated where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_qxcas_engine)
  183. df_uniform = pd.read_sql("select time,sn,cellsoc_diff,cellvolt_diff,cellmin_num,cellmax_num,cellvolt_rank from uniform_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time2,end_time), db_engine)
  184. df_voltsigma = pd.read_sql("select time,sn,VolOl_Uni,VolChng_Uni from outlier_voltchangeratio_result where sn = '{}' and time between '{}' and '{}'".format(sn,start_time3,end_time), db_engine)
  185. df_uniform=df_uniform.dropna(axis=0,how='any')
  186. #获取sn的故障RAM
  187. df_fault_ram_sn=df_fault_ram[df_fault_ram['sn']==sn]
  188. #热安全预警
  189. if df_fault_ram_sn.empty:
  190. BatWarning=CBMSSafetyWarning.SafetyWarning(sn,celltype,df_short,df_uniform,df_voltsigma,df_soh)
  191. df_warning_res=BatWarning.diag()
  192. #当前热失控故障写入数据库
  193. if not df_warning_res.empty:
  194. df_warning_res['add_time'] = datetime.datetime.now()
  195. df_warning_res['factory'] = factory
  196. df_warning_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
  197. logger.info(u"{} fault_result写入成功!!!\n".format(sn), exc_info=True)
  198. else:
  199. fault_time=df_fault_ram_sn.iloc[-1]['time_st']
  200. if (now_time-fault_time).total_seconds()>7*24*3600: #df_warning_end历史故障筛选并更改数据库故障结束时间
  201. df_fault_ram_sn['time_sp']=end_time
  202. try:
  203. cursor.execute('''
  204. update fault_result set update_time='{}',time_sp='{}' where sn='{}' and faultcode={} and time_sp='0000-00-00 00:00:00'
  205. '''.format(datetime.datetime.now(), end_time, sn, 110))
  206. conn.commit()
  207. except:
  208. logger.error(traceback.format_exc)
  209. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  210. logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
  211. except:
  212. logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
  213. logger.error(traceback.format_exc)
  214. db_engine.dispose()
  215. logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
  216. def heart_beat(host, port, db, user, password, log):
  217. while True:
  218. try:
  219. db_engine = create_engine(
  220. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  221. user, parse.quote_plus(password), host, port, db
  222. ))
  223. while True:
  224. now = datetime.datetime.now()
  225. db_engine.execute("update status set algo_safetywarning='{}'".format(now))
  226. log.info("safetywarning心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
  227. time.sleep(5)
  228. except:
  229. log.error(u"safetywarning心跳错误", exc_info=True)
  230. log.error(traceback.format_exc)
  231. time.sleep(5)
  232. finally:
  233. db_engine.dispose()
  234. if __name__ == '__main__':
  235. warnings.filterwarnings("ignore")
  236. env_dist = os.environ
  237. # 配置信息信息
  238. host = env_dist.get("ALI_HOST", '120.25.223.1')
  239. port = env_dist.get("ALI_PORT", '4901')
  240. db = env_dist.get("ALI_DB", 'ali')
  241. user = env_dist.get("ALI_ROOT", 'root')
  242. password = env_dist.get("ALI_PASSWORD", '123456')
  243. # host = env_dist.get("ALI_HOST", '192.168.31.141')
  244. # port = env_dist.get("ALI_PORT", '3306')
  245. # db = env_dist.get("ALI_DB", 'ali')
  246. # user = env_dist.get("ALI_ROOT", 'root')
  247. # password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
  248. runenv = env_dist.get("ALI_RUNENV", 'dev')
  249. period_second = env_dist.get("ALI_PERIOD_SECOND", '5,5,5,5').split(',')
  250. period_second = env_dist.get("ALI_PERIOD_SECOND", '10')
  251. algo_name = 'safetywarning'
  252. # 日志配置
  253. log_path = './log' + '/' + algo_name
  254. if not os.path.exists(log_path):
  255. os.makedirs(log_path)
  256. logger = logging.getLogger()
  257. logger.setLevel(logging.DEBUG)
  258. fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
  259. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  260. fh.suffix = "%Y-%m-%d_%H-%M.log"
  261. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
  262. fh.setFormatter(formatter)
  263. fh.setLevel(logging.DEBUG)
  264. logger.addHandler(fh)
  265. fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
  266. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  267. fh.setFormatter(formatter)
  268. fh.setLevel(logging.ERROR)
  269. logger.addHandler(fh)
  270. #参数初始化
  271. #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取...................................
  272. df_warning_ram=pd.DataFrame(columns=['sn','time','deltsoc','cellsoc'])
  273. df_warning_ram1=pd.DataFrame(columns=['sn','time1','deltsoc1'])
  274. df_warning_ram2=pd.DataFrame(columns=['sn','time2','deltAs2'])
  275. df_warning_ram3=pd.DataFrame(columns=['sn','time3','standingtime','standingtime1','standingtime2'])
  276. df_lfp_ram=pd.DataFrame()
  277. df_lfp_ram1=pd.DataFrame()
  278. # 开启定时任务
  279. logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
  280. p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
  281. p.start()
  282. scheduler = BlockingScheduler()
  283. # heart_beat(host, port, db, user, password, logger)
  284. # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5, max_instances=1, coalesce=True)
  285. fun(host, port, db, user, password, runenv, logger, period_second)
  286. scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=int(period_second),
  287. max_instances=1, coalesce=True)
  288. try:
  289. scheduler.start()
  290. except Exception as e:
  291. scheduler.shutdown()
  292. logger.error(str(e))