import pandas as pd import numpy as np import matplotlib.pyplot as plt # from pymysql import paramstyle from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class BatInterShort(): def __init__(self,sn,celltype,df_bms,df_soh): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.df_soh=df_soh self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] def intershort(self): if self.celltype<50: df_res=self._ncm_intershort() return df_res elif self.celltype>50: df_res=self._lfp_intershort() return df_res else: return pd.DataFrame() #定义滑动滤波函数.................................................................................... def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的最小温度值............................................................................. def _celltemp_weight(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) celltemp=np.mean(celltemp) self.celltemp=celltemp if self.celltype==99: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=10: self.tempweight=0.6 self.StandardStandingTime=7200 elif celltemp>=5: self.tempweight=0. self.StandardStandingTime=7200 else: self.tempweight=0.1 self.StandardStandingTime=10800 else: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=10: self.tempweight=0.8 self.StandardStandingTime=3600 elif celltemp>=5: self.tempweight=0.6 self.StandardStandingTime=7200 else: self.tempweight=0.2 self.StandardStandingTime=10800 #获取当前行所有电压数据........................................................................................ def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name])/1000 return cellvolt #获取当前行所有soc差........................................................................................... def _celldeltsoc_get(self,num,dict_baltime,capacity): cellsoc=[] celldeltsoc=[] for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压对应的SOC值 cellvolt=self.df_bms.loc[num,'单体电压' + str(j)]/1000 ocv_soc=np.interp(cellvolt,self.param.LookTab_OCV,self.param.LookTab_SOC) if j in dict_baltime.keys(): ocv_soc=ocv_soc+dict_baltime[j]*self.param.BalCurrent/(capacity*3600) #补偿均衡电流 else: pass cellsoc.append(ocv_soc) if self.celltype==1 or self.celltype==2: consum_num=7 cellsoc1=cellsoc[:self.param.CellVoltNums-consum_num] #切片,将bms耗电的电芯和非耗电的电芯分离开 cellsocmean1=(sum(cellsoc1)-max(cellsoc1)-min(cellsoc1))/(len(cellsoc1)-2) cellsoc2=cellsoc[self.param.CellVoltNums-consum_num:] cellsocmean2=(sum(cellsoc2)-max(cellsoc2)-min(cellsoc2))/(len(cellsoc2)-2) for j in range(len(cellsoc)): #计算每个电芯的soc差 if j峰值电压的充入As数 if j in dict_baltime.keys(): #补偿均衡电流 As=-self.param.BalCurrent*dict_baltime[j] else: As=0 As_tatol=0 symbol=0 for m in range(chrg_st+1,chrg_end): As=As-self.packcrnt[m]*(self.bmstime[m]-self.bmstime[m-1]).total_seconds() if symbol<5: if self.df_bms.loc[m,'单体电压'+str(j)]/1000>self.param.PeakCellVolt[symbol]: As_tatol=As_tatol+As symbol=symbol+1 else: continue else: cellAs.append(As_tatol/5) break if self.celltype==99: consum_num=10 cellAs1=cellAs[:self.param.CellVoltNums-consum_num] #切片,将bms耗电的电芯和非耗电的电芯分离开 cellAsmean1=(sum(cellAs1)-max(cellAs1)-min(cellAs1))/(len(cellAs1)-2) cellAs2=cellAs[self.param.CellVoltNums-consum_num:] cellAsmean2=(sum(cellAs2)-max(cellAs2)-min(cellAs2))/(len(cellAs2)-2) for j in range(len(cellAs)): #计算每个电芯的soc差 if j0: if x & 1==1: #判断最后一位是否为1 if count in dict_baltime.keys(): dict_baltime[count] = dict_baltime[count] + dict_bal[key] else: dict_baltime[count] = dict_bal[key] else: pass count += 1 x >>= 1 #右移一位 dict_baltime=dict(sorted(dict_baltime.items(),key=lambda dict_baltime:dict_baltime[0])) for key in dict_baltime: #解析均衡的电芯编号 if self.celltype==1: #科易6040 if key<14: dict_baltime1[key]=dict_baltime[key] elif key<18: dict_baltime1[key-1]=dict_baltime[key] else: dict_baltime1[key-3]=dict_baltime[key] elif self.celltype==1: #科易4840 if key<4: dict_baltime1[key-1]=dict_baltime[key] elif key<8: dict_baltime1[key-1]=dict_baltime[key] elif key<14: dict_baltime1[key-3]=dict_baltime[key] elif key<18: dict_baltime1[key-4]=dict_baltime[key] else: dict_baltime1[key-6]=dict_baltime[key] else: dict_baltime1=dict_baltime return dict_baltime1 #三元电池的内短路电流计算........................................................................................................................................................... def _ncm_intershort(self): column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime'] df_res=pd.DataFrame(columns=column_name) if not self.df_bms.empty: if self.df_soh.empty: batsoh=self.df_bms.loc[0,'SOH[%]'] capacity=self.param.Capacity*batsoh/100 else: batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh'] capacity=self.param.Capacity*batsoh/100 standingtime=0 standingtime1=0 firsttime=1 firsttime1=1 dict_bal={} dict_bal1={} for i in range(2,len(self.df_bms)-2): if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间 try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal1.keys(): dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step else: dict_bal1[str(balstat)]=bal_step else: pass except: dict_bal1={} else: pass if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime=standingtime+delttime standingtime1=standingtime1+delttime self._celltemp_weight(i) #静置法计算内短路-开始..................................................................................................................................... if firsttime==1: if standingtime>self.StandardStandingTime*2: #静置时间满足要求 standingtime=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 23600*10: standingtime=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 20.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal.keys(): dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step else: dict_bal[str(balstat)]=bal_step else: pass except: dict_bal={} #满电静置法计算内短路-开始..................................................................................................................................................... if self.StandardStandingTime=self.param.FullChrgSoc-10 and 23600*12: deltsoc_now1=self._celldeltsoc_get(i,dict_baltime1,capacity) list_sub1=deltsoc_now1-deltsoc_last1 list_pud1=0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds() leak_current1=list_sub1*list_pud1 # leak_current1=np.array(leak_current1) leak_current1=np.round(leak_current1,3) leak_current1=list(leak_current1) df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe time_last1=time_now1 #更新时间 deltsoc_last1=deltsoc_now1 #更新soc差 dict_bal1={} else: pass else: pass else: pass else: dict_bal={} firsttime=1 standingtime=0 standingtime1=0 pass if df_res.empty: #返回计算结果 return pd.DataFrame() else: return df_res #磷酸铁锂电池内短路计算程序............................................................................................................................. def _lfp_intershort(self): column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime'] df_res=pd.DataFrame(columns=column_name) if not self.df_bms.empty: if self.df_soh.empty: batsoh=self.df_bms.loc[0,'SOH[%]'] capacity=self.param.Capacity*batsoh/100 else: batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh'] capacity=self.param.Capacity*batsoh/100 standingtime=0 standingtime1=0 firsttime=1 firsttime1=1 dict_bal={} dict_bal1={} chrg_start=[] chrg_end=[] dict_bal_list=[] charging=0 dict_bal3={} for i in range(3,len(self.df_bms)-3): #静置法计算内短路.......................................................................................................................... if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间 try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal1.keys(): dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step else: dict_bal1[str(balstat)]=bal_step else: pass except: dict_bal1={} else: pass if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime=standingtime+delttime standingtime1=standingtime1+delttime self._celltemp_weight(i) #静置法计算内短路-开始..................................................................................................................................... if firsttime==1: if standingtime>self.StandardStandingTime: #静置时间满足要求 standingtime=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 23600*12: standingtime=0 cellvolt_now=np.array(self._cellvolt_get(i)) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=np.array(self._cellvolt_get(i-1)) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 20.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal.keys(): dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step else: dict_bal[str(balstat)]=bal_step else: pass except: dict_bal={} #非平台区间静置法计算内短路-开始..................................................................................................................................................... if standingtime1>self.StandardStandingTime: if abs(self.packcrnt[i+2]) >= 0.1: standingtime1=0 cellvolt_now1=self._cellvolt_get(i) cellvolt_max1=max(cellvolt_now1) cellvolt_min1=min(cellvolt_now1) cellvolt_last1=self._cellvolt_get(i-1) deltvolt1=max(abs(cellvolt_now1-cellvolt_last1)) if 23600*24: list_sub1=deltsoc_now1-deltsoc_last1 list_pud1=0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds() leak_current1=list_sub1*list_pud1 # leak_current1=np.array(leak_current1) leak_current1=np.round(leak_current1,3) leak_current1=list(leak_current1) df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe time_last1=time_now1 #更新时间 deltsoc_last1=deltsoc_now1 #更新soc差 dict_bal1={} else: pass else: pass else: pass else: pass else: dict_bal={} firsttime=1 standingtime=0 standingtime1=0 pass #获取充电数据——开始.............................................................................................................. try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) #统计均衡状态 if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal3.keys(): dict_bal3[str(balstat)]=dict_bal3[str(balstat)]+bal_step else: dict_bal3[str(balstat)]=bal_step else: pass except: dict_bal3={} if charging==0: if self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<=-1 and self.bms_soc[i]<40: #判断充电开始 cellvolt_now=self._cellvolt_get(i) if min(cellvolt_now)len(chrg_end): chrg_start[-1]=i else: chrg_start.append(i) else: pass else: pass else: #充电中 if (self.bmstime[i+1]-self.bmstime[i]).total_seconds()>180 or (self.packcrnt[i]>self.param.Capacity/3 and self.packcrnt[i+1]>self.param.Capacity/3): #如果充电过程中时间间隔>180s,则舍弃该次充电 chrg_start.remove(chrg_start[-1]) charging=0 continue elif self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<-1: cellvolt_now=self._cellvolt_get(i) if min(cellvolt_now)>self.param.CellFullChrgVolt-0.13: #电压>满充电压-0.13V,即3.37V self._celltemp_weight(i) if i-chrg_start[-1]>10 and self.celltemp>10: chrg_end.append(i+1) dict_bal_list.append(dict_bal3) dict_bal3={} charging=0 continue else: chrg_start.remove(chrg_start[-1]) charging=0 continue else: pass else: pass #基于充电数据计算单体电芯的漏电流.......................................................................................................... if len(chrg_end)>1: for i in range(len(chrg_end)): if i<1: dict_baltime={} deltAs_last=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime) time_last=self.bmstime[chrg_end[i]] else: dict_baltime=self._bal_time(dict_bal_list[i]) #获取每个电芯的均衡时间 deltAs_now=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime) #获取每个电芯的As差 time_now=self.bmstime[chrg_end[i]] list_sub=deltAs_now-deltAs_last list_pud=-1000/(time_now-time_last).total_seconds() leak_current=list_sub*list_pud # leak_current=np.array(leak_current) leak_current=np.round(leak_current,3) leak_current=list(leak_current) df_res.loc[len(df_res)]=[time_last,time_now,self.sn,3,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe deltAs_last=deltAs_now time_last=time_now else: pass if df_res.empty: return pd.DataFrame() else: return df_res