from datetime import datetime, timedelta from multiprocessing import Pool import json import os import time import traceback import warnings import sys from sqlalchemy import text, delete, and_, or_, update import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * from socdiag.V_1_0_0.SOCBatDiag_test import SocDiag import matplotlib.pyplot as plt # from LowSocAlarm.V1_0_0.low_soc_alarm import Low_soc_alarm def main(process_num): # 程序不能停止 if (True): warnings.filterwarnings("ignore") try: # 调用算法前的准备工作 kafka_topic_key = 'topic_test_cez' kafka_groupid_key = 'group_id_test_cez' algo_list = ['socdiag'] # 本调度所包含的算法名列表。 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) mysql_algo_conn = mysql_algo_engine.connect() mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp') mysqlUtils = MysqlUtils() mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data) mysql_iotp_conn = mysql_iotp_engine.connect() # # kafka # kafka_params = sysUtils.get_cf_param('kafka') # kafkaUtils = KafkaUtils() # kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key) # #Hbase hbase_params = sysUtils.get_cf_param('hbase-datafactory') iotp_service = IotpAlgoService(hbase_params=hbase_params) #redis redis_params = sysUtils.get_cf_param('redis') reidsUtils = RedisUtils() rc = reidsUtils.get_redis_conncect(redis_params) except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') # 开始准备调度 # logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度") logger_main.info(f"process-{process_num}: 开始循环") path = '/data/common/benchi/data/' sn_list = ['LY9139BB4MALBZ795', 'LY9139BB8MALBZ329', 'LY9F49BCXMALBZ876', 'LY9F49BC7MALBZ883'] pack_code = 'CL3282A' start_time_dts = [pd.to_datetime('2022-07-30 20:00:09'), pd.to_datetime('2022-01-11 14:00:09'), pd.to_datetime('2022-09-07 11:00:09'), pd.to_datetime('2022-01-29 08:00:09')] end_time_dts = [pd.to_datetime('2022-07-30 22:00:09'), pd.to_datetime('2022-01-11 15:00:09'), pd.to_datetime('2022-09-07 12:00:09'), pd.to_datetime('2022-01-29 10:00:09')] sql = "select * from algo_pack_param" df_algo_pack_param_all = pd.read_sql(sql, mysql_algo_conn) sql = "select * from algo_list" df_algo_param = pd.read_sql(sql, mysql_algo_conn) df_algo_pack_param = json.loads(df_algo_pack_param_all[df_algo_pack_param_all['pack_code'] == pack_code]['param'].iloc[0]) sql = f"select sn, imei from t_device where sn in {tuple(sn_list)}" df_snlist = pd.read_sql(sql, mysql_algo_conn) for sn, start_time_dt, end_time_dt in zip(sn_list, start_time_dts, end_time_dts): try: logger_main.info(f'收到{sn}调度') if mysql_algo_conn.close: mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接 try: # df_snlist = [sn] # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai']) # df_algo_param = pd.DataFrame(schedule_params['algo_list']) cell_type = 'CLL027A' sn_list=[sn] start_time = start_time_dt.strftime('%Y-%m-%d %H:%M:%S') end_time = end_time_dt.strftime('%Y-%m-%d %H:%M:%S') # 取数 time_st = time.time() logger_main.info(f"process-{process_num}: 开始取数{sn_list}") columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.other_temp_value, DataField.bms_sta] df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time) logger_main.info(f'process-{process_num},从{start_time}到{end_time}获取到{len(df_data)}条数据,取数耗时:{time.time()-time_st}') except Exception as e: logger_main.error(f"process-{process_num}:{sn}获取原始数据出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue # 数据清洗 try: time_st = time.time() logger_main.info(f'process-{process_num}数据清洗') df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗 if len(df_data) == 0: logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算") continue else: logger_main.info(f"process-{process_num}: {sn}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}") except Exception as e: logger_main.error(f"process-{process_num}:{sn}数据清洗出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") # # 数据画图 # plt.figure(figsize=(30, 6)) # ax1 = plt.subplot(1,2,1) # ax1.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_soc']), s=2, color='b', label='SOC') # ax1.set_xlabel('Date', fontsize=16) # ax1.set_ylabel('SOC%', fontsize=16) # ax1.set_title(f'sn:{sn}', fontsize=16) # ax2 = ax1.twinx() # ax2.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_volt']), color='r', s=1, label='PackVoltage') # ax2.set_ylabel('PackVoltage', fontsize=16) # ax1.legend(loc = (.75,.13), fontsize=14) # ax2.legend( loc = (.75, .05), fontsize=14) # ax3 = plt.subplot(1,2,2) # ax3.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_soc']), s=2, color='b', label='SOC') # ax3.set_xlabel('Date', fontsize=16) # ax3.set_ylabel('SOC%', fontsize=16) # ax4 = ax3.twinx() # ax4.scatter(list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['time']), list(df_data[(df_data['pack_soc'] > 0) & (df_data['pack_soc'] < 100)]['pack_crnt']), color='r', s=1, label='PackCrnt') # ax4.set_ylabel('PackCrnt', fontsize=16) # ax3.set_title(f'start time: {start_time} end time: {end_time}', fontsize=16) # ax3.legend(loc = (.75,.13), fontsize=14) # ax4.legend( loc = (.75, .05), fontsize=14) # st = start_time_dt.strftime('%Y%m%d%H%M%S') # et = end_time_dt.strftime('%Y%m%d%H%M%S') # plt.savefig(f'./{sn}_{st}_{et}.jpg') # os._exit() # continue # mysql数据读取 try: time_st = time.time() logger_main.info(f'process-{process_num}开始读取mysql故障数据') if len(sn_list) == 1: sql = "select * from algo_all_fault_info_ing where sn = '{}'".format(sn_list[0]) else: sql = "select * from algo_all_fault_info_ing where sn in {}".format(tuple(sn_list)) #fault_code='{}' or fault_code='{}') and 'C599','C590', df_diag_ram = pd.read_sql(sql, mysql_algo_conn) logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}') except Exception as e: logger_main.error(f"process-{process_num}:{sn}读取redis出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue # 算法调用 try: time_st = time.time() loggers['socdiag'].info(f'开始执行算法{sn}, time:{start_time}~{end_time},\n sn_list:{sn_list}') period = 24*60 #算法周期min first = 1 for t in pd.date_range(start_time_dt, end_time_dt, freq=timedelta(minutes=period)): start = t end = start + timedelta(minutes=period) df_data_range = df_data[(df_data['time'] >= start) & (df_data['time'] < end)] if len(df_data_range) == 0: continue soc_diag = SocDiag(cell_type, df_algo_pack_param, df_algo_param, end.strftime('%Y-%m-%d %H:%M:%S'), period, pack_code, df_snlist, df_data_range) df_res_end_C107= soc_diag.soc_jump() df_res_new_C109, df_res_end_C109 = soc_diag.soc_block(df_diag_ram) df_res_new_soc_1 = df_res_new_C109 df_res_end_soc_1 = pd.concat([df_res_end_C107, df_res_end_C109]) if first: df_res_new_soc = df_res_new_soc_1 df_res_end_soc = df_res_end_soc_1 first = 0 else: df_res_new_soc = pd.concat([df_res_new_soc, df_res_new_soc_1]) df_res_end_soc = pd.concat([df_res_end_soc, df_res_end_soc_1]) loggers['socdiag'].info(f'算法运行完成{sn},算法耗时{time.time()-time_st}') except Exception as e: loggers['socdiag'].error('{}算法运行出错'.format(pack_code)) loggers['socdiag'].error(str(e)) loggers['socdiag'].error(traceback.format_exc()) continue # # 算法调用 # # try: # # time_st = time.time() # # # loggers['low_soc_diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') # # # low_soc_warning = Low_soc_alarm(df_data,cellvolt_name) # # # df_res_new_lw_soc, df_res_update_lw_soc,df_res_end_lw_soc= low_soc_warning.diag(df_algo_pack_param,df_algo_param,df_algo_adjustable_param,df_data,df_table,df_diag_ram,df_snlist) # # loggers['low_soc_diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') # # except Exception as e: # # loggers['low_soc_diag'].error('{}算法运行出错'.format(pack_code)) # # loggers['low_soc_diag'].error(str(e)) # # loggers['low_soc_diag'].error(traceback.format_exc()) # # continue # df_res_new = df_res_new_soc #, res1 # # df_res_update=df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1 # df_res_end = df_res_end_soc #, res2 # df_res_new.reset_index(drop=True, inplace=True) # # df_res_update.reset_index(drop=True, inplace=True) # df_res_end.reset_index(drop=True, inplace=True) # #结果写入mysql # try: # time_st = time.time() # session = mysql_algo_Session() # if not df_res_new.empty: # df_res_new['date_info'] = df_res_new['start_time'] # df_res_new['create_time'] = datetime.now() # df_res_new['create_by'] = 'algo' # df_res_new['is_delete'] = 0 # df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False) # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成') # if not df_res_end.empty: # df_res_end=df_res_end.where(pd.notnull(df_res_end),None) # df_res_end=df_res_end.fillna(0) # for index in df_res_end.index: # df_t = df_res_end.loc[index:index] # sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn' # params = {'start_time': df_t['start_time'].values[0], # 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]} # session.execute(sql, params=params) # sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\ # fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \ # (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\ # :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)' # params = {'date_info': datetime.now(), # 'start_time': df_t['start_time'].values[0], # 'end_time': df_t['end_time'].values[0], # 'sn': df_t['sn'].values[0], # 'imei': df_t['imei'].values[0], # 'model' :pack_code, # 'fault_level': df_t['fault_level'].values[0], # 'fault_code': df_t['fault_code'].values[0], # 'fault_info': df_t['fault_info'].values[0], # 'fault_reason': df_t['fault_reason'].values[0], # 'fault_advice': df_t['fault_advice'].values[0], # 'fault_location': df_t['fault_location'].values[0], # 'device_status': df_t['device_status'].values[0], # 'odo': df_t['odo'].values[0], # 'create_time': datetime.now(), # 'create_by': 'algo', # 'update_time': datetime.now(), # 'update_by': None, # 'is_delete': 0, # 'comment': None} # session.execute(sql, params=params) # session.commit() # logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成') # # if not df_res_update.empty: # # df_res_update=df_res_update.where(pd.notnull(df_res_update),None) # # df_res_update=df_res_update.fillna(0) # # for index in df_res_update.index: # # df_t = df_res_update.loc[index:index] # # try: # # # 更新数据 # # with mysql_algo_Session() as session: # # session.execute(update(AlgoAllFaultInfoIng).where( # # and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]), # # (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]), # # (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))). # # values(fault_level=df_t['fault_level'].values[0], # # comment=df_t['comment'].values[0])) # # session.commit() # # except Exception as e: # # logger_main.error(f"process-{process_num}:{pack_code}结果入库出错") # # logger_main.error(f"process-{process_num}:{e}") # # logger_main.error(f"process-{process_num}:{traceback.format_exc()}") # # finally: # # session.close() # # logger_main.info(f"process-{process_num}: 更新入库完成") # # else: # # logger_main.info(f"process-{process_num}: 无更新故障") # logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}") # except Exception as e: # logger_main.error(f"process-{process_num}:{sn}结果入库出错") # logger_main.error(f"process-{process_num}:{e}") # logger_main.error(f"process-{process_num}:{traceback.format_exc()}") finally: pass if __name__ == '__main__': if (True): try: # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "/home/chenenze/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_day_1_test_offline_test" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(main, (i, )) pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join() except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) finally: handlers = logger_main.handlers.copy() for h in handlers: logger_main.removeHandler(h) pool.terminate()