import pandas as pd import numpy as np import bisect import datetime import BatParam class BatDiag: def __init__(self,sn,celltype,df_bms,df_volt,df_temp,df_diag,df_diag_ram, df_soh, df_sor): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(self.celltype) self.df_volt=df_volt self.df_temp=df_temp self.df_soh=df_soh self.df_sor=df_sor self.packcrnt=(df_volt['可充电储能装置电流(A)'].astype('float'))*self.param.PackCrntDec self.packvolt=df_volt['可充电储能装置电压(V)'].astype('float') self.bmssoc=df_bms['SOC'].map(lambda x:x.strip('%')) self.HIVLLk=df_diag['高压互锁状态'] self.ISO=df_diag['绝缘'] self.enmtemp=df_bms['EnmTemp'] # self.bms_soh=df_volt['SOH[%]'] self.bmstime= pd.to_datetime(df_volt['上报时间'], format='%Y-%m-%d %H:%M:%S') self.param.CellTempNums=int(df_temp.loc[5,'可充电储能温度探针个数']) self.df_diag_ram=df_diag_ram self.cellvolt_name=[str(x)+'.0' for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=[str(x)+'.0' for x in range(1,self.param.CellTempNums+1)] def diag(self): if self.celltype<=50: df_res=self._ncm_diag() return df_res else: df_res=self._ncm_diag() return df_res #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = list(self.df_bms.loc[num,self.cellvolt_name]/1000) return cellvolt #电芯温升..................................................................................................... def _celltemp_rate(self,num): celltemp_rate={} if num>0: for j in range(1, self.param.CellTempNums+1): time_now=self.bmstime[num] for k in range(1,num): if (time_now-self.bmstime[num-k]).total_seconds()>60: num_last=num-k celltemp_now=self.df_temp.loc[num,str(j)+'.0'] celltemp_last=self.df_temp.loc[num_last,str(j)+'.0'] time_last=self.bmstime[num_last] temp_rate=(celltemp_now-celltemp_last)*60/(time_now-time_last).total_seconds() if temp_rate>self.param.CellTempRate: celltemp_rate[j]=temp_rate break else: pass else: pass return celltemp_rate #..........................................三元电池诊断功能.................................................................. def _ncm_diag(self): column_name=['start_time', 'end_time', 'product_id', 'code', 'level', 'info','advice'] df_res=pd.DataFrame(columns=column_name) end_time='0000-00-00 00:00:00' ah_accum=0 #SOC卡滞初始参数 as_chg=0 #过流诊断初始参数 as_dis=0 #过流诊断初始参数 cellvoltvalid=1 voltdsc_time=0 voltdsc_count=0 voltfail_time=0 voltfail_count=0 voltloose_time=0 voltloose_count=0 voltstray_time=0 tempstray_time=0 packvoltvalid_time=0 tempvalid_time=0 cov_time=0 cuv_time=0 cdv_time=0 pov_time=0 puv_time=0 ot_time=0 ut_time=0 dt_time=0 sor=eval(self.df_sor.loc[0,'sor']) sor_mean=np.mean(sor) for i in range(1,len(self.df_volt)-1): df_diag_now=self.df_diag_ram[self.df_diag_ram['end_time']=='0000-00-00 00:00:00'] #电压诊断功能......................................................................................................................................... if i<=1: time1=self.bmstime[i-1] time2=self.bmstime[i] cellvolt2=self._cellvolt_get(i) cellvoltmin2=min(cellvolt2) cellvoltmax2=max(cellvolt2) cellvoltmin_index2=cellvolt2.index(cellvoltmin2) cellvoltmax_index2=cellvolt2.index(cellvoltmax2) cellvolt1=self._cellvolt_get(i-1) cellvoltmin1=min(cellvolt1) cellvoltmax1=max(cellvolt1) cellvoltmin_index1=cellvolt1.index(cellvoltmin1) cellvoltmax_index1=cellvolt1.index(cellvoltmax1) cellvolt1_std=np.std(cellvolt1) cellvolt1_mean=np.mean(cellvolt1) cellvolt1_3sigma=(np.array(cellvolt1)-cellvolt1_mean)/cellvolt1_std cellvolt2_std=np.std(cellvolt2) cellvolt2_mean=np.mean(cellvolt2) cellvolt2_3sigma=(np.array(cellvolt2)-cellvolt2_mean)/cellvolt2_std celltemp1=self._celltemp_get(i-1) celltemp2=self._celltemp_get(i) celltempmin1=min(celltemp1) celltempmin2=min(celltemp2) celltempmax1=max(celltemp1) celltempmax2=max(celltemp2) celltemp1_std=np.std(celltemp1) celltemp1_mean=np.mean(celltemp1) celltemp1_3sigma=(np.array(celltemp1)-celltemp1_mean)/celltemp1_std celltemp2_std=np.std(celltemp2) celltemp2_mean=np.mean(celltemp2) celltemp2_3sigma=(np.array(celltemp2)-celltemp2_mean)/celltemp2_std else: time1=self.bmstime[i-1] time2=self.bmstime[i] cellvolt1=cellvolt2 cellvoltmin1=cellvoltmin2 cellvoltmax1=cellvoltmax2 cellvoltmin_index1=cellvoltmin_index2 cellvoltmax_index1=cellvoltmax_index2 cellvolt2=self._cellvolt_get(i) cellvoltmin2=min(cellvolt2) cellvoltmax2=max(cellvolt2) cellvoltmin_index2=cellvolt2.index(cellvoltmin2) cellvoltmax_index2=cellvolt2.index(cellvoltmax2) cellvolt1_std=cellvolt2_std cellvolt1_mean=cellvolt2_mean cellvolt1_3sigma=cellvolt2_3sigma cellvolt2_std=np.std(cellvolt2) cellvolt2_mean=np.mean(cellvolt2) cellvolt2_3sigma=(np.array(cellvolt2)-cellvolt2_mean)/cellvolt2_std #电压采样断线.......................................................................................................................................... if not 'C308' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmin2<2 and cellvoltmax2>4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) and (cellvoltmin1<2 and cellvoltmax1>4.5 and abs(cellvoltmax_index1-cellvoltmin_index1)==1): #电压断线故障进入 cellvoltvalid=0 voltdsc_time=voltdsc_time+(time2-time1).total_seconds() if voltdsc_time>self.param.volt_time: #持续时间 time=self.bmstime[i] code='C308' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样线断线' faultlocation='电芯{}'.format([cellvoltmin_index2+1, cellvoltmax_index2+1]) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass elif (cellvoltmin2<2 and cellvoltmax2>4.5 and abs(cellvoltmax_index2-cellvoltmin_index2)==1) and (cellvoltmin1>2.5 and cellvoltmax1<4.3): #连续跳变 cellvoltvalid=0 voltdsc_count=voltdsc_count+1 if voltdsc_count>=3: time=self.bmstime[i] code='C308' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样线断线' faultlocation='电芯{}'.format([cellvoltmin_index2+1, cellvoltmax_index2+1]) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: voltdsc_time=0 else: cellvoltvalid=0 if cellvoltmin2>2 and cellvoltmax2<4.5 and cellvoltmin1>2 and cellvoltmax1<4.5: cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C309'].index[-1], 'end_time'] = time else: pass #电压采样系统失效............................................................................................................. if not 'C309' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if ((cellvoltmin2<1 and cellvoltmin1<1) or (cellvoltmax2>5 and cellvoltmax1>5)): cellvoltvalid=0 voltfail_time=voltfail_time+(time2-time1).total_seconds() if voltfail_time>self.param.volt_time and (not 'C308' in list(df_diag_now['code'])): #持续时间 time=self.bmstime[i] code='C309' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样电路异常' faultlocation='电芯{}'.format(list(np.argwhere(np.array(cellvolt2)<1)+1)+list(np.argwhere(np.array(cellvolt2)>5)+1)) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass elif (cellvoltmin2<1 and cellvoltmin1>2.5) or (cellvoltmax2>5 and cellvoltmax1<4.5): #连续跳变 cellvoltvalid=0 voltfail_count=voltfail_count+1 if voltfail_count>=3 and (not 'C308' in list(df_diag_now['code'])): time=self.bmstime[i] code='C309' faultlv=3 faultinfo='电芯电压无效' reason='数据通讯异常' faultlocation='电芯{}'.format(list(np.argwhere(np.array(cellvolt2)<1)+1)+list(np.argwhere(np.array(cellvolt2)>5)+1)) faultadvice='检修数据传输链路' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: voltfail_time=0 else: cellvoltvalid=0 if cellvoltmin2>2.5 and cellvoltmax2<4.5 and cellvoltmin1>2.5 and cellvoltmax1<4.5: cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C309'].index[-1], 'end_time'] = time else: pass #电压采样线松动................................................................................................................. if not 'C208' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmin2>2 and cellvoltmax2<4.5 and cellvoltmin1>2 and cellvoltmax1<4.5: if (min(cellvolt1_3sigma)<-3 and max(cellvolt1_3sigma)>3) and (min(cellvolt2_3sigma)<-3 and max(cellvolt2_3sigma)>3) and (cellvoltmax2-cellvoltmin2)>0.2 and (cellvoltmax1-cellvoltmin1)>0.2: #连续发生 cellvoltvalid=0 voltloose_time=voltloose_time+(time2-time1).total_seconds() if voltloose_time>self.param.volt_time: time=self.bmstime[i] code='C208' faultinfo='电芯电压无效' reason='电芯电压采样线松动' faultlocation='电芯{}'.format([cellvoltmin_index2+1, cellvoltmax_index2+1]) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass elif (min(cellvolt1_3sigma)>-3 and max(cellvolt1_3sigma)<3) and (min(cellvolt2_3sigma)<-3 and max(cellvolt2_3sigma)>3) and (cellvoltmax2-cellvoltmin2)>0.2 and (cellvoltmax1-cellvoltmin1)>0.2: #连续跳变 cellvoltvalid=0 voltloose_count=voltloose_count+1 if voltloose_count>=3: time=self.bmstime[i] code='C208' faultlv=3 faultinfo='电压电压无效' reason='电芯电压采样线松动' faultlocation='电芯{}'.format([cellvoltmin_index2+1, cellvoltmax_index2+1]) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: voltloose_time=0 else: voltloose_time=0 else: if(cellvoltmax2-cellvoltmin2)<0.1 and (cellvoltmax1-cellvoltmin1)<0.1: cellvoltvalid=1 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C208'].index[-1], 'end_time'] = time else: pass #电芯电压诊断............................................................................................................................................ if cellvoltvalid==1: #电芯过压............................................................................................................................................. if not 'C401' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmax2>self.param.CellOvLv2 and cellvoltmax1>self.param.CellOvLv2: #二级过压进入 cov_time=cov_time+(time2-time1).total_seconds() if cov_time>self.param.volt_time: time=self.bmstime[i] code='C401' faultlv=4 faultinfo='电芯过压' faultlocation='电芯{}'.format(list(np.argwhere(np.array(cellvolt2)>self.param.CellOvLv2)+1)) influence='长时间过压会导致电池析锂,存在电池安全与寿命衰减过快风险' cellocvmax=cellvoltmax2-self.packcrnt[i]*sor_mean*2 #内阻反推OCV cellsocmax=np.interp(cellocvmax,self.param.LookTab_OCV,self.param.LookTab_SOC) #ocv反查表得到SOC if self.bmssoc[i]<90 and self.packcrnt[i]>-self.param.Capacity/10 and self.packcrnt[i-1]>-self.param.Capacity/10: reason='BMS计算SOC偏低' faultadvice='优化SOC算法' elif self.bmssoc[i]self.param.volt_time: time=self.bmstime[i] code='C202' faultlv=2 faultinfo='电芯{}欠压' faultlocation='电芯{}'.format([cellvoltmin_index2+1, cellvoltmax_index2+1]) influence='欠压可能导致电池过放,严重过放会导致负极集流体溶解,进而发生内短路风险' cellocvmin=cellvoltmin2+-self.packcrnt[i]*sor_mean*2 #内阻反推OCV cellsocmin=np.interp(cellocvmin,self.param.LookTab_OCV,self.param.LookTab_SOC) #ocv反查表得到SOC if self.bmssoc[i]>10 and self.packcrnt[i]cellsocmin+10: reason='BMS计算SOC偏高' faultadvice='优化SOC算法' elif self.packcrnt[i-1]>self.param.Capacity/2 or self.packcrnt[i]>self.param.Capacity/2: reason='放电电流过大' faultadvice='优化充放电功率限值' else: reason='1.BMS软件策略BUG,2.继电器粘连' faultadvice='检修电池包' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: cuv_time=0 else: if cellvoltmin2>self.param.CellUvLv1+0.1 and cellvoltmin1>self.param.CellUvLv1+0.1: time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C202'].index[-1], 'end_time'] = time else: pass #电芯压差大..................................................................................................................................................... if not 'C104' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmax2-cellvoltmin2)>self.param.CellVoltDiffLv2 and (cellvoltmax1-cellvoltmin1)>self.param.CellVoltDiffLv2: #二级电芯压差 cdv_time=cdv_time+(time2-time1).total_seconds() if cdv_time>self.param.volt_time: time=self.bmstime[i] code='C104' faultlv=0 faultinfo='电芯电压一致性差' faultlocation='电芯{}'.format(list(np.argwhere(np.array(cellvolt2)5: reason='电芯容量一致性差' elif ('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code'])): reason='电芯内阻一致性差' elif 'C490' in list(df_diag_now['code']): reason='电芯自放电异常' elif celltempmin1<-5 and celltempmin2<-5 and abs(self.packcrnt[i-1])>self.param.Capacity/10 and abs(self.packcrnt[i])>self.param.Capacity/10: reason='电芯低温性能差' faultadvice='优化电芯低温性能' elif self.bmssoc[i-1]<3 and self.self.bmssoc[i]<3: reason='SOC过低' faultadvice='通知用户充电' else: reason='BMS均衡逻辑异常' faultadvice='优化均衡策略' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: cdv_time=0 else: if (cellvoltmax2-cellvoltmin2)0.05: cellvoltvalid=0 voltstray_time=voltstray_time+(time2-time1).total_seconds() if voltstray_time>self.param.volt_time: time=self.bmstime[i] code='C206' faultlv=2 faultinfo='电芯电压离群' faultlocation='电芯{}'.format([cellvoltmin_index2+1]) faultadvice='更换模组' influence='容量/内阻/自放电不一致,影响电池充放电性能' if (not self.df_soh.empty) and (self.df_soh.loc[0,'cellsohmax']-self.df_soh.loc[0,'cellsohmin'])>5: reason='电芯容量一致性差' elif ('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code'])): reason='电芯内阻一致性差' elif 'C490' in list(df_diag_now['code']): reason='电芯自放电异常' elif celltempmin1<-5 and celltempmin2<-5 and abs(self.packcrnt[i-1])>self.param.Capacity/10 and abs(self.packcrnt[i])>self.param.Capacity/10: reason='电芯低温性能差' faultadvice='优化电芯低温性能' elif self.bmssoc[i-1]<3 and self.self.bmssoc[i]<3: reason='SOC过低' faultadvice='通知用户充电' else: reason='BMS均衡逻辑异常' faultadvice='优化均衡策略' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass elif max(cellvolt1_3sigma)>4 and max(cellvolt2_3sigma)>4 and cellvoltmax2-cellvolt2_mean>0.05: #连续发生 cellvoltvalid=0 voltstray_time=voltstray_time+(time2-time1).total_seconds() if voltstray_time>self.param.volt_time: time=self.bmstime[i] code='C206' faultlv=2 faultinfo='电芯电压离群' faultlocation='电芯{}'.format([cellvoltmax_index2+1]) faultadvice='更换模组' influence='容量/内阻/自放电不一致,影响电池充放电性能' if (not self.df_soh.empty) and (self.df_soh.loc[0,'cellsohmax']-self.df_soh.loc[0,'cellsohmin'])>5: reason='电芯容量一致性差' elif ('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code'])): reason='电芯内阻一致性差' elif 'C490' in list(df_diag_now['code']): reason='电芯自放电异常' else: reason='BMS均衡逻辑异常' faultadvice='优化均衡策略' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: voltstray_time=0 else: if min(cellvolt1_3sigma)>-3 and min(cellvolt2_3sigma)>-3 and max(cellvolt1_3sigma)<3 and max(cellvolt2_3sigma)<3: self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C206'].index[-1], 'end_time'] = self.bmstime[i] else: pass else: cov_time=0 cuv_time=0 cdv_time=0 voltstray_time=0 #电池包诊断..................................................................................................................................... packvolt1=self.packvolt[i-1] packvolt2=self.packvolt[i] time1=self.bmstime[i-1] time2=self.bmstime[i] if not 'C304' in list(df_diag_now['code']): if packvolt2<2*self.param.CellVoltNums or packvolt2>4.5*self.param.CellVoltNums or (cellvoltvalid==1 and abs(packvolt2-sum(cellvolt2))>10): #电池包电压有效性 packvoltvalid=0 packvoltvalid_time=packvoltvalid_time+(time2-time1).total_seconds() if packvoltvalid_time>self.param.volt_time: time=self.bmstime[i] code='304' faultlv=3 faultinfo='电池包电压无效' reason='1.电池包电压采样电路异常,2.数据通讯异常' faultlocation='电池包电压' faultadvice='召回电池包,进行检修' influence='失去对电池包电压监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: packvoltvalid=1 packvoltvalid_time=0 else: packvoltvalid=1 packvoltvalid_time=0 if packvolt1>2.2*self.param.CellVoltNums or packvolt2<4.3*self.param.CellVoltNums: self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C304'].index[-1], 'end_time'] = self.bmstime[i] else: pass if packvoltvalid==1: if not 'C402' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if packvolt1>self.param.PackVoltOvLv2 and packvolt2>self.param.PackVoltOvLv2 and self.packcrnt[i]<0: #电池包过压二级进入 pov_time=pov_time+(time2-time1).total_seconds() if pov_time>self.param.volt_time: time=self.bmstime[i] code='C402' faultlv=4 faultinfo='电池包过压' faultlocation='电池包电压' influence='长时间过压会导致电池析锂,存在电池安全与寿命衰减过快风险' cellocvmax=cellvoltmax2-self.packcrnt[i]*sor_mean*2 #内阻反推OCV cellsocmax=np.interp(cellocvmax,self.param.LookTab_OCV,self.param.LookTab_SOC) #ocv反查表得到SOC if self.bmssoc[i]<90 and self.packcrnt[i]>-self.param.Capacity/10 and self.packcrnt[i-1]>-self.param.Capacity/10: reason='BMS计算SOC偏低' faultadvice='优化SOC算法' elif self.bmssoc[i]self.param.volt_time: time=self.bmstime[i] code='C203' faultlv=2 faultinfo='电池包欠压' faultlocation='电池包电压' faultadvice='禁止放电' influence='欠压可能导致电池过放,严重过放会导致负极集流体溶解,进而发生内短路风险' cellocvmin=cellvoltmin2+-self.packcrnt[i]*sor_mean*2 #内阻反推OCV cellsocmin=np.interp(cellocvmin,self.param.LookTab_OCV,self.param.LookTab_SOC) #ocv反查表得到SOC if self.bmssoc[i]>10 and self.packcrnt[i]cellsocmin+10: reason='BMS计算SOC偏高' faultadvice='优化SOC算法' elif self.packcrnt[i-1]>self.param.Capacity/2 or self.packcrnt[i]>self.param.Capacity/2: reason='放电电流过大' faultadvice='优化充放电功率限值' else: reason='1.BMS软件策略BUG,2.继电器粘连' faultadvice='检修电池包' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: puv_time=0 else: if packvolt1>self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums and packvolt2>self.param.PackVoltUvLv1+0.1*self.param.CellVoltNums: #电池包二级欠压恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C203'].index[-1], 'end_time'] = time else: pass else: pov_time=0 puv_time=0 packvoltvalid_time=0 #温度有效性判断................................................................................................................................................ if not 'C301' in list(df_diag_now['code']): if celltempmax2>self.param.CellTempUpLmt or celltempmin210 or min(np.array(celltemp2)-np.array(celltemp1))<-10: celltempvalid=0 tempvalid_time=tempvalid_time+(time2-time1).total_seconds() if tempvalid_time>self.param.temp_time: time=self.bmstime[i] code='301' faultlv=3 faultinfo='电芯温度无效' reason='1.温度采样电路异常,2.数据通讯异常' faultlocation='温度探针{}'.format(list(np.argwhere(np.array(celltemp2)self.param.CellTempUpLmt)+1)) faultadvice='召回电池包,进行检修' influence='失去对电池温度监测' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: celltempvalid=1 tempvalid_time=0 else: celltempvalid=1 tempvalid_time=0 if celltempmax1self.param.CellTempHighLv2 and celltempmax2>self.param.CellTempHighLv2: #二级高温进入 ot_time=ot_time+(time2-time1).total_seconds() if ot_time>self.param.temp_time: time=self.bmstime[i] code='C302' faultlv=3 faultinfo='电芯温度过高' faultlocation='温度探针{}'.format(list(np.argwhere(np.array(celltemp2)>self.param.CellTempHighLv2)+1)) faultadvice='禁止充放电,并开启电池冷却功能' influence='高温下充放电,SEI膜增长加速,导致容量衰减过快,温度过高则存在热失控风险' if max(celltemp1_3sigma)>3 and max(celltemp2_3sigma)>3: reason='Busbar连接异常' faultadvice='检修电池包' elif sum(self.packcrnt[:i]*self.packvolt[:i])/(1000*(i+1))>50: reason='电池输出功率过大' faultadvice='优化电池充放电功率限值' else: reason='1.冷却液温度过高,2.冷却水泵异常,3.Busbar连接异常' faultadvice='检修电池包' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: ot_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax1self.param.temp_time: time=self.bmstime[i] code='C102' faultlv=1 faultinfo='电芯温度过低' faultlocation='温度探针{}'.format(list(np.argwhere(np.array(celltemp2)10 and self.enmtemp[i]-celltempmin2>10: reason='温度检测系统异常' faultadvice='检修电池包' elif self.bmssoc[i-1]<5 and self.bmssoc[i]<5: reason='电池包SOC过低' faultadvice='通知用户进行充电' else: reason='PTC加热系统异常' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: ut_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax1>self.param.CellTempLowLv1+2 and celltempmax2>self.param.CellTempLowLv1+2: #二级高温恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C102'].index[-1], 'end_time'] = time else: pass #温差判断............................................................................................................................. if not 'C103' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if (celltempmax1-celltempmin1)>self.param.CellTempDiffLv2 and (celltempmax2-celltempmin2)>self.param.CellTempDiffLv2: #二级温差进入 dt_time=dt_time+(time2-time1).total_seconds() if dt_time>self.param.temp_time: time=self.bmstime[i] code='C103' faultlv=1 faultinfo='电芯温差过大' faultlocation='温度探针{}'.format([celltemp2.index(celltempmin2)+1,celltemp2.index(celltempmax2)+1]) influence='存在电芯的老化速率不一致的风险' if self.enmtemp[i-1]>0 and self.enmtemp>0 and (('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code']))): reason='电芯内阻不一致' faultadvice='更换模组' elif self.enmtemp[i-1]>10 and self.enmtemp>10: reason='Busbar连接异常' faultadvice='检修电池包' elif self.enmtemp[i-1]<-10 and self.enmtemp<-10: reason='环境温度过低' faultadvice='合理优化低温充放电功率限值,并开启水泵' else: reason='电池包冷却/加热系统设计不合理' faultadvice='合理优化电池包冷却/加热系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: dt_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if (celltempmax1-celltempmin1)self.param.CellTempDiffLv1-2: #二级温差恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C103'].index[-1], 'end_time'] = time else: pass #温度离群判断............................................................................................................................. if not 'C105' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if max(celltemp1_3sigma)>3 and max(celltemp2_3sigma)>3: tempstray_time=tempstray_time+(time2-time1).total_seconds() if tempstray_time>self.param.temp_time: time=self.bmstime[i] code='C105' faultlv=1 faultinfo='电芯温度离群' faultlocation='温度探针{}'.format([celltemp2.index(celltempmax2)+1]) influence='存在电芯的老化速率不一致的风险' if self.enmtemp[i-1]>0 and self.enmtemp>0 and (('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code']))): reason='电芯内阻不一致' faultadvice='更换模组' elif self.enmtemp[i-1]>10 and self.enmtemp>10: reason='Busbar连接异常' faultadvice='检修电池包' elif self.enmtemp[i-1]<-10 and self.enmtemp<-10: reason='环境温度过低' faultadvice='合理优化低温充放电功率限值,并开启水泵' else: reason='电池包冷却/加热系统设计不合理' faultadvice='合理优化电池包冷却/加热系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] elif min(celltemp1_3sigma)<-3 and min(celltemp2_3sigma)<-3: tempstray_time=tempstray_time+(time2-time1).total_seconds() if tempstray_time>self.param.temp_time: time=self.bmstime[i] code='C105' faultlv=1 faultinfo='电芯温度离群' faultlocation='温度探针{}'.format([celltemp2.index(celltempmin2)+1]) influence='存在电芯的老化速率不一致的风险' if self.enmtemp[i-1]>0 and self.enmtemp>0 and (('C316' in list(df_diag_now['code'])) or ('C317' in list(df_diag_now['code']))): reason='电芯内阻不一致' faultadvice='更换模组' elif self.enmtemp[i-1]>10 and self.enmtemp>10: reason='Busbar连接异常' faultadvice='检修电池包' elif self.enmtemp[i-1]<-10 and self.enmtemp<-10: reason='环境温度过低' faultadvice='合理优化低温充放电功率限值,并开启水泵' else: reason='电池包冷却/加热系统设计不合理' faultadvice='合理优化电池包冷却/加热系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: tempstray_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if (celltempmax1-celltempmin1)self.param.CellTempDiffLv1-2: #二级温差恢复 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C105'].index[-1], 'end_time'] = time else: pass else: ot_time=0 ut_time=0 dt_time=0 tempstray_time=0 #电流过流诊断....................................................................................................................... if i>0.5: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120 and self.packcrnt[i]>self.param.PackDisOc and self.packcrnt[i-1]>self.param.PackDisOc: as_dis=as_dis+(self.packcrnt[i]-self.param.PackDisOc)*step #ah累计 elif step<120 and self.packcrnt[i]100: time=self.bmstime[i] code='C306' faultlv=3 faultinfo='电池放电过流' faultlocation='电池包' influence='长时间过流会导致电池欠压及温升过快' if cellvoltmin1100: time=self.bmstime[i] code='C305' faultlv=3 faultinfo='电池充电过流' faultlocation='电池包' influence='过流会导致电池析锂,存在电池安全与寿命衰减过快风险' if cellvoltmax1>self.param.CellOvLv2 and cellvoltmax2>self.param.CellOvLv2: reason='BMS控制策略异常' faultadvice='优化BMS控制策略' else: reason='1.充电器异常,2.继电器粘连' faultadvice='停止放电' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, reason, faultlocation, faultadvice, influence] else: pass else: if self.packcrnt[i]>self.param.PackChgOc+10: time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C305'].index[-1], 'end_time'] = time else: pass #SOC卡滞、跳变诊断................................................................................................ if i<2: bmssoc_st=float(self.bms_soc[i]) bmssoc_last=float(self.bms_soc[i]) bmssoc_now=float(self.bms_soc[i]) else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120: ah_accum=ah_accum-self.packcrnt[i]*step/3600 #ah累计 else: pass #SOC卡滞............................................................................................................ if abs(ah_accum)>self.param.Capacity*0.1: bmssoc_now=float(self.bms_soc[i]) if not 'C106' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if abs(bmssoc_now-bmssoc_st)self.param.SocClamp: #SOC卡滞故障退出 time=self.bmstime[i] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C106'].index[-1], 'end_time'] = time else: pass bmssoc_st=bmssoc_now ah_accum=0 else: pass #SOC跳变.................................................................................................................... bmssoc_last=float(self.bms_soc[i-1]) bmssoc_now=float(self.bms_soc[i]) if not 'C107' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 if step<70 and abs(bmssoc_now-bmssoc_last)>self.param.SocJump: #SOC跳变进入 time=self.bmstime[i] code='C107' faultlv=0 faultinfo='电池SOC跳变{}%'.format(bmssoc_now-bmssoc_last) faultadvice='技术介入诊断,检修电池BMS软件' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] else: pass else: if abs(bmssoc_now-bmssoc_st)self.param.SocDiff: #SOC一致性差故障进入 # time=self.bmstime[0] # code='C201' # faultlv=0 # faultinfo='电芯{}和{}SOC差过大:{}'.format(int(self.df_uniform.loc[0,'cellmin_num']),int(self.df_uniform.loc[0,'cellmax_num']),cellsoc_diff) # faultadvice='技术介入诊断' # self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] # else: # pass # else: # if cellsoc_diffself.param.SohLow+2: #soh过低故障恢复 # time=self.bmstime[0] # self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['code']=='C204'].index[-1], 'end_time'] = time # else: # pass # if not 'C205' in list(df_diag_now['code']): #当前故障中没有该故障,则判断是否发生该故障 # if cellsoh_diff>self.param.SohDiff: # time=self.bmstime[0] # code='C205' # faultlv=2 # faultinfo='电池包容量一致性差:电芯{}'.format(cellsoh_lowindex) # faultadvice='检修电池,更换容量过低的电芯或模组' # self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, end_time, self.sn, code, faultlv, faultinfo, faultadvice] # else: # pass # else: # if cellsoh_diff