deploy_diag.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import os
  2. import time
  3. import pandas as pd
  4. from sqlalchemy import create_engine
  5. import logging
  6. import logging.handlers
  7. import traceback
  8. import re
  9. import CBMSBatDiag
  10. import datetime
  11. import dateutil.relativedelta
  12. from urllib import parse
  13. from apscheduler.schedulers.blocking import BlockingScheduler
  14. import warnings
  15. import pymysql
  16. from urllib import parse
  17. from multiprocessing import Process
  18. def diag_cal(host, port, db, user, password, runenv, logger, period_second):
  19. logger.info("pid is {}".format(os.getpid()))
  20. try:
  21. db_engine = create_engine(
  22. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  23. user, parse.quote_plus(password), host, port, db
  24. ))
  25. conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db)
  26. cursor = conn.cursor()
  27. except:
  28. logger.error(u"数据库连接错误", exc_info=True)
  29. logger.error(traceback.format_exc)
  30. return
  31. # 获取历史报警数据
  32. df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where time_sp='0000-00-00 00:00:00'",db_engine);
  33. # 获取配置信息
  34. df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
  35. # db_engine.dispose()
  36. now_time = datetime.datetime.now()
  37. pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-5) #
  38. end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
  39. start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
  40. logger.info("time range from {} to {} ".format(start_time, end_time))
  41. for i in range(len(df_confs)):
  42. try:
  43. factory = df_confs.loc[i, 'factory']
  44. sn = df_confs.loc[i, 'device_name']
  45. if '康普盾' in factory:
  46. celltype = 1
  47. cell_volt_count = 120
  48. cell_temp_count = 40
  49. elif '华霆' in factory:
  50. celltype = 2
  51. cell_volt_count = 68
  52. cell_temp_count = 40
  53. elif '力神' in factory:
  54. celltype = 99
  55. cell_volt_count = 72
  56. cell_temp_count = 32
  57. logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
  58. sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
  59. logger.info(sql)
  60. df_bms = pd.read_sql(sql, db_engine)
  61. if df_bms.empty:
  62. continue
  63. df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
  64. 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
  65. 'CellVolt', 'id']
  66. # 单体数据拆分
  67. cell_temp = df_bms['CellTemp']
  68. cell_volt = df_bms['CellVolt']
  69. cell_temps = []
  70. [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
  71. cell_volts = []
  72. [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
  73. del_indexes = []
  74. for i, cell_volt in enumerate(cell_volts):
  75. if len(cell_volt) != cell_volt_count:
  76. del_indexes.append(i)
  77. for i, cell_temp in enumerate(cell_temps):
  78. if len(cell_temp) != cell_temp_count:
  79. del_indexes.append(i)
  80. for i, del_index in enumerate(del_indexes):
  81. del_index = del_index - i
  82. cell_volts.pop(del_index)
  83. cell_temps.pop(del_index)
  84. df_bms = df_bms.drop(index=del_indexes, axis=1)
  85. cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
  86. celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
  87. df_bms[cellvolt_name] = cell_volts
  88. df_bms[celltemp_name] = cell_temps
  89. df_bms = df_bms.drop(columns='CellVolt', axis=0)
  90. df_bms = df_bms.drop(columns='CellTemp', axis=0)
  91. df_bms = df_bms.reset_index(drop=True)
  92. logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
  93. # 算法执行
  94. df_soh = pd.read_sql(
  95. "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
  96. db_engine);
  97. df_uniform = pd.read_sql(
  98. "select time,sn,cellsoc_diff,cellmin_num,cellmax_num from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
  99. db_engine);
  100. df_soc = pd.read_sql(
  101. "select time,sn,packsoc from soc_result where factory ='{}' and sn = '{}' order by add_time desc limit 1".format(factory, sn),
  102. db_engine);
  103. #电池诊断................................................................................................................................................................
  104. if not df_bms.empty:
  105. df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn]
  106. df_diag_ram_sn.reset_index(inplace=True,drop=True)
  107. batdiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms,df_soh,df_soc,df_uniform,df_diag_ram_sn)
  108. df_diag_res, df_health_res=batdiag.diag() #获取电池故障结果和电池评分结果
  109. #电池评分写入数据库
  110. if not df_health_res.empty: #变为历史故障更改数据库
  111. df_health_res['add_time'] = datetime.datetime.now()
  112. df_health_res['factory'] = factory
  113. df_health_res.to_sql("health_result",con=db_engine, if_exists="append",index=False)
  114. logger.info(u"{} health_result 写入成功!!!\n".format(sn), exc_info=True)
  115. #历史故障筛选并更改数据库故障结束时间.........................................................
  116. if not df_diag_res.empty:
  117. df_diag_now=df_diag_res[df_diag_res['time_sp'] == '0000-00-00 00:00:00'] #去除历史故障
  118. df_diag_new = pd.concat([df_diag_res,df_diag_ram_sn,df_diag_ram_sn]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障
  119. df_diag_end=pd.concat([df_diag_res,df_diag_new,df_diag_new]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障
  120. df_diag_end=df_diag_end[df_diag_end['time_sp'] != '0000-00-00 00:00:00']
  121. df_diag_end.reset_index(inplace=True,drop=True) #重置索引
  122. if not df_diag_end.empty: #变为历史故障更改数据库
  123. try:
  124. for i in range(0,len(df_diag_end)):
  125. sql = '''update fault_result set update_time='{}', time_sp='{}' where sn='{}' and faultcode={} and time_st='{}'
  126. '''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), df_diag_end.loc[i,'time_sp'], sn, df_diag_end.loc[i,'faultcode'],
  127. df_diag_end.loc[i,'time_st'])
  128. cursor.execute(sql)
  129. conn.commit()
  130. logger.info(u"{} fault_result更新成功\n".format(sn), exc_info=True)
  131. except:
  132. logger.error(traceback.format_exc)
  133. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  134. #新增故障筛选并存入数据库.....................................................................
  135. if not df_diag_new.empty:
  136. df_diag_new['add_time'] = datetime.datetime.now()
  137. df_diag_new['factory'] = factory
  138. df_diag_new.to_sql("fault_result",con=db_engine, if_exists="append",index=False)
  139. logger.info(u"{} fault_result 写入成功!!!\n".format(sn), exc_info=True)
  140. #更新diag的Ram数据
  141. df_diag_ram=df_diag_ram.drop(df_diag_ram[df_diag_ram.sn==sn].index)
  142. df_diag_ram=df_diag_ram.append(df_diag_now, ignore_index=True)
  143. df_diag_ram.reset_index(inplace=True,drop=True) #重置索引
  144. logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
  145. except:
  146. logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
  147. logger.error(traceback.format_exc)
  148. db_engine.dispose()
  149. cursor.close()
  150. conn.close()
  151. logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
  152. def heart_beat(host, port, db, user, password, log):
  153. while True:
  154. try:
  155. db_engine = create_engine(
  156. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  157. user, parse.quote_plus(password), host, port, db
  158. ))
  159. while True:
  160. now = datetime.datetime.now()
  161. db_engine.execute("update status set algo_diag='{}'".format(now))
  162. log.info("diag心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
  163. time.sleep(5)
  164. except:
  165. log.error(u"diag心跳错误", exc_info=True)
  166. log.error(traceback.format_exc)
  167. time.sleep(5)
  168. finally:
  169. db_engine.dispose()
  170. # 开启定时任务
  171. if __name__ == '__main__':
  172. warnings.filterwarnings("ignore")
  173. env_dist = os.environ
  174. # 配置信息信息
  175. # host = env_dist.get("ALI_HOST", '120.25.223.1')
  176. # port = env_dist.get("ALI_PORT", '4901')
  177. # db = env_dist.get("ALI_DB", 'ali')
  178. # user = env_dist.get("ALI_ROOT", 'root')
  179. # password =
  180. host = env_dist.get("ALI_HOST", '192.168.31.141')
  181. port = env_dist.get("ALI_PORT", '3306')
  182. db = env_dist.get("ALI_DB", 'ali')
  183. user = env_dist.get("ALI_ROOT", 'root')
  184. password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
  185. runenv = env_dist.get("ALI_RUNENV", 'pro')
  186. period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60'))
  187. algo_name = 'diag'
  188. # 日志配置
  189. log_path = './log' + '/' + algo_name
  190. if not os.path.exists(log_path):
  191. os.makedirs(log_path)
  192. logger = logging.getLogger()
  193. logger.setLevel(logging.DEBUG)
  194. fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  195. backupCount=5, encoding="utf-8", mode="a")
  196. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  197. fh.suffix = "%Y-%m-%d_%H-%M.log"
  198. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
  199. fh.setFormatter(formatter)
  200. fh.setLevel(logging.DEBUG)
  201. logger.addHandler(fh)
  202. fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  203. backupCount=5, encoding="utf-8", mode="a")
  204. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  205. fh.setFormatter(formatter)
  206. fh.setLevel(logging.ERROR)
  207. logger.addHandler(fh)
  208. logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
  209. # 调度
  210. p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
  211. p.start()
  212. scheduler = BlockingScheduler()
  213. # heart_beat(host, port, db, user, password, logger)
  214. # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
  215. diag_cal(host, port, db, user, password, runenv, logger, period_second)
  216. scheduler.add_job(func=diag_cal, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
  217. ,max_instances=1, coalesce=True)
  218. try:
  219. scheduler.start()
  220. except Exception as e:
  221. scheduler.shutdown()
  222. logger.error(str(e))