123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import pandas as pd
- import numpy as np
- import datetime
- import BatParam
- class SafetyAlarm:
- def __init__(self,sn,celltype,df_bms,df_diag_ram_sn, df_bms_ram_sn, df_alarm_ram_sn): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_bms=df_bms
- self.df_ram_alarm=df_alarm_ram_sn.copy()
- self.df_ram_bms=df_bms_ram_sn.copy()
- self.df_diag_ram=df_diag_ram_sn.copy()
- df_bms['time']=pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S')
- if (not df_bms_ram_sn.empty) and (not self.df_bms.empty):
- self.df_bms=self.df_bms[self.df_bms['time'] > df_bms_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据
- self.df_bms.reset_index(inplace=True,drop=True) #重置索引
- self.packcrnt=df_bms['PackCrnt']*self.param.PackCrntDec
- self.packvolt=df_bms['PackCrnt']
- self.bmstime= df_bms['time']
- self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- # othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)]
- self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)]
- # self.celltemp_name=celltemp+othertemp
- def safety_alarm_diag(self):
- time_now=datetime.datetime.now()
- if not self.df_diag_ram.empty:
- fault_time=self.df_diag_ram.iloc[-1]['start_time']
- if (time_now-fault_time).total_seconds()>24*3600:
- self.df_diag_ram['end_time']=time_now
- return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm
- else:
- return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm
- else:
- df_res=self._alarm_diag()
- return df_res
- #定义滑动滤波函数.............................................................................................
- def _np_move_avg(self,a, n, mode="same"):
- return (np.convolve(a, np.ones((n,)) / n, mode=mode))
- #寻找当前行数据的所有温度值...................................................................................
- def _celltemp_get(self,num):
- celltemp = np.array(self.df_bms.loc[num,self.celltemp_name])
- return celltemp
- #获取当前行所有电压数据............................................................................................
- def _cellvolt_get(self,num):
- cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name])
- return cellvolt
- #..........................................三元电池诊断功能..................................................................
- def _alarm_diag(self):
- # column_name=['time_st','time_sp','sn','faultcode','faultlv','faultinfo','faultadvice']
- # df_res=pd.DataFrame(columns=column_name)
- end_time='0000-00-00 00:00:00'
- time_now=datetime.datetime.now()
- celltemprise=0
- celltemphigh=0
- cellvoltfall=0
- cellvoltdsc=0
- packvoltfall=0
- cellvolttime=0
- if not self.df_ram_alarm.empty:
- safetywarning1=self.df_ram_alarm.iloc[-1]['safetywarning1']
- safetywarning2=self.df_ram_alarm.iloc[-1]['safetywarning2']
- time_last=self.df_ram_alarm.iloc[-1]['time']
- else:
- safetywarning1=0
- safetywarning2=0
- if not self.df_bms.empty:
- for i in range(len(self.df_bms)):
- if i<1:
- if not self.df_ram_bms.empty:
- temp1=np.array(self.df_ram_bms.iloc[-1]['celltemp'])
- temp2=self._celltemp_get(i)
- time1=self.df_ram_bms.iloc[-1]['time']
- time2=self.bmstime[i]
- temp_st=temp1
- time_st=time1
- cellvolt1=np.array(self.df_ram_bms.iloc[-1]['cellvolt'])
- cellvolt2=self._cellvolt_get(i)
- packvolt1=self.df_ram_bms.iloc[-1]['packvolt']
- packvolt2=self.packvolt[i]
- delta_soc=abs(self.df_ram_bms.iloc[-1]['packsoc']-self.packsoc[i])
- packcrnt1=self.df_ram_bms.iloc[-1]['packcrnt']
- packcrnt2=self.packcrnt[i]
- if len(temp1)==len(temp2):
- pass
- else:
- temp1=temp2
- temp_st=temp2
- if len(cellvolt1)==len(cellvolt2):
- pass
- else:
- cellvolt1=cellvolt2
- else:
- temp1=self._celltemp_get(i)
- temp2=temp1
- time1=self.bmstime[i]-datetime.timedelta(seconds=1)
- time2=self.bmstime[i]
- temp_st=temp1
- time_st=time1
- cellvolt1=self._cellvolt_get(i)
- cellvolt2=cellvolt1
- packvolt1=self.packvolt[i]
- packvolt2=packvolt1
- delta_soc=0
- packcrnt1=self.packcrnt[i]
- packcrnt2=packcrnt1
- else:
- temp1=self._celltemp_get(i-1)
- temp2=self._celltemp_get(i)
- time1=self.bmstime[i-1]
- time2=self.bmstime[i]
- cellvolt1=self._cellvolt_get(i-1)
- cellvolt2=self._cellvolt_get(i)
- packvolt1=packvolt2
- packvolt2=self.packvolt[i]
- packcrnt1=self.packcrnt[i-1]
- packcrnt2=self.packcrnt[i]
- delta_soc=abs(self.packsoc[i-1]-self.packsoc[i])
- #温度有效性判断...........................................................................
- if max(temp2)>self.param.CellTempUpLmt or min(temp2)<self.param.CellTempLwLmt:
- celltempvalid=0
- else:
- celltempvalid=1
- temp1_valid=np.where((temp1<self.param.CellTempUpLmt) & (temp1>self.param.CellTempLwLmt),True,False)
- temp2_valid=np.where((temp2<self.param.CellTempUpLmt) & (temp2>self.param.CellTempLwLmt),True,False)
- temp1=temp1[temp1_valid & temp2_valid]
- temp2=temp2[temp1_valid & temp2_valid]
- if len(temp1)>0.5 and len(temp2)>0.5:
- celltempmax=max(temp2)
- #过温判断............................................................................................................................
- if celltempmax>self.param.TrwTempHigh and np.sum(temp2>self.param.TrwTempHigh)<=self.CellTempNums/2:
- celltemphigh=1
- else:
- pass
-
- #温升判断.............................................................................................................................
- delttime=(time2-time_st).total_seconds()
- if delttime>5:
- temp_st=temp_st[temp1_valid & temp2_valid]
- celltemp_rate=((temp2-temp_st)*60)/delttime #计算最大温升速率
- time_st=time2
- temp_st=self._celltemp_get(i)
- else:
- celltemp_rate=np.array([0,0])
- if delta_soc<5 and max(celltemp_rate)>self.param.TrwTempRate and np.sum(celltemp_rate>self.param.TrwTempRate)<=self.CellTempNums/2:
- celltemprise=celltemprise+1
- else:
- pass
-
- else:
- pass
- #电压有效性..........................................................................................................................................
- cellvolt1_valid=np.where((cellvolt1>0.1) & (cellvolt1<5),True,False)
- cellvolt2_valid=np.where((cellvolt2>0.1) & (cellvolt2<5),True,False)
- cellvoltfall1=cellvolt1[cellvolt1_valid & cellvolt2_valid]
- cellvoltfall2=cellvolt2[cellvolt1_valid & cellvolt2_valid]
-
- if len(cellvoltfall1)<self.CellVoltNums-5 and len(cellvoltfall2)<self.CellVoltNums-5:
- cellvolttime=cellvolttime+(time2-time1).total_seconds()
- if cellvolttime>2:
- cellvoltvalid=0
- else:
- cellvoltvalid=1
- else:
- cellvolttime=0
- cellvoltvalid=1
-
- delttime=(time2-time1).total_seconds()
- if delttime<60:
- #单体电压跌落...........................................................................................................................
- if len(cellvoltfall1)>1 and len(cellvoltfall2)>1:
- if delta_soc<5 and abs(packcrnt1)<=1 and abs(packcrnt2)<=1 and max(cellvoltfall1-cellvoltfall2)>self.param.TrwCellVoltFall and np.sum(cellvoltfall1-cellvoltfall2>self.param.TrwCellVoltFall)<=self.CellVoltNums/2:
- cellvoltfall=cellvoltfall+1
- else:
- pass
- #单体电压断线..............................................................................................................................
- if 0<min(cellvolt2)<2 and 5<max(cellvolt2)<10:
- cellvoltdsc=1
- else:
- pass
- else:
- pass
-
- #电池包诊断.....................................................................................................................................
- packvolt2=self.packvolt[i]
- #电池包电压有效性............................................................................................................................
- if self.CellVoltNums<packvolt2<self.CellVoltNums*4.5:
- packvoltvalid=1
- else:
- packvoltvalid=0
-
- if packvoltvalid==1:
- delttime=(time2-time1).total_seconds()
- if delttime<360:
- if delta_soc<5 and abs(packcrnt1)<5 and abs(packcrnt2)<5 and (packvolt1-packvolt2)>self.param.TrwPackVoltFall and self.CellVoltNums<packvolt1<self.CellVoltNums*4.5:
- packvoltfall=packvoltfall+1
- else:
- pass
- else:
- pass
- else:
- pass
- #热失控故障判断........................................................................................................................
- self.df_ram_bms.loc[0]=[time2, self.sn, packvolt2, list(self._cellvolt_get(i)), list(self._celltemp_get(i)), self.packsoc[i],packcrnt2]
- self.df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- trwtemp=max(celltemprise,celltemphigh)
- trwcellvolt=cellvoltfall
- trwpackvolt=packvoltfall
- trw_array=np.array([trwtemp, trwcellvolt, trwpackvolt])
- if trwcellvolt>0.5 and np.sum(trw_array)>3.5 and np.sum(trw_array>0.5)>1.5:
- fltcode=119
- self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入']
- elif safetywarning1>2.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5:
- fltcode=119
- self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入']
- elif safetywarning1>2.5 and cellvoltvalid==0:
- fltcode=119
- self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入']
- else:
- #更新df_ram_alarm信息
- if trwcellvolt>0.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5:
- safetywarning1=3
- self.df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- elif (trwcellvolt<0.5 or np.sum(trw_array>0.5)<1.5) and celltempvalid==1 and cellvoltvalid==1:
- safetywarning1=0
- self.df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- else:
- pass
- return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm
- else:
- if (safetywarning1>2.5 or safetywarning2>2.5) and (time_now-time_last).total_seconds()>360:
- fltcode=119
- time_now=time_now.strftime('%Y-%m-%d %H:%M:%S')
- time_now=datetime.datetime.strptime(time_now,'%Y-%m-%d %H:%M:%S')
- self.df_diag_ram.loc[0]=[time_now, end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入']
- self.df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- else:
- pass
- return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm
|