from datetime import datetime from multiprocessing import Pool import json import os import time import traceback import warnings from li_plted.V1_0_0.corepro_V1 import * #from keras.models import load_model import pickle from sqlalchemy import text, delete, and_, or_, update import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * from socdiag.V_1_0_0.SOCBatDiag import SocDiag from LowSocAlarm.V1_0_0.low_soc_alarm import Low_soc_alarm from SorCal.V_1_0_0.sorcal import sor_est from DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序 from DataSplit.V_1_0_0 import data_split as dt ##分段函数程序 from DataSplit.V_1_0_0 import data_drive_stat as ddt ##行驶数据按行驶段汇总统计 from DataSplit.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总 from DataSplit.V_1_0_0 import data_stand_stat as dst ##静置数据按静置段汇总 from DataSplit.V_1_0_0 import data_drive_stat_period as ddtp ##行驶数据按充电周期汇总统计 from DataSplit.V_1_0_0 import trans_day as trd ##解决跨天的问题 def update_param(db_engine, rc):# # 从redis中获取参数,如果redis中获取不到,则去数据库中获取 data = rc.get("algo_param_from_mysql:algo_adjustable_param") #data=pd.DataFrame() if pd.isnull(data): df_algo_adjustable_param = pd.read_sql("select id, algo_id, pack_code, param from algo_adjustable_param", db_engine) else: df_algo_adjustable_param = pd.DataFrame(json.loads(data)) data = rc.get("algo_param_from_mysql:algo_list")#pd.DataFrame() if pd.isnull(data): df_algo_list = pd.read_sql("select id, algo_id, algo_name, is_activate, global_param, fault_code, fault_influence from algo_list", db_engine) else: df_algo_list = pd.DataFrame(json.loads(data)) data = rc.get("algo_param_from_mysql:algo_pack_param") if pd.isnull(data): df_algo_pack_param = pd.read_sql("select id, pack_code, param from algo_pack_param", db_engine) else: df_algo_pack_param = pd.DataFrame(json.loads(data)) data = rc.get("algo_param_from_mysql:app_device") if pd.isnull(data): df_snpk_list = pd.read_sql("select sn, imei,pack_model,scrap_status from t_device", db_engine) df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4] else: df_snpk_list = pd.DataFrame(json.loads(data)) return df_algo_adjustable_param,df_algo_list,df_algo_pack_param,df_snpk_list def main(): #process_num # 程序不能停止 #while(True): try: process_num=1 warnings.filterwarnings("ignore") try: # 调用算法前的准备工作 kafka_topic_key = 'topic_task_day_1_sxqtest' kafka_groupid_key = 'group_id_task_day_1_sxqtest' algo_list = ['socdiag','low_soc_diag','Sor_Diag','Li_Plted','Data_Split'] # 本调度所包含的算法名列表。 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) mysql_algo_conn = mysql_algo_engine.connect() # kafka kafka_params = sysUtils.get_cf_param('kafka') kafkaUtils = KafkaUtils() kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key) #Hbase hbase_params = sysUtils.get_cf_param('hbase-datafactory')#hbase iotp_service = IotpAlgoService(hbase_params=hbase_params) #redis redis_params = sysUtils.get_cf_param('redis') reidsUtils = RedisUtils() rc = reidsUtils.get_redis_conncect(redis_params) except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') # 开始准备调度 logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度") # for message in kafka_consumer: path = 'D:/data/'#'/data/common/benchi/data/' sn_list = os.listdir(path) pack_code = 'CL3282A' df_algo_adjustable_param, df_algo_list, df_algo_pack_param, df_snpk_list= update_param(mysql_algo_conn,rc)# sql = "select * from algo_pack_param" df_algo_pack_param_all = pd.read_sql(sql, mysql_algo_conn) sql = "select * from algo_list" df_algo_param = pd.read_sql(sql, mysql_algo_conn) df_algo_pack_param = json.loads(df_algo_pack_param_all[df_algo_pack_param_all['pack_code'] == pack_code]['param'].iloc[0]) sql = f"select sn, imei from t_device where sn in {tuple(sn_list)}" df_snlist = pd.read_sql(sql, mysql_algo_conn) start_time_dt = pd.to_datetime('2022-01-01') end_time_dt = pd.to_datetime('2022-11-30') path = 'D:/data/'#'/data/common/benchi/data/' sn_list = os.listdir(path) sn_list=sn_list[10:20] for sn in sn_list: st_time=time.time() try: snpath = path + sn + '/' times = os.listdir(snpath) times = sorted(times) for i in range(0, len(times), 2): # 获取当前读取的4个文件 files_to_read = times[i:i+2] # 循环读取文件 df_data_all=pd.DataFrame() rd_data_st_time=time.time() for sntime in files_to_read: # 读取Excel文件 start_time = sntime.split('.')[0].split('_')[0] end_time = sntime.split('.')[0].split('_')[1] # 取数 time_st = time.time() logger_main.info(f"process-{process_num}: 开始取数{sn_list}") columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.other_temp_value, DataField.bms_sta] df_data_t = pd.read_pickle(snpath+sntime, compression='zip') if len(df_data_all): df_data_all=df_data_all.append(df_data_t) else: df_data_all=df_data_t df_data_all=df_data_all.reset_index(drop=True) print(sn+'_第'+str(i)+'次/共'+str(len(times))+'次取数耗时:'+str(time.time()-rd_data_st_time)+',共{}条数据'.format(str(len(df_data_t)))) if mysql_algo_conn.closed: mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接 # if mysql_algo_conn.closed: # mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接 # schedule_params = json.loads(message.value) # if (schedule_params is None) or (schedule_params ==''): # logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value))) # continue # # kafka 调度参数解析 # df_snlist = pd.DataFrame(schedule_params['snlist']) # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai']) # df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param']) # df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()} # df_algo_param = pd.DataFrame(schedule_params['algo_list']) # start_time = schedule_params['start_time'] # end_time = schedule_params['end_time'] # pack_code = schedule_params['pack_code'] # cell_type = schedule_params['cell_type'] # sn_list=df_snlist['sn'].tolist() # # 取数 # time_st = time.time() # logger_main.info(f"process-{process_num}: 开始取数{sn_list}") # columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc, # DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, # DataField.other_temp_value, DataField.bms_sta, DataField.charge_sta,DataField.latitude,DataField.longitude] # df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time) # logger_main.info(f'process-{process_num},获取到{len(df_data)}条数据,取数耗时:{time.time()-time_st}') # # 将字符串转换成datetime对象 # str_date = start_time # date_time =datetime.datetime.strptime(str_date, '%Y-%m-%d %H:%M:%S') # # 将datetime对象减去6小时 # new_date_time = date_time - datetime.timedelta(hours=8) # # 将datetime对象转换成字符串 # start_time_8h = new_date_time.strftime('%Y-%m-%d %H:%M:%S') # df_data_8h = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time_8h, end_time=start_time) # logger_main.info(f'process-{process_num},获取到{len(df_data_8h)}条数据,取数耗时:{time.time()-time_st}') # except Exception as e: # logger_main.error(f"process-{process_num}:获取原始数据出错") # logger_main.error(f"process-{process_num}:{e}") # logger_main.error(f"process-{process_num}:{traceback.format_exc()}") # continue # 数据清洗 try: time_st = time.time() logger_main.info(f'process-{process_num}数据清洗') #里程填充 df_data_all['mileage'] = df_data_all['mileage'].replace(0, np.nan).ffill() df_data_all['mileage'] = df_data_all['mileage'].replace(0, np.nan).bfill() df_data_all['mileage']=df_data_all['mileage']/1000 df_data, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data_all)#进行数据清洗 # df_data_8h, df_table_t, cellvolt_name_t, celltemp_name_t = iotp_service.datacleaning(df_algo_pack_param,df_data_8h)#进行数据清洗 print('洗数耗时:'+str(time.time()-time_st)) if len(df_data) == 0: logger_main.info(f"process-{process_num}: 数据清洗耗时{time.time()-time_st}, 无有效数据,跳过本次运算") continue else: logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成耗时{time.time()-time_st}") except Exception as e: logger_main.error(f"process-{process_num}:数据清洗出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue # mysql数据读取 try: time_st = time.time() logger_main.info(f'process-{process_num}开始读取mysql故障数据') if len(sn_list) == 1: sn_tuple = f"('{sn_list[0]}')" else: sn_tuple = tuple(sn_list) sql = "select * from algo_all_fault_info_ing where sn in {}".format(sn_tuple) #fault_code='{}' or fault_code='{}') and 'C599','C590', df_diag_ram = pd.read_sql(sql, mysql_algo_conn) sql = "select * from algo_ailipltd_result where sn in {}".format(sn_tuple) #fault_code='{}' or fault_code='{}') and 'C599','C590', Li_pltd_his = pd.read_sql(sql, mysql_algo_conn) logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}') except Exception as e: logger_main.error(f"process-{process_num}:读取redis出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue #算法1_SOC诊断调用 # try: # time_st = time.time() # loggers['socdiag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') # period = 24*60 #算法周期min # soc_diag = SocDiag(cell_type, df_algo_pack_param, df_algo_adjustable_param, df_algo_param, end_time, period, pack_code, df_snlist, df_data) # df_res_new_C109, df_res_end_C109= soc_diag.soc_block(df_diag_ram) # df_res_end_C107 = soc_diag.soc_jump() # df_res_new_soc = df_res_new_C109 # df_res_end_soc = pd.concat([df_res_end_C107, df_res_end_C109]) # loggers['socdiag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') # except Exception as e: # loggers['socdiag'].error('算法运行出错') # loggers['socdiag'].error(str(e)) # loggers['socdiag'].error(traceback.format_exc()) # df_res_end_soc=pd.DataFrame() # df_res_new_soc=pd.DataFrame() # # 算法2_低电量调用 try: time_st = time.time() loggers['low_soc_diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') low_soc_warning = Low_soc_alarm(df_data,cellvolt_name) df_res_new_lw_soc, df_res_update_lw_soc,df_res_end_lw_soc= low_soc_warning.diag(df_algo_pack_param,df_algo_param,df_algo_adjustable_param,df_data,df_table,df_diag_ram,df_snlist) start_time # month = date.month # day = date.day df_res_new_lw_soc.to_excel('lowsoc_sn_date_{}_{}.xlsx'.format(sn,start_time[0:10])) loggers['low_soc_diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') except Exception as e: loggers['low_soc_diag'].error('算法运行出错') loggers['low_soc_diag'].error(str(e)) loggers['low_soc_diag'].error(traceback.format_exc()) df_res_new_lw_soc=pd.DataFrame() df_res_update_lw_soc=pd.DataFrame() df_res_end_lw_soc=pd.DataFrame() # # 算法3_SOR计算调用 # try: # time_st = time.time() # loggers['Sor_Diag'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') # Diagsor_temp = sor_est(df_data, df_algo_pack_param)#计算内阻 # df_sor_add = Diagsor_temp.sor_cal() # loggers['Sor_Diag'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') # except Exception as e: # loggers['Sor_Diag'].error('算法运行出错') # loggers['Sor_Diag'].error(str(e)) # loggers['Sor_Diag'].error(traceback.format_exc()) # df_sor_add=pd.DataFrame() # #算法4_析锂计算调用 # try: # time_st = time.time() # loggers['Li_Plted'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') # pkl_path='li_plted/V1_0_0/scaler.pkl' # md_path='li_plted/V1_0_0/model.h5' # scaler=pickle.load(open(pkl_path,'rb')) #读取标准化参数 # model=load_model(md_path) #读取模型参数 # data_set=df_data.groupby('sn').apply(prediction,scaler,model,cellvolt_name) # if not data_set.empty: # df_result=data_set.groupby('sn').apply(out_final,Li_pltd_his,df_algo_param,df_algo_pack_param) # # if not df_result.empty: # # df_res_lipltdchange,df_res_lipltd_new = zip(*df_result.groupby('sn').apply(alarme_final,Li_pltd_his,df_algo_pack_param)) # loggers['Li_Plted'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') # except Exception as e: # loggers['Li_Plted'].error('算法运行出错') # loggers['Li_Plted'].error(str(e)) # loggers['Li_Plted'].error(traceback.format_exc()) # df_res_lipltdchange=pd.DataFrame() # df_res_lipltd_new=pd.DataFrame() #算法5_日数据分段 try: cal_time=time.time() loggers['Data_Split'].info(f'开始执行算法{pack_code}, time:{start_time}~{end_time},\n sn_list:{sn_list}') df_merge=ds.data_status(df_data,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300) ##基于各个状态码,进行分段,分段函数 df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0) # date = datetime.datetime.strptime(start_time, '%Y-%m-%d-%H-%M-%S') # 获取datetime对象的月份 # month = date.month # day = date.day print('计算耗时:'+str(time.time()-cal_time)) df_data_split_rlt.to_excel('data_split_sn_date_{}_{}.xlsx'.format(sn,start_time[0:10])) loggers['Data_Split'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') except Exception as e: loggers['Data_Split'].error('算法运行出错') loggers['Data_Split'].error(str(e)) loggers['Data_Split'].error(traceback.format_exc()) # #结果写入mysql # try: # df_res_new = pd.concat([df_res_new_soc, df_res_new_lw_soc]) #, res1 # df_res_update=df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1 # df_res_end = pd.concat([df_res_end_soc,df_res_end_lw_soc]) #, res2 # df_res_new.reset_index(drop=True, inplace=True) # df_res_update.reset_index(drop=True, inplace=True) # df_res_end.reset_index(drop=True, inplace=True) # time_st = time.time() # session = mysql_algo_Session() # if not df_res_new.empty: # df_res_new['date_info'] = df_res_new['start_time'] # df_res_new['create_time'] = datetime.now() # df_res_new['create_by'] = 'algo' # df_res_new['is_delete'] = 0 # df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False) # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成') # if not df_res_end.empty: # df_res_end=df_res_end.where(pd.notnull(df_res_end),None) # df_res_end=df_res_end.fillna(0) # for index in df_res_end.index: # df_t = df_res_end.loc[index:index] # sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn' # params = {'start_time': df_t['start_time'].values[0], # 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]} # session.execute(sql, params=params) # sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\ # fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \ # (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\ # :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)' # params = {'date_info': datetime.now(), # 'start_time': df_t['start_time'].values[0], # 'end_time': df_t['end_time'].values[0], # 'sn': df_t['sn'].values[0], # 'imei': df_t['imei'].values[0], # 'model' :pack_code, # 'fault_level': df_t['fault_level'].values[0], # 'fault_code': df_t['fault_code'].values[0], # 'fault_info': df_t['fault_info'].values[0], # 'fault_reason': df_t['fault_reason'].values[0], # 'fault_advice': df_t['fault_advice'].values[0], # 'fault_location': df_t['fault_location'].values[0], # 'device_status': df_t['device_status'].values[0], # 'odo': df_t['odo'].values[0], # 'create_time': datetime.now(), # 'create_by': 'algo', # 'update_time': datetime.now(), # 'update_by': None, # 'is_delete': 0, # 'comment': None} # session.execute(sql, params=params) # session.commit() # logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成') # if not df_res_update.empty: # df_res_update=df_res_update.where(pd.notnull(df_res_update),None) # df_res_update=df_res_update.fillna(0) # for index in df_res_update.index: # df_t = df_res_update.loc[index:index] # try: # # 更新数据 # with mysql_algo_Session() as session: # session.execute(update(AlgoAllFaultInfoIng).where( # and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]), # (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]), # (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))). # values(fault_level=df_t['fault_level'].values[0], # comment=df_t['comment'].values[0])) # session.commit() # except Exception as e: # logger_main.error(f"process-{process_num}:结果入库出错") # logger_main.error(f"process-{process_num}:{e}") # logger_main.error(f"process-{process_num}:{traceback.format_exc()}") # finally: # session.close() # logger_main.info(f"process-{process_num}: 更新入库完成") # else: # logger_main.info(f"process-{process_num}: 无更新故障") # if not df_sor_add.empty: # time_record = time.time() # df_sor_rlt = df_sor_add#df_sor_rlt.append() # df_sor_rlt.reset_index(drop = True, inplace = True) # df_sor_rlt.to_sql("algo_mid_sorout",con=mysql_algo_conn, if_exists="append",index=False) # write_mysql_time = write_mysql_time + time.time()-time_record # logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成') # logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}") # except Exception as e: # logger_main.error(f"process-{process_num}:结果入库出错") # logger_main.error(f"process-{process_num}:{e}") # logger_main.error(f"process-{process_num}:{traceback.format_exc()}") try: if not df_data_split_rlt.empty: time_record = time.time() df_data_split_rlt.reset_index(drop = True, inplace = True) #df_data_split_rlt.to_sql("algo_charge_info",con=mysql_algo_conn, if_exists="append",index=False) # write_mysql_time = write_mysql_time + time.time()-time_record logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成') logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}") except Exception as e: logger_main.error(f"process-{process_num}:数据分段结果入库出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') print('本次计算耗时:'+str(time.time()-st_time)) except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') if __name__ == '__main__': # while(True): try: # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "D:/ZLWORK/code/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_day_1_sxqtest" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") main() # 读取配置文件 (该部分请不要修改) # processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 # pool = Pool(processes = int(processes)) # logger_main.info("开始分配子进程") # for i in range(int(processes)): # pool.apply_async(main, (i, )) # pool.close() # logger_main.info("进程分配结束,堵塞主进程") # pool.join() except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) finally: handlers = logger_main.handlers.copy() for h in handlers: logger_main.removeHandler(h) # pool.terminate()