|
@@ -0,0 +1,215 @@
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import datetime
|
|
|
+from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
|
|
|
+
|
|
|
+class SafetyAlarm:
|
|
|
+ def __init__(self,sn,celltype,df_bms,df_bms_ram): #参数初始化
|
|
|
+
|
|
|
+ self.sn=sn
|
|
|
+ self.celltype=celltype
|
|
|
+ self.param=BatParam.BatParam(celltype)
|
|
|
+ self.df_bms=df_bms
|
|
|
+ self.df_bms_ram=df_bms_ram
|
|
|
+ df_bms['时间戳']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
|
|
|
+
|
|
|
+ if not df_bms_ram.empty:
|
|
|
+ self.df_bms=self.df_bms[self.df_bms['时间戳'] > df_bms_ram.iloc[-1]['time']] #滤除原始数据中的重复数据
|
|
|
+ self.df_bms.reset_index(inplace=True,drop=True) #重置索引
|
|
|
+
|
|
|
+ self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
|
|
|
+ self.packvolt=df_bms['总电压[V]']
|
|
|
+ self.bmstime= df_bms['时间戳']
|
|
|
+
|
|
|
+ self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
|
|
|
+ othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)]
|
|
|
+ celltemp=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
|
|
|
+ self.celltemp_name=celltemp+othertemp
|
|
|
+
|
|
|
+
|
|
|
+ def diag(self):
|
|
|
+ if self.celltype<=50:
|
|
|
+ df_res=self._ncm_diag()
|
|
|
+ return df_res
|
|
|
+ else:
|
|
|
+ df_res=self._ncm_diag()
|
|
|
+ return df_res
|
|
|
+
|
|
|
+
|
|
|
+ #定义滑动滤波函数.............................................................................................
|
|
|
+ def _np_move_avg(self,a, n, mode="same"):
|
|
|
+ return (np.convolve(a, np.ones((n,)) / n, mode=mode))
|
|
|
+
|
|
|
+ #寻找当前行数据的所有温度值...................................................................................
|
|
|
+ def _celltemp_get(self,num):
|
|
|
+ celltemp = list(self.df_bms.loc[num,self.celltemp_name])
|
|
|
+ return celltemp
|
|
|
+
|
|
|
+ #获取当前行所有电压数据............................................................................................
|
|
|
+ def _cellvolt_get(self,num):
|
|
|
+ cellvolt = list(self.df_bms.loc[num,self.cellvolt_name]/1000)
|
|
|
+ return cellvolt
|
|
|
+
|
|
|
+ #..........................................三元电池诊断功能..................................................................
|
|
|
+ def _ncm_diag(self):
|
|
|
+
|
|
|
+ column_name=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice']
|
|
|
+ df_res=pd.DataFrame(columns=column_name)
|
|
|
+
|
|
|
+ end_time='0000-00-00 00:00:00'
|
|
|
+ celltemprise=0
|
|
|
+ celltemphigh=0
|
|
|
+ cellvoltfall=0
|
|
|
+ cellvoltdiff=0
|
|
|
+ packvoltfall=0
|
|
|
+
|
|
|
+ for i in range(len(self.df_bms)):
|
|
|
+
|
|
|
+ #温度诊断功能.............................................................................................................
|
|
|
+ temp2=np.array(self._celltemp_get(i))
|
|
|
+ celltempmin=min(temp2)
|
|
|
+ celltempmax=max(temp2)
|
|
|
+ #温度有效性判断...........................................................................
|
|
|
+ if celltempmax>self.param.CellTempUpLmt or celltempmin<self.param.CellTempLwLmt:
|
|
|
+ celltempvalid=0
|
|
|
+ else: #不作处理
|
|
|
+ celltempvalid=1
|
|
|
+
|
|
|
+ if celltempvalid==1:
|
|
|
+
|
|
|
+ #过温判断............................................................................................................................
|
|
|
+ if celltempmax>self.param.TrwTempHigh:
|
|
|
+ celltemphigh=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+ #温升判断.............................................................................................................................
|
|
|
+ if i<1:
|
|
|
+ if not self.df_bms_ram.empty:
|
|
|
+ time1=self.df_bms_ram.iloc[-1]['time']
|
|
|
+ time2=self.bmstime[i]
|
|
|
+ temp1=np.array(self.df_bms_ram.iloc[-1]['celltemp'])
|
|
|
+
|
|
|
+ delttime=(time2-time1).total_seconds()
|
|
|
+ celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率
|
|
|
+ if celltemp_rate>self.param.TrwTempRate:
|
|
|
+ celltemprise=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ time1=self.bmstime[i-1]
|
|
|
+ time2=self.bmstime[i]
|
|
|
+ temp1=np.array(self._celltemp_get(i-1))
|
|
|
+
|
|
|
+ delttime=(time2-time1).total_seconds()
|
|
|
+ celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率
|
|
|
+ if celltemp_rate>self.param.TrwTempRate:
|
|
|
+ celltemprise=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ #电压诊断功能.........................................................................................................................................
|
|
|
+ cellvolt=self._cellvolt_get(i)
|
|
|
+ cellvolt2=np.array(cellvolt)
|
|
|
+ cellvoltmin=min(cellvolt)
|
|
|
+ cellvoltmax=max(cellvolt)
|
|
|
+ cellvoltmin_index=cellvolt.index(cellvoltmin)
|
|
|
+ cellvoltmax_index=cellvolt.index(cellvoltmax)
|
|
|
+
|
|
|
+ #电压有效性..........................................................................................................................................
|
|
|
+ if (cellvoltmin<2 and cellvoltmax>4.5 and (cellvoltmax_index-cellvoltmin_index)==1) or cellvoltmin<0.01: #电压断线故障进入
|
|
|
+ cellvoltvalid=0
|
|
|
+ else:
|
|
|
+ cellvoltvalid=1
|
|
|
+
|
|
|
+ if cellvoltvalid==1:
|
|
|
+ #单体电压跌落诊断...........................................................................................................................
|
|
|
+ if i<1:
|
|
|
+ if not self.df_bms_ram.empty:
|
|
|
+ time1=self.df_bms_ram.iloc[-1]['time']
|
|
|
+ time2=self.bmstime[i]
|
|
|
+ delttime=(time2-time1).total_seconds()
|
|
|
+ cellvolt1=np.array(self.df_bms_ram.iloc[-1]['cellvolt'])
|
|
|
+ if delttime<310:
|
|
|
+ if self.packcrnt[i]<0.5 and max(cellvolt2-cellvolt1)>self.param.TrwCellVoltFall:
|
|
|
+ cellvoltfall=1
|
|
|
+ elif self.packcrnt[i]>0.5 and max(cellvolt2-cellvolt1)-self.packcrnt[i]*0.01>self.param.TrwCellVoltFall:
|
|
|
+ cellvoltfall=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ if min(cellvolt2)<self.param.TrwCellVoltLow: #电压跌落至<1.5V
|
|
|
+ cellvoltfall=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds()
|
|
|
+ cellvolt1=np.array(self._cellvolt_get(i-1))
|
|
|
+ if self.packcrnt[i]<0.5 and max(cellvolt2-cellvolt1)>self.param.TrwCellVoltFall:
|
|
|
+ cellvoltfall=1
|
|
|
+ elif self.packcrnt[i]>0.5 and max(cellvolt2-cellvolt1)-self.packcrnt[i]*0.01>self.param.TrwCellVoltFall:
|
|
|
+ cellvoltfall=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ #压差诊断........................................................................................................................................
|
|
|
+ if (max(cellvolt2)-min(cellvolt2))>self.param.TrwCellVoltDiff:
|
|
|
+ cellvoltdiff=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ #电池包诊断.....................................................................................................................................
|
|
|
+ if i<1:
|
|
|
+ if not self.df_bms_ram.empty:
|
|
|
+ time1=self.df_bms_ram.iloc[-1]['time']
|
|
|
+ time2=self.bmstime[i]
|
|
|
+ delttime=(time2-time1).total_seconds()
|
|
|
+ packvolt1=self.df_bms_ram.iloc[-1]['packvolt']
|
|
|
+ packvolt2=self.packvolt[i]
|
|
|
+ if delttime<310:
|
|
|
+ if self.packcrnt[i]<5 and (packvolt2-packvolt1)>self.param.TrwPackVoltFall:
|
|
|
+ packvoltfall=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ packvolt2=self.packvolt[i]
|
|
|
+ else:
|
|
|
+ packvolt1=self.packvolt[i-1]
|
|
|
+ packvolt2=self.packvolt[i]
|
|
|
+ if self.packcrnt[i]<5 and (packvolt2-packvolt1)>self.param.TrwPackVoltFall:
|
|
|
+ packvoltfall=1
|
|
|
+ else:
|
|
|
+ pass
|
|
|
+
|
|
|
+ #热失控故障判断........................................................................................................................
|
|
|
+ df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
|
|
|
+ df_bms_ram.loc[0]=[self.bmstime[0], self.sn, packvolt2, list(cellvolt2), list(temp2)]
|
|
|
+ if celltemprise==1 or celltemphigh==1:
|
|
|
+ trwtemp=1
|
|
|
+ else:
|
|
|
+ trwtemp=0
|
|
|
+
|
|
|
+ if cellvoltfall==1 or cellvoltdiff==1:
|
|
|
+ trwcellvolt=1
|
|
|
+ else:
|
|
|
+ trwcellvolt=0
|
|
|
+
|
|
|
+ if trwtemp+trwcellvolt+packvoltfall>1.5:
|
|
|
+ fltcode=119
|
|
|
+ df_res.loc[0]=[self.bmstime[0], end_time, self.sn, fltcode, 5, '电池发生热失控', '立刻远离电池']
|
|
|
+ return df_res, df_bms_ram
|
|
|
+ else:
|
|
|
+ return pd.DataFrame(), df_bms_ram
|