import pandas as pd import numpy as np import datetime from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class SafetyAlarm: def __init__(self,sn,celltype,df_bms,df_bms_ram_sn,df_alarm_ram_sn): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.df_ram_bms=df_bms_ram_sn.copy() df_bms['时间戳']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.df_ram_alarm=df_alarm_ram_sn.copy() if (not df_bms_ram_sn.empty) and (not self.df_bms.empty): self.df_bms=self.df_bms[self.df_bms['时间戳'] > df_bms_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据 self.df_bms.reset_index(inplace=True,drop=True) #重置索引 self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bmstime= df_bms['时间戳'] self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] # othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] # self.celltemp_name=celltemp+othertemp def safety_alarm_diag(self): if self.celltype<=50: df_res=self._alarm_diag() return df_res else: df_res=self._alarm_diag() return df_res #定义滑动滤波函数............................................................................................. def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = np.array(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name]/1000) return cellvolt #..........................................三元电池诊断功能.................................................................. def _alarm_diag(self): df_res=pd.DataFrame(columns=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice']) end_time='0000-00-00 00:00:00' time_now=datetime.datetime.now() celltemprise=0 celltemphigh=0 cellvoltfall=0 packvoltfall=0 if not self.df_ram_alarm.empty: safetywarning1=self.df_ram_alarm.iloc[-1]['safetywarning1'] safetywarning2=self.df_ram_alarm.iloc[-1]['safetywarning2'] else: safetywarning1=0 safetywarning2=0 if not self.df_bms.empty: for i in range(len(self.df_bms)): #温度诊断功能............................................................................................................. if i<1: if not self.df_ram_bms.empty: temp1=self.df_ram_bms.iloc[-1]['celltemp'] temp2=self._celltemp_get(i) time1=self.df_ram_bms.iloc[-1]['time'] time2=self.bmstime[i] else: temp1=self._celltemp_get(i) temp2=self._celltemp_get(i) time1=self.bmstime[i]-datetime.timedelta(seconds=10) time2=self.bmstime[i] else: temp1=self._celltemp_get(i-1) temp2=self._celltemp_get(i) time1=self.bmstime[i-1] time2=self.bmstime[i] temp1=temp1[np.where((temp1self.param.CellTempLwLmt))] temp2=temp2[np.where((temp2self.param.CellTempLwLmt))] #温度有效性判断........................................................................... if len(temp1)>0.5 and len(temp2)>0.5 and len(temp1)==len(temp2): celltempvalid=1 else: #不作处理 celltempvalid=0 if celltempvalid==1: celltempmax=max(temp2) #过温判断............................................................................................................................ if celltempmax>self.param.TrwTempHigh: celltemphigh=1 else: pass #温升判断............................................................................................................................. delttime=(time2-time1).total_seconds() if delttime>1: celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率 else: celltemp_rate=0 if celltemp_rate>self.param.TrwTempRate and self.param.CellTempLwLmt4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) or (cellvoltmin1<2 and cellvoltmax1>4.5 and abs(cellvoltmax_index1-cellvoltmin_index1)==1): #电压断线故障进入 cellvoltvalid=0 else: cellvolt1=cellvolt1[np.where((cellvolt1>0.01) & (cellvolt1<4.5))] cellvolt2=cellvolt2[np.where((cellvolt2>0.01) & (cellvolt2<4.5))] if len(cellvolt1)>1 and len(cellvolt2)>1 and len(cellvolt1)==len(cellvolt2): cellvoltvalid=1 else: cellvoltvalid=0 if cellvoltvalid==1: #单体电压跌落诊断........................................................................................................................... delttime=(time2-time1).total_seconds() if delttime<360: if self.packcrnt[i]<0.5 and max(cellvolt1-cellvolt2)>self.param.TrwCellVoltFall: cellvoltfall=cellvoltfall+1 elif self.packcrnt[i]>0.5 and max(cellvolt1-cellvolt2)-self.packcrnt[i]*0.01>self.param.TrwCellVoltFall: cellvoltfall=cellvoltfall+1 else: pass else: pass else: pass #电池包诊断..................................................................................................................................... packvolt2=self.packvolt[i] #电池包电压有效性............................................................................................................................ if self.param.CellVoltNumsself.param.TrwPackVoltFall and self.param.CellVoltNumsself.param.TrwPackVoltFall and self.param.CellVoltNums3.5 and np.sum(trw_array>0.5)>1.5: fltcode='C599' df_res.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入'] df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) elif safetywarning1>2.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5: fltcode='C599' df_res.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入'] df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) else: df_res=pd.DataFrame() #更新df_ram_alarm信息 if np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5: safetywarning1=3 df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] else: safetywarning1=0 df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) if max(cellvolt2)>5 or min(cellvolt2)<1: safetywarning2=3 df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] elif max(cellvolt2)<4.5 and min(cellvolt2)>2.5: safetywarning2=0 if safetywarning1==0: df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) else: df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] else: df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] return df_res, df_bms_ram, df_ram_alarm else: df_ram_alarm=self.df_ram_alarm df_ram_bms=self.df_ram_bms if (safetywarning1>2.5 or safetywarning2>2.5) and (time_now-df_ram_alarm.iloc[-1]['time']).total_seconds()>120: fltcode='C599' time_now=time_now.strftime('%Y-%m-%d %H:%M:%S') time_now=datetime.datetime.strptime(time_now,'%Y-%m-%d %H:%M:%S') df_res.loc[0]=[time_now, end_time, self.sn, fltcode, 5, '电池发生热失控', '联系用户立即远离电池,并通知技术人员介入'] df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) else: df_res=pd.DataFrame() return df_res, df_ram_bms, df_ram_alarm