main_offline_test.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. from datetime import datetime, timedelta
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. import sys
  9. from sqlalchemy import text, delete, and_, or_, update
  10. import pandas as pd
  11. from ZlwlAlgosCommon.utils.ProUtils import *
  12. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  13. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  14. from ZlwlAlgosCommon.orm.models import *
  15. from socdiag.V_1_0_0.SOCBatDiag_test import SocDiag
  16. import matplotlib.pyplot as plt
  17. # from LowSocAlarm.V1_0_0.low_soc_alarm import Low_soc_alarm
  18. def main(process_num):
  19. # 程序不能停止
  20. if (True):
  21. warnings.filterwarnings("ignore")
  22. try:
  23. # 调用算法前的准备工作
  24. kafka_topic_key = 'topic_test_cez'
  25. kafka_groupid_key = 'group_id_test_cez'
  26. algo_list = ['socdiag'] # 本调度所包含的算法名列表。
  27. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  28. logger_main.info(f"process-{process_num}: 配置中间件")
  29. # mysql
  30. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  31. mysqlUtils = MysqlUtils()
  32. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  33. mysql_algo_conn = mysql_algo_engine.connect()
  34. mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp')
  35. mysqlUtils = MysqlUtils()
  36. mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data)
  37. mysql_iotp_conn = mysql_iotp_engine.connect()
  38. # # kafka
  39. # kafka_params = sysUtils.get_cf_param('kafka')
  40. # kafkaUtils = KafkaUtils()
  41. # kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  42. # #Hbase
  43. hbase_params = sysUtils.get_cf_param('hbase-datafactory')
  44. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  45. #redis
  46. redis_params = sysUtils.get_cf_param('redis')
  47. reidsUtils = RedisUtils()
  48. rc = reidsUtils.get_redis_conncect(redis_params)
  49. except Exception as e:
  50. logger_main.error(f'process-{process_num}: {e}')
  51. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  52. # 开始准备调度
  53. # logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  54. logger_main.info(f"process-{process_num}: 开始循环")
  55. path = '/data/common/benchi/data/'
  56. sn_list = ['LY9139BB4MALBZ795', 'LY9139BB8MALBZ329', 'LY9F49BCXMALBZ876', 'LY9F49BC7MALBZ883']
  57. pack_code = 'CL3282A'
  58. start_time_dts = [pd.to_datetime('2022-07-30 20:00:09'), pd.to_datetime('2022-01-11 14:00:09'), pd.to_datetime('2022-09-07 11:00:09'), pd.to_datetime('2022-01-29 08:00:09')]
  59. end_time_dts = [pd.to_datetime('2022-07-30 22:00:09'), pd.to_datetime('2022-01-11 15:00:09'), pd.to_datetime('2022-09-07 12:00:09'), pd.to_datetime('2022-01-29 10:00:09')]
  60. sql = "select * from algo_pack_param"
  61. df_algo_pack_param_all = pd.read_sql(sql, mysql_algo_conn)
  62. sql = "select * from algo_list"
  63. df_algo_param = pd.read_sql(sql, mysql_algo_conn)
  64. df_algo_pack_param = json.loads(df_algo_pack_param_all[df_algo_pack_param_all['pack_code'] == pack_code]['param'].iloc[0])
  65. sql = f"select sn, imei from t_device where sn in {tuple(sn_list)}"
  66. df_snlist = pd.read_sql(sql, mysql_algo_conn)
  67. for sn, start_time_dt, end_time_dt in zip(sn_list, start_time_dts, end_time_dts):
  68. try:
  69. logger_main.info(f'收到{sn}调度')
  70. if mysql_algo_conn.close:
  71. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  72. try:
  73. # df_snlist = [sn]
  74. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  75. # df_algo_param = pd.DataFrame(schedule_params['algo_list'])
  76. cell_type = 'CLL027A'
  77. sn_list=[sn]
  78. start_time = start_time_dt.strftime('%Y-%m-%d %H:%M:%S')
  79. end_time = end_time_dt.strftime('%Y-%m-%d %H:%M:%S')
  80. # 取数
  81. time_st = time.time()
  82. logger_main.info(f"process-{process_num}: 开始取数{sn_list}")
  83. columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,
  84. DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  85. DataField.other_temp_value, DataField.bms_sta]
  86. df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  87. logger_main.info(f'process-{process_num},从{start_time}到{end_time}获取到{len(df_data)}条数据,取数耗时:{time.time()-time_st}')
  88. except Exception as e:
  89. logger_main.error(f"process-{process_num}:{sn}获取原始数据出错")
  90. logger_main.error(f"process-{process_num}:{e}")
  91. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  92. continue
  93. # 数据清洗
  94. try:
  95. time_st = time.time()
  96. logger_main.info(f'process-{process_num}数据清洗')
  97. df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗
  98. if len(df_data) == 0:
  99. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  100. continue
  101. else:
  102. logger_main.info(f"process-{process_num}: {sn}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}")
  103. except Exception as e:
  104. logger_main.error(f"process-{process_num}:{sn}数据清洗出错")
  105. logger_main.error(f"process-{process_num}:{e}")
  106. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  107. # # 数据画图
  108. # plt.figure(figsize=(30, 6))
  109. # ax1 = plt.subplot(1,2,1)
  110. # ax1.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_soc']), s=2, color='b', label='SOC')
  111. # ax1.set_xlabel('Date', fontsize=16)
  112. # ax1.set_ylabel('SOC%', fontsize=16)
  113. # ax1.set_title(f'sn:{sn}', fontsize=16)
  114. # ax2 = ax1.twinx()
  115. # ax2.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_volt']), color='r', s=1, label='PackVoltage')
  116. # ax2.set_ylabel('PackVoltage', fontsize=16)
  117. # ax1.legend(loc = (.75,.13), fontsize=14)
  118. # ax2.legend( loc = (.75, .05), fontsize=14)
  119. # ax3 = plt.subplot(1,2,2)
  120. # ax3.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_soc']), s=2, color='b', label='SOC')
  121. # ax3.set_xlabel('Date', fontsize=16)
  122. # ax3.set_ylabel('SOC%', fontsize=16)
  123. # ax4 = ax3.twinx()
  124. # ax4.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_crnt']), color='r', s=1, label='PackCrnt')
  125. # ax4.set_ylabel('PackCrnt', fontsize=16)
  126. # ax3.set_title(f'start time: {start_time} end time: {end_time}', fontsize=16)
  127. # ax3.legend(loc = (.75,.13), fontsize=14)
  128. # ax4.legend( loc = (.75, .05), fontsize=14)
  129. # st = start_time_dt.strftime('%Y%m%d%H%M%S')
  130. # et = end_time_dt.strftime('%Y%m%d%H%M%S')
  131. # plt.savefig(f'./{sn}_{st}_{et}.jpg')
  132. # os._exit()
  133. # continue
  134. # mysql数据读取
  135. try:
  136. time_st = time.time()
  137. logger_main.info(f'process-{process_num}开始读取mysql故障数据')
  138. if len(sn_list) == 1:
  139. sql = "select * from algo_all_fault_info_ing where sn = '{}'".format(sn_list[0])
  140. else:
  141. sql = "select * from algo_all_fault_info_ing where sn in {}".format(tuple(sn_list)) #fault_code='{}' or fault_code='{}') and 'C599','C590',
  142. df_diag_ram = pd.read_sql(sql, mysql_algo_conn)
  143. logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}')
  144. except Exception as e:
  145. logger_main.error(f"process-{process_num}:{sn}读取redis出错")
  146. logger_main.error(f"process-{process_num}:{e}")
  147. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  148. continue
  149. # 算法调用
  150. try:
  151. time_st = time.time()
  152. loggers['socdiag'].info(f'开始执行算法{sn}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  153. period = 24*60 #算法周期min
  154. first = 1
  155. for t in pd.date_range(start_time_dt, end_time_dt, freq=timedelta(minutes=period)):
  156. start = t
  157. end = start + timedelta(minutes=period)
  158. df_data_range = df_data[(df_data['time'] >= start) & (df_data['time'] < end)]
  159. if len(df_data_range) == 0:
  160. continue
  161. soc_diag = SocDiag(cell_type, df_algo_pack_param, df_algo_param, end.strftime('%Y-%m-%d %H:%M:%S'), period, pack_code, df_snlist, df_data_range)
  162. df_res_end_C107= soc_diag.soc_jump()
  163. df_res_new_C109, df_res_end_C109 = soc_diag.soc_block(df_diag_ram)
  164. df_res_new_soc_1 = df_res_new_C109
  165. df_res_end_soc_1 = pd.concat([df_res_end_C107, df_res_end_C109])
  166. if first:
  167. df_res_new_soc = df_res_new_soc_1
  168. df_res_end_soc = df_res_end_soc_1
  169. first = 0
  170. else:
  171. df_res_new_soc = pd.concat([df_res_new_soc, df_res_new_soc_1])
  172. df_res_end_soc = pd.concat([df_res_end_soc, df_res_end_soc_1])
  173. loggers['socdiag'].info(f'算法运行完成{sn},算法耗时{time.time()-time_st}')
  174. except Exception as e:
  175. loggers['socdiag'].error('{}算法运行出错'.format(pack_code))
  176. loggers['socdiag'].error(str(e))
  177. loggers['socdiag'].error(traceback.format_exc())
  178. continue
  179. # # 算法调用
  180. # # try:
  181. # # time_st = time.time()
  182. # # # loggers['low_soc_diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  183. # # # low_soc_warning = Low_soc_alarm(df_data,cellvolt_name)
  184. # # # df_res_new_lw_soc, df_res_update_lw_soc,df_res_end_lw_soc= low_soc_warning.diag(df_algo_pack_param,df_algo_param,df_algo_adjustable_param,df_data,df_table,df_diag_ram,df_snlist)
  185. # # loggers['low_soc_diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  186. # # except Exception as e:
  187. # # loggers['low_soc_diag'].error('{}算法运行出错'.format(pack_code))
  188. # # loggers['low_soc_diag'].error(str(e))
  189. # # loggers['low_soc_diag'].error(traceback.format_exc())
  190. # # continue
  191. # df_res_new = df_res_new_soc #, res1
  192. # # df_res_update=df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1
  193. # df_res_end = df_res_end_soc #, res2
  194. # df_res_new.reset_index(drop=True, inplace=True)
  195. # # df_res_update.reset_index(drop=True, inplace=True)
  196. # df_res_end.reset_index(drop=True, inplace=True)
  197. # #结果写入mysql
  198. # try:
  199. # time_st = time.time()
  200. # session = mysql_algo_Session()
  201. # if not df_res_new.empty:
  202. # df_res_new['date_info'] = df_res_new['start_time']
  203. # df_res_new['create_time'] = datetime.now()
  204. # df_res_new['create_by'] = 'algo'
  205. # df_res_new['is_delete'] = 0
  206. # df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False)
  207. # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成')
  208. # if not df_res_end.empty:
  209. # df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
  210. # df_res_end=df_res_end.fillna(0)
  211. # for index in df_res_end.index:
  212. # df_t = df_res_end.loc[index:index]
  213. # sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn'
  214. # params = {'start_time': df_t['start_time'].values[0],
  215. # 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]}
  216. # session.execute(sql, params=params)
  217. # sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\
  218. # fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \
  219. # (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\
  220. # :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)'
  221. # params = {'date_info': datetime.now(),
  222. # 'start_time': df_t['start_time'].values[0],
  223. # 'end_time': df_t['end_time'].values[0],
  224. # 'sn': df_t['sn'].values[0],
  225. # 'imei': df_t['imei'].values[0],
  226. # 'model' :pack_code,
  227. # 'fault_level': df_t['fault_level'].values[0],
  228. # 'fault_code': df_t['fault_code'].values[0],
  229. # 'fault_info': df_t['fault_info'].values[0],
  230. # 'fault_reason': df_t['fault_reason'].values[0],
  231. # 'fault_advice': df_t['fault_advice'].values[0],
  232. # 'fault_location': df_t['fault_location'].values[0],
  233. # 'device_status': df_t['device_status'].values[0],
  234. # 'odo': df_t['odo'].values[0],
  235. # 'create_time': datetime.now(),
  236. # 'create_by': 'algo',
  237. # 'update_time': datetime.now(),
  238. # 'update_by': None,
  239. # 'is_delete': 0,
  240. # 'comment': None}
  241. # session.execute(sql, params=params)
  242. # session.commit()
  243. # logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成')
  244. # # if not df_res_update.empty:
  245. # # df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
  246. # # df_res_update=df_res_update.fillna(0)
  247. # # for index in df_res_update.index:
  248. # # df_t = df_res_update.loc[index:index]
  249. # # try:
  250. # # # 更新数据
  251. # # with mysql_algo_Session() as session:
  252. # # session.execute(update(AlgoAllFaultInfoIng).where(
  253. # # and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
  254. # # (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
  255. # # (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
  256. # # values(fault_level=df_t['fault_level'].values[0],
  257. # # comment=df_t['comment'].values[0]))
  258. # # session.commit()
  259. # # except Exception as e:
  260. # # logger_main.error(f"process-{process_num}:{pack_code}结果入库出错")
  261. # # logger_main.error(f"process-{process_num}:{e}")
  262. # # logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  263. # # finally:
  264. # # session.close()
  265. # # logger_main.info(f"process-{process_num}: 更新入库完成")
  266. # # else:
  267. # # logger_main.info(f"process-{process_num}: 无更新故障")
  268. # logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
  269. # except Exception as e:
  270. # logger_main.error(f"process-{process_num}:{sn}结果入库出错")
  271. # logger_main.error(f"process-{process_num}:{e}")
  272. # logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  273. finally:
  274. pass
  275. if __name__ == '__main__':
  276. if (True):
  277. try:
  278. # 配置量
  279. cur_env = 'dev' # 设置运行环境
  280. app_path = "/home/chenenze/zlwl-algos/" # 设置app绝对路径
  281. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  282. app_name = "task_day_1_test_offline_test" # 应用名
  283. sysUtils = SysUtils(cur_env, app_path)
  284. logger_main = sysUtils.get_logger(app_name, log_base_path)
  285. logger_main.info(f"本次主进程号: {os.getpid()}")
  286. # 读取配置文件 (该部分请不要修改)
  287. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  288. pool = Pool(processes = int(processes))
  289. logger_main.info("开始分配子进程")
  290. for i in range(int(processes)):
  291. pool.apply_async(main, (i, ))
  292. pool.close()
  293. logger_main.info("进程分配结束,堵塞主进程")
  294. pool.join()
  295. except Exception as e:
  296. print(str(e))
  297. print(traceback.format_exc())
  298. logger_main.error(str(e))
  299. logger_main.error(traceback.format_exc())
  300. finally:
  301. handlers = logger_main.handlers.copy()
  302. for h in handlers:
  303. logger_main.removeHandler(h)
  304. pool.terminate()