import pandas as pd import numpy as np import datetime from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class BatDiag: def __init__(self,sn,celltype,df_bms,df_diag_ram_sn,df_bms_ram_sn,df_soh,df_uniform): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.df_uniform=df_uniform self.df_soh=df_soh self.df_bms_ram=df_bms_ram_sn.copy() self.df_diag_ram=df_diag_ram_sn.copy() df_bms['时间戳']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') if not df_bms_ram_sn.empty: self.df_bms=self.df_bms[self.df_bms['时间戳'] > df_bms_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据 self.df_bms.reset_index(inplace=True,drop=True) #重置索引 self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bmstime= df_bms['时间戳'] self.bms_soc=df_bms['SOC[%]'] self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] # othertemp=['其他温度'+str(x) for x in range(1,self.param.OtherTempNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] # self.celltemp_name=celltemp+othertemp def diag(self): if self.celltype<=50: df_res1,df_res2=self._bat_diag() return df_res1,df_res2 else: df_res1,df_res2=self._bat_diag() return df_res1,df_res2 #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = list(self.df_bms.loc[num,self.cellvolt_name]/1000) return cellvolt #..........................................电池诊断功能.................................................................. def _bat_diag(self): column_name=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'] df_res=pd.DataFrame(columns=column_name) end_time='0000-00-00 00:00:00' ah_accum=0 #SOC卡滞初始参数 as_chg=0 #过流诊断初始参数 as_dis=0 #过流诊断初始参数 cellvoltvalid=1 voltdsc_time=0 voltdsc_count=0 voltfail_time=0 voltfail_count=0 voltloose_time=0 voltloose_count=0 voltover_time=0 voltunder_time=0 cov_time=0 cuv_time=0 cdv_time=0 pov_time=0 puv_time=0 ot_time=0 ut_time=0 dt_time=0 for i in range(len(self.df_bms)): #电压诊断功能......................................................................................................................................... if i<1: cellvolt2=self._cellvolt_get(i) cellvoltmin2=min(cellvolt2) cellvoltmax2=max(cellvolt2) cellvoltmin_index2=cellvolt2.index(cellvoltmin2) cellvoltmax_index2=cellvolt2.index(cellvoltmax2) if not self.df_bms_ram.empty: time1=self.df_bms_ram.iloc[-1]['time'] time2=self.bmstime[i] cellvolt1=self.df_bms_ram.iloc[-1]['cellvolt'] cellvoltmin1=min(cellvolt1) cellvoltmax1=max(cellvolt1) cellvoltmin_index1=cellvolt1.index(cellvoltmin1) cellvoltmax_index1=cellvolt1.index(cellvoltmax1) else: cellvolt1=cellvolt2 time1=self.bmstime[i] time2=self.bmstime[i] cellvoltmin1=cellvoltmin2 cellvoltmax1=cellvoltmax2 cellvoltmin_index1=cellvoltmin_index2 cellvoltmax_index1=cellvoltmax_index2 else: time1=self.bmstime[i-1] time2=self.bmstime[i] cellvolt2=self._cellvolt_get(i) cellvoltmin2=min(cellvolt2) cellvoltmax2=max(cellvolt2) cellvoltmin_index2=cellvolt2.index(cellvoltmin2) cellvoltmax_index2=cellvolt2.index(cellvoltmax2) cellvolt1=self._cellvolt_get(i-1) cellvoltmin1=min(cellvolt1) cellvoltmax1=max(cellvolt1) cellvoltmin_index1=cellvolt1.index(cellvoltmin1) cellvoltmax_index1=cellvolt1.index(cellvoltmax1) #电压采样断线.......................................................................................................................................... if not 'C308' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmin2<2 and cellvoltmax2>4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) and (cellvoltmin1<2 and cellvoltmax1>4.5 and abs(cellvoltmax_index1-cellvoltmin_index1)==1): #电压断线故障进入 cellvoltvalid=0 voltdsc_time=voltdsc_time+(time2-time1).total_seconds() if voltdsc_time>self.param.volt_time: #持续时间 time=self.bmstime[i] code='C308' faultlv=3 faultinfo='电芯{}和{}电压采样断线'.format(cellvoltmin_index2+1, cellvoltmax_index2+1) faultadvice='通知用户更换电池,电池返厂维修' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass elif (cellvoltmin2<2 and cellvoltmax2>4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) and (cellvoltmin1>2.5 and cellvoltmax1<4.3): #连续跳变 cellvoltvalid=0 voltdsc_count=voltdsc_count+1 if voltdsc_count>=3: time=self.bmstime[i] code='C308' faultlv=3 faultinfo='电芯{}和{}电压采样断线'.format(cellvoltmin_index2+1, cellvoltmax_index2+1) faultadvice='通知用户更换电池,电池返厂维修' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: voltdsc_time=0 else: cellvoltvalid=0 if cellvoltmin2>2 and cellvoltmax2<4.5 and cellvoltmin1>2 and cellvoltmax1<4.5: cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C308'].index, 'end_time'] = time else: pass #电压采样系统失效............................................................................................................. if not 'C309' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmin2<1 and cellvoltmin1<1) or (cellvoltmax2>5 and cellvoltmax1>5): cellvoltvalid=0 voltfail_time=voltfail_time+(time2-time1).total_seconds() if voltfail_time>self.param.volt_time: #持续时间 time=self.bmstime[i] code='C309' faultlv=3 faultinfo='电芯电压采样系统失效' faultadvice='通知用户更换电池,电池返厂维修' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass elif (cellvoltmin2<1 and cellvoltmin1>2.5) or (cellvoltmax2>5 and cellvoltmax1<4.5): #连续跳变 cellvoltvalid=0 voltfail_count=voltfail_count+1 if voltfail_count>=3: time=self.bmstime[i] code='C309' faultlv=3 faultinfo='电芯电压采样系统失效' faultadvice='通知用户更换电池,电池返厂维修' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: voltfail_time=0 else: cellvoltvalid=0 if cellvoltmin2>2.5 and cellvoltmax2<4.5 and cellvoltmin1>2.5 and cellvoltmax1<4.5: cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C309'].index, 'end_time'] = time else: pass #电压采样线松动................................................................................................................. if not 'C208' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmin2>2 and cellvoltmax2<4.5 and cellvoltmin1>2 and cellvoltmax1<4.5: cellvolt1_std=np.std(cellvolt1) cellvolt1_mean=np.mean(cellvolt1) cellvolt1_3sigma=(np.array(cellvolt1)-cellvolt1_mean)/cellvolt1_std cellvolt2_std=np.std(cellvolt2) cellvolt2_mean=np.mean(cellvolt2) cellvolt2_3sigma=(np.array(cellvolt2)-cellvolt2_mean)/cellvolt2_std if (min(cellvolt1_3sigma)<-3 and max(cellvolt1_3sigma)>3) and (min(cellvolt2_3sigma)<-3 and max(cellvolt2_3sigma)>3): #连续发生 cellvoltvalid=0 voltloose_time=voltloose_time+(time2-time1).total_seconds() if voltloose_time>self.param.volt_time: time=self.bmstime[i] code='C208' faultlv=3 faultinfo='电芯电压{}和{}采样松动'.format(cellvoltmin_index2+1, cellvoltmax_index2+1) faultadvice='通知技术人员介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass elif (min(cellvolt1_3sigma)>-3 and max(cellvolt1_3sigma)<3) and (min(cellvolt2_3sigma)<-3 and max(cellvolt2_3sigma)>3): #连续跳变 cellvoltvalid=0 voltloose_count=voltloose_count+1 if voltloose_count>=3: time=self.bmstime[i] code='C208' faultlv=3 faultinfo='电芯电压{}和{}采样松动'.format(cellvoltmin_index2+1, cellvoltmax_index2+1) faultadvice='通知技术人员介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: voltloose_time=0 else: voltloose_time=0 else: cellvoltvalid=0 cellvolt1_std=np.std(cellvolt1) cellvolt1_mean=np.mean(cellvolt1) cellvolt1_3sigma=(np.array(cellvolt1)-cellvolt1_mean)/cellvolt1_std cellvolt2_std=np.std(cellvolt2) cellvolt2_mean=np.mean(cellvolt2) cellvolt2_3sigma=(np.array(cellvolt2)-cellvolt2_mean)/cellvolt2_std if(min(cellvolt1_3sigma)>-3 or max(cellvolt1_3sigma)<3) and (min(cellvolt2_3sigma)>-3 or max(cellvolt2_3sigma)<3): cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C208'].index, 'end_time'] = time else: pass if cellvoltvalid==1: #继电器粘连............................................................................................................................................. if not 'C310' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmax2>self.param.CellOvLv2 and cellvoltmax1>self.param.CellOvLv2 and self.packcrnt[i]<0: voltover_time=voltover_time+(time2-time1).total_seconds() if voltover_time>self.param.volt_time: time=self.bmstime[i] code='C310' faultlv=3 faultinfo='充电继电器粘连' faultadvice='通知技术人员介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: voltover_time=0 else: if cellvoltmax20: voltunder_time=voltunder_time+(time2-time1).total_seconds() if voltunder_time>self.param.volt_time: time=self.bmstime[i] code='C311' faultlv=3 faultinfo='放电继电器粘连' faultadvice='通知技术人员介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: voltunder_time=0 else: if cellvoltmin2>self.param.CellUvLv2+0.05 and cellvoltmin1>self.param.CellOvLv2+0.05: time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C311'].index, 'end_time'] = time else: pass #电芯过压............................................................................................................................................. if not 'C501' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmax2>self.param.CellOvLv2 and cellvoltmax1>self.param.CellOvLv2 and self.packcrnt[i]<0: #二级过压进入 cov_time=cov_time+(time2-time1).total_seconds() if cov_time>self.param.volt_time: time=self.bmstime[i] code='C501' faultlv=5 faultinfo='电芯{}过压'.format(cellvolt1.index(cellvoltmax1)+1) faultadvice='联系用户立即停止充电,并通知技术运维人员' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: cov_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if cellvoltmax2self.param.volt_time: time=self.bmstime[i] code='C202' faultlv=2 faultinfo='电芯{}欠压'.format(cellvolt1.index(cellvoltmin1)+1) faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: cuv_time=0 else: if cellvoltmin2>self.param.CellUvLv1+0.1 and cellvoltmin1>self.param.CellUvLv1+0.1: time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C202'].index, 'end_time'] = time else: pass #电芯压差大..................................................................................................................................................... if not 'C104' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmax2-cellvoltmin2)>self.param.CellVoltDiffLv2 and (cellvoltmax1-cellvoltmin1)>self.param.CellVoltDiffLv2: #二级电芯压差 cdv_time=cdv_time+(time2-time1).total_seconds() if cdv_time>self.param.volt_time: time=self.bmstime[i] code='C104' faultlv=0 faultinfo='电芯{}和{}压差大'.format(cellvolt1.index(cellvoltmax1)+1,cellvolt1.index(cellvoltmin1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: cdv_time=0 else: if (cellvoltmax2-cellvoltmin2)4.5*self.param.CellVoltNums: #电池包电压有效性 packvoltvalid=0 else: packvoltvalid=1 if packvoltvalid==1: if not 'C502' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if packvolt1>self.param.PackVoltOvLv2 and packvolt2>self.param.PackVoltOvLv2 and self.packcrnt[i]<0: #电池包过压二级进入 pov_time=pov_time+(time2-time1).total_seconds() if pov_time>self.param.volt_time: time=self.bmstime[i] code='C502' faultlv=5 faultinfo='电池包过压' faultadvice='联系用户立即停止充电,并通知技术运维人员' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: pov_time=0 else: if packvolt1self.param.volt_time: time=self.bmstime[i] code='C203' faultlv=2 faultinfo='电池包欠压' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: puv_time=0 else: if packvolt1>self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums and packvolt2>self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums: #电池包二级欠压恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C203'].index, 'end_time'] = time else: pass else: pov_time=0 puv_time=0 #温度诊断功能............................................................................................................. if i<1: if not self.df_bms_ram.empty: celltemp1=self.df_bms_ram.iloc[-1]['celltemp'] celltemp2=self._celltemp_get(i) time1=self.df_bms_ram.iloc[-1]['time'] time2=self.bmstime[i] else: celltemp1=self._celltemp_get(i) celltemp2=self._celltemp_get(i) time1=self.bmstime[i] time2=self.bmstime[i] else: celltemp1=self._celltemp_get(i-1) celltemp2=self._celltemp_get(i) time1=self.bmstime[i-1] time2=self.bmstime[i] celltempmin1=min(celltemp1) celltempmin2=min(celltemp2) celltempmax1=max(celltemp1) celltempmax2=max(celltemp2) #温度有效性判断.......................................................................... if celltempmax1>self.param.CellTempUpLmt or celltempmin1self.param.CellTempHighLv2 and celltempmax2>self.param.CellTempHighLv2: #二级高温进入 ot_time=ot_time+(time2-time1).total_seconds() if ot_time>self.param.temp_time: time=self.bmstime[i] code='C302' faultlv=3 faultinfo='温度{}高温'.format(celltemp1.index(celltempmax1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: ot_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax1self.param.temp_time: time=self.bmstime[i] code='C102' faultlv=1 faultinfo='温度{}低温'.format(celltemp1.index(celltempmin1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: ut_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax1>self.param.CellTempLowLv1+2 and celltempmax2>self.param.CellTempLowLv1+2: #二级高温恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C102'].index, 'end_time'] = time else: pass #温差判断............................................................................................................................. if not 'C103' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if (celltempmax1-celltempmin1)>self.param.CellTempDiffLv2 and (celltempmax2-celltempmin2)>self.param.CellTempDiffLv2: #二级温差进入 dt_time=dt_time+(time2-time1).total_seconds() if dt_time>self.param.temp_time: time=self.bmstime[i] code='C103' faultlv=1 faultinfo='温度{}和{}温差大'.format(celltemp1.index(celltempmax1)+1,celltemp1.index(celltempmin1)+1) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: dt_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if (celltempmax1-celltempmin1)self.param.CellTempDiffLv1-2: #二级温差恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C103'].index, 'end_time'] = time else: pass else: ot_time=0 ut_time=0 dt_time=0 #电流过流诊断....................................................................................................................... if i>0.5: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120 and self.packcrnt[i]>self.param.PackDisOc and self.packcrnt[i-1]>self.param.PackDisOc: as_dis=as_dis+(self.packcrnt[i]-self.param.PackDisOc)*step #ah累计 elif step<120 and self.packcrnt[i]100: time=self.bmstime[i] code='C306' faultlv=3 faultinfo='电池包放电过流' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i]100: time=self.bmstime[i] code='C305' faultlv=3 faultinfo='电池包充电过流' faultadvice='联系用户询问用车场景,技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i]>self.param.PackChgOc+10: time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C305'].index, 'end_time'] = time else: pass #SOC卡滞、跳变诊断................................................................................................ if i<1: bmssoc_st=float(self.bms_soc[i]) bmssoc_last=float(self.bms_soc[i]) bmssoc_now=float(self.bms_soc[i]) else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120: ah_accum=ah_accum-self.packcrnt[i]*step/3600 #ah累计 else: pass #SOC卡滞............................................................................................................ if abs(ah_accum)>self.param.Capacity*0.1: bmssoc_now=float(self.bms_soc[i]) if not 'C106' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if abs(bmssoc_now-bmssoc_st)self.param.SocClamp: #SOC卡滞故障退出 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C106'].index, 'end_time'] = time else: pass bmssoc_st=bmssoc_now ah_accum=0 else: pass #SOC跳变.................................................................................................................... bmssoc_last=float(self.bms_soc[i-1]) bmssoc_now=float(self.bms_soc[i]) if not 'C107' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if step<70 and abs(bmssoc_now-bmssoc_last)>self.param.SocJump: #SOC跳变进入 time=self.bmstime[i] code='C107' faultlv=0 faultinfo='电池SOC跳变{}%'.format(bmssoc_now-bmssoc_last) faultadvice='技术介入诊断,检修电池BMS软件' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if abs(bmssoc_now-bmssoc_st)self.param.SocDiff: #SOC一致性差故障进入 time=self.bmstime[0] code='C201' faultlv=0 faultinfo='电芯{}和{}SOC差过大:{}'.format(int(self.df_uniform.loc[0,'cellmin_num']),int(self.df_uniform.loc[0,'cellmax_num']),cellsoc_diff) faultadvice='技术介入诊断' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if cellsoc_diffself.param.SohLow+2: #soh过低故障恢复 time=self.bmstime[0] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C204'].index, 'end_time'] = time else: pass if not 'C205' in list(self.df_diag_ram['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellsoh_diff>self.param.SohDiff: time=self.bmstime[0] code='C205' faultlv=2 faultinfo='电池包容量一致性差:电芯{}'.format(cellsoh_lowindex) faultadvice='检修电池,更换容量过低的电芯或模组' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if cellsoh_diff