123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- import pandas as pd
- import numpy as np
- import datetime
- import time
- import pymannkendall as mk
- import BatParam
- class SafetyWarning:
- def __init__(self,sn,celltype,df_short,df_uniform,OutLineVol_Rate,df_soh): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_short=df_short
- self.df_uniform=df_uniform
- self.OutLineVol_Rate=OutLineVol_Rate
- self.df_soh=df_soh
- def diag(self):
- if self.celltype<=50:
- df_res=self._warning_diag()
- return df_res
- else:
- df_res=self._warning_diag()
- return df_res
- #电池热安全预警诊断功能.................................................................................................
- def _warning_diag(self):
- df_res=pd.DataFrame(columns=['time_st','time_sp','sn','faultcode','faultlv','faultinfo','faultadvice'])
- time_now=datetime.datetime.now()
- time_now=time_now.strftime('%Y-%m-%d %H:%M:%S')
- time_sp='0000-00-00 00:00:00'
- #参数初始化.......................................
- voltsigmafault=0
- uniformfault=0
- cellshortfault=0
- volt_rate=[]
- R2_list=[]
- voltsigmafault_list=[]
- uniformfault_list=[]
- mk_trend_list=[]
- mk_p_list=[]
- mk_z_list=[]
- mk_Tau_list=[]
- mk_slope_list=[]
- mk_s_list=[]
- mk_svar_list=[]
- if not self.df_short.empty:
- short_current=self.df_short['short_current']
- short_current=short_current.str.replace("[", '')
- short_current=short_current.str.replace("]", '')
- if not self.OutLineVol_Rate.empty:
- volt_column = ['CellVolt'+str(i) for i in range(1,self.param.CellVoltNums+1)]
- self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("[","")
- self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("]","")
- self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("[","")
- self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("]","")
- Volt_3Sigma=self.OutLineVol_Rate['VolOl_Uni'].str.split(',',expand=True)
- Volt_3Sigma.columns=volt_column
- #电压变化率
- VoltChange=self.OutLineVol_Rate['VolChng_Uni'].str.split(',',expand=True)
- VoltChange.columns=volt_column
- VoltChange['time']=self.OutLineVol_Rate['time']
- VoltChange = VoltChange.reset_index(drop=True)
- xtime1=VoltChange['time']
- time0=time.mktime(VoltChange.loc[0,'time'].timetuple())
- for i in range(0,len(VoltChange)):
- VoltChange.loc[i,'time']=(time.mktime(VoltChange.loc[i,'time'].timetuple())-time0)/36000
- #计算漏电流离群度
- if not self.df_short.empty:
- self.df_short['cellshort_sigma']=0
- for i in range(len(self.df_short)):
- cellshort=eval(self.df_short.loc[i,'short_current'])
- cellshort_std=np.std(cellshort)
- cellshort_mean=np.mean(cellshort)
- self.df_short.loc[i,'cellshort_sigma']=str(list((cellshort-cellshort_mean)/cellshort_std))
- if not self.df_uniform.empty:
- cellvolt_rank=self.df_uniform['cellvolt_rank']
- cellvolt_rank=cellvolt_rank.str.replace("[", '')
- cellvolt_rank=cellvolt_rank.str.replace("]", '')
- # plt.figure()
- for i in range(self.param.CellVoltNums):
- #漏电流故障判断...........................................................................
- if not self.df_short.empty:
- self.df_short['cellshort'+str(i+1)]=short_current.map(lambda x:eval(x.split(',')[i]))
- cellshort=self.df_short['cellshort'+str(i+1)]
- index_list=cellshort[cellshort<self.param.LeakCurrentLv2].index
- if len(index_list)>1:
- for j in range(1,len(index_list)):
- if index_list[j]-index_list[j-1]==1:
- cellshort_sigma1=eval(self.df_short.loc[index_list[j],'cellshort_sigma'])
- cellshort_sigma2=eval(self.df_short.loc[index_list[j-1],'cellshort_sigma'])
- if cellshort_sigma1[i]<-3 or cellshort_sigma2[i]<-3:
- cellshortfault=1
- else:
- pass
- else:
- pass
- #电压变化率及电压离群度.................................................................................
- if not self.OutLineVol_Rate.empty and VoltChange.iloc[-1]['time']*36000>18*3600 and len(VoltChange)>5:
- volt3sigma=np.array(Volt_3Sigma[volt_column[i]].map(lambda x:eval(x)))
- volt3sigma_sum=np.sum(volt3sigma<-3)
- #电压变化率
- VoltChange[volt_column[i]]=VoltChange[volt_column[i]].map(lambda x:eval(x))
- y=VoltChange[volt_column[i]]
- a1,b1=np.polyfit(VoltChange['time'].tolist(),y.tolist(),1)
- y1=a1*VoltChange['time']+b1
- y_mean=y.mean()
- R2=1-(np.sum((y1-y)**2))/(np.sum((y-y_mean)**2))
- R2_list.append(R2)
- volt_rate.append(a1)
- # plt.plot(xtime1,y1,label=a1)
- # plt.scatter( xtime1,VoltChange[volt_column[i]],marker='o')
- # plt.legend(loc='best')
- # plt.title('CellVolt'+str(i+1))
- # plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
- # plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
- # plt.show()
- if volt3sigma_sum>len(volt3sigma)/2:
- voltsigmafault=1
- else:
- voltsigmafault=0
- voltsigmafault_list.append(voltsigmafault)
- #mana-kendell趋势检验
- mk_res=mk.regional_test(np.array(y))
- mk_trend_list.append(mk_res.trend)
- mk_p_list.append(mk_res.p)
- mk_z_list.append(mk_res.z)
- mk_Tau_list.append(mk_res.Tau)
- mk_slope_list.append(mk_res.slope)
- mk_s_list.append(mk_res.s)
- mk_svar_list.append(mk_res.var_s)
- """
- trend:趋势;
- h:有无趋势;
- p:趋势的显著水平,越小趋势越明显;
- z:检验统计量,正代表随时间增大趋势,负代表随时间减小趋势;
- Tau:反映两个序列的相关性,接近1的值表示强烈的正相关,接近-1的值表示强烈的负相关;
- s:如果S是一个正数,那么后一部分的观测值相比之前的观测值会趋向于变大;如果S是一个负数,那么后一部分的观测值相比之前的观测值会趋向于变小
- slope:趋势斜率
- """
- # print('单体电压{}:\n'.format(i+1), mk_res)
- else:
- volt_rate.append(0)
- R2_list.append(0)
- voltsigmafault_list.append(0)
- mk_trend_list.append(0)
- mk_p_list.append(0)
- mk_z_list.append(0)
- mk_Tau_list.append(0)
- mk_slope_list.append(0)
- mk_s_list.append(0)
- mk_svar_list.append(0)
- #电芯SOC排名判断.............................................................................
- if not self.df_uniform.empty:
- self.df_uniform['cellvolt_rank'+str(i+1)]=cellvolt_rank.map(lambda x:eval(x.split(',')[i]))
- if max(self.df_uniform['cellvolt_rank'+str(i+1)])<5:
- uniformfault=1
- else:
- uniformfault=0
- else:
- uniformfault=0
- uniformfault_list.append(uniformfault)
- #漏电流热失控预警确认........................................................
- if cellshortfault==1:
- faultcode=110
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(i+1)
- faultadvice='断开继电器,远离电池,并通知电池技术人员介入分析'
- df_res.loc[0]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- break
- else:
- pass
- #电池电压变化率离群度计算...............................................................................
- volt_rate_std=np.std(volt_rate)
- volt_rate_mean=np.mean(volt_rate)
- volt_rate_3sigma=(np.array(volt_rate)-volt_rate_mean)/volt_rate_std
- #mk离群度计算
- mk_slope_std=np.std(mk_slope_list)
- mk_slope_mean=np.mean(mk_slope_list)
- mk_slope_3sigma=(np.array(mk_slope_list)-mk_slope_mean)/mk_slope_std
- mk_z_std=np.std(mk_z_list)
- mk_z_mean=np.mean(mk_z_list)
- mk_z_3sigma=(np.array(mk_z_list)-mk_z_mean)/mk_z_std
- if not self.df_soh.empty and self.celltype<50:
- cellsoh=eval(self.df_soh.loc[0,'cellsoh'])
- cellsoh_std=np.std(cellsoh)
- cellsoh_mean=np.mean(cellsoh)
- cellsoh_3sigma=((np.array(cellsoh)-cellsoh_mean)/cellsoh_std)
- else:
- cellsoh_3sigma=[0]*self.param.CellVoltNums
- #电压/SOC变化率
- # for i in range(len(volt_rate)):
- # if volt_rate[i]<self.param.TrwVoltRate and volt_rate_3sigma[i]<-3 and abs(cellsoh_3sigma[i])<2.5 and voltsigmafault_list[i]==1:
- # faultcode=110
- # faultlv=4
- # faultinfo='电芯{}发生热失控安全预警'.format(i+1)
- # faultadvice='2联系用户远离电池,立刻召回电池'
- # df_res.loc[0]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- # else:
- # pass
- #mana-kendall趋势检测
- for i in range(len(mk_p_list)):
- #适用动态工况判断
- if mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<self.param.mk_z and mk_Tau_list[i]<self.param.mk_Tau and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]>self.param.mk_svar and mk_slope_3sigma[i]<-3 and mk_slope_list[i]<self.param.mk_slope and abs(cellsoh_3sigma[i])<2.5 and volt_rate_3sigma[i]<-3:
- faultcode=110
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(i+1)
- faultadvice='2联系用户远离电池,立刻召回电池'
- df_res.loc[0]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- #适用静态工况判断
- elif self.celltype<=50 and mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<-6 and (mk_Tau_list[i]<-0.9 or mk_z_3sigma[i]<-3) and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]>self.param.mk_svar and mk_slope_3sigma[i]<-3.5 and mk_slope_list[i]<-0.03 and volt_rate_3sigma[i]<-3:
- faultcode=110
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(i+1)
- faultadvice='2联系用户远离电池,立刻召回电池'
- df_res.loc[0]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- elif self.celltype>50 and mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<-6 and (mk_Tau_list[i]<-0.95 or mk_z_3sigma[i]<-3) and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]>self.param.mk_svar and mk_slope_3sigma[i]<-3.5 and mk_slope_list[i]<-0.4 and volt_rate_3sigma[i]<-3:
- faultcode=110
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(i+1)
- faultadvice='2联系用户远离电池,立刻召回电池'
- df_res.loc[0]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- else:
- pass
- # plt.show()
- return df_res
|