main_deploy.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. import datetime
  2. import gc
  3. import re
  4. from multiprocessing import Pool
  5. import json
  6. import logging
  7. import logging.handlers
  8. import os
  9. import time
  10. import traceback
  11. import warnings
  12. from sqlalchemy import text, delete, and_, or_, update
  13. import pandas as pd
  14. from ZlwlAlgosCommon.utils.ProUtils import *
  15. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  16. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  17. from ZlwlAlgosCommon.orm.models import *
  18. from FaultWarning.V_1_0_0 import CBMSBatDiag
  19. from FaultWarning.V_1_0_0 import CBMSBatDiag_TempV3
  20. def get_cell_info(db_engine,rc):
  21. #获取SOH数据
  22. data = rc.get("algo_param_from_mysql:df_soh")
  23. if pd.isnull(data):
  24. df_soh = pd.read_sql("select * from `algo_soh`GROUP BY sn DESC", db_engine)
  25. else:
  26. df_soh = pd.DataFrame(json.loads(data))
  27. if len(df_soh) > 0:
  28. df_soh['time_st'] = pd.to_datetime(df_soh['time_st'], unit='ms')
  29. df_soh['time_sp'] = pd.to_datetime(df_soh['time_sp'], unit='ms')
  30. #获取SOR数据
  31. data = rc.get("algo_param_from_mysql:sor_result")
  32. if pd.isnull(data):
  33. df_sor = pd.read_sql("select * from `algo_mid_sorout`GROUP BY sn DESC", db_engine)
  34. else:
  35. df_sor = pd.DataFrame(json.loads(data))
  36. if len(df_sor) > 0:
  37. df_sor['time'] = pd.to_datetime(df_sor['time'], unit='ms')
  38. #获取一致性数据
  39. data = rc.get("Algo:FaultDiag:SafetyWarning:uniform_result:{}")
  40. if pd.isnull(data):
  41. df_uniform=pd.read_sql("select * from `algo_mid_uniform_result`GROUP BY sn DESC", db_engine)
  42. else:
  43. df_uniform = pd.read_json(data)
  44. if len(df_uniform) > 0:
  45. df_uniform['time'] = pd.to_datetime(df_uniform['time'],unit='ms')
  46. return df_sor,df_uniform #df_soh,
  47. def get_fault_info(db_engine):
  48. df_diag_ram = pd.read_sql("select * from algo_all_fault_info_ing", db_engine)
  49. df_diag_ram['end_flag'] = 0
  50. return df_diag_ram
  51. def fault_warning(logger, mysql_algo_conn, mysql_algo_Session, start_time, df_data, df_table, cellvolt_name, celltemp_name,pack_code, cell_type, df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list):
  52. #电芯电压温度故障诊断
  53. time_start1 = time.time()
  54. FaultDiagVolt = CBMSBatDiag.BatDiagVolt(df_data, df_table, cellvolt_name, celltemp_name,cell_type)
  55. df_res_new_volt,df_res_update_volt, df_res_end_volt = FaultDiagVolt.diag(df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)#df_soh,
  56. logger.info(f"{datetime.datetime.now()},电压故障运行耗时{time.time()-time_start1} ")
  57. time_start1 = time.time()
  58. FaultDiagTemp = CBMSBatDiag_TempV3.BatDiag(df_data, df_table, celltemp_name, cellvolt_name)
  59. df_res_new_temp,df_res_update_temp, df_res_end_Temp = FaultDiagTemp.diag(df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)
  60. logger.info(f"{datetime.datetime.now()},温度故障运行耗时{time.time()-time_start1} ")
  61. df_res_new = pd.concat([df_res_new_volt, df_res_new_temp]) #, res1
  62. df_res_update=pd.concat([df_res_update_volt, df_res_update_temp]) #, res1
  63. df_res_end = pd.concat([df_res_end_volt, df_res_end_Temp]) #, res2
  64. df_res_new.reset_index(drop=True, inplace=True)
  65. df_res_update.reset_index(drop=True, inplace=True)
  66. df_res_end.reset_index(drop=True, inplace=True)
  67. if not df_res_end.empty:
  68. df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
  69. df_res_end=df_res_end.fillna(0)
  70. for index in df_res_end.index:
  71. df_t = df_res_end.loc[index:index]
  72. try:
  73. # 删除数据
  74. with mysql_algo_Session() as session:
  75. session.execute(delete(AlgoAllFaultInfoIng).where(and_(
  76. (AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
  77. (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
  78. (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))))
  79. session.flush()
  80. # 插入数据
  81. session.add(AlgoAllFaultInfoDone(
  82. date_info=datetime.datetime.now(),
  83. start_time=df_t['start_time'].values[0],
  84. end_time=df_t['end_time'].values[0],
  85. sn=df_t['sn'].values[0],
  86. imei=df_t['imei'].values[0]),
  87. model=pack_code,
  88. fault_level=df_t['fault_level'].values[0],
  89. fault_code=df_t['fault_code'].values[0],
  90. fault_reason=df_t['fault_reason'].values[0],
  91. fault_advice=df_t['fault_advice'].values[0],
  92. fault_location=df_t['fault_location'].values[0],
  93. device_status=df_t['device_status'].values[0],
  94. odo=df_t['odo'].values[0],
  95. create_time=datetime.datetime.now(),
  96. create_by='algo',
  97. update_time=datetime.datetime.now(),
  98. update_by=None,
  99. is_delete=0,
  100. comment=df_t['comment'].values[0])
  101. session.flush()
  102. session.commit()
  103. except Exception as e:
  104. logger.error('{}运行出错'.format(pack_code))
  105. logging.error(str(e))
  106. logging.error(traceback.format_exc())
  107. finally:
  108. session.close()
  109. logger.info('更新入库完成')
  110. else:
  111. logger.info('无结束故障')
  112. #新增故障入库
  113. if not df_res_new.empty:
  114. df_res_new=df_res_new.where(pd.notnull(df_res_new),None)
  115. df_res_new=df_res_new.fillna(0)
  116. Scrap_Vin = pd.read_sql("select sn from t_scrap_device where create_time>='{}'".format(start_time), mysql_algo_conn)
  117. df_res_new=df_res_new[~df_res_new['sn'].isin(Scrap_Vin['sn'])]
  118. if not df_res_new.empty:
  119. # 如果该故障未结束且不存在于ing,则写入ing
  120. try:
  121. df_res_new['date_info'] = df_res_new['start_time']
  122. df_res_new['create_time'] = datetime.datetime.now()
  123. df_res_new['create_by'] = 'algo'
  124. df_res_new['update_by'] = None
  125. df_res_new['is_delete'] = 0
  126. #df_res_new['imei'] = imei
  127. df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn,
  128. if_exists="append", index=False)
  129. logger.info('新增故障{} 入库{}完成'.format(df_res_new, 'algo_all_fault_info_ing'))
  130. except Exception as e:
  131. logger.error('{}运行出错'.format(pack_code))
  132. logger.error(str(e))
  133. logger.error(traceback.format_exc())
  134. else:
  135. logger.info('新增故障为报废电池')
  136. else:
  137. logger.info('无新增故障')
  138. #更新故障入库
  139. if not df_res_update.empty:
  140. df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
  141. df_res_update=df_res_update.fillna(0)
  142. for index in df_res_update.index:
  143. df_t = df_res_update.loc[index:index]
  144. try:
  145. # 更新数据
  146. with mysql_algo_Session() as session:
  147. session.execute(update(AlgoAllFaultInfoIng).where(
  148. and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
  149. (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
  150. (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
  151. values(fault_level=df_t['fault_level'].values[0],
  152. comment=df_t['comment'].values[0]))
  153. session.commit()
  154. except Exception as e:
  155. logger.error('{}运行出错'.format(pack_code))
  156. logger.error(str(e))
  157. logger.error(traceback.format_exc())
  158. finally:
  159. session.close()
  160. logger.info('更新入库完成')
  161. else:
  162. logger.info('无更新故障')
  163. def main(process_num):
  164. # 程序不能停止
  165. while(True):
  166. warnings.filterwarnings("ignore")
  167. try:
  168. # 调用算法前的准备工作
  169. kafka_topic_key = 'topic_task_min_10'
  170. kafka_groupid_key = 'group_id_task_min_10'
  171. algo_list = ['FaultWarning', 'other'] # 本调度所包含的算法名列表。
  172. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  173. logger_main.info(f"process-{process_num}: 配置中间件")
  174. # mysql
  175. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  176. mysqlUtils = MysqlUtils()
  177. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  178. mysql_algo_conn = mysql_algo_engine.connect()
  179. # redis
  180. redis_params = sysUtils.get_cf_param('redis')
  181. redisUtils = RedisUtils()
  182. redis_conn = redisUtils.get_redis_conncect(redis_params)
  183. # hbase
  184. hbase_params = sysUtils.get_cf_param('hbase')
  185. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  186. # kafka
  187. kafka_params = sysUtils.get_cf_param('kafka')
  188. kafkaUtils = KafkaUtils()
  189. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  190. logger_main.info(f"process-{process_num}: 获取算法参数及电池参数")
  191. df_sor,df_uniform = get_cell_info(mysql_algo_conn, redis_conn)
  192. df_diag_ram = get_fault_info(mysql_algo_conn)
  193. except Exception as e:
  194. logger_main.error(str(e))
  195. logger_main.error(traceback.format_exc())
  196. # 开始准备调度
  197. try:
  198. logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  199. param_update_timer = time.time()
  200. for message in kafka_consumer:
  201. try:
  202. logger_main.info(f'收到调度 {message.value}')
  203. if mysql_algo_conn.close:
  204. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  205. if time.time()-param_update_timer > 1200: # 超过20分钟,更新参数
  206. df_sor,df_uniform=get_cell_info(mysql_algo_conn, redis_conn)#df_soh,
  207. param_update_timer = time.time()
  208. schedule_params = json.loads(message.value)
  209. if (schedule_params is None) or (schedule_params ==''):
  210. logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  211. continue
  212. # kafka 调度参数解析
  213. df_snlist = pd.DataFrame(schedule_params['snlist'])
  214. df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  215. df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  216. df_algo_list = pd.DataFrame(schedule_params['algo_list'])
  217. start_time = schedule_params['start_time']
  218. end_time = schedule_params['end_time']
  219. pack_code = schedule_params['pack_code']
  220. cell_type = schedule_params['cell_type']
  221. sn_list=df_snlist['sn'].tolist()
  222. # 取数
  223. logger_main.info(f"process-{process_num}: 开始取数")
  224. columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt,
  225. DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  226. DataField.pack_soc, DataField.other_temp_value, DataField.bal_cell,
  227. DataField.pack_soh, DataField.charge_sta]
  228. df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  229. logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据")
  230. except Exception as e:
  231. logging.error('{}运行出错'.format(pack_code))
  232. logging.error(str(e))
  233. logging.error(traceback.format_exc())
  234. try:
  235. # 数据清洗
  236. if len(df_data) == 0:
  237. logger_main.info(f"process-{process_num}: 无数据跳过本次运算")
  238. continue
  239. df_data,df_table,cellvolt_name,celltemp_name=iotp_service.data_clean(df_data,df_algo_pack_param)#进行数据清洗
  240. if len(df_data) == 0:
  241. logger_main.info(f"process-{process_num}: 数据清洗完成, 无有效数据,跳过本次运算")
  242. continue
  243. else:
  244. logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成")
  245. except Exception as e:
  246. logger_main.error(f"process-{process_num}:{pack_code}数据清洗出错")
  247. logger_main.error(f"process-{process_num}:{e}")
  248. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  249. # 算法调用
  250. try:
  251. fault_warning(loggers['FaultWarning'], mysql_algo_conn, mysql_algo_Session, start_time, df_data, df_table, cellvolt_name,
  252. celltemp_name,pack_code, cell_type, df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)
  253. except Exception as e:
  254. loggers['FaultWarning'].error('{}运行出错'.format(pack_code))
  255. loggers['FaultWarning'].error(str(e))
  256. loggers['FaultWarning'].error(traceback.format_exc())
  257. # 第二个算法调用
  258. try:
  259. pass
  260. except Exception as e:
  261. pass
  262. except Exception as e:
  263. logging.error('{}运行出错'.format(pack_code))
  264. logging.error(str(e))
  265. logging.error(traceback.format_exc())
  266. finally:
  267. iotp_service.close()
  268. if __name__ == '__main__':
  269. while(True):
  270. try:
  271. # 配置量
  272. cur_env = 'dev' # 设置运行环境
  273. app_path = "." # 设置app绝对路径
  274. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  275. app_name = "task_min_10" # 应用名
  276. sysUtils = SysUtils(cur_env, app_path)
  277. logger_main = sysUtils.get_logger(app_name, log_base_path)
  278. logger_main.info(f"本次主进程号: {os.getpid()}")
  279. # 读取配置文件 (该部分请不要修改)
  280. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '2')) # 默认为1个进程
  281. pool = Pool(processes = int(processes))
  282. logger_main.info("开始分配子进程")
  283. for i in range(int(processes)):
  284. pool.apply_async(main, (i, ))
  285. pool.close()
  286. logger_main.info("进程分配结束,堵塞主进程")
  287. pool.join()
  288. except Exception as e:
  289. logger_main.error(str(e))
  290. logger_main.error(traceback.format_exc())
  291. finally:
  292. handlers = logger_main.handlers.copy()
  293. for h in handlers:
  294. logger_main.removeHandler(h)
  295. pool.terminate()