123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import pandas as pd
- import numpy as np
- import datetime
- import time
- from matplotlib import pyplot as plt
- import pymannkendall as mk
- from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
- class SafetyWarning:
- def __init__(self,sn,celltype,df_short,df_uniform,OutLineVol_Rate,df_soh,df_fault_ram_sn): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_short=df_short
- self.df_uniform=df_uniform
- self.OutLineVol_Rate=OutLineVol_Rate
- self.df_soh=df_soh
- self.df_alarm_ram=df_fault_ram_sn.copy()
-
- def diag(self):
- if self.celltype<=50:
- df_res=self._warning_diag()
- return df_res
- else:
- df_res=self._warning_diag()
- return df_res
-
- #电池热安全预警诊断功能.................................................................................................
- def _warning_diag(self):
- df_res=pd.DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'])
- time_now=datetime.datetime.now()
- time_now=time_now.strftime('%Y-%m-%d %H:%M:%S')
- time_sp='0000-00-00 00:00:00'
- #参数初始化.......................................
- voltsigmafault=0
- uniformfault=0
- cellshortfault=0
- cellshortfault=[]
- volt_rate=[]
- R2_list=[]
- voltsigmafault_list=[]
- uniformfault_list=[]
- mk_trend_list=[]
- mk_p_list=[]
- mk_z_list=[]
- mk_Tau_list=[]
- mk_slope_list=[]
- mk_s_list=[]
- mk_svar_list=[]
- if not self.df_short.empty:
- short_current=self.df_short['short_current']
- short_current=short_current.str.replace("[", '')
- short_current=short_current.str.replace("]", '')
-
- if not self.OutLineVol_Rate.empty:
- volt_column = ['单体电压'+str(i) for i in range(1,self.param.CellVoltNums+1)]
- self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("[","")
- self.OutLineVol_Rate['VolChng_Uni']=self.OutLineVol_Rate['VolChng_Uni'].str.replace("]","")
- self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("[","")
- self.OutLineVol_Rate['VolOl_Uni']=self.OutLineVol_Rate['VolOl_Uni'].str.replace("]","")
- Volt_3Sigma=self.OutLineVol_Rate['VolOl_Uni'].str.split(',',expand=True)
- Volt_3Sigma.columns=volt_column
- #电压变化率
- VoltChange=self.OutLineVol_Rate['VolChng_Uni'].str.split(',',expand=True)
- VoltChange.columns=volt_column
- VoltChange['time']=self.OutLineVol_Rate['time']
- VoltChange = VoltChange.reset_index(drop=True)
- xtime1=VoltChange['time']
- time0=time.mktime(VoltChange.loc[0,'time'].timetuple())
- for i in range(0,len(VoltChange)):
- VoltChange.loc[i,'time']=(time.mktime(VoltChange.loc[i,'time'].timetuple())-time0)/36000
-
- #计算漏电流离群度
- if not self.df_short.empty:
- self.df_short['cellshort_sigma']=0
- for i in range(len(self.df_short)):
- cellshort=eval(self.df_short.loc[i,'short_current'])
- cellshort_std=np.std(cellshort)
- cellshort_mean=np.mean(cellshort)
- self.df_short.loc[i,'cellshort_sigma']=str(list((cellshort-cellshort_mean)/cellshort_std))
-
-
- if not self.df_uniform.empty:
- cellvolt_rank=self.df_uniform['cellvolt_rank']
- cellvolt_rank=cellvolt_rank.str.replace("[", '')
- cellvolt_rank=cellvolt_rank.str.replace("]", '')
-
- for i in range(self.param.CellVoltNums):
- #漏电流故障判断...........................................................................
- if not self.df_short.empty:
- self.df_short['cellshort'+str(i+1)]=short_current.map(lambda x:eval(x.split(',')[i]))
- cellshort=self.df_short['cellshort'+str(i+1)]
- index_list=cellshort[cellshort<self.param.LeakCurrentLv2].index
- if len(index_list)>1:
- for j in range(1,len(index_list)):
- if index_list[j]-index_list[j-1]==1:
- cellshort_sigma1=eval(self.df_short.loc[index_list[j],'cellshort_sigma'])
- cellshort_sigma2=eval(self.df_short.loc[index_list[j-1],'cellshort_sigma'])
- if cellshort_sigma1[i]<-3 or cellshort_sigma2[i]<-3:
- cellshortfault.append(1)
- else:
- cellshortfault.append(0)
- else:
- cellshortfault.append(0)
- else:
- pass
-
- #电压变化率及电压离群度.................................................................................
- if not self.OutLineVol_Rate.empty and VoltChange.iloc[-1]['time']*36000>18*3600 and len(VoltChange)>5:
- VoltChange[volt_column[i]]=VoltChange[volt_column[i]].map(lambda x:eval(x))
- y=VoltChange[volt_column[i]]
- volt3sigma=np.array(Volt_3Sigma[volt_column[i]].map(lambda x:eval(x)))
- volt3sigma_sum=np.sum(volt3sigma<-3)
- #电压变化率
- a1,b1=np.polyfit(VoltChange['time'].tolist(),y.tolist(),1)
- y1=a1*VoltChange['time']+b1
- y_mean=y.mean()
- R2=1-(np.sum((y1-y)**2))/(np.sum((y-y_mean)**2))
- R2_list.append(R2)
-
- volt_rate.append(a1)
- plt.plot(xtime1,y1,label='单体'+str(i+1))
- plt.xlabel('时间', fontsize=25)
- plt.ylabel('SOC差', fontsize=25)
- plt.xticks(fontsize=20)
- plt.yticks(fontsize=20)
- plt.scatter( xtime1,VoltChange[volt_column[i]],marker='o')
- plt.legend(bbox_to_anchor=(1, 0), loc=3, fontsize=13)
- plt.title(self.sn)
- plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签
- plt.rcParams['axes.unicode_minus']=False #用来正常显示负号
- # plt.show()
-
- if volt3sigma_sum>len(volt3sigma)/2:
- voltsigmafault=1
- else:
- voltsigmafault=0
- voltsigmafault_list.append(voltsigmafault)
-
- #mana-kendell趋势检验
- mk_res=mk.regional_test(np.array(y))
- mk_trend_list.append(mk_res.trend)
- mk_p_list.append(mk_res.p)
- mk_z_list.append(mk_res.z)
- mk_Tau_list.append(mk_res.Tau)
- mk_slope_list.append(mk_res.slope)
- mk_s_list.append(mk_res.s)
- mk_svar_list.append(mk_res.var_s)
- """
- trend:趋势;
- h:有无趋势;
- p:趋势的显著水平,越小趋势越明显;
- z:检验统计量,正代表随时间增大趋势,负代表随时间减小趋势;
- Tau:反映两个序列的相关性,接近1的值表示强烈的正相关,接近-1的值表示强烈的负相关;
- s:Mann-Kendal的分数,如果S是一个正数,那么后一部分的观测值相比之前的观测值会趋向于变大;如果S是一个负数,那么后一部分的观测值相比之前的观测值会趋向于变小
- slope:趋势斜率
- """
- # print('单体电压{}:\n'.format(i+1), mk_res)
- else:
- volt_rate.append(0)
- R2_list.append(0)
- voltsigmafault_list.append(0)
- mk_trend_list.append(0)
- mk_p_list.append(0)
- mk_z_list.append(0)
- mk_Tau_list.append(0)
- mk_slope_list.append(0)
- mk_s_list.append(0)
- mk_svar_list.append(0)
- #电芯SOC排名判断.............................................................................
- if not self.df_uniform.empty:
- self.df_uniform['cellvolt_rank'+str(i+1)]=cellvolt_rank.map(lambda x:eval(x.split(',')[i]))
-
- if max(self.df_uniform['cellvolt_rank'+str(i+1)])<5:
- uniformfault=1
- else:
- uniformfault=0
- else:
- uniformfault=0
- uniformfault_list.append(uniformfault)
-
- plt.show()
-
- #电池电压变化率离群度计算...............................................................................
- volt_rate_std=np.std(volt_rate)
- volt_rate_mean=np.mean(volt_rate)
- volt_rate_3sigma=(np.array(volt_rate)-volt_rate_mean)/volt_rate_std
-
- #mk离群度计算
- mk_slope_std=np.std(mk_slope_list)
- mk_slope_mean=np.mean(mk_slope_list)
- mk_slope_3sigma=(np.array(mk_slope_list)-mk_slope_mean)/mk_slope_std
- mk_z_std=np.std(mk_z_list)
- mk_z_mean=np.mean(mk_z_list)
- mk_z_3sigma=(np.array(mk_z_list)-mk_z_mean)/mk_z_std
- if not self.df_soh.empty and self.celltype<50:
- cellsoh=eval(self.df_soh.loc[0,'cellsoh'])
- cellsoh_std=np.std(cellsoh)
- cellsoh_mean=np.mean(cellsoh)
- cellsoh_3sigma=((np.array(cellsoh)-cellsoh_mean)/cellsoh_std)
- else:
- cellsoh_3sigma=[0]*self.param.CellVoltNums
-
- #漏电流热失控预警确认.......................................................................................
- if len(cellshortfault)>1:
- if not 'C490' in list(self.df_alarm_ram['code']): #当前故障中没有该故障,则判断是否发生该故障
- if max(cellshortfault)==1:
- faultcode='C490'
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(cellshortfault.index(1)+1)
- faultadvice='请于24h内联系技术人员确认故障'
- self.df_alarm_ram.loc[len(self.df_alarm_ram)]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- else:
- pass
- else:
- if max(cellshortfault)==1:
- pass
- else:
- self.df_alarm_ram.loc[self.df_alarm_ram[self.df_alarm_ram['code']=='C490'].index, 'end_time'] = time_now
- else:
- pass
- #mana-kendall趋势检测
- mk_fault_list=[]
- for i in range(len(mk_p_list)):
- #适用动态工况判断
- if mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<self.param.mk_z and mk_Tau_list[i]<self.param.mk_Tau and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]<self.param.mk_svar and mk_slope_3sigma[i]<-3 and mk_slope_list[i]<self.param.mk_slope and volt_rate_3sigma[i]<-3:
- mk_fault_list.append(1)
- #适用静态工况判断
- elif self.celltype<=50 and mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<-6 and (mk_Tau_list[i]<-0.9 or mk_z_3sigma[i]<-3) and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]<self.param.mk_svar and mk_slope_3sigma[i]<-3.5 and mk_slope_list[i]<-0.03 and volt_rate_3sigma[i]<-3:
- mk_fault_list.append(1)
- elif self.celltype>50 and mk_trend_list[i]=='decreasing' and mk_p_list[i]<self.param.mk_p and mk_z_list[i]<-6 and (mk_Tau_list[i]<-0.95 or mk_z_3sigma[i]<-3) and mk_s_list[i]<self.param.mk_s and mk_svar_list[i]<self.param.mk_svar and mk_slope_3sigma[i]<-3.5 and mk_slope_list[i]<-0.4 and volt_rate_3sigma[i]<-3:
- mk_fault_list.append(1)
- else:
- mk_fault_list.append(0)
-
- if len(mk_fault_list)>1:
- if not 'C491' in list(self.df_alarm_ram['code']): #当前故障中没有该故障,则判断是否发生该故障
- if max(mk_fault_list)==1:
- faultcode='C491'
- faultlv=4
- faultinfo='电芯{}发生热失控安全预警'.format(mk_fault_list.index(1)+1)
- faultadvice='请于24h内联系技术人员确认故障'
- self.df_alarm_ram.loc[len(self.df_alarm_ram)]=[time_now, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice]
- else:
- pass
- else:
- if max(mk_fault_list)==1:
- pass
- else:
- self.df_alarm_ram.loc[self.df_alarm_ram[self.df_alarm_ram['code']=='C491'].index, 'end_time'] = time_now
- else:
- pass
- return self.df_alarm_ram
|