import pandas as pd import numpy as np import datetime import time from matplotlib import pyplot as plt import pymannkendall as mk from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class SafetyWarning: def __init__(self,sn,celltype,df_short,df_uniform,OutLineVol_Rate,df_soh,df_fault_ram_sn): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_short=df_short self.df_uniform=df_uniform self.OutLineVol_Rate=OutLineVol_Rate self.df_soh=df_soh self.df_alarm_ram=df_fault_ram_sn.copy() def diag(self): if self.celltype<=50: df_res=self._warning_diag() return df_res else: df_res=self._warning_diag() return df_res #电池热安全预警诊断功能................................................................................................. def _warning_diag(self): df_res=pd.DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice']) time_now=datetime.datetime.now() time_now=time_now.strftime('%Y-%m-%d %H:%M:%S') time_sp='0000-00-00 00:00:00' #参数初始化....................................... voltsigmafault=0 uniformfault=0 cellshortfault=0 cellshortfault=[] volt_rate=[] R2_list=[] voltsigmafault_list=[] uniformfault_list=[] mk_trend_list=[] mk_p_list=[] mk_z_list=[] mk_Tau_list=[] mk_slope_list=[] mk_s_list=[] mk_svar_list=[] if not self.df_short.empty: short_current=self.df_short['short_current'] short_current=short_current.str.replace("[", '') short_current=short_current.str.replace("]", '') if not self.OutLineVol_Rate.empty: volt_column = ['单体电压'+str(i) for i in range(1,self.param.CellVoltNums+1)] self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("[","") self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("]","") self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("[","") self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("]","") Volt_3Sigma=self.OutLineVol_Rate['VolOl_Uni'].str.split(',',expand=True) Volt_3Sigma.columns=volt_column #电压变化率 VoltChange=self.OutLineVol_Rate['VolChng_Uni'].str.split(',',expand=True) VoltChange.columns=volt_column VoltChange['time']=self.OutLineVol_Rate['time'] VoltChange = VoltChange.reset_index(drop=True) xtime1=VoltChange['time'] time0=time.mktime(VoltChange.loc[0,'time'].timetuple()) for i in range(0,len(VoltChange)): VoltChange.loc[i,'time']=(time.mktime(VoltChange.loc[i,'time'].timetuple())-time0)/36000 #计算漏电流离群度 if not self.df_short.empty: self.df_short['cellshort_sigma']=0 for i in range(len(self.df_short)): cellshort=eval(self.df_short.loc[i,'short_current']) cellshort_std=np.std(cellshort) cellshort_mean=np.mean(cellshort) self.df_short.loc[i,'cellshort_sigma']=str(list((cellshort-cellshort_mean)/cellshort_std)) if not self.df_uniform.empty: cellvolt_rank=self.df_uniform['cellvolt_rank'] cellvolt_rank=cellvolt_rank.str.replace("[", '') cellvolt_rank=cellvolt_rank.str.replace("]", '') for i in range(self.param.CellVoltNums): #漏电流故障判断........................................................................... if not self.df_short.empty: self.df_short['cellshort'+str(i+1)]=short_current.map(lambda x:eval(x.split(',')[i])) cellshort=self.df_short['cellshort'+str(i+1)] index_list=cellshort[cellshort1: for j in range(1,len(index_list)): if index_list[j]-index_list[j-1]==1: cellshort_sigma1=eval(self.df_short.loc[index_list[j],'cellshort_sigma']) cellshort_sigma2=eval(self.df_short.loc[index_list[j-1],'cellshort_sigma']) if cellshort_sigma1[i]<-3 or cellshort_sigma2[i]<-3: cellshortfault.append(1) else: cellshortfault.append(0) else: cellshortfault.append(0) else: pass #电压变化率及电压离群度................................................................................. if not self.OutLineVol_Rate.empty and VoltChange.iloc[-1]['time']*36000>18*3600 and len(VoltChange)>5: VoltChange[volt_column[i]]=VoltChange[volt_column[i]].map(lambda x:eval(x)) y=VoltChange[volt_column[i]] volt3sigma=np.array(Volt_3Sigma[volt_column[i]].map(lambda x:eval(x))) volt3sigma_sum=np.sum(volt3sigma<-3) #电压变化率 a1,b1=np.polyfit(VoltChange['time'].tolist(),y.tolist(),1) y1=a1*VoltChange['time']+b1 y_mean=y.mean() R2=1-(np.sum((y1-y)**2))/(np.sum((y-y_mean)**2)) R2_list.append(R2) volt_rate.append(a1) plt.plot(xtime1,y1,label='单体'+str(i+1)) plt.xlabel('时间', fontsize=25) plt.ylabel('SOC差', fontsize=25) plt.xticks(fontsize=20) plt.yticks(fontsize=20) plt.scatter( xtime1,VoltChange[volt_column[i]],marker='o') plt.legend(bbox_to_anchor=(1, 0), loc=3, fontsize=13) plt.title(self.sn) plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签 plt.rcParams['axes.unicode_minus']=False #用来正常显示负号 # plt.show() if volt3sigma_sum>len(volt3sigma)/2: voltsigmafault=1 else: voltsigmafault=0 voltsigmafault_list.append(voltsigmafault) #mana-kendell趋势检验 mk_res=mk.regional_test(np.array(y)) mk_trend_list.append(mk_res.trend) mk_p_list.append(mk_res.p) mk_z_list.append(mk_res.z) mk_Tau_list.append(mk_res.Tau) mk_slope_list.append(mk_res.slope) mk_s_list.append(mk_res.s) mk_svar_list.append(mk_res.var_s) """ trend:趋势; h:有无趋势; p:趋势的显著水平,越小趋势越明显; z:检验统计量,正代表随时间增大趋势,负代表随时间减小趋势; Tau:反映两个序列的相关性,接近1的值表示强烈的正相关,接近-1的值表示强烈的负相关; s:Mann-Kendal的分数,如果S是一个正数,那么后一部分的观测值相比之前的观测值会趋向于变大;如果S是一个负数,那么后一部分的观测值相比之前的观测值会趋向于变小 slope:趋势斜率 """ # print('单体电压{}:\n'.format(i+1), mk_res) else: volt_rate.append(0) R2_list.append(0) voltsigmafault_list.append(0) mk_trend_list.append(0) mk_p_list.append(0) mk_z_list.append(0) mk_Tau_list.append(0) mk_slope_list.append(0) mk_s_list.append(0) mk_svar_list.append(0) #电芯SOC排名判断............................................................................. if not self.df_uniform.empty: self.df_uniform['cellvolt_rank'+str(i+1)]=cellvolt_rank.map(lambda x:eval(x.split(',')[i])) if max(self.df_uniform['cellvolt_rank'+str(i+1)])<5: uniformfault=1 else: uniformfault=0 else: uniformfault=0 uniformfault_list.append(uniformfault) plt.show() #电池电压变化率离群度计算............................................................................... volt_rate_std=np.std(volt_rate) volt_rate_mean=np.mean(volt_rate) volt_rate_3sigma=(np.array(volt_rate)-volt_rate_mean)/volt_rate_std #mk离群度计算 mk_slope_std=np.std(mk_slope_list) mk_slope_mean=np.mean(mk_slope_list) mk_slope_3sigma=(np.array(mk_slope_list)-mk_slope_mean)/mk_slope_std mk_z_std=np.std(mk_z_list) mk_z_mean=np.mean(mk_z_list) mk_z_3sigma=(np.array(mk_z_list)-mk_z_mean)/mk_z_std if not self.df_soh.empty and self.celltype<50: cellsoh=eval(self.df_soh.loc[0,'cellsoh']) cellsoh_std=np.std(cellsoh) cellsoh_mean=np.mean(cellsoh) cellsoh_3sigma=((np.array(cellsoh)-cellsoh_mean)/cellsoh_std) else: cellsoh_3sigma=[0]*self.param.CellVoltNums #漏电流热失控预警确认....................................................................................... if len(cellshortfault)>1: if not 'C490' in list(self.df_alarm_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if max(cellshortfault)==1: faultcode='C490' faultlv=4 faultinfo='电芯{}发生热失控安全预警'.format(cellshortfault.index(1)+1) faultadvice='请于24h内联系技术人员确认故障' self.df_alarm_ram.loc[len(self.df_alarm_ram)]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if max(cellshortfault)==1: pass else: self.df_alarm_ram.loc[self.df_alarm_ram[self.df_alarm_ram['code']=='C490'].index, 'end_time'] = time_now else: pass #mana-kendall趋势检测 mk_fault_list=[] for i in range(len(mk_p_list)): #适用动态工况判断 if mk_trend_list[i]=='decreasing' and mk_p_list[i]50 and mk_trend_list[i]=='decreasing' and mk_p_list[i]1: if not 'C491' in list(self.df_alarm_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if max(mk_fault_list)==1: faultcode='C491' faultlv=4 faultinfo='电芯{}发生热失控安全预警'.format(mk_fault_list.index(1)+1) faultadvice='请于24h内联系技术人员确认故障' self.df_alarm_ram.loc[len(self.df_alarm_ram)]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if max(mk_fault_list)==1: pass else: self.df_alarm_ram.loc[self.df_alarm_ram[self.df_alarm_ram['code']=='C491'].index, 'end_time'] = time_now else: pass return self.df_alarm_ram