deploy_safetyalarm.py 10 KB


  1. import os
  2. import time
  3. import pandas as pd
  4. from sqlalchemy import create_engine
  5. from sqlalchemy.orm import sessionmaker
  6. import logging
  7. import logging.handlers
  8. import traceback
  9. import re
  10. import CBMSSafetyAlarm
  11. import datetime
  12. import dateutil.relativedelta
  13. from urllib import parse
  14. from apscheduler.schedulers.blocking import BlockingScheduler
  15. import warnings
  16. import pymysql
  17. from multiprocessing import Process
  18. def fun(host, port, db, user, password, runenv, logger, period_second):
  19. logger.info("pid is {}".format(os.getpid()))
  20. global df_bms_ram, df_alarm_ram
  21. try:
  22. db_engine = create_engine(
  23. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  24. user, parse.quote_plus(password), host, port, db
  25. ))
  26. conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
  27. cursor = conn.cursor()
  28. # 获取配置信息
  29. df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
  30. # db_engine.dispose()
  31. except:
  32. logger.error(u"数据库连接错误", exc_info=True)
  33. logger.error(traceback.format_exc)
  34. return
  35. # 获取历史报警数据
  36. df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 119 and time_sp='0000-00-00 00:00:00'",db_engine);
  37. now_time = datetime.datetime.now()
  38. pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second) #1min
  39. end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
  40. start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
  41. logger.info("time range from {} to {} ".format(start_time, end_time))
  42. for i in range(len(df_confs)):
  43. try:
  44. factory = df_confs.loc[i, 'factory']
  45. sn = df_confs.loc[i, 'device_name']
  46. if '康普盾' in factory:
  47. celltype = 1
  48. cell_volt_count = 120
  49. cell_temp_count = 40
  50. elif '华霆' in factory:
  51. celltype = 2
  52. cell_volt_count = 68
  53. cell_temp_count = 40
  54. elif '力神' in factory:
  55. celltype = 99
  56. cell_volt_count = 72
  57. cell_temp_count = 32
  58. logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
  59. sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
  60. logger.info(sql)
  61. df_bms = pd.read_sql(sql, db_engine)
  62. if df_bms.empty:
  63. continue
  64. df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
  65. 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
  66. 'CellVolt', 'id']
  67. # 单体数据拆分
  68. cell_temp = df_bms['CellTemp']
  69. cell_volt = df_bms['CellVolt']
  70. cell_temps = []
  71. [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
  72. cell_volts = []
  73. [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
  74. del_indexes = []
  75. for i, cell_volt in enumerate(cell_volts):
  76. if len(cell_volt) != cell_volt_count:
  77. del_indexes.append(i)
  78. for i, cell_temp in enumerate(cell_temps):
  79. if len(cell_temp) != cell_temp_count:
  80. del_indexes.append(i)
  81. del_indexes = list(set(del_indexes))
  82. for i, del_index in enumerate(del_indexes):
  83. del_index = del_index - i
  84. cell_volts.pop(del_index)
  85. cell_temps.pop(del_index)
  86. df_bms = df_bms.drop(index=del_indexes, axis=1)
  87. cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
  88. celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
  89. df_bms[cellvolt_name] = cell_volts
  90. df_bms[celltemp_name] = cell_temps
  91. df_bms = df_bms.drop(columns='CellVolt', axis=0)
  92. df_bms = df_bms.drop(columns='CellTemp', axis=0)
  93. df_bms = df_bms.reset_index(drop=True)
  94. logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
  95. # 算法执行
  96. #电池诊断................................................................................................................................................................
  97. df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn]
  98. df_bms_ram_sn=df_bms_ram[df_bms_ram['sn']==sn]
  99. df_alarm_ram_sn=df_alarm_ram[df_alarm_ram['sn']==sn]
  100. if df_diag_ram_sn.empty:
  101. SafetyAlarm=CBMSSafetyAlarm.SafetyAlarm(sn, celltype, df_bms, df_diag_ram_sn, df_bms_ram_sn, df_alarm_ram_sn)
  102. df_diag_res, df_bms_res, df_ram_res=SafetyAlarm.safety_alarm_diag()
  103. #更新bms的ram数据
  104. sn_index=df_bms_ram.loc[df_bms_ram['sn']==sn].index
  105. df_bms_ram=df_bms_ram.drop(index=sn_index)
  106. df_bms_ram=df_bms_ram.append(df_bms_res)
  107. sn_index=df_alarm_ram.loc[df_alarm_ram['sn']==sn].index
  108. df_alarm_ram=df_alarm_ram.drop(index=sn_index)
  109. df_alarm_ram=df_alarm_ram.append(df_ram_res)
  110. #当前热失控故障写入数据库
  111. if not df_diag_res.empty:
  112. df_diag_res['add_time'] = datetime.datetime.now()
  113. df_diag_res['factory'] = factory
  114. df_diag_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
  115. logger.info(u"{} 写入成功!!!\n".format(sn), exc_info=True)
  116. #当前热失控已超过一天变为历史故障并更改数据库
  117. else:
  118. fault_time=df_diag_ram_sn.iloc[-1]['time_st']
  119. if (now_time-fault_time).total_seconds()>24 * 3600:
  120. df_diag_ram_sn['time_sp']=end_time
  121. try:
  122. cursor.execute('''
  123. update fault_result set update_time='{}',time_sp='{}' where sn='{}' and time_sp='0000-00-00 00:00:00' and faultcode={}
  124. '''.format(datetime.datetime.now(), end_time,sn, 119))
  125. conn.commit()
  126. logger.info(u"{} 更新成功\n".format(sn), exc_info=True)
  127. except:
  128. logger.error(traceback.format_exc)
  129. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  130. logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
  131. except:
  132. logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
  133. logger.error(traceback.format_exc)
  134. db_engine.dispose()
  135. logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
  136. def heart_beat(host, port, db, user, password, log):
  137. while True:
  138. try:
  139. db_engine = create_engine(
  140. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  141. user, parse.quote_plus(password), host, port, db
  142. ))
  143. while True:
  144. now = datetime.datetime.now()
  145. db_engine.execute("update status set algo_safetyalarm='{}'".format(now))
  146. log.info("safetyalarm心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
  147. time.sleep(5)
  148. except:
  149. log.error(u"safetyalarm心跳错误", exc_info=True)
  150. log.error(traceback.format_exc)
  151. time.sleep(5)
  152. finally:
  153. db_engine.dispose()
  154. if __name__ == '__main__':
  155. warnings.filterwarnings("ignore")
  156. env_dist = os.environ
  157. # 配置信息信息
  158. host = env_dist.get("ALI_HOST", '120.25.223.1')
  159. port = env_dist.get("ALI_PORT", '4901')
  160. db = env_dist.get("ALI_DB", 'ali')
  161. user = env_dist.get("ALI_ROOT", 'root')
  162. password = env_dist.get("ALI_PASSWORD", '123456')
  163. runenv = env_dist.get("ALI_RUNENV", 'dev')
  164. period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60'))
  165. algo_name = 'safetyalarm'
  166. # 日志配置
  167. log_path = './log' + '/' + algo_name
  168. if not os.path.exists(log_path):
  169. os.makedirs(log_path)
  170. logger = logging.getLogger()
  171. logger.setLevel(logging.DEBUG)
  172. fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
  173. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  174. fh.suffix = "%Y-%m-%d_%H-%M.log"
  175. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
  176. fh.setFormatter(formatter)
  177. fh.setLevel(logging.DEBUG)
  178. logger.addHandler(fh)
  179. fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a")
  180. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  181. fh.setFormatter(formatter)
  182. fh.setLevel(logging.ERROR)
  183. logger.addHandler(fh)
  184. #参数初始化
  185. df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp', 'packsoc'])
  186. df_alarm_ram=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
  187. logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
  188. # 开启定时任务
  189. p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
  190. p.start()
  191. scheduler = BlockingScheduler()
  192. # heart_beat(host, port, db, user, password, logger)
  193. # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
  194. fun(host, port, db, user, password, runenv, logger, period_second)
  195. scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
  196. ,max_instances=1, coalesce=True)
  197. try:
  198. scheduler.start()
  199. except Exception as e:
  200. scheduler.shutdown()
  201. logger.error(str(e))