import pandas as pd import numpy as np import matplotlib.pyplot as plt import BatParam class BatInterShort: def __init__(self,sn,celltype,df_bms,df_volt,df_temp,df_accum): #参数初始化 self.sn=sn self.celltype=celltype CellVoltNums=int(df_volt.loc[5,'单体电池总数']) if CellVoltNums==110: self.celltype=99 else: self.celltype==100 self.param=BatParam.BatParam(self.celltype) self.df_volt=df_volt self.df_temp=df_temp self.df_bms=df_bms self.packcrnt=(df_volt['可充电储能装置电流(A)'].astype('float'))*self.param.PackCrntDec self.packvolt=df_volt['可充电储能装置电压(V)'].astype('float') self.bms_soc=df_bms['SOC'] self.bmsstat=df_bms['充电状态'] self.bmstime= pd.to_datetime(df_volt['上报时间'], format='%Y-%m-%d %H:%M:%S') self.param.CellVoltNums=CellVoltNums self.param.CellTempNums=int(df_temp.loc[5,'可充电储能温度探针个数']) def intershort(self): if self.celltype==1 or self.celltype==2 or self.celltype==3 or self.celltype==4 or self.celltype==100: df_res=self._ncm_intershort() return df_res elif self.celltype==99: df_res=self._lfp_intershort() return df_res else: return pd.DataFrame() #定义滑动滤波函数.................................................................................... def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的最小温度值............................................................................. def _celltemp_weight(self,num): celltemp = [] for j in range(1, self.param.CellTempNums+1): s = str(j) celltemp.append(self.df_temp.loc[num,s+'.0']) celltemp=np.mean(celltemp) self.celltemp=celltemp if self.celltype==99: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=10: self.tempweight=0.6 self.StandardStandingTime=7200 elif celltemp>=5: self.tempweight=0. self.StandardStandingTime=7200 else: self.tempweight=0.1 self.StandardStandingTime=10800 else: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=10: self.tempweight=0.8 self.StandardStandingTime=3600 elif celltemp>=5: self.tempweight=0.6 self.StandardStandingTime=7200 else: self.tempweight=0.2 self.StandardStandingTime=10800 #获取当前行所有电压数据........................................................................................ def _cellvolt_get(self,num): cellvolt=[] for j in range(1, self.param.CellVoltNums+1): s = str(j) cellvolt.append(self.df_volt.loc[num, s+'.0']) return cellvolt #获取当前行所有soc差........................................................................................... def _celldeltsoc_get(self,num,dict_baltime,capacity): cellsoc=[] celldeltsoc=[] for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压对应的SOC值 cellvolt=self.df_volt.loc[num,str(j)+'.0'] ocv_soc=np.interp(cellvolt,self.param.LookTab_OCV,self.param.LookTab_SOC) if j in dict_baltime.keys(): ocv_soc=ocv_soc+dict_baltime[j]*self.param.BalCurrent/(capacity*3600) #补偿均衡电流 else: pass cellsoc.append(ocv_soc) if self.celltype==1 or self.celltype==2: consum_num=7 cellsoc1=cellsoc[:self.param.CellVoltNums-consum_num] #切片,将bms耗电的电芯和非耗电的电芯分离开 cellsocmean1=(sum(cellsoc1)-max(cellsoc1)-min(cellsoc1))/(len(cellsoc1)-2) cellsoc2=cellsoc[self.param.CellVoltNums-consum_num:] cellsocmean2=(sum(cellsoc2)-max(cellsoc2)-min(cellsoc2))/(len(cellsoc2)-2) for j in range(len(cellsoc)): #计算每个电芯的soc差 if j峰值电压的充入As数 if j in dict_baltime.keys(): #补偿均衡电流 As=-self.param.BalCurrent*dict_baltime[j] else: As=0 As_tatol=0 symbol=0 for m in range(chrg_st,chrg_end): As=As-self.packcrnt[m]*(self.bmstime[m]-self.bmstime[m-1]).total_seconds() if symbol<5: if self.df_volt.loc[m, str(j)+'.0']/1000>self.param.PeakCellVolt[symbol]: As_tatol=As_tatol+As symbol=symbol+1 else: continue else: cellAs.append(As_tatol/5) break if cellAs: cellAsmean=(sum(cellAs)-max(cellAs)-min(cellAs))/(len(cellAs)-2) for j in range(len(cellAs)): #计算每个电芯的soc差 celldeltAs.append(cellAs[j]-cellAsmean) return celldeltAs #计算每个电芯的均衡时长.......................................................................................................................... def _bal_time(self,dict_bal): dict_baltime={} dict_baltime1={} for key in dict_bal: count=1 x=eval(key) while x>0: if x & 1==1: #判断最后一位是否为1 if count in dict_baltime.keys(): dict_baltime[count] = dict_baltime[count] + dict_bal[key] else: dict_baltime[count] = dict_bal[key] else: pass count += 1 x >>= 1 #右移一位 dict_baltime=dict(sorted(dict_baltime.items(),key=lambda dict_baltime:dict_baltime[0])) for key in dict_baltime: #解析均衡的电芯编号 if self.celltype==1: #科易6040 if key<14: dict_baltime1[key]=dict_baltime[key] elif key<18: dict_baltime1[key-1]=dict_baltime[key] else: dict_baltime1[key-3]=dict_baltime[key] elif self.celltype==1: #科易4840 if key<4: dict_baltime1[key-1]=dict_baltime[key] elif key<8: dict_baltime1[key-1]=dict_baltime[key] elif key<14: dict_baltime1[key-3]=dict_baltime[key] elif key<18: dict_baltime1[key-4]=dict_baltime[key] else: dict_baltime1[key-6]=dict_baltime[key] else: dict_baltime1=dict_baltime return dict_baltime1 #三元电池的内短路电流计算........................................................................................................................................................... def _ncm_intershort(self): column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime'] df_res=pd.DataFrame(columns=column_name) df_res1=pd.DataFrame() if not self.df_volt.empty: capacity=self.param.Capacity standingtime=0 standingtime1=0 firsttime=1 firsttime1=1 dict_bal={} dict_bal1={} for i in range(2,len(self.df_volt)-2): if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间 try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal1.keys(): dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step else: dict_bal1[str(balstat)]=bal_step else: pass except: dict_bal1={} else: pass if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime=standingtime+delttime standingtime1=standingtime1+delttime self._celltemp_weight(i) #静置法计算内短路-开始..................................................................................................................................... if firsttime==1: if standingtime>self.StandardStandingTime: #静置时间满足要求 dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间 deltsoc_last=self._celldeltsoc_get(i,dict_baltime,capacity) time_last=self.bmstime[i] firsttime=0 standingtime=0 else: pass elif standingtime>3600*12: cellvolt=self._cellvolt_get(i) if 30.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal.keys(): dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step else: dict_bal[str(balstat)]=bal_step else: pass except: dict_bal={} #满电静置法计算内短路-开始..................................................................................................................................................... if standingtime1>self.StandardStandingTime: cellvolt=self._cellvolt_get(i) if 3= 0.1: standingtime1=0 cellvolt_now1=self._cellvolt_get(i) cellsoc_now1=np.interp(max(cellvolt_now1),self.param.LookTab_OCV,self.param.LookTab_SOC) if cellsoc_now1>=self.param.FullChrgSoc-50: if firsttime1==1: dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间 deltsoc_last1=self._celldeltsoc_get(i,dict_baltime1,capacity) time_last1=self.bmstime[i] firsttime1=0 else: dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间 time_now1=self.bmstime[i] if (time_now1-time_last1).total_seconds()>3600*24: deltsoc_now1=self._celldeltsoc_get(i,dict_baltime1,capacity) list_sub1=[a-b for a, b in zip(deltsoc_now1, deltsoc_last1)] list_pud1=[0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds()]*self.param.CellVoltNums leak_current1=[a*b for a,b in zip(list_sub1,list_pud1)] leak_current1=np.array(leak_current1) leak_current1=np.round(leak_current1,3) leak_current1=list(leak_current1) df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe df_res1[time_now1]=leak_current1 time_last1=time_now1 #更新时间 deltsoc_last1=deltsoc_now1 #更新soc差 dict_bal1={} else: pass else: pass else: pass else: pass else: dict_bal={} firsttime=1 standingtime=0 standingtime1=0 pass #内短路结果画图................................................................................................. if not df_res1.empty: ax=df_res1.plot(marker='*',figsize=(10,5)) plt.xlabel('电芯序列') plt.ylabel('内短路指数') plt.title(str(self.sn)) plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签 plt.rcParams['axes.unicode_minus']=False #用来正常显示负号 plt.legend() plt.show() fig = ax.get_figure() fig.savefig('./'+str(self.sn)+'漏电流1.png') if df_res.empty: #返回计算结果 return pd.DataFrame() else: return df_res #磷酸铁锂电池内短路计算程序............................................................................................................................. def _lfp_intershort(self): column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime'] df_res=pd.DataFrame(columns=column_name) df_res1=pd.DataFrame() if not self.df_bms.empty: capacity=self.param.Capacity standingtime=0 standingtime1=0 firsttime=1 firsttime1=1 dict_bal={} dict_bal1={} chrg_start=[] chrg_end=[] dict_bal_list=[] charging=0 for i in range(3,len(self.df_bms)-3): #静置法计算内短路.......................................................................................................................... if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间 try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal1.keys(): dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step else: dict_bal1[str(balstat)]=bal_step else: pass except: dict_bal1={} else: pass if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime=standingtime+delttime standingtime1=standingtime1+delttime self._celltemp_weight(i) #静置法计算内短路-开始..................................................................................................................................... if firsttime==1: if standingtime>self.StandardStandingTime: #静置时间满足要求 cellvolt=self._cellvolt_get(i) if max(cellvolt)3600*12: cellvolt=self._cellvolt_get(i) if max(cellvolt)0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal.keys(): dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step else: dict_bal[str(balstat)]=bal_step else: pass except: dict_bal={} #非平台区间静置法计算内短路-开始..................................................................................................................................................... if standingtime1>self.StandardStandingTime: if abs(self.packcrnt[i+2]) >= 0.1: standingtime1=0 cellvolt_now1=self._cellvolt_get(i) if max(cellvolt_now1)3600*24: list_sub1=[a-b for a, b in zip(deltsoc_now1, deltsoc_last1)] list_pud1=[0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds()]*self.param.CellVoltNums leak_current1=[a*b for a,b in zip(list_sub1,list_pud1)] leak_current1=np.array(leak_current1) leak_current1=np.round(leak_current1,3) leak_current1=list(leak_current1) df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe df_res1[time_now1]=leak_current1 time_last1=time_now1 #更新时间 deltsoc_last1=deltsoc_now1 #更新soc差 dict_bal1={} else: pass else: pass else: pass else: pass else: dict_bal={} firsttime=1 standingtime=0 standingtime1=0 pass #获取充电数据——开始.............................................................................................................. try: balstat=int(self.df_bms.loc[i,'单体均衡状态']) #统计均衡状态 if balstat>0.5: bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长 bal_step=int(bal_step) if str(balstat) in dict_bal3.keys(): dict_bal3[str(balstat)]=dict_bal3[str(balstat)]+bal_step else: dict_bal3[str(balstat)]=bal_step else: pass except: dict_bal3={} if charging==0: if self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<=-1 and float(self.bms_soc[i].strip('%'))<40: #判断充电开始 cellvolt_now=self._cellvolt_get(i) if min(cellvolt_now)len(chrg_end): chrg_start[-1]=i else: chrg_start.append(i) else: pass else: pass else: #充电中 if (self.bmstime[i+1]-self.bmstime[i]).total_seconds()>180 or (self.packcrnt[i]>self.param.Capacity/3 and self.packcrnt[i+1]>self.param.Capacity/3): #如果充电过程中时间间隔>180s,则舍弃该次充电 chrg_start.remove(chrg_start[-1]) charging=0 continue elif self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<-1: cellvolt_now=self._cellvolt_get(i) if min(cellvolt_now)>self.param.CellFullChrgVolt-0.13: #电压>满充电压-0.13V,即3.37V self._celltemp_weight(i) if i-chrg_start[-1]>10 and self.celltemp>10: chrg_end.append(i+1) dict_bal_list.append(dict_bal3) dict_bal3={} charging=0 continue else: chrg_start.remove(chrg_start[-1]) charging=0 continue else: pass else: pass #基于充电数据计算单体电芯的漏电流.......................................................................................................... if len(chrg_end)>1: for i in range(len(chrg_end)): if i<1: dict_baltime={} deltAs_last=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime) time_last=self.bmstime[chrg_end[i]] else: dict_baltime=self._bal_time(dict_bal_list[i]) #获取每个电芯的均衡时间 deltAs_now=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime) #获取每个电芯的As差 time_now=self.bmstime[chrg_end[i]] list_sub=[a-b for a, b in zip(deltAs_now, deltAs_last)] list_pud=[-1000/(time_now-time_last).total_seconds()]*self.param.CellVoltNums leak_current=[a*b for a,b in zip(list_sub,list_pud)] leak_current=np.array(leak_current) leak_current=np.round(leak_current,3) leak_current=list(leak_current) df_res.loc[len(df_res)]=[time_last,time_now,self.sn,3,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe deltAs_last=deltAs_now time_last=time_now else: pass #漏电流结果画图........................................................................................... if not df_res1.empty: ax=df_res1.plot(marker='*',figsize=(10,5)) plt.xlabel('电芯序列') plt.ylabel('内短路指数') plt.title(str(self.sn)) plt.rcParams['font.sans-serif']=['SimHei'] #用来正常显示中文标签 plt.rcParams['axes.unicode_minus']=False #用来正常显示负号 plt.legend() plt.show() fig = ax.get_figure() fig.savefig('./'+str(self.sn)+'漏电流1.png') if df_res.empty: return pd.DataFrame() else: return df_res