import pandas as pd import numpy as np import datetime from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class BatDiag: def __init__(self,sn,celltype,df_bms,df_soh,df_uniform,df_diag_Ram): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.df_soh=df_soh self.df_uniform=df_uniform self.df_diag_ram=df_diag_Ram self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.bmsfault1=self.df_bms['故障代码'] # self.bmsfault2=self.df_bms['alarm2'].tolist() # self.bmsfault3=self.df_bms['alarm3'].tolist() # self.bmsfault4=self.df_bms['fault'].tolist() self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] def diag(self): if self.celltype<=50: df_res=self._ncm_diag() return df_res else: df_res=self._ncm_diag() return df_res #定义滑动滤波函数............................................................................................. def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name]/1000) return cellvolt #..........................................三元电池诊断功能.................................................................. def _ncm_diag(self): bmssoc_st=float(self.bms_soc[0]) #SOC卡滞初始参数 ah_accum=0 #SOC卡滞初始参数 as_chg=0 #过流诊断初始参数 as_dis=0 #过流诊断初始参数 time1=self.bmstime[0] #温升速率初始参数 temp1=np.array(self._celltemp_get(0)) #温升速率初始参数 temprate_cnt=0 end_time='0000-00-00 00:00:00' for i in range(1,len(self.df_bms)): #温度诊断功能............................................................................................................. celltemp0=self._celltemp_get(i-1) celltemp1=self._celltemp_get(i) celltempmin0=min(celltemp0) celltempmin1=min(celltemp1) celltempmax0=max(celltemp0) celltempmax1=max(celltemp1) #温度有效性判断.......................................................................... if celltempmax0>self.param.CellTempUpLmt or celltempmin0self.param.CellTempHighLv2 and celltempmax1>self.param.CellTempHighLv2: #二级高温进入 time=self.bmstime[i] code=4 faultlv=3 faultinfo='温度{}高温二级'.format(celltemp1.index(celltempmax1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax0self.param.CellTempLowLv1+2 and celltempmax1>self.param.CellTempLowLv1+2: #二级高温恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==6].index, 'end_time'] = time else: pass #温差判断............................................................................................................................. if not 8 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (celltempmax0-celltempmin0)>self.param.CellTempDiffLv2 and (celltempmax1-celltempmin1)>self.param.CellTempDiffLv2: #二级温差进入 time=self.bmstime[i] code=8 faultlv=3 faultinfo='温度{}和{}温差大二级'.format(celltemp1.index(celltempmax1)+1,celltemp1.index(celltempmin1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: #ram当前故障中有该故障,则判断是否退出该故障 if (celltempmax0-celltempmin0)self.param.CellTempDiffLv1-2: #二级温差恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==8].index, 'end_time'] = time else: pass #温升判断 time2=self.bmstime[i] delttime=(time2-time1).total_seconds() if delttime>20: temp2=np.array(self._celltemp_get(i)) celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率 temp1=temp2 #更新初始温度 time1=time2 #更新初始时间 if celltemp_rate>self.param.CellTempRate: temprate_cnt=temprate_cnt+1 if not 9 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if temprate_cnt>2: #温升故障进入 time=self.bmstime[i] code=9 faultlv=3 faultinfo='温升速率过快:{}℃/min'.format(celltemp_rate) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: #ram当前故障中有该故障,则判断是否退出该故障 if celltemp_rate4.5) or cellvoltmin0<0.1 or cellvoltmax0>5: cellvoltvalid=0 else: cellvoltvalid=1 if cellvoltvalid==1: #过压诊断............................................................................................................. if not 12 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmax0>self.param.CellOvLv2 and cellvoltmax1>self.param.CellOvLv2: #二级过压进入 time=self.bmstime[i] code=12 faultlv=4 faultinfo='电芯{}过压二级'.format(cellvolt1.index(cellvoltmax1)+1) faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: #ram当前故障中有该故障,则判断是否退出该故障 if cellvoltmax0self.param.CellUvLv1+0.1 and cellvoltmin1>self.param.CellUvLv1+0.1: time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==14].index, 'end_time'] = time else: pass #电芯压差大..................................................................................................................................................... if not 16 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmax0-cellvoltmin0)>self.param.CellVoltDiffLv2 and (cellvoltmax1-cellvoltmin1)>self.param.CellVoltDiffLv2: #二级电芯压差 time=self.bmstime[i] code=16 faultlv=3 faultinfo='电芯{}和{}压差大二级'.format(cellvolt1.index(cellvoltmax1)+1,cellvolt1.index(cellvoltmin1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if (cellvoltmax0-cellvoltmin0)self.param.CellVoltDiffLv1-0.05: #二级欠压恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==16].index, 'end_time'] = time else: pass else: pass #电池包诊断..................................................................................................................................... if not 18 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if self.packvolt[i-1]>self.param.PackVoltOvLv2 and self.packvolt[i]>self.param.PackVoltOvLv2: #电池包过压二级进入 time=self.bmstime[i] code=18 faultlv=4 faultinfo='电池包过压二级' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.packvolt[i-1]self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums and self.packvolt[i]>self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums: #电池包二级欠压恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==20].index, 'end_time'] = time else: pass #电流过流诊断....................................................................................................................... step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120 and self.packcrnt[i]>self.param.PackDisOc: as_dis=as_dis+(self.packcrnt[i]-self.param.self.PackDisOc)*step #ah累计 elif step<120 and self.packcrnt[i]100: time=self.bmstime[i] code=22 faultlv=3 faultinfo='电池包放电过流' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i-1]100: time=self.bmstime[i] code=21 faultlv=3 faultinfo='电池包充电过流' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i-1]>self.param.PackChgOc+10 and self.packcrnt[i]>self.param.PackChgOc+10: time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==22].index, 'end_time'] = time else: pass #SOC卡滞、跳变诊断................................................................................................ step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120: ah_accum=ah_accum-self.packcrnt[i]*step/3600 #ah累计 else: pass #SOC卡滞............................................................................................................ if abs(ah_accum)>self.param.Capacity*0.05: bmssoc_now=float(self.bms_soc[i]) if not 27 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if abs(bmssoc_now-bmssoc_st)self.param.SocClamp: #SOC卡滞故障退出 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==27].index, 'end_time'] = time else: pass bmssoc_st=bmssoc_now ah_accum=0 else: pass #SOC跳变.................................................................................................................... if step<30: bmssoc_last=float(self.bms_soc[i-1]) bmssoc_now=float(self.bms_soc[i]) if not 28 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if abs(bmssoc_now-bmssoc_last)>self.param.SocJump: #SOC跳变进入 time=self.bmstime[i] code=28 faultlv=1 faultinfo='电池SOC跳变' faultadvice='技术介入诊断,检修电池BMS软件' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if abs(bmssoc_now-bmssoc_st)self.param.SocLow and self.bms_soc[i]>self.param.SocLow: #SOC过低故障退出 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==26].index, 'end_time'] = time else: pass #BMS故障报警........................................................................................................ if not 1 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if self.bmsfault1[i-1] is None or self.bmsfault1[i] is None: self.bmsfault1[i-1]=0 self.bmsfault1[i]=0 if self.bmsfault1[i-1]>0 or self.bmsfault1[i]>0: #BMS故障进入 time=self.bmstime[0] code=1 faultlv=2 faultinfo='BMS故障报警:{}'.format(self.bmsfault1[i-1]) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.bmsfault1[i-1]==0 and self.bmsfault1[i]==0: #BMS故恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==1].index, 'end_time'] = time #SOC一致性故障报警.......................................................................................................... if not self.df_uniform.empty: cellsoc_diff=self.df_uniform.loc[0,'cellsoc_diff'] if not 25 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellsoc_diff>self.param.SocDiff: #SOC一致性差故障进入 time=self.bmstime[0] code=25 faultlv=1 faultinfo='电芯{}和{}SOC差过大:{}'.format(self.df_uniform.loc[0,'cellmin_num'],self.df_uniform.loc[0,'cellmax_num']),cellsoc_diff faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if cellsoc_diffself.param.SohLow+2: #soh过低故障恢复 time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==23].index, 'end_time'] = time else: pass if not 24 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellsoh_diff>self.param.SohDiff: time=self.bmstime[0] code=24 faultlv=1 faultinfo='电池包容量一致性差:电芯{}'.format(cellsoh_lowindex) faultadvice='检修电池,更换容量过低的电芯或模组' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if cellsoh_diff100: # health_state=100 # elif health_state<0: # health_state=0 # else: # pass # health_state=eval(format(health_state,'.1f')) #返回诊断结果........................................................................................................... df_res=self.df_diag_ram if not df_res.empty: return df_res else: return pd.DataFrame() #............................................................内短路故障诊断............................................................................. class ShortDiag(): def __init__(self,sn,celltype,df_short): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_short=df_short def shortdiag(self): if len(self.df_short)>1: df_res=self._short_diag() return df_res else: return pd.DataFrame() #内短路故障检测................................................................................................................................... def _short_diag(self): column_name=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'] df_res=pd.DataFrame(columns=column_name) time=datetime.datetime.now() end_time=datetime.datetime.strftime('0000-00-00 00:00:00') end_time=datetime.datetime.strptime(end_time,'%Y-%m-%d %H:%M:%S') # if self.df_diag.empty: # health_state=100 # else: # health_state=self.df_diag.loc[0,'health_state'] for i in range(self.param.CellVoltNums): #将字符串分割为多列 short_current=self.df_short['short_current'] short_current=short_current.str.replace("[", '') short_current=short_current.str.replace("]", '') self.df_short['cellshort'+str(i+1)]=short_current.map(lambda x:x.split(',')[i]) self.df_short['cellshort'+str(i+1)]=self.df_short['cellshort'+str(i+1)].map(lambda x:eval(x)) #漏电流故障判断 cellshort=np.array(self.df_short['cellshort'+str(i+1)]) shortlv3=np.sum(cellshort>self.param.LeakCurrentLv3) shortlv2=np.sum(cellshort>self.param.LeakCurrentLv2)-shortlv3 shortlv1=np.sum(cellshort>self.param.LeakCurrentLv1)-shortlv2-shortlv3 shortlv=shortlv3*3 + shortlv2*2 + shortlv1 if not 31 in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if shortlv>=5: time=self.bmstime[0] code=31 faultlv=3 faultinfo='电芯{}发生严重内短路'.format(i+1) faultadvice='禁止充放电,检修电池' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if shortlv<3: time=self.bmstime[i] self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['code']==31].index, 'end_time'] = time if not df_res.empty: return df_res else: return pd.DataFrame()