main.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. from datetime import datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from apscheduler.schedulers.blocking import BlockingScheduler
  9. from sqlalchemy import text, delete, and_, or_, update
  10. import pandas as pd
  11. from ZlwlAlgosCommon.utils.ProUtils import *
  12. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  13. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  14. from ZlwlAlgosCommon.orm.models import *
  15. from healthscore.V_1_0_0.BatHealthScore import HealthScore
  16. from sohdiag.V_1_0_0.SOHBatDiag import sohdiag
  17. from OffLineAlarm.V1_0_0 import off_line_warning
  18. #from OmissionNotice.V_1_0_0 import omission_notice
  19. def get_battery_info(mysql_iotp_conn):
  20. sql = "select * from ff_battery_status "
  21. t_battery = pd.read_sql(sql, mysql_iotp_conn)
  22. return t_battery
  23. def get_bat_health_info(mysql_algo_conn,sn_list):
  24. if len(sn_list) == 1:
  25. sn_tuple = f"('{sn_list[0]}')"
  26. else:
  27. sn_tuple = tuple(sn_list)
  28. sql = "select * from algo_all_fault_info_ing where sn in {}".format(sn_tuple)
  29. df_diag_ram = pd.read_sql(sql, mysql_algo_conn)
  30. sql = "select sn,MAX(time_st) AS time_st,time_sp,soh,cellsoh_diff,cellsoh,odo from algo_soh WHERE sn in {} GROUP BY sn".format(sn_tuple)
  31. df_soh = pd.read_sql(sql, mysql_algo_conn)
  32. sql = "SELECT * FROM algo_mid_uniform_result where sn in {}".format(sn_tuple)
  33. df_uniform = pd.read_sql(sql, mysql_algo_conn)
  34. sql = "SELECT * FROM algo_mid_sorout where sn in {}".format(sn_tuple)
  35. df_sor = pd.read_sql(sql, mysql_algo_conn)
  36. return df_diag_ram, df_soh, df_uniform, df_sor
  37. def update_param(db_engine, rc):#
  38. # 从redis中获取参数,如果redis中获取不到,则去数据库中获取
  39. df_algo_adjustable_param = pd.read_sql("select id, algo_id, pack_code, param from algo_adjustable_param", db_engine)
  40. df_algo_list = pd.read_sql("select id, algo_id, algo_name, is_activate, global_param, fault_code, fault_influence from algo_list", db_engine)
  41. df_algo_pack_param = pd.read_sql("select id, pack_code, param from algo_pack_param", db_engine)
  42. df_snpk_list = pd.read_sql("select sn, imei,pack_model,scrap_status from t_device", db_engine)
  43. df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4]
  44. return df_algo_adjustable_param,df_algo_list,df_algo_pack_param,df_snpk_list
  45. def main():
  46. # 程序不能停止
  47. try:
  48. warnings.filterwarnings("ignore")
  49. try:
  50. cleanUtils = CleanUtils()
  51. # 调用算法前的准备工作
  52. mysql_algo_conn = None
  53. mysql_algo_engine = None
  54. mysql_iotp_conn = None
  55. mysql_iotp_engine= None
  56. kafka_consumer = None
  57. rc= None
  58. kafka_topic_key = 'topic_test_sxq'#topic_task_day_1_1 test_code
  59. kafka_groupid_key = 'group_id_test_sxq' #group_id_task_day_1_1 test_code
  60. algo_list = ['healthscore', 'sohdiag','offline_diag'] # 本调度所包含的算法名列表。
  61. process_num=1
  62. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  63. logger_main.info(f"process-{process_num}: 配置中间件")
  64. # mysql
  65. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  66. mysqlUtils = MysqlUtils()
  67. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  68. mysql_algo_conn = mysql_algo_engine.connect()
  69. mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp')
  70. mysqlUtils = MysqlUtils()
  71. mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data)
  72. mysql_iotp_conn = mysql_iotp_engine.connect()
  73. # kafka
  74. kafka_params = sysUtils.get_cf_param('kafka')
  75. kafkaUtils = KafkaUtils()
  76. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  77. #Hbase
  78. hbase_params = sysUtils.get_cf_param('hbase-datafactory')#test_code
  79. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  80. #redis
  81. redis_params = sysUtils.get_cf_param('redis')
  82. reidsUtils = RedisUtils()
  83. rc = reidsUtils.get_redis_conncect(redis_params)
  84. except Exception as e:
  85. logger_main.error(f'process-{process_num}: {e}')
  86. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  87. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  88. # 开始准备调度
  89. logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  90. #for message in kafka_consumer:
  91. #print('test')
  92. #KafkaConsumer.commit()
  93. try:
  94. logger_main.info(f'收到调度')
  95. if not mysql_algo_conn.closed:
  96. mysql_algo_conn.close()
  97. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个mysql连接
  98. if not mysql_iotp_conn.closed:
  99. mysql_iotp_conn.close()
  100. mysql_iotp_conn = mysql_iotp_engine.connect() # 从连接池中获取一个mysql连接
  101. # schedule_params = json.loads(message.value)
  102. # if (schedule_params is None) or (schedule_params ==''):
  103. # logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  104. # continue
  105. # kafka 调度参数解析
  106. # df_snlist = pd.DataFrame(schedule_params['snlist'])
  107. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  108. # df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  109. # df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  110. # df_algo_param = pd.DataFrame(schedule_params['algo_list'])
  111. # # start_time = schedule_params['start_time']
  112. # # end_time = schedule_params['end_time']
  113. # pack_code = schedule_params['pack_code']
  114. # cell_type = schedule_params['cell_type']
  115. logger_main.info("获取算法参数")
  116. df_algo_adjustable_param, df_algo_list, df_algo_pack_param, df_snpk_list= update_param(mysql_algo_conn,rc)
  117. pack_code_list= list(set(df_snpk_list['pack_model']))
  118. for i in range(0,len(pack_code_list)):
  119. df_res_new=pd.DataFrame()
  120. df_res_update=pd.DataFrame()
  121. df_res_end=pd.DataFrame()
  122. df_snlist=df_snpk_list[df_snpk_list['pack_model']==pack_code_list[i]]
  123. sn_list=df_snlist['sn'].tolist()
  124. df_algo_adjustable_param_pack_code=df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code_list[i]]
  125. df_algo_param=df_algo_list
  126. # mysql数据读取
  127. try:
  128. time_st = time.time()
  129. logger_main.info(f'process-{process_num}开始读取mysql故障数据')
  130. df_diag_ram,df_soh,df_uniform,df_sor=get_bat_health_info(mysql_algo_conn,sn_list)
  131. logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}')
  132. except Exception as e:
  133. logger_main.error(f"process-{process_num}:读取redis出错")
  134. logger_main.error(f"process-{process_num}:{e}")
  135. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  136. continue
  137. # 算法调用
  138. # 健康度评分算法调用
  139. try:
  140. time_st = time.time()
  141. loggers['healthscore'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
  142. healthscole = HealthScore(df_soh, df_uniform, df_sor)
  143. df_res_healthscore = df_snlist['sn'].apply(lambda x : healthscole.health_score(x))
  144. df_res_healthscore = pd.concat(df_res_healthscore.tolist(), axis=0)
  145. loggers['healthscore'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
  146. except Exception as e:
  147. loggers['healthscore'].error('算法运行出错')
  148. loggers['healthscore'].error(str(e))
  149. loggers['healthscore'].error(traceback.format_exc())
  150. df_res_healthscore=pd.DataFrame()
  151. # SOH诊断算法调用
  152. try:
  153. time_st = time.time()
  154. loggers['sohdiag'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
  155. df_res_new_soh, df_res_end_soh = sohdiag(df_soh, df_diag_ram, df_snlist, df_algo_adjustable_param_pack_code)
  156. loggers['sohdiag'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
  157. except Exception as e:
  158. loggers['sohdiag'].error('算法运行出错')
  159. loggers['sohdiag'].error(str(e))
  160. loggers['sohdiag'].error(traceback.format_exc())
  161. df_res_new_soh=pd.DataFrame()
  162. df_res_end_soh=pd.DataFrame()
  163. # 离线诊断算法调用
  164. try:
  165. time_st = time.time()
  166. loggers['offline_diag'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
  167. t_battery=get_battery_info(mysql_iotp_conn)
  168. offline_diag=off_line_warning.Off_Line_Warning()
  169. df_res_new_ofl,df_res_update_ofl,df_res_end_ofl=offline_diag.diag(t_battery,df_diag_ram,df_algo_adjustable_param_pack_code,df_snlist,df_algo_param)
  170. loggers['offline_diag'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
  171. #print(len(df_res_new_ofl))
  172. except Exception as e:
  173. loggers['offline_diag'].error('算法运行出错')
  174. loggers['offline_diag'].error(str(e))
  175. loggers['offline_diag'].error(traceback.format_exc())
  176. df_res_new_ofl=pd.DataFrame()
  177. df_res_update_ofl=pd.DataFrame()
  178. df_res_end_ofl=pd.DataFrame()
  179. #结果写入mysql
  180. try:
  181. df_res_new = pd.concat([df_res_new_ofl,df_res_new_soh]) #, res1
  182. df_res_update=df_res_update_ofl #df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1
  183. df_res_end = pd.concat([df_res_end_ofl,df_res_end_soh]) #, res2
  184. df_res_new.reset_index(drop=True, inplace=True)
  185. df_res_update.reset_index(drop=True, inplace=True)
  186. df_res_end.reset_index(drop=True, inplace=True)
  187. time_st = time.time()
  188. session = mysql_algo_Session()
  189. if not df_res_healthscore.empty:
  190. df_res_healthscore.to_sql("algo_health_score",con=mysql_algo_conn, if_exists="append",index=False)
  191. if not df_res_new.empty:
  192. df_res_new['date_info'] = df_res_new['start_time']
  193. df_res_new['create_time'] = datetime.now()
  194. df_res_new['create_by'] = 'algo'
  195. df_res_new['is_delete'] = 0
  196. df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False)
  197. logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code_list[i]}完成')
  198. if not df_res_end.empty:
  199. df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
  200. df_res_end=df_res_end.fillna(0)
  201. for index in df_res_end.index:
  202. df_t = df_res_end.loc[index:index]
  203. sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn'
  204. params = {'start_time': df_t['start_time'].values[0],
  205. 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]}
  206. session.execute(sql, params=params)
  207. sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\
  208. fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \
  209. (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\
  210. :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)'
  211. params = {'date_info': datetime.now(),
  212. 'start_time': df_t['start_time'].values[0],
  213. 'end_time': df_t['end_time'].values[0],
  214. 'sn': df_t['sn'].values[0],
  215. 'imei': df_t['imei'].values[0],
  216. 'model' :pack_code_list[i],
  217. 'fault_level': df_t['fault_level'].values[0],
  218. 'fault_code': df_t['fault_code'].values[0],
  219. 'fault_info': df_t['fault_info'].values[0],
  220. 'fault_reason': df_t['fault_reason'].values[0],
  221. 'fault_advice': df_t['fault_advice'].values[0],
  222. 'fault_location': df_t['fault_location'].values[0],
  223. 'device_status': df_t['device_status'].values[0],
  224. 'odo': df_t['odo'].values[0],
  225. 'create_time': datetime.now(),
  226. 'create_by': 'algo',
  227. 'update_time': datetime.now(),
  228. 'update_by': None,
  229. 'is_delete': 0,
  230. 'comment': None}
  231. session.execute(sql, params=params)
  232. session.commit()
  233. logger_main.info(f'process-{process_num}结束故障入库{pack_code_list[i]}完成')
  234. if not df_res_update.empty:
  235. df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
  236. df_res_update=df_res_update.fillna(0)
  237. for index in df_res_update.index:
  238. df_t = df_res_update.loc[index:index]
  239. try:
  240. # 更新数据
  241. with mysql_algo_Session() as session:
  242. session.execute(update(AlgoAllFaultInfoIng).where(
  243. and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
  244. (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
  245. (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
  246. values(fault_level=df_t['fault_level'].values[0],
  247. comment=df_t['comment'].values[0]))
  248. session.commit()
  249. except Exception as e:
  250. logger_main.error(f"process-{process_num}:{pack_code_list[i]}结果入库出错")
  251. logger_main.error(f"process-{process_num}:{e}")
  252. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  253. finally:
  254. session.close()
  255. logger_main.info(f"process-{process_num}: 更新入库完成")
  256. else:
  257. logger_main.info(f"process-{process_num}: 无更新故障")
  258. logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
  259. except Exception as e:
  260. logger_main.error(f"process-{process_num}:结果入库出错")
  261. logger_main.error(f"process-{process_num}:{e}")
  262. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  263. finally:
  264. pass
  265. except Exception as e:
  266. logger_main.error(f"process-{process_num}:获取mysql数据库数据出错")
  267. logger_main.error(f"process-{process_num}:{e}")
  268. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  269. logger_main.info(f"process-{process_num}: 结束本轮计算")
  270. except Exception as e:
  271. logger_main.error(f'process-{process_num}: {e}')
  272. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  273. cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
  274. if __name__ == '__main__':
  275. #定时任务.......................................................................................................................................................................
  276. cur_env = 'dev' # 设置运行环境
  277. app_path = "/home/shouxueqi/projects/zlwl-algos/" # 设置app绝对路径fff
  278. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  279. app_name = "task_day_1_1" # 应用名
  280. sysUtils = SysUtils(cur_env, app_path)
  281. logger_main = sysUtils.get_logger(app_name, log_base_path)
  282. main()
  283. x=0
  284. while x==0:
  285. state_time=time.localtime(time.time())
  286. tt ='2023-06-01 00:00:00'
  287. t=time.strptime(tt,'%Y-%m-%d %H:%M:%S')
  288. if state_time>t:
  289. x=1
  290. main()
  291. scheduler = BlockingScheduler()
  292. scheduler.add_job(main, 'interval', days=1, id='diag_job')
  293. try:
  294. logger_main.info(os.getpid())
  295. scheduler.start()
  296. except Exception as e:
  297. print(str(e))
  298. print(traceback.format_exc())
  299. logger_main.error(str(e))
  300. logger_main.error(traceback.format_exc())