import pandas as pd import numpy as np import datetime import BatParam class SafetyAlarm: def __init__(self,sn,celltype,df_bms,df_diag_ram_sn, df_bms_ram_sn, df_alarm_ram_sn): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.df_ram_alarm=df_alarm_ram_sn.copy() self.df_ram_bms=df_bms_ram_sn.copy() self.df_diag_ram=df_diag_ram_sn.copy() df_bms['time']=pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S') if (not df_bms_ram_sn.empty) and (not self.df_bms.empty): self.df_bms=self.df_bms[self.df_bms['time'] > df_bms_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据 self.df_bms.reset_index(inplace=True,drop=True) #重置索引 self.packcrnt=df_bms['PackCrnt']*self.param.PackCrntDec self.packvolt=df_bms['PackCrnt'] self.bmstime= df_bms['time'] self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)] # othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)] self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)] # self.celltemp_name=celltemp+othertemp def safety_alarm_diag(self): time_now=datetime.datetime.now() if not self.df_diag_ram.empty: fault_time=self.df_diag_ram.iloc[-1]['start_time'] if (time_now-fault_time).total_seconds()>24*3600: self.df_diag_ram['end_time']=time_now return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm else: return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm else: df_res=self._alarm_diag() return df_res #定义滑动滤波函数............................................................................................. def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = np.array(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name]) return cellvolt #..........................................三元电池诊断功能.................................................................. def _alarm_diag(self): # column_name=['time_st','time_sp','sn','faultcode','faultlv','faultinfo','faultadvice'] # df_res=pd.DataFrame(columns=column_name) end_time='0000-00-00 00:00:00' time_now=datetime.datetime.now() celltemprise=0 celltemphigh=0 cellvoltfall=0 cellvoltdsc=0 packvoltfall=0 cellvolttime=0 if not self.df_ram_alarm.empty: safetywarning1=self.df_ram_alarm.iloc[-1]['safetywarning1'] safetywarning2=self.df_ram_alarm.iloc[-1]['safetywarning2'] time_last=self.df_ram_alarm.iloc[-1]['time'] else: safetywarning1=0 safetywarning2=0 if not self.df_bms.empty: for i in range(len(self.df_bms)): if i<1: if not self.df_ram_bms.empty: temp1=np.array(self.df_ram_bms.iloc[-1]['celltemp']) temp2=self._celltemp_get(i) time1=self.df_ram_bms.iloc[-1]['time'] time2=self.bmstime[i] temp_st=temp1 time_st=time1 cellvolt1=np.array(self.df_ram_bms.iloc[-1]['cellvolt']) cellvolt2=self._cellvolt_get(i) packvolt1=self.df_ram_bms.iloc[-1]['packvolt'] packvolt2=self.packvolt[i] delta_soc=abs(self.df_ram_bms.iloc[-1]['packsoc']-self.packsoc[i]) packcrnt1=self.df_ram_bms.iloc[-1]['packcrnt'] packcrnt2=self.packcrnt[i] if len(temp1)==len(temp2): pass else: temp1=temp2 temp_st=temp2 if len(cellvolt1)==len(cellvolt2): pass else: cellvolt1=cellvolt2 else: temp1=self._celltemp_get(i) temp2=temp1 time1=self.bmstime[i]-datetime.timedelta(seconds=1) time2=self.bmstime[i] temp_st=temp1 time_st=time1 cellvolt1=self._cellvolt_get(i) cellvolt2=cellvolt1 packvolt1=self.packvolt[i] packvolt2=packvolt1 delta_soc=0 packcrnt1=self.packcrnt[i] packcrnt2=packcrnt1 else: temp1=self._celltemp_get(i-1) temp2=self._celltemp_get(i) time1=self.bmstime[i-1] time2=self.bmstime[i] cellvolt1=self._cellvolt_get(i-1) cellvolt2=self._cellvolt_get(i) packvolt1=packvolt2 packvolt2=self.packvolt[i] packcrnt1=self.packcrnt[i-1] packcrnt2=self.packcrnt[i] delta_soc=abs(self.packsoc[i-1]-self.packsoc[i]) #温度有效性判断........................................................................... if max(temp2)>self.param.CellTempUpLmt or min(temp2)self.param.CellTempLwLmt),True,False) temp2_valid=np.where((temp2self.param.CellTempLwLmt),True,False) temp1=temp1[temp1_valid & temp2_valid] temp2=temp2[temp1_valid & temp2_valid] if len(temp1)>0.5 and len(temp2)>0.5: celltempmax=max(temp2) #过温判断............................................................................................................................ if celltempmax>self.param.TrwTempHigh and np.sum(temp2>self.param.TrwTempHigh)<=self.CellTempNums/2: celltemphigh=1 else: pass #温升判断............................................................................................................................. delttime=(time2-time_st).total_seconds() if delttime>5: temp_st=temp_st[temp1_valid & temp2_valid] celltemp_rate=((temp2-temp_st)*60)/delttime #计算最大温升速率 time_st=time2 temp_st=self._celltemp_get(i) else: celltemp_rate=np.array([0,0]) if delta_soc<5 and max(celltemp_rate)>self.param.TrwTempRate and np.sum(celltemp_rate>self.param.TrwTempRate)<=self.CellTempNums/2: celltemprise=celltemprise+1 else: pass else: pass #电压有效性.......................................................................................................................................... cellvolt1_valid=np.where((cellvolt1>0.1) & (cellvolt1<5),True,False) cellvolt2_valid=np.where((cellvolt2>0.1) & (cellvolt2<5),True,False) cellvoltfall1=cellvolt1[cellvolt1_valid & cellvolt2_valid] cellvoltfall2=cellvolt2[cellvolt1_valid & cellvolt2_valid] if len(cellvoltfall1)2: cellvoltvalid=0 else: cellvoltvalid=1 else: cellvolttime=0 cellvoltvalid=1 delttime=(time2-time1).total_seconds() if delttime<60: #单体电压跌落........................................................................................................................... if len(cellvoltfall1)>1 and len(cellvoltfall2)>1: if delta_soc<5 and abs(packcrnt1)<=1 and abs(packcrnt2)<=1 and max(cellvoltfall1-cellvoltfall2)>self.param.TrwCellVoltFall and np.sum(cellvoltfall1-cellvoltfall2>self.param.TrwCellVoltFall)<=self.CellVoltNums/2: cellvoltfall=cellvoltfall+1 else: pass #单体电压断线.............................................................................................................................. if 0self.param.TrwPackVoltFall and self.CellVoltNums0.5 and np.sum(trw_array)>3.5 and np.sum(trw_array>0.5)>1.5: fltcode=119 self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入'] elif safetywarning1>2.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5: fltcode=119 self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入'] elif safetywarning1>2.5 and cellvoltvalid==0: fltcode=119 self.df_diag_ram.loc[0]=[self.bmstime[len(self.bmstime)-1], end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入'] else: #更新df_ram_alarm信息 if trwcellvolt>0.5 and np.sum(trw_array)>1.5 and np.sum(trw_array>0.5)>1.5: safetywarning1=3 self.df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] elif (trwcellvolt<0.5 or np.sum(trw_array>0.5)<1.5) and celltempvalid==1 and cellvoltvalid==1: safetywarning1=0 self.df_ram_alarm.loc[0]=[self.sn,time2,safetywarning1,safetywarning2] else: pass return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm else: if (safetywarning1>2.5 or safetywarning2>2.5) and (time_now-time_last).total_seconds()>360: fltcode=119 time_now=time_now.strftime('%Y-%m-%d %H:%M:%S') time_now=datetime.datetime.strptime(time_now,'%Y-%m-%d %H:%M:%S') self.df_diag_ram.loc[0]=[time_now, end_time, self.sn, fltcode, 5, '电池发生热失控', '断开继电器,远离电池,立刻消防处理,并通知技术人员介入'] self.df_ram_alarm=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) else: pass return self.df_diag_ram, self.df_ram_bms, self.df_ram_alarm