main.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. from datetime import datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from sqlalchemy import text, delete, and_, or_, update
  9. import pandas as pd
  10. from ZlwlAlgosCommon.utils.ProUtils import *
  11. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  12. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  13. from ZlwlAlgosCommon.orm.models import *
  14. from safetyalarm.V_1_0_0.CBMSSafetyAlarm import SafetyAlarm
  15. from ChargeRemainder.V_1_0_0.AlgoChargeRemainder import ChargeRemainder
  16. def main(process_num):
  17. # 程序不能停止
  18. while(True):
  19. try:
  20. warnings.filterwarnings("ignore")
  21. try:
  22. # 调用算法前的准备工作
  23. cleanUtils = CleanUtils()
  24. mysql_algo_conn = None
  25. mysql_algo_engine = None
  26. mysql_iotp_conn = None
  27. mysql_iotp_engine= None
  28. kafka_consumer = None
  29. rc= None
  30. kafka_topic_key = 'topic_test_sxq'
  31. kafka_groupid_key = 'group_id_test_sxq'
  32. algo_list = ['safety_alarm', 'charge_remainder'] # 本调度所包含的算法名列表。
  33. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  34. logger_main.info(f"process-{process_num}: 配置中间件")
  35. # mysql
  36. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  37. mysqlUtils = MysqlUtils()
  38. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  39. mysql_algo_conn = mysql_algo_engine.connect()
  40. # kafka
  41. kafka_params = sysUtils.get_cf_param('kafka')
  42. kafkaUtils = KafkaUtils()
  43. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  44. #Hbase
  45. hbase_params = sysUtils.get_cf_param('hbase')
  46. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  47. #redis
  48. redis_params = sysUtils.get_cf_param('redis')
  49. reidsUtils = RedisUtils()
  50. rc = reidsUtils.get_redis_conncect(redis_params)
  51. except Exception as e:
  52. logger_main.error(str(e))
  53. logger_main.error(traceback.format_exc())
  54. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  55. # 开始准备调度
  56. logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  57. for message in kafka_consumer:
  58. try:
  59. logger_main.info(f'收到调度')
  60. if not mysql_algo_conn.closed:
  61. mysql_algo_conn.close()
  62. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  63. schedule_params = json.loads(message.value)
  64. if (schedule_params is None) or (schedule_params ==''):
  65. logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  66. continue
  67. # kafka 调度参数解析
  68. df_snlist = pd.DataFrame(schedule_params['snlist'])
  69. sn_list=df_snlist['sn'].tolist()
  70. pack_code = schedule_params['pack_code']
  71. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  72. df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  73. df_algo_list = pd.DataFrame(schedule_params['algo_list'])
  74. start_time = schedule_params['start_time']
  75. end_time = schedule_params['end_time']
  76. # cell_type = schedule_params['cell_type']
  77. # 取数
  78. time_st = time.time()
  79. logger_main.info(f"process-{process_num}: 开始取数,{start_time} ~ {end_time}\n{str(sn_list)}")
  80. columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,
  81. DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  82. DataField.other_temp_value, DataField.bms_sta]
  83. df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  84. logger_main.info(f'process-{process_num},获取到{len(df_data)}条数据,取数耗时:{time.time()-time_st}')
  85. except Exception as e:
  86. logger_main.error(f"process-{process_num}-{pack_code}:获取原始数据出错{sn_list}")
  87. logger_main.error(f"process-{process_num}:{e}")
  88. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  89. continue
  90. # 数据清洗
  91. try:
  92. time_st = time.time()
  93. logger_main.info(f'process-{process_num}数据清洗')
  94. if len(df_data) == 0:
  95. logger_main.info(f"process-{process_num}: 无有效数据,跳过本次运算")
  96. continue
  97. df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗
  98. if len(df_data) == 0:
  99. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  100. continue
  101. else:
  102. logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}")
  103. except Exception as e:
  104. logger_main.error(f"process-{process_num}:数据清洗出错{sn_list}")
  105. logger_main.error(f"process-{process_num}:{e}")
  106. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  107. continue
  108. # 算法缓存数据获取
  109. try:
  110. time_st = time.time()
  111. logger_main.info(f'process-{process_num}开始读取redis')
  112. df_bms_ram = pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp', 'packsoc','packcrnt'])
  113. df_alarm_ram = pd.DataFrame(columns=['time','sn','safetywarning1','safetywarning2'])
  114. for sn in sn_list:
  115. redis_ram_data = rc.get("Algo:FaultDiag:SafetyAlarm:df_bms_ram:{}".format(sn))
  116. if pd.isnull(redis_ram_data):
  117. pass
  118. else:
  119. df_bms_ram1 = pd.read_json(redis_ram_data)
  120. df_bms_ram = pd.concat([df_bms_ram, df_bms_ram1])
  121. redis_ram_data = rc.get("Algo:FaultDiag:SafetyAlarm:df_alarm_ram:{}".format(sn))
  122. if pd.isnull(redis_ram_data):
  123. pass
  124. else:
  125. df_alarm_ram1 = pd.read_json(redis_ram_data)
  126. df_alarm_ram = pd.concat([df_alarm_ram, df_alarm_ram1])
  127. logger_main.info(f'process-{process_num}读取redis耗时{time.time()-time_st}')
  128. except Exception as e:
  129. logger_main.error(f"process-{process_num}:读取redis出错{sn_list}")
  130. logger_main.error(f"process-{process_num}:{e}")
  131. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  132. continue
  133. # mysql数据读取
  134. try:
  135. time_st = time.time()
  136. logger_main.info(f'process-{process_num}开始读取mysql故障数据')
  137. sql = "select start_time,end_time,sn,imei,model,fault_level,fault_code,fault_info,fault_reason,fault_advice,fault_location,device_status,odo \
  138. from algo_all_fault_info_ing where fault_code='{}' or fault_code='{}'".format('C599','C590')
  139. df_diag_ram = pd.read_sql(sql, mysql_algo_conn)
  140. logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}')
  141. except Exception as e:
  142. logger_main.error(f"process-{process_num}:读取redis出错{sn_list}")
  143. logger_main.error(f"process-{process_num}:{e}")
  144. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  145. continue
  146. # 算法调用
  147. try:
  148. time_st = time.time()
  149. loggers['safety_alarm'].info(f'开始执行算法{pack_code}')
  150. safety_alarm = SafetyAlarm(df_data, df_diag_ram, df_bms_ram, df_alarm_ram, df_algo_list, df_table, cellvolt_name, celltemp_name, pack_code, df_snlist)
  151. df_res_new, df_res_end, df_bms_ram, df_alarm_ram = safety_alarm.safety_alarm_diag()
  152. loggers['safety_alarm'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  153. except Exception as e:
  154. loggers['safety_alarm'].error(f'算法运行出错{sn_list}')
  155. loggers['safety_alarm'].error(str(e))
  156. loggers['safety_alarm'].error(traceback.format_exc())
  157. df_res_new=pd.DataFrame()
  158. df_res_end=pd.DataFrame()
  159. df_bms_ram=pd.DataFrame()
  160. df_alarm_ram=pd.DataFrame()
  161. try:
  162. #充电剩余时间预测,只计算重卡
  163. if pack_code in ('JX18020', 'JX19220'):
  164. time_st = time.time()
  165. df_chargeremainder = df_data[['sn', 'pack_soc', 'pack_crnt', 'cell_temp_max', 'cell_temp_min', 'cell_volt_max', 'cell_volt_min', 'time']]
  166. chargeremainder = ChargeRemainder(df_chargeremainder, pack_code)
  167. df_results = chargeremainder.process()
  168. df_results.insert(loc=0, column='pack', value=pack_code)
  169. df_results.to_sql("algo_charge_remainder", con=mysql_algo_conn, if_exists="append", index=False)
  170. loggers['charge_remainder'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  171. except Exception as e:
  172. loggers['charge_remainder'].error(f'算法运行出错{sn_list}')
  173. loggers['charge_remainder'].error(str(e))
  174. loggers['charge_remainder'].error(traceback.format_exc())
  175. #更新redis
  176. try:
  177. time_st = time.time()
  178. logger_main.info(f'process-{process_num}开始更新redis数据')
  179. df_bms_ram.groupby('sn').apply(lambda x : rc.set("Algo:FaultDiag:SafetyAlarm:df_bms_ram:{}".format(x['sn'].values[0]), json.dumps(x.to_dict()), ex=24*3600))
  180. df_alarm_ram.groupby('sn').apply(lambda x : rc.set("Algo:FaultDiag:SafetyAlarm:df_alarm_ram:{}".format(x['sn'].values[0]), json.dumps(x.to_dict()), ex=24*3600))
  181. logger_main.info(f'process-{process_num}更新redis数据耗时{time.time()-time_st}')
  182. except Exception as e:
  183. logger_main.error(f"process-{process_num}:更新redis出错{sn_list}")
  184. logger_main.error(f"process-{process_num}:{e}")
  185. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  186. #结果写入mysql
  187. try:
  188. session = mysql_algo_Session()
  189. if not df_res_end.empty:
  190. df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
  191. df_res_end=df_res_end.fillna(0)
  192. for index in df_res_end.index:
  193. df_t = df_res_end.loc[index:index]
  194. sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn'
  195. params = {'start_time': df_t['start_time'].values[0],
  196. 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]}
  197. session.execute(sql, params=params)
  198. sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\
  199. fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \
  200. (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\
  201. :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)'
  202. params = {'date_info': datetime.now(),
  203. 'start_time': df_t['start_time'].values[0],
  204. 'end_time': df_t['end_time'].values[0],
  205. 'sn': df_t['sn'].values[0],
  206. 'imei': df_t['imei'].values[0],
  207. 'model' :pack_code,
  208. 'fault_level': df_t['fault_level'].values[0],
  209. 'fault_code': df_t['fault_code'].values[0],
  210. 'fault_info': df_t['fault_info'].values[0],
  211. 'fault_reason': df_t['fault_reason'].values[0],
  212. 'fault_advice': df_t['fault_advice'].values[0],
  213. 'fault_location': df_t['fault_location'].values[0],
  214. 'device_status': df_t['device_status'].values[0],
  215. 'odo': df_t['odo'].values[0],
  216. 'create_time': datetime.now(),
  217. 'create_by': 'algo',
  218. 'update_time': datetime.now(),
  219. 'update_by': None,
  220. 'is_delete': 0,
  221. 'comment': None}
  222. session.execute(sql, params=params)
  223. session.commit()
  224. logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成')
  225. else:
  226. pass
  227. if not df_res_new.empty:
  228. # 如果该故障未结束且不存在于ing,则写入ing
  229. df_res_new['date_info'] = datetime.now()
  230. df_res_new['create_time'] = datetime.now()
  231. df_res_new['create_by'] = 'algo'
  232. df_res_new['update_by'] = None
  233. df_res_new['is_delete'] = 0
  234. df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn,
  235. if_exists="append", index=False)
  236. logger_main.info(f'process-{process_num}新增故障入库{pack_code}完成')
  237. else:
  238. pass
  239. except Exception as e:
  240. logger_main.error(f"process-{process_num}:结果入库出错{sn_list}")
  241. logger_main.error(f"process-{process_num}:{e}")
  242. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  243. finally:
  244. df_data = None
  245. session.close()
  246. except Exception as e:
  247. logger_main.error(f'process-{process_num}: {e}')
  248. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  249. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  250. if __name__ == '__main__':
  251. while(True):
  252. try:
  253. # 配置量
  254. cur_env = 'dev' # 设置运行环境
  255. app_path = "/home/shouxueqi/projects/zlwl-algos/zlwl-algos/" # 设置app绝对路径
  256. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  257. app_name = "task_min_5" # 应用名
  258. sysUtils = SysUtils(cur_env, app_path)
  259. logger_main = sysUtils.get_logger(app_name, log_base_path)
  260. logger_main.info(f"本次主进程号: {os.getpid()}")
  261. # 读取配置文件 (该部分请不要修改)
  262. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  263. pool = Pool(processes = int(processes))
  264. logger_main.info("开始分配子进程")
  265. for i in range(int(processes)):
  266. pool.apply_async(main, (i, ))
  267. pool.close()
  268. logger_main.info("进程分配结束,堵塞主进程")
  269. pool.join()
  270. except Exception as e:
  271. print(str(e))
  272. print(traceback.format_exc())
  273. logger_main.error(str(e))
  274. logger_main.error(traceback.format_exc())
  275. finally:
  276. handlers = logger_main.handlers.copy()
  277. for h in handlers:
  278. logger_main.removeHandler(h)
  279. pool.terminate()