import pandas as pd import time import datetime import traceback from ZlwlAlgos.PERIODIC.task_min_5.group_1.safetyalarm.V_1_0_0.CBMSSafetyAlarm import SafetyAlarm from ZlwlAlgos.PERIODIC.task_min_10.group_1.FaultWarning.V_1_0_0 import CBMSBatDiag, CBMSBatDiag_SOCCrntIsnV1, CBMSBatDiag_TempV4 from ZlwlAlgos.PERIODIC.task_hour_1.group_2.safety_score.V1_0_0 import Safety_Score from ZlwlAlgos.PERIODIC.task_hour_6.group_1.intershort.V_1_0_0.CBMSBatInterShort import BatInterShort from ZlwlAlgos.PERIODIC.task_hour_6.group_1.uniform.V_1_0_0.CBMSBatUniform import CellUniform from ZlwlAlgos.PERIODIC.task_hour_6.group_1.safetywarning.V_1_0_0.CBMSSafetyWarning import SafetyWarning from ZlwlAlgos.PERIODIC.task_day_1.group_1.socdiag.V_1_0_0.SOCBatDiag import SocDiag from ZlwlAlgos.PERIODIC.task_day_1.group_1.LowSocAlarm.V1_0_0.low_soc_alarm import Low_soc_alarm from ZlwlAlgos.PERIODIC.task_day_1.group_1.SorCal.V_1_0_0.sorcal import sor_est from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_split as dt ##分段函数程序 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_drive_stat as ddt ##行驶数据按行驶段汇总统计 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_stand_stat as dst ##静置数据按静置段汇总 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_drive_stat_period as ddtp ##行驶数据按充电周期汇总统计 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import trans_day as trd ##解决跨天的问题 from ZlwlAlgos.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import stand_status as ss from ZlwlAlgos.PERIODIC.task_day_7.group_1.soh.V_1_0_0.CBMSBatSoh import BatSoh import lib.glo as glo class AlgosInvoke(): def __init__(self, logger, df_algo_param, df_table, cellvolt_name, celltemp_name, pack_code, df_snlist, df_algo_adjustable_param, df_algo_pack_param, cell_type): self.logger = logger self.df_algo_param = df_algo_param self.df_table = df_table self.cellvolt_name = cellvolt_name self.cellvolt_name = cellvolt_name self.celltemp_name = celltemp_name self.pack_code = pack_code self.df_snlist = df_snlist self.df_algo_adjustable_param = df_algo_adjustable_param self.df_algo_pack_param = df_algo_pack_param self.df_algo_pack_param_eval = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()} self.cell_type = cell_type def _process_ing_done(self, df_all_fault_ing, df_all_fault_done, df_res_new, df_res_end, df_res_update=None, update_flag=False): # 处理新增 df_all_fault_ing = pd.concat([df_all_fault_ing, df_res_new]) df_all_fault_ing.reset_index(drop=True, inplace=True) # 处理update if update_flag: upd_list = df_res_update.to_dict("records") for u in upd_list: index1=df_all_fault_ing[(df_all_fault_ing['sn']==u['sn']) & (df_all_fault_ing['start_time']==u['start_time']) & (df_all_fault_ing['fault_code']==u['fault_code'])].index[0] df_all_fault_ing.drop(index1, inplace=True) index2=df_res_update[(df_res_update['sn']==u['sn']) & (df_res_update['start_time']==u['start_time']) & (df_res_update['fault_code']==u['fault_code'])].index[0] df_all_fault_ing = pd.concat([df_all_fault_ing, df_res_update.loc[index2:index2]]) df_all_fault_ing.reset_index(drop=True, inplace=True) # 处理delete del_list = df_res_end.to_dict("records") for d in del_list: index1=df_all_fault_ing[(df_all_fault_ing['sn']==d['sn']) & (df_all_fault_ing['start_time']==d['start_time']) & (df_all_fault_ing['fault_code']==d['fault_code'])].index[0] df_all_fault_ing.drop(index1, inplace=True) index2=df_res_end[(df_res_end['sn']==d['sn']) & (df_res_end['start_time']==d['start_time']) & (df_res_end['fault_code']==d['fault_code'])].index[0] df_all_fault_done = pd.concat([df_all_fault_done, df_res_end.loc[index2:index2]]) df_all_fault_ing.reset_index(drop=True, inplace=True) df_all_fault_done.reset_index(drop=True, inplace=True) return df_all_fault_ing, df_all_fault_done def safety_alarm(self, df_data, df_all_fault_ing, df_all_fault_done): try: algo_name = 'safety_alarm' self.logger.info(f"开始执行{algo_name}算法") # 变量定义 df_bms_ram = pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp', 'packsoc','packcrnt']) df_alarm_ram = pd.DataFrame(columns=['time','sn','safetywarning1','safetywarning2']) df_diag_ram = pd.DataFrame(columns=['start_time','end_time','sn','imei','model','fault_level','fault_code','fault_info','fault_reason','fault_advice','fault_location','device_status','odo']) # 执行算法 safety_alarm = SafetyAlarm(df_data, df_diag_ram, df_bms_ram, df_alarm_ram, self.df_algo_param, self.df_table, self.cellvolt_name, self.celltemp_name, self.pack_code, self.df_snlist) df_res_new, df_res_end, df_bms_ram, df_alarm_ram = safety_alarm.safety_alarm_diag() # 处理结果 df_all_fault_ing, df_all_fault_done = self._process_ing_done(df_all_fault_ing, df_all_fault_done, df_res_new, df_res_end) self.logger.info(f"{algo_name}算法执行完成") except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_all_fault_ing, df_all_fault_done def fault_warning(self, df_data, df_all_fault_ing, df_all_fault_done): algo_name = 'fault_warning' self.logger.info(f"开始执行{algo_name}算法") #电芯电压温度故障诊断 try: df_res_new_volt=pd.DataFrame() df_res_update_volt=pd.DataFrame() df_res_end_volt=pd.DataFrame() FaultDiagVolt = CBMSBatDiag.BatDiagVolt(df_data, self.df_table, self.cellvolt_name, self.celltemp_name, self.cell_type) df_res_new_volt, df_res_update_volt, df_res_end_volt = FaultDiagVolt.diag(df_all_fault_ing, self.df_algo_adjustable_param, self.df_algo_pack_param, self.df_algo_param, self.df_snlist)#df_soh, except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) try: df_res_new_crnt=pd.DataFrame() df_res_update_crnt=pd.DataFrame() df_res_end_crnt=pd.DataFrame() FaultDiagTemp = CBMSBatDiag_SOCCrntIsnV1.BatDiag_SOCCrntIsn(df_data, self.df_table, self.cellvolt_name, self.celltemp_name) df_res_new_crnt, df_res_update_crnt, df_res_end_crnt = FaultDiagTemp.diag(df_all_fault_ing, self.df_algo_adjustable_param, self.df_algo_pack_param, self.df_algo_param, self.df_snlist) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) try: df_res_new_temp=pd.DataFrame() df_res_update_temp=pd.DataFrame() df_res_end_Temp=pd.DataFrame() FaultDiagTemp = CBMSBatDiag_TempV4.BatDiag(df_data, self.df_table, self.celltemp_name, self.cellvolt_name, self.pack_code) df_res_new_temp, df_res_update_temp, df_res_end_Temp = FaultDiagTemp.diag(df_all_fault_ing, self.df_algo_adjustable_param, self.df_algo_pack_param, self.df_algo_param, self.df_snlist) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) try: df_res_new = pd.concat([df_res_new_volt, df_res_new_crnt, df_res_new_temp]) #, res1 df_res_update=pd.concat([df_res_update_volt,df_res_update_crnt, df_res_update_temp]) #, res1 df_res_end = pd.concat([df_res_end_volt,df_res_end_crnt, df_res_end_Temp]) #, res2 df_res_new.reset_index(drop=True, inplace=True) df_res_update.reset_index(drop=True, inplace=True) df_res_end.reset_index(drop=True, inplace=True) # 处理结果 df_all_fault_ing, df_all_fault_done = self._process_ing_done(df_all_fault_ing, df_all_fault_done, df_res_new, df_res_end, df_res_update, update_flag=True) self.logger.info(f"{algo_name}算法执行完成") except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_all_fault_ing, df_all_fault_done def safety_score(self, df_r_risk_model, df_r_risk_list, df_dept, df_allfltinfoing, df_allfltinfodone, organ_code): try: algo_name = 'safety_score' self.logger.info(f"开始执行{algo_name}算法") df_safety_score_h = pd.DataFrame() df_safety_score_d = pd.DataFrame() df_safety_core_hour = pd.DataFrame(columns=["create_time", "sn", "score_h", "risk_index"]) SafetyScole=Safety_Score.SafetyScore(self.df_snlist, df_allfltinfoing, df_allfltinfodone, df_safety_core_hour, self.df_algo_param, df_r_risk_model, df_r_risk_list, df_dept, organ_code) df_safety_score_h, df_safety_score_d=SafetyScole.safety_score() self.logger.info(f"{algo_name}算法执行完成") except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_safety_score_h, df_safety_score_d def bat_intershort_cell_uniform_safety_warning(self, df_data, df_all_fault_ing, df_all_fault_done, df_soh): try: algo_name = 'bat_intershort_cell_uniform_safety_warning' self.logger.info(f"开始执行{algo_name}算法") df_all_short = pd.DataFrame() df_all_uniform = pd.DataFrame() BatShort = BatInterShort(self.cell_type, self.df_algo_pack_param, self.celltemp_name, self.cellvolt_name) BatUniform = CellUniform(self.cell_type, self.df_algo_pack_param, self.celltemp_name, self.cellvolt_name) BatWarning = SafetyWarning(self.df_algo_adjustable_param, self.df_algo_param, self.cellvolt_name, self.pack_code, self.df_snlist) for sn, df in df_data.groupby('sn'): df_short, df_ram_res, df_ram_res1, df_ram_res2, df_ram_res3, df_lfp_ram, df_bal_ram=BatShort.intershort( sn, df, df_soh, pd.DataFrame(columns=['sn', 'time', 'deltsoc', 'cellsoc']), pd.DataFrame(columns=['sn', 'time1', 'deltsoc1']), pd.DataFrame(columns=['sn', 'time2', 'delt_volt', 'packcrnt_avg', 'temp_avg']), pd.DataFrame(columns=['sn','time3','standingtime','standingtime1','standingtime2','standingtime3']), [], pd.DataFrame(columns=['sn', 'time', 'bal_time', 'bal_time1', 'bal_time2'])) df_uniform, df_ram_res3=BatUniform.batuniform(sn, df, df_ram_res3) df_new, df_update, df_end = BatWarning.warning_diag(sn, df, df_short, df_uniform, pd.DataFrame(columns=['start_time','end_time','sn','imei','model','fault_level','fault_code','fault_info','fault_reason','fault_advice','fault_location','device_status','odo'])) df_all_short = pd.concat([df_all_short, df_short]) df_all_uniform = pd.concat([df_all_uniform, df_uniform]) # 处理结果 df_all_fault_ing, df_all_fault_done = self._process_ing_done(df_all_fault_ing, df_all_fault_done, df_new, df_end, df_update, update_flag=True) self.logger.info(f"{algo_name}算法执行完成") except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_all_short, df_all_uniform, df_all_fault_ing, df_all_fault_done def soc_diag(self, df_data, df_all_fault_ing, df_all_fault_done): try: algo_name = 'soc_diag' self.logger.info(f"开始执行{algo_name}算法") df_diag_ram = df_all_fault_ing[df_all_fault_ing['sn'].isin(self.df_snlist['sn'].tolist())] period = 24*60 #算法周期min end_time = str(df_data['time'].tolist()[-1]) soc_diag = SocDiag(self.cell_type, self.df_algo_pack_param_eval, self.df_algo_adjustable_param, self.df_algo_param, end_time, period, self.pack_code, self.df_snlist, df_data) df_res_new_C109, df_res_end_C109= soc_diag.soc_block(df_diag_ram) df_res_end_C107 = soc_diag.soc_jump() df_res_new_soc = df_res_new_C109 df_res_end_soc = pd.concat([df_res_end_C107, df_res_end_C109]) df_all_fault_ing, df_all_fault_done = self._process_ing_done(df_all_fault_ing, df_all_fault_done, df_res_new_soc, df_res_end_soc) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_all_fault_ing, df_all_fault_done def low_soc(self, df_data, df_all_fault_ing, df_all_fault_done): try: algo_name = 'low_soc' self.logger.info(f"开始执行{algo_name}算法") df_diag_ram = df_all_fault_ing[df_all_fault_ing['sn'].isin(self.df_snlist['sn'].tolist())] low_soc_warning = Low_soc_alarm(df_data, self.cellvolt_name) df_res_new_lw_soc, df_res_update_lw_soc,df_res_end_lw_soc= low_soc_warning.diag( self.df_algo_pack_param_eval, self.df_algo_param, self.df_algo_adjustable_param, df_data, self.df_table, df_diag_ram, self.df_snlist) df_all_fault_ing, df_all_fault_done = self._process_ing_done(df_all_fault_ing, df_all_fault_done, df_res_new_lw_soc, df_res_end_lw_soc, df_res_update_lw_soc, True) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_all_fault_ing, df_all_fault_done def sor(self, df_data): try: algo_name = 'sor' self.logger.info(f"开始执行{algo_name}算法") Diagsor_temp = sor_est(df_data, self.df_algo_pack_param)#计算内阻 df_sor_add = Diagsor_temp.sor_cal() df_sor_add.reset_index(drop = True, inplace = True) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_sor_add # def li_plted(self, df_data): # try: # algo_name = 'li_plted' # self.logger.info(f"开始执行{algo_name}算法") # Diagsor_temp = sor_est(df_data, self.df_algo_pack_param)#计算内阻 # df_sor_add = Diagsor_temp.sor_cal() # except Exception as e: # self.logger.error(str(e)) # self.logger.error(traceback.format_exc()) def data_split(self, df_merge): try: algo_name = 'data_split' self.logger.info(f"开始执行{algo_name}算法") df_merge=ds.data_status(df_merge,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300) ##基于各个状态码,进行分段,分段函数 df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge, self.celltemp_name, self.cellvolt_name, drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0) df_data_split_rlt.reset_index(drop = True, inplace = True) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_data_split_rlt def soh(self, df_data): try: algo_name = 'soh' self.logger.info(f"开始执行{algo_name}算法") df_ram_soh_result = pd.DataFrame(columns=['time_st','time_sp','sn','method','soh','soh_real','cellsoh_diff','cellsoh','odo']) bat_soh = BatSoh(self.cell_type, df_data, df_ram_soh_result, self.df_algo_pack_param_eval, self.cellvolt_name, self.celltemp_name, self.pack_code) df_soh_res = bat_soh.batsoh(self.df_snlist) df_soh_res.index = list(range(len(df_soh_res))) except Exception as e: self.logger.error(str(e)) self.logger.error(traceback.format_exc()) return df_soh_res