123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- import pandas as pd
- import numpy as np
- import datetime
- from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
- class SafetyAlarm:
- def __init__(self,sn,celltype,df_bms,df_bms_ram_sn,df_alarm_ram_sn): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_bms=df_bms
- self.df_ram_bms=df_bms_ram_sn.copy()
- df_bms['时间戳']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
- self.df_ram_alarm=df_alarm_ram_sn.copy()
- if (not df_bms_ram_sn.empty) and (not self.df_bms.empty):
- self.df_bms=self.df_bms[self.df_bms['时间戳'] > df_bms_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据
- self.df_bms.reset_index(inplace=True,drop=True) #重置索引
-
- self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
- self.packvolt=df_bms['总电压[V]']
- self.bmstime= df_bms['时间戳']
- self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- # othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)]
- self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
- # self.celltemp_name=celltemp+othertemp
-
- def safety_alarm_diag(self):
- if self.celltype<=50:
- df_res=self._alarm_diag()
- return df_res
- else:
- df_res=self._alarm_diag()
- return df_res
-
- #定义滑动滤波函数.............................................................................................
- def _np_move_avg(self,a, n, mode="same"):
- return (np.convolve(a, np.ones((n,)) / n, mode=mode))
-
- #寻找当前行数据的所有温度值...................................................................................
- def _celltemp_get(self,num):
- celltemp = np.array(self.df_bms.loc[num,self.celltemp_name])
- return celltemp
- #获取当前行所有电压数据............................................................................................
- def _cellvolt_get(self,num):
- cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name]/1000)
- return cellvolt
- #..........................................三元电池诊断功能..................................................................
- def _alarm_diag(self):
- df_res=pd.DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'])
- end_time='0000-00-00 00:00:00'
- time_now=datetime.datetime.now()
- celltemprise=0
- celltemphigh=0
- cellvoltfall=0
- packvoltfall=0
- if not self.df_ram_alarm.empty:
- safetywarning1=self.df_ram_alarm.iloc[-1]['safetywarning1']
- safetywarning2=self.df_ram_alarm.iloc[-1]['safetywarning2']
- else:
- safetywarning1=0
- safetywarning2=0
- if not self.df_bms.empty:
- for i in range(len(self.df_bms)):
- #温度诊断功能.............................................................................................................
- if i<1:
- if not self.df_ram_bms.empty:
- temp1=self.df_ram_bms.iloc[-1]['celltemp']
- temp2=self._celltemp_get(i)
- time1=self.df_ram_bms.iloc[-1]['time']
- time2=self.bmstime[i]
- else:
- temp1=self._celltemp_get(i)
- temp2=self._celltemp_get(i)
- time1=self.bmstime[i]-datetime.timedelta(seconds=10)
- time2=self.bmstime[i]
- else:
- temp1=self._celltemp_get(i-1)
- temp2=self._celltemp_get(i)
- time1=self.bmstime[i-1]
- time2=self.bmstime[i]
-
- temp1=temp1[np.where((temp1<self.param.CellTempUpLmt) & (temp1>self.param.CellTempLwLmt))]
- temp2=temp2[np.where((temp2<self.param.CellTempUpLmt) & (temp2>self.param.CellTempLwLmt))]
-
- #温度有效性判断...........................................................................
- if len(temp1)>0.5 and len(temp2)>0.5 and len(temp1)==len(temp2):
- celltempvalid=1
- else: #不作处理
- celltempvalid=0
- if celltempvalid==1:
- celltempmax=max(temp2)
- #过温判断............................................................................................................................
- if celltempmax>self.param.TrwTempHigh:
- celltemphigh=1
- else:
- pass
-
-
- #温升判断.............................................................................................................................
- delttime=(time2-time1).total_seconds()
- if delttime>1:
- celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率
- else:
- celltemp_rate=0
- if celltemp_rate>self.param.TrwTempRate and self.param.CellTempLwLmt<min(temp1) and max(temp1)<self.param.CellTempUpLmt:
- celltemprise=celltemprise+1
- else:
- pass
-
- else:
- pass
-
- #电压诊断功能.........................................................................................................................................
- if i<1:
- cellvolt2=self._cellvolt_get(i)
- time2=self.bmstime[i]
- if not self.df_ram_bms.empty:
- cellvolt1=self.df_ram_bms.iloc[-1]['cellvolt']
- time1=self.df_ram_bms.iloc[-1]['time']
- else:
- cellvolt1=cellvolt2
- time1=time2
- else:
- cellvolt2=self._cellvolt_get(i)
- cellvolt1=self._cellvolt_get(i-1)
- time2=self.bmstime[i]
- time1=self.bmstime[i-1]
-
- #电压有效性..........................................................................................................................................
- cellvoltmin2=min(cellvolt2)
- cellvoltmax2=max(cellvolt2)
- cellvoltmin_index2=list(cellvolt2).index(cellvoltmin2)
- cellvoltmax_index2=list(cellvolt2).index(cellvoltmax2)
- cellvoltmin1=min(cellvolt1)
- cellvoltmax1=max(cellvolt1)
- cellvoltmin_index1=list(cellvolt1).index(cellvoltmin1)
- cellvoltmax_index1=list(cellvolt1).index(cellvoltmax1)
-
- if (cellvoltmin2<2 and cellvoltmax2>4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) or (cellvoltmin1<2 and cellvoltmax1>4.5 and abs(cellvoltmax_index1-cellvoltmin_index1)==1): #电压断线故障进入
- cellvoltvalid=0
- else:
- cellvolt1=cellvolt1[np.where((cellvolt1>0.01) & (cellvolt1<4.5))]
- cellvolt2=cellvolt2[np.where((cellvolt2>0.01) & (cellvolt2<4.5))]
- if len(cellvolt1)>1 and len(cellvolt2)>1 and len(cellvolt1)==len(cellvolt2):
- cellvoltvalid=1
- else:
- cellvoltvalid=0
-
- if cellvoltvalid==1:
- #单体电压跌落诊断...........................................................................................................................
- delttime=(time2-time1).total_seconds()
- if delttime<360:
- if self.packcrnt[i]<0.5 and max(cellvolt1-cellvolt2)>self.param.TrwCellVoltFall:
- cellvoltfall=cellvoltfall+1
- elif self.packcrnt[i]>0.5 and max(cellvolt1-cellvolt2)-self.packcrnt[i]*0.01>self.param.TrwCellVoltFall:
- cellvoltfall=cellvoltfall+1
- else:
- pass
- else:
- pass
- else:
- pass
-
- #电池包诊断.....................................................................................................................................
- packvolt2=self.packvolt[i]
- #电池包电压有效性............................................................................................................................
- if self.param.CellVoltNums<packvolt2<self.param.CellVoltNums*4.5:
- packvoltvalid=1
- else:
- packvoltvalid=0
-
- if packvoltvalid==1:
- if i<1:
- if not self.df_ram_bms.empty:
- time1=self.df_ram_bms.iloc[-1]['time']
- time2=self.bmstime[i]
- delttime=(time2-time1).total_seconds()
- packvolt1=self.df_ram_bms.iloc[-1]['packvolt']
-
- if delttime<360:
- if self.packcrnt[i]<5 and (packvolt1-packvolt2)>self.param.TrwPackVoltFall and self.param.CellVoltNums<packvolt1<self.param.CellVoltNums*4.5:
- packvoltfall=packvoltfall+1
- else:
- pass
- else:
- pass
- else:
- pass
- else:
- packvolt1=self.packvolt[i-1]
- if self.packcrnt[i]<5 and (packvolt1-packvolt2)>self.param.TrwPackVoltFall and self.param.CellVoltNums<packvolt1<self.param.CellVoltNums*4.5:
- packvoltfall=packvoltfall+1
- else:
- pass
- else:
- pass
-
- #热失控故障判断........................................................................................................................
- df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
- df_bms_ram.loc[0]=[self.bmstime[0], self.sn, packvolt2, cellvolt2, temp2]
- df_ram_alarm=self.df_ram_alarm
- trwtemp=max(celltemprise,celltemphigh)
- trwcellvolt=cellvoltfall
- trwpackvolt=packvoltfall
- trw_array=np.array([trwtemp, trwcellvolt, trwpackvolt])
- if np.sum(trw_array)>3.5 and np.sum(trw_array>0.5)>1.5:
- fltcode='C599'
- df_res.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入']
- df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- elif safetywarning1>2.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5:
- fltcode='C599'
- df_res.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入']
- df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- else:
- df_res=pd.DataFrame()
- #更新df_ram_alarm信息
- if np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5:
- safetywarning1=3
- df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- else:
- safetywarning1=0
- df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- if max(cellvolt2)>5 or min(cellvolt2)<1:
- safetywarning2=3
- df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- elif max(cellvolt2)<4.5 and min(cellvolt2)>2.5:
- safetywarning2=0
- if safetywarning1==0:
- df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- else:
- df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- else:
- df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2]
- return df_res, df_bms_ram, df_ram_alarm
-
- else:
- df_ram_alarm=self.df_ram_alarm
- df_ram_bms=self.df_ram_bms
- if (safetywarning1>2.5 or safetywarning2>2.5) and (time_now-df_ram_alarm.iloc[-1]['time']).total_seconds()>120:
- fltcode='C599'
- time_now=time_now.strftime('%Y-%m-%d %H:%M:%S')
- time_now=datetime.datetime.strptime(time_now,'%Y-%m-%d %H:%M:%S')
- df_res.loc[0]=[time_now, end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入']
- df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2'])
- else:
- df_res=pd.DataFrame()
- return df_res, df_ram_bms, df_ram_alarm
-
|