import pandas as pd import numpy as np import datetime from USER.SPF.alibaba.Common import BatParam class BatDiag: def __init__(self,sn,celltype,df_bms,df_soh,df_soc,df_uniform,df_diag_ram): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.df_soh=df_soh self.df_soc=df_soc self.df_uniform=df_uniform self.df_diag_ram=df_diag_ram.copy() self.packcrnt=df_bms['PackCrnt']*self.param.PackCrntDec self.packvolt=df_bms['PackVolt'] self.bms_soc=df_bms['PackSOC'] self.bmstime= pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S') self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)] def diag(self): if len(self.df_bms)>1: if self.celltype<=50: df_res=self._ncm_diag() return df_res else: df_res=self._ncm_diag() return df_res else: return pd.DataFrame() #定义滑动滤波函数............................................................................................. def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的所有温度值................................................................................... def _celltemp_get(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) return celltemp #获取当前行所有电压数据............................................................................................ def _cellvolt_get(self,num): cellvolt = list(self.df_bms.loc[num,self.cellvolt_name]) return cellvolt #..........................................三元电池诊断功能.................................................................. def _ncm_diag(self): #参数初始化 bmssoc_st=float(self.bms_soc[0]) #SOC卡滞初始参数 ah_accum=0 #SOC卡滞初始参数 as_chg=0 #过流诊断初始参数 as_dis=0 #过流诊断初始参数 time1=self.bmstime[0] #温升速率初始参数 temp1=np.array(self._celltemp_get(0)) #温升速率初始参数 temprate_cnt=0 ot_time=0 ut_time=0 dt_time=0 cov_time=0 cuv_time=0 cdv_time=0 pov_time=0 puv_time=0 vv_time=0 tv_time=0 time_sp='0000-00-00 00:00:00' for i in range(1,len(self.df_bms)): time=self.bmstime[i] #温度诊断功能............................................................................................................. celltemp0=self._celltemp_get(i-1) celltemp1=self._celltemp_get(i) celltempmin0=min(celltemp0) celltempmin1=min(celltemp1) celltempmax0=max(celltemp0) celltempmax1=max(celltemp1) #温度有效性判断............................................................................................ if not 2 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if celltempmax0>self.param.CellTempUpLmt or celltempmin055: celltempvalid=0 faultcode=2 faultlv=2 cellnum1=np.where(np.array(celltemp1)self.param.CellTempUpLmt)[0]+1 cellnum=list(cellnum1)+list(cellnum2) faultinfo='电芯温度{}无效'.format(cellnum) faultadvice='丢失温度信息,无法全面监控电池安全;请停止充放电,并检修电池采样系统及数据传输系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: #不作处理 celltempvalid=1 tv_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax0self.param.CellTempLwLmt+10: celltempvalid=1 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==2].index[0], 'time_sp'] = time else: celltempvalid=0 if celltempvalid==1: #过温判断............................................................................................................. if not 4 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if celltempmax0>self.param.CellTempHighLv2 and celltempmax1>self.param.CellTempHighLv2: #二级高温进入 ot_time=ot_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if ot_time>self.param.temp_time: faultcode=4 faultlv=3 cellnum=np.where(np.array(celltemp1)>self.param.CellTempHighLv2)[0]+1 faultinfo='电芯温度{}过温'.format(list(cellnum)) faultadvice='高温下充放电,会导致容量衰减过快,并存在热失控风险;请停止充放电,开启电池冷却功能,并通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: ot_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if celltempmax0self.param.temp_time: faultcode=6 faultlv=3 cellnum=np.where(np.array(celltemp1)self.param.CellTempLowLv1+2 and celltempmax1>self.param.CellTempLowLv1+2: #二级高温恢复 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==6].index[0], 'time_sp'] = time else: pass #温差判断............................................................................................................................. if not 8 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if (celltempmax0-celltempmin0)>self.param.CellTempDiffLv2 and (celltempmax1-celltempmax0)>self.param.CellTempDiffLv2: #二级温差进入 dt_time=dt_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if dt_time>self.param.temp_time: faultcode=8 faultlv=3 faultinfo='电芯温度{}和{}温差大'.format(celltemp1.index(celltempmax1)+1,celltemp1.index(celltempmin1)+1) faultadvice='存在电芯的老化速率不一致的风险;请检查电池温度采样系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: dt_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if (celltempmax0-celltempmin0)self.param.CellTempDiffLv1-2: #二级温差恢复 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==8].index[0], 'time_sp'] = time else: pass #温升判断 time2=self.bmstime[i] delttime=(time2-time1).total_seconds() if delttime>20: temp2=np.array(self._celltemp_get(i)) celltemp_rate=(max(temp2-temp1)*60)/delttime #计算最大温升速率 temp1=temp2 #更新初始温度 time1=time2 #更新初始时间 if not 9 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if celltemp_rate>self.param.CellTempRate: temprate_cnt=temprate_cnt+1 if temprate_cnt>2: #温升故障进入 faultcode=9 faultlv=3 faultinfo='电芯温升速率过快:{}℃/min'.format(celltemp_rate) faultadvice='温升速率过快,存在热失控风险;禁止充放电,并通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: #ram当前故障中有该故障,则判断是否退出该故障 pass else: if celltemp_rate4.5 and abs(cellvoltmax_index0-cellvoltmin_index0)==1) and (cellvoltmin1<2 and cellvoltmax1>4.5 and abs(cellvoltmax_index1-cellvoltmin_index1)==1): #电压断线故障进入 vv_time=vv_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if vv_time>55: cellvoltvalid=0 faultcode=10 faultlv=2 faultinfo='{}号电芯和{}号电芯电压采样断线'.format(cellvolt0.index(cellvoltmax0)+1,cellvolt0.index(cellvoltmin0)+1) faultadvice='丢失部分电压信息,无法全面监控电池安全;禁止充放电,请检查电池电压采样系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: cellvoltvalid=1 vv_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if cellvoltmin0>2 and cellvoltmax0<4.5: #电压断线故障恢复 cellvoltvalid=1 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==10].index[0], 'time_sp'] = time else: cellvoltvalid=0 #电压无效诊断....................................................................................................... if not 11 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmin0<0.5 and cellvoltmin1<0.5) or (cellvoltmax0>4.5 and cellvoltmax1>4.5): cellvoltvalid=0 faultcode=11 faultlv=2 cellnum1=np.where(np.array(cellvolt1)<0.5)[0]+1 cellnum2=np.where(np.array(cellvolt1)>4.5)[0]+1 cellnum=list(cellnum1)+list(cellnum2) faultinfo='电芯电压{}采样无效'.format(cellnum) faultadvice='丢失电压信息,无法全面监控电池安全;禁止充放电,请检查电池电压采样系统' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: cellvoltvalid=1 else: if cellvoltmin0>2 and cellvoltmax0<4.5 and cellvoltmin1>2 and cellvoltmax1<4.5: #电压断线故障恢复 cellvoltvalid=1 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==11].index[0], 'time_sp'] = time else: cellvoltvalid=0 if cellvoltvalid==1: #过压诊断............................................................................................................. if not 12 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if cellvoltmax0>self.param.CellOvLv2 and cellvoltmax1>self.param.CellOvLv2: #二级过压进入 cov_time=cov_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if cov_time>self.param.volt_time: faultcode=12 faultlv=3 cellnum=np.where(np.array(cellvolt1)>self.param.CellOvLv2)[0]+1 faultinfo='{}号电芯过压'.format(list(cellnum)) faultadvice='过压可能导致电池析锂,存在电池安全与寿命衰减过快风险;禁止充电,若持续>10min,请通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: cov_time=0 else: #ram当前故障中有该故障,则判断是否退出该故障 if cellvoltmax0self.param.volt_time: faultcode=14 faultlv=3 cellnum=np.where(np.array(cellvolt1)self.param.CellUvLv1+0.1 and cellvoltmin1>self.param.CellUvLv1+0.1: self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==14].index[0], 'time_sp'] = time else: pass #电芯压差大..................................................................................................................................................... if not 16 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if (cellvoltmax0-cellvoltmin0)>self.param.CellVoltDiffLv2 and (cellvoltmax1-cellvoltmin1)>self.param.CellVoltDiffLv2: #二级电芯压差 cdv_time=cdv_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if cdv_time>self.param.volt_time: faultcode=16 faultlv=3 faultinfo='{}号电芯和{}号电芯压差大'.format(cellvolt1.index(cellvoltmax1)+1,cellvolt1.index(cellvoltmin1)+1) faultadvice='容量/内阻不一致;请均衡电池,若均衡无法解决问题,请通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: cdv_time=0 else: if (cellvoltmax0-cellvoltmin0)2*self.param.CellVoltNums and self.packvolt[i]<4.5*self.param.CellVoltNums: #电池包电压有效性 packvoltvalid=1 else: packvoltvalid=0 if packvoltvalid==1: if not 18 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if self.packvolt[i-1]>self.param.PackVoltOvLv2 and self.packvolt[i]>self.param.PackVoltOvLv2: #电池包过压二级进入 pov_time=pov_time+(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if pov_time>self.param.volt_time: faultcode=18 faultlv=3 faultinfo='电池包过压' faultadvice='过压会导致电池析锂,存在电池安全与寿命衰减过快风险;请停止充电,若过压持续>10min,通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: pov_time=0 else: if self.packvolt[i-1]self.param.volt_time: faultcode=20 faultlv=3 faultinfo='电池包欠压' faultadvice='欠压可能导致电池过放;请停止放电,并充电,若欠压持续>10min,请通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: puv_time=0 else: if self.packvolt[i-1]>self.param.PackVoltUvLv1 and self.packvolt[i]>self.param.PackVoltUvLv1: #电池包二级欠压恢复 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==20].index[0], 'time_sp'] = time else: pass else: pov_time=0 puv_time=0 #电流过流诊断....................................................................................................................... step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<60 and self.packcrnt[i]>self.param.PackDisOc and self.packcrnt[i-1]>self.param.PackDisOc: as_dis=as_dis+(self.packcrnt[i]-self.param.self.PackDisOc)*step #ah累计 elif step<60 and self.packcrnt[i]100: faultcode=22 faultlv=3 faultinfo='电池包放电过流' faultadvice='长时间过流会导致电池欠压及温升过快;请停止放电,若持续>5min,断开继电器,并通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i-1]100: faultcode=21 faultlv=3 faultinfo='电池包充电过流' faultadvice='过流会导致电池析锂,存在电池安全与寿命衰减过快风险;请停止充电,若持续>5min,断开继电器,并通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if self.packcrnt[i-1]>self.param.PackChgOc+10 and self.packcrnt[i]>self.param.PackChgOc+10: self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==21].index[0], 'time_sp'] = time else: pass #SOC卡滞、跳变诊断................................................................................................ step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if step<120: ah_accum=ah_accum-self.packcrnt[i]*step/3600 #ah累计 else: pass #SOC卡滞............................................................................................................ if abs(ah_accum)>self.param.Capacity*0.1: bmssoc_now=float(self.bms_soc[i]) if not 27 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if abs(bmssoc_now-bmssoc_st)self.param.SocClamp: #SOC卡滞故障退出 self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==27].index[0], 'time_sp'] = time else: pass bmssoc_st=bmssoc_now ah_accum=0 else: pass #SOC跳变.................................................................................................................... bmssoc_last=float(self.bms_soc[i-1]) bmssoc_now=float(self.bms_soc[i]) if not 28 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if step<30 and abs(bmssoc_now-bmssoc_last)>self.param.SocJump and 10 or self.bmsfault1[i]>0: #BMS故障进入 # time=self.bmstime[0] # faultcode=1 # faultlv=2 # faultinfo='BMS故障报警:{}'.format(self.bmsfault1[i-1]) # faultadvice='检修电池' # self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] # else: # pass # else: # if self.bmsfault1[i-1]==0 and self.bmsfault1[i]==0: #BMS故恢复 # time=self.bmstime[i] # self.df_diag_ram[self.df_diag_ram[self.df_diag_ram['faultcode']==1].index[0], 'time_sp'] = time #SOC过低故障报警............................................................................................................ if not self.df_soc.empty: if not 26 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if self.df_soc.loc[0, 'packsoc']self.param.SocLow: #SOC过低故障退出 time=self.df_soc.loc[0, 'time'] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==26].index[0], 'time_sp'] = time else: pass else: pass #SOC一致性故障报警.......................................................................................................... if not self.df_uniform.empty: cellsoc_diff=self.df_uniform.loc[0,'cellsoc_diff'] if not 25 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 if cellsoc_diff>self.param.SocDiff: #SOC一致性差故障进入 time=self.bmstime[0] faultcode=25 faultlv=1 faultinfo='电芯{}和{}SOC差过大:{}'.format(self.df_uniform.loc[0,'cellmin_num'],self.df_uniform.loc[0,'cellmax_num'],cellsoc_diff) faultadvice='请均衡电池,若无法解决,请通知电池技术人员介入分析' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if cellsoc_diffself.param.SohLow+2: #soh过低故障恢复 time=self.bmstime[0] self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==23].index[0], 'time_sp'] = time else: pass if not 24 in list(self.df_diag_ram['faultcode']): #当前故障中没有该故障,则判断是否发生该故障 cellsohmin=min(cellsoh) cellsohmax=max(cellsoh) if cellsoh_diff>self.param.SohDiff: time=self.bmstime[0] faultcode=24 faultlv=1 faultinfo='电池包容量一致性差:{}号和{}号电芯'.format(list(cellsoh).index(cellsohmin)+1,list(cellsoh).index(cellsohmax)+1) faultadvice='电池容量不一致性差,会导致电池放电电量不足;建议更换容量过低的电芯或模组' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: if cellsoh_diff30*24*3600: faultcode=100 faultlv=1 faultinfo='电池包状态长时间未标定' faultadvice='电池长时间未标定,存在电池状态估算不准确风险,请对电池进行:放电至SOC<10%-静置>1h-充满,操作' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass else: time=self.bmstime[0] if (time-self.df_soh.loc[0,'time_st']).total_seconds()<29*24*3600: self.df_diag_ram.loc[self.df_diag_ram[self.df_diag_ram['faultcode']==100].index[0], 'time_sp'] = time else: pass else: #soh长时间未标定 if not 100 in list(self.df_diag_ram['faultcode']): time=self.bmstime[0] faultcode=100 faultlv=1 faultinfo='电池包状态长时间未标定' faultadvice='电池长时间未标定,存在电池状态估算不准确风险,请对电池进行:放电至SOC<10%-静置>1h-充满,操作' self.df_diag_ram.loc[len(self.df_diag_ram)]=[time, time_sp, self.sn, faultcode, faultlv, faultinfo, faultadvice] else: pass #电池健康度评分..................................................................................................... column_name=['time', 'sn', 'healthstate', 'healthadvice'] df_health=pd.DataFrame(columns=column_name) healthstate=100 healthadvice='' df_fault_now=self.df_diag_ram[self.df_diag_ram['time_sp']=='0000-00-00 00:00:00'] df_fault_now.reset_index(inplace=True,drop=True) fault_num=len(df_fault_now) fltlv=df_fault_now['faultlv'] fltlv1=np.sum(fltlv<1.5) fltlv2=np.sum(fltlv<2.5)-fltlv1 fltlv3=np.sum(fltlv<3.5)-fltlv1-fltlv2 fltlv4=np.sum(fltlv<4.5)-fltlv1-fltlv2-fltlv3 fltlv5=np.sum(fltlv<5.5)-fltlv1-fltlv2-fltlv3-fltlv4 if fltlv5>0: healthstate=healthstate-fltlv1*10-fltlv2*20-fltlv3*30-fltlv4*50-fltlv5*100 healthadvice=healthadvice+',电池发生{}个故障'.format(fault_num) elif fltlv4>0: healthstate=healthstate-50 healthadvice=healthadvice+',电池发生{}个故障'.format(fault_num) elif fltlv3>0: healthstate=healthstate-30 healthadvice=healthadvice+',电池发生{}个故障'.format(fault_num) elif fltlv2>0: healthstate=healthstate-20 healthadvice=healthadvice+',电池发生{}个故障'.format(fault_num) elif fltlv1>0: healthstate=healthstate-10 healthadvice=healthadvice+',电池发生{}个故障'.format(fault_num) #电池寿命 if not self.df_soh.empty: if soh>85: pass elif soh>=80: healthstate=healthstate-5 healthadvice=healthadvice+',电池SOH较低' elif soh>=70: healthstate=healthstate-15 healthadvice=healthadvice+',电池SOH过低' else: healthstate=healthstate-30 healthadvice=healthadvice+',电池寿命用尽' #电池寿命一致性 if cellsoh_diff>15: healthstate=healthstate-10 healthadvice=healthadvice+',电池SOH一致性很差' elif cellsoh_diff>10: healthstate=healthstate-5 healthadvice=healthadvice+',电池SOH一致性较差' elif cellsoh_diff>5: healthstate=healthstate-1 else: pass #SOC一致性 if not self.df_uniform.empty: if cellsoc_diff>20: healthstate=healthstate-10 healthadvice=healthadvice+',电池SOC一致性很差' elif cellsoc_diff>10: healthstate=healthstate-5 healthadvice=healthadvice+',电池SOC一致性较差' elif cellsoc_diff>5: healthstate=healthstate-1 else: pass if healthstate<0: healthstate=0 elif healthstate>100: healthstate=100 else: pass healthstate=int(healthstate) if len(healthadvice)>0.5: healthadvice=healthadvice[1:] else: healthadvice='电池运行状态良好' df_health.loc[0]=[self.bmstime[len(self.bmstime)-1], self.sn, healthstate, healthadvice] #返回诊断结果........................................................................................................... df_res=self.df_diag_ram if not df_res.empty: pass else: df_res=pd.DataFrame() if not df_health.empty: pass else: df_health=pd.DataFrame() return df_res, df_health