main copy 3.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. from datetime import datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from li_plted.V1_0_0.corepro_V1 import *
  9. #from keras.models import load_model
  10. import pickle
  11. from sqlalchemy import text, delete, and_, or_, update
  12. import pandas as pd
  13. from ZlwlAlgosCommon.utils.ProUtils import *
  14. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  15. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  16. from ZlwlAlgosCommon.orm.models import *
  17. from socdiag.V_1_0_0.SOCBatDiag import SocDiag
  18. from LowSocAlarm.V1_0_0.low_soc_alarm import Low_soc_alarm
  19. from SorCal.V_1_0_0.sorcal import sor_est
  20. from DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序
  21. from DataSplit.V_1_0_0 import data_split as dt ##分段函数程序
  22. from DataSplit.V_1_0_0 import data_drive_stat as ddt ##行驶数据按行驶段汇总统计
  23. from DataSplit.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总
  24. from DataSplit.V_1_0_0 import data_stand_stat as dst ##静置数据按静置段汇总
  25. from DataSplit.V_1_0_0 import data_drive_stat_period as ddtp ##行驶数据按充电周期汇总统计
  26. from DataSplit.V_1_0_0 import trans_day as trd ##解决跨天的问题
  27. def update_param(db_engine, rc):#
  28. # 从redis中获取参数,如果redis中获取不到,则去数据库中获取
  29. data = rc.get("algo_param_from_mysql:algo_adjustable_param")
  30. #data=pd.DataFrame()
  31. if pd.isnull(data):
  32. df_algo_adjustable_param = pd.read_sql("select id, algo_id, pack_code, param from algo_adjustable_param", db_engine)
  33. else:
  34. df_algo_adjustable_param = pd.DataFrame(json.loads(data))
  35. data = rc.get("algo_param_from_mysql:algo_list")#pd.DataFrame()
  36. if pd.isnull(data):
  37. df_algo_list = pd.read_sql("select id, algo_id, algo_name, is_activate, global_param, fault_code, fault_influence from algo_list", db_engine)
  38. else:
  39. df_algo_list = pd.DataFrame(json.loads(data))
  40. data = rc.get("algo_param_from_mysql:algo_pack_param")
  41. if pd.isnull(data):
  42. df_algo_pack_param = pd.read_sql("select id, pack_code, param from algo_pack_param", db_engine)
  43. else:
  44. df_algo_pack_param = pd.DataFrame(json.loads(data))
  45. data = rc.get("algo_param_from_mysql:app_device")
  46. if pd.isnull(data):
  47. df_snpk_list = pd.read_sql("select sn, imei,pack_model,scrap_status from t_device", db_engine)
  48. df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4]
  49. else:
  50. df_snpk_list = pd.DataFrame(json.loads(data))
  51. return df_algo_adjustable_param,df_algo_list,df_algo_pack_param,df_snpk_list
  52. def main(): #process_num
  53. # 程序不能停止
  54. #while(True):
  55. try:
  56. process_num=1
  57. warnings.filterwarnings("ignore")
  58. try:
  59. # 调用算法前的准备工作
  60. kafka_topic_key = 'topic_task_day_1_sxqtest'
  61. kafka_groupid_key = 'group_id_task_day_1_sxqtest'
  62. algo_list = ['socdiag','low_soc_diag','Sor_Diag','Li_Plted','Data_Split'] # 本调度所包含的算法名列表。
  63. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  64. logger_main.info(f"process-{process_num}: 配置中间件")
  65. # mysql
  66. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  67. mysqlUtils = MysqlUtils()
  68. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  69. mysql_algo_conn = mysql_algo_engine.connect()
  70. # mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp')
  71. # mysqlUtils = MysqlUtils()
  72. # mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data)
  73. # mysql_iotp_conn = mysql_iotp_engine.connect()
  74. # kafka
  75. kafka_params = sysUtils.get_cf_param('kafka')
  76. kafkaUtils = KafkaUtils()
  77. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  78. #Hbase
  79. hbase_params = sysUtils.get_cf_param('hbase-datafactory')#hbase
  80. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  81. #redis
  82. redis_params = sysUtils.get_cf_param('redis')
  83. reidsUtils = RedisUtils()
  84. rc = reidsUtils.get_redis_conncect(redis_params)
  85. except Exception as e:
  86. logger_main.error(f'process-{process_num}: {e}')
  87. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  88. # 开始准备调度
  89. logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  90. # for message in kafka_consumer:
  91. path = 'D:/data/'#'/data/common/benchi/data/'
  92. sn_list = os.listdir(path)
  93. pack_code = 'CL3282A'
  94. df_algo_adjustable_param, df_algo_list, df_algo_pack_param, df_snpk_list= update_param(mysql_algo_conn,rc)#
  95. sql = "select * from algo_pack_param"
  96. df_algo_pack_param_all = pd.read_sql(sql, mysql_algo_conn)
  97. sql = "select * from algo_list"
  98. df_algo_param = pd.read_sql(sql, mysql_algo_conn)
  99. df_algo_pack_param = json.loads(df_algo_pack_param_all[df_algo_pack_param_all['pack_code'] == pack_code]['param'].iloc[0])
  100. sql = f"select sn, imei from t_device where sn in {tuple(sn_list)}"
  101. df_snlist = pd.read_sql(sql, mysql_algo_conn)
  102. start_time_dt = pd.to_datetime('2022-01-01')
  103. end_time_dt = pd.to_datetime('2022-11-30')
  104. path = 'D:/data/'#'/data/common/benchi/data/'
  105. sn_list = os.listdir(path)
  106. sn_list=sn_list[30:40]
  107. for sn in sn_list:
  108. st_time=time.time()
  109. try:
  110. snpath = path + sn + '/'
  111. times = os.listdir(snpath)
  112. times = sorted(times)
  113. for i in range(0, len(times), 2):
  114. # 获取当前读取的4个文件
  115. files_to_read = times[i:i+2]
  116. # 循环读取文件
  117. df_data_all=pd.DataFrame()
  118. rd_data_st_time=time.time()
  119. for sntime in files_to_read:
  120. # 读取Excel文件
  121. start_time = sntime.split('.')[0].split('_')[0]
  122. end_time = sntime.split('.')[0].split('_')[1]
  123. # 取数
  124. time_st = time.time()
  125. logger_main.info(f"process-{process_num}: 开始取数{sn_list}")
  126. columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,
  127. DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  128. DataField.other_temp_value, DataField.bms_sta]
  129. df_data_t = pd.read_pickle(snpath+sntime, compression='zip')
  130. if len(df_data_all):
  131. df_data_all=df_data_all.append(df_data_t)
  132. else:
  133. df_data_all=df_data_t
  134. df_data_all=df_data_all.reset_index(drop=True)
  135. print(sn+'_第'+str(i)+'次/共'+str(len(times))+'次取数耗时:'+str(time.time()-rd_data_st_time)+',共{}条数据'.format(str(len(df_data_t))))
  136. if mysql_algo_conn.closed:
  137. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  138. # if mysql_algo_conn.closed:
  139. # mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  140. # schedule_params = json.loads(message.value)
  141. # if (schedule_params is None) or (schedule_params ==''):
  142. # logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  143. # continue
  144. # # kafka 调度参数解析
  145. # df_snlist = pd.DataFrame(schedule_params['snlist'])
  146. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  147. # df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  148. # df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  149. # df_algo_param = pd.DataFrame(schedule_params['algo_list'])
  150. # start_time = schedule_params['start_time']
  151. # end_time = schedule_params['end_time']
  152. # pack_code = schedule_params['pack_code']
  153. # cell_type = schedule_params['cell_type']
  154. # sn_list=df_snlist['sn'].tolist()
  155. # # 取数
  156. # time_st = time.time()
  157. # logger_main.info(f"process-{process_num}: 开始取数{sn_list}")
  158. # columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,
  159. # DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  160. # DataField.other_temp_value, DataField.bms_sta, DataField.charge_sta,DataField.latitude,DataField.longitude]
  161. # df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  162. # logger_main.info(f'process-{process_num},获取到{len(df_data)}条数据,取数耗时:{time.time()-time_st}')
  163. # # 将字符串转换成datetime对象
  164. # str_date = start_time
  165. # date_time =datetime.datetime.strptime(str_date, '%Y-%m-%d %H:%M:%S')
  166. # # 将datetime对象减去6小时
  167. # new_date_time = date_time - datetime.timedelta(hours=8)
  168. # # 将datetime对象转换成字符串
  169. # start_time_8h = new_date_time.strftime('%Y-%m-%d %H:%M:%S')
  170. # df_data_8h = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time_8h, end_time=start_time)
  171. # logger_main.info(f'process-{process_num},获取到{len(df_data_8h)}条数据,取数耗时:{time.time()-time_st}')
  172. # except Exception as e:
  173. # logger_main.error(f"process-{process_num}:获取原始数据出错")
  174. # logger_main.error(f"process-{process_num}:{e}")
  175. # logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  176. # continue
  177. # 数据清洗
  178. try:
  179. time_st = time.time()
  180. logger_main.info(f'process-{process_num}数据清洗')
  181. #里程填充
  182. df_data_all['mileage'] = df_data_all['mileage'].replace(0, np.nan).ffill()
  183. df_data_all['mileage'] = df_data_all['mileage'].replace(0, np.nan).bfill()
  184. df_data_all['mileage']=df_data_all['mileage']/1000
  185. df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data_all)#进行数据清洗
  186. # df_data_8h, df_table_t, cellvolt_name_t, celltemp_name_t = iotp_service.datacleaning(df_algo_pack_param,df_data_8h)#进行数据清洗
  187. print('洗数耗时:'+str(time.time()-time_st))
  188. if len(df_data) == 0:
  189. logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算")
  190. continue
  191. else:
  192. logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}")
  193. except Exception as e:
  194. logger_main.error(f"process-{process_num}:数据清洗出错")
  195. logger_main.error(f"process-{process_num}:{e}")
  196. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  197. continue
  198. # mysql数据读取
  199. try:
  200. time_st = time.time()
  201. logger_main.info(f'process-{process_num}开始读取mysql故障数据')
  202. if len(sn_list) == 1:
  203. sn_tuple = f"('{sn_list[0]}')"
  204. else:
  205. sn_tuple = tuple(sn_list)
  206. sql = "select * from algo_all_fault_info_ing where sn in {}".format(sn_tuple) #fault_code='{}' or fault_code='{}') and 'C599','C590',
  207. df_diag_ram = pd.read_sql(sql, mysql_algo_conn)
  208. sql = "select * from algo_ailipltd_result where sn in {}".format(sn_tuple) #fault_code='{}' or fault_code='{}') and 'C599','C590',
  209. Li_pltd_his = pd.read_sql(sql, mysql_algo_conn)
  210. logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}')
  211. except Exception as e:
  212. logger_main.error(f"process-{process_num}:读取redis出错")
  213. logger_main.error(f"process-{process_num}:{e}")
  214. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  215. continue
  216. #算法1_SOC诊断调用
  217. # try:
  218. # time_st = time.time()
  219. # loggers['socdiag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  220. # period = 24*60 #算法周期min
  221. # soc_diag = SocDiag(cell_type, df_algo_pack_param, df_algo_adjustable_param, df_algo_param, end_time, period, pack_code, df_snlist, df_data)
  222. # df_res_new_C109, df_res_end_C109= soc_diag.soc_block(df_diag_ram)
  223. # df_res_end_C107 = soc_diag.soc_jump()
  224. # df_res_new_soc = df_res_new_C109
  225. # df_res_end_soc = pd.concat([df_res_end_C107, df_res_end_C109])
  226. # loggers['socdiag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  227. # except Exception as e:
  228. # loggers['socdiag'].error('算法运行出错')
  229. # loggers['socdiag'].error(str(e))
  230. # loggers['socdiag'].error(traceback.format_exc())
  231. # df_res_end_soc=pd.DataFrame()
  232. # df_res_new_soc=pd.DataFrame()
  233. # # 算法2_低电量调用
  234. try:
  235. time_st = time.time()
  236. loggers['low_soc_diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  237. low_soc_warning = Low_soc_alarm(df_data,cellvolt_name)
  238. df_res_new_lw_soc, df_res_update_lw_soc,df_res_end_lw_soc= low_soc_warning.diag(df_algo_pack_param,df_algo_param,df_algo_adjustable_param,df_data,df_table,df_diag_ram,df_snlist)
  239. start_time
  240. # month = date.month
  241. # day = date.day
  242. df_res_new_lw_soc.to_excel('lowsoc_sn_date_{}_{}.xlsx'.format(sn,start_time[0:10]))
  243. loggers['low_soc_diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  244. except Exception as e:
  245. loggers['low_soc_diag'].error('算法运行出错')
  246. loggers['low_soc_diag'].error(str(e))
  247. loggers['low_soc_diag'].error(traceback.format_exc())
  248. df_res_new_lw_soc=pd.DataFrame()
  249. df_res_update_lw_soc=pd.DataFrame()
  250. df_res_end_lw_soc=pd.DataFrame()
  251. # # 算法3_SOR计算调用
  252. # try:
  253. # time_st = time.time()
  254. # loggers['Sor_Diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  255. # Diagsor_temp = sor_est(df_data, df_algo_pack_param)#计算内阻
  256. # df_sor_add = Diagsor_temp.sor_cal()
  257. # loggers['Sor_Diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  258. # except Exception as e:
  259. # loggers['Sor_Diag'].error('算法运行出错')
  260. # loggers['Sor_Diag'].error(str(e))
  261. # loggers['Sor_Diag'].error(traceback.format_exc())
  262. # df_sor_add=pd.DataFrame()
  263. # #算法4_析锂计算调用
  264. # try:
  265. # time_st = time.time()
  266. # loggers['Li_Plted'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  267. # pkl_path='li_plted/V1_0_0/scaler.pkl'
  268. # md_path='li_plted/V1_0_0/model.h5'
  269. # scaler=pickle.load(open(pkl_path,'rb')) #读取标准化参数
  270. # model=load_model(md_path) #读取模型参数
  271. # data_set=df_data.groupby('sn').apply(prediction,scaler,model,cellvolt_name)
  272. # if not data_set.empty:
  273. # df_result=data_set.groupby('sn').apply(out_final,Li_pltd_his,df_algo_param,df_algo_pack_param)
  274. # # if not df_result.empty:
  275. # # df_res_lipltdchange,df_res_lipltd_new = zip(*df_result.groupby('sn').apply(alarme_final,Li_pltd_his,df_algo_pack_param))
  276. # loggers['Li_Plted'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  277. # except Exception as e:
  278. # loggers['Li_Plted'].error('算法运行出错')
  279. # loggers['Li_Plted'].error(str(e))
  280. # loggers['Li_Plted'].error(traceback.format_exc())
  281. # df_res_lipltdchange=pd.DataFrame()
  282. # df_res_lipltd_new=pd.DataFrame()
  283. #算法5_日数据分段
  284. try:
  285. cal_time=time.time()
  286. loggers['Data_Split'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}')
  287. df_merge=ds.data_status(df_data,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300)
  288. ##基于各个状态码,进行分段,分段函数
  289. df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0)
  290. # date = datetime.datetime.strptime(start_time, '%Y-%m-%d-%H-%M-%S')
  291. # 获取datetime对象的月份
  292. # month = date.month
  293. # day = date.day
  294. print('计算耗时:'+str(time.time()-cal_time))
  295. df_data_split_rlt.to_excel('data_split_sn_date_{}_{}.xlsx'.format(sn,start_time[0:10]))
  296. loggers['Data_Split'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  297. except Exception as e:
  298. loggers['Data_Split'].error('算法运行出错')
  299. loggers['Data_Split'].error(str(e))
  300. loggers['Data_Split'].error(traceback.format_exc())
  301. # #结果写入mysql
  302. # try:
  303. # df_res_new = pd.concat([df_res_new_soc, df_res_new_lw_soc]) #, res1
  304. # df_res_update=df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1
  305. # df_res_end = pd.concat([df_res_end_soc,df_res_end_lw_soc]) #, res2
  306. # df_res_new.reset_index(drop=True, inplace=True)
  307. # df_res_update.reset_index(drop=True, inplace=True)
  308. # df_res_end.reset_index(drop=True, inplace=True)
  309. # time_st = time.time()
  310. # session = mysql_algo_Session()
  311. # if not df_res_new.empty:
  312. # df_res_new['date_info'] = df_res_new['start_time']
  313. # df_res_new['create_time'] = datetime.now()
  314. # df_res_new['create_by'] = 'algo'
  315. # df_res_new['is_delete'] = 0
  316. # df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False)
  317. # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成')
  318. # if not df_res_end.empty:
  319. # df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
  320. # df_res_end=df_res_end.fillna(0)
  321. # for index in df_res_end.index:
  322. # df_t = df_res_end.loc[index:index]
  323. # sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn'
  324. # params = {'start_time': df_t['start_time'].values[0],
  325. # 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]}
  326. # session.execute(sql, params=params)
  327. # sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\
  328. # fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \
  329. # (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\
  330. # :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)'
  331. # params = {'date_info': datetime.now(),
  332. # 'start_time': df_t['start_time'].values[0],
  333. # 'end_time': df_t['end_time'].values[0],
  334. # 'sn': df_t['sn'].values[0],
  335. # 'imei': df_t['imei'].values[0],
  336. # 'model' :pack_code,
  337. # 'fault_level': df_t['fault_level'].values[0],
  338. # 'fault_code': df_t['fault_code'].values[0],
  339. # 'fault_info': df_t['fault_info'].values[0],
  340. # 'fault_reason': df_t['fault_reason'].values[0],
  341. # 'fault_advice': df_t['fault_advice'].values[0],
  342. # 'fault_location': df_t['fault_location'].values[0],
  343. # 'device_status': df_t['device_status'].values[0],
  344. # 'odo': df_t['odo'].values[0],
  345. # 'create_time': datetime.now(),
  346. # 'create_by': 'algo',
  347. # 'update_time': datetime.now(),
  348. # 'update_by': None,
  349. # 'is_delete': 0,
  350. # 'comment': None}
  351. # session.execute(sql, params=params)
  352. # session.commit()
  353. # logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成')
  354. # if not df_res_update.empty:
  355. # df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
  356. # df_res_update=df_res_update.fillna(0)
  357. # for index in df_res_update.index:
  358. # df_t = df_res_update.loc[index:index]
  359. # try:
  360. # # 更新数据
  361. # with mysql_algo_Session() as session:
  362. # session.execute(update(AlgoAllFaultInfoIng).where(
  363. # and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
  364. # (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
  365. # (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
  366. # values(fault_level=df_t['fault_level'].values[0],
  367. # comment=df_t['comment'].values[0]))
  368. # session.commit()
  369. # except Exception as e:
  370. # logger_main.error(f"process-{process_num}:结果入库出错")
  371. # logger_main.error(f"process-{process_num}:{e}")
  372. # logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  373. # finally:
  374. # session.close()
  375. # logger_main.info(f"process-{process_num}: 更新入库完成")
  376. # else:
  377. # logger_main.info(f"process-{process_num}: 无更新故障")
  378. # if not df_sor_add.empty:
  379. # time_record = time.time()
  380. # df_sor_rlt = df_sor_add#df_sor_rlt.append()
  381. # df_sor_rlt.reset_index(drop = True, inplace = True)
  382. # df_sor_rlt.to_sql("algo_mid_sorout",con=mysql_algo_conn, if_exists="append",index=False)
  383. # write_mysql_time = write_mysql_time + time.time()-time_record
  384. # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成')
  385. # logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
  386. # except Exception as e:
  387. # logger_main.error(f"process-{process_num}:结果入库出错")
  388. # logger_main.error(f"process-{process_num}:{e}")
  389. # logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  390. try:
  391. if not df_data_split_rlt.empty:
  392. time_record = time.time()
  393. df_data_split_rlt.reset_index(drop = True, inplace = True)
  394. #df_data_split_rlt.to_sql("algo_charge_info",con=mysql_algo_conn, if_exists="append",index=False)
  395. # write_mysql_time = write_mysql_time + time.time()-time_record
  396. logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成')
  397. logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
  398. except Exception as e:
  399. logger_main.error(f"process-{process_num}:数据分段结果入库出错")
  400. logger_main.error(f"process-{process_num}:{e}")
  401. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  402. except Exception as e:
  403. logger_main.error(f'process-{process_num}: {e}')
  404. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  405. print('本次计算耗时:'+str(time.time()-st_time))
  406. except Exception as e:
  407. logger_main.error(f'process-{process_num}: {e}')
  408. logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
  409. if __name__ == '__main__':
  410. # while(True):
  411. try:
  412. # 配置量
  413. cur_env = 'dev' # 设置运行环境
  414. app_path = "D:/ZLWORK/code/zlwl-algos/" # 设置app绝对路径
  415. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  416. app_name = "task_day_1_sxqtest" # 应用名
  417. sysUtils = SysUtils(cur_env, app_path)
  418. logger_main = sysUtils.get_logger(app_name, log_base_path)
  419. logger_main.info(f"本次主进程号: {os.getpid()}")
  420. main()
  421. # 读取配置文件 (该部分请不要修改)
  422. # processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  423. # pool = Pool(processes = int(processes))
  424. # logger_main.info("开始分配子进程")
  425. # for i in range(int(processes)):
  426. # pool.apply_async(main, (i, ))
  427. # pool.close()
  428. # logger_main.info("进程分配结束,堵塞主进程")
  429. # pool.join()
  430. except Exception as e:
  431. print(str(e))
  432. print(traceback.format_exc())
  433. logger_main.error(str(e))
  434. logger_main.error(traceback.format_exc())
  435. finally:
  436. handlers = logger_main.handlers.copy()
  437. for h in handlers:
  438. logger_main.removeHandler(h)
  439. # pool.terminate()