123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554 |
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from USER.Common import BatParam
- class BatInterShort():
- def __init__(self,sn,celltype,df_bms,df_soh): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_bms=df_bms
- self.packcrnt=df_bms['电流']*self.param.PackCrntDec
- self.bms_soc=df_bms['SOC']
- self.bmstime= pd.to_datetime(df_bms['接收时间'], format='%Y-%m-%d %H:%M:%S')
- self.df_soh=df_soh
- self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- self.celltemp_name=['温度采样'+str(x) for x in range(1,self.param.CellTempNums+1)]
-
- def intershort(self):
- if self.celltype<50:
- df_res=self._ncm_intershort()
- return df_res
-
- elif self.celltype>50:
- df_res=self._lfp_intershort()
- return df_res
-
- else:
- return pd.DataFrame()
- #定义滑动滤波函数....................................................................................
- def _np_move_avg(self,a, n, mode="same"):
- return (np.convolve(a, np.ones((n,)) / n, mode=mode))
-
- #寻找当前行数据的最小温度值.............................................................................
- def _celltemp_weight(self,num):
- celltemp = list(self.df_bms.loc[num,self.celltemp_name])
- celltemp=min(celltemp)
- self.celltemp=celltemp
- if self.celltype==99:
- if celltemp>=20:
- self.tempweight=1
- self.StandardStandingTime=3600
- elif celltemp>=10:
- self.tempweight=0.6
- self.StandardStandingTime=7200
- elif celltemp>=5:
- self.tempweight=0.
- self.StandardStandingTime=7200
- else:
- self.tempweight=0.1
- self.StandardStandingTime=10800
- else:
- if celltemp>=20:
- self.tempweight=1
- self.StandardStandingTime=3600
- elif celltemp>=10:
- self.tempweight=0.8
- self.StandardStandingTime=3600
- elif celltemp>=5:
- self.tempweight=0.6
- self.StandardStandingTime=7200
- else:
- self.tempweight=0.2
- self.StandardStandingTime=10800
- #获取当前行所有电压数据........................................................................................
- def _cellvolt_get(self,num):
- cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name])
- return cellvolt
- #获取当前行所有soc差...........................................................................................
- def _celldeltsoc_get(self,num,dict_baltime,capacity):
- cellsoc=[]
- celldeltsoc=[]
- for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压对应的SOC值
- cellvolt=self.df_bms.loc[num,'单体电压' + str(j)]
- ocv_soc=np.interp(cellvolt,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if j in dict_baltime.keys():
- ocv_soc=ocv_soc+dict_baltime[j]*self.param.BalCurrent/(capacity*3600) #补偿均衡电流
- else:
- pass
- cellsoc.append(ocv_soc)
-
- cellsocmean=(sum(cellsoc)-max(cellsoc)-min(cellsoc))/(len(cellsoc)-2)
- for j in range(len(cellsoc)): #计算每个电芯的soc差
- celldeltsoc.append(cellsoc[j]-cellsocmean)
- return np.array(celldeltsoc)
-
- #获取所有电芯的As差
- def _cellDeltAs_get(self,chrg_st,chrg_end,dict_baltime):
- cellAs=[]
- celldeltAs=[]
- for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压>峰值电压的充入As数
- if j in dict_baltime.keys(): #补偿均衡电流
- As=-self.param.BalCurrent*dict_baltime[j]
- else:
- As=0
- As_tatol=0
- symbol=0
- for m in range(chrg_st+1,chrg_end):
- As=As-self.packcrnt[m]*(self.bmstime[m]-self.bmstime[m-1]).total_seconds()
- if symbol<5:
- if self.df_bms.loc[m,'单体电压'+str(j)]>self.param.PeakCellVolt[symbol]:
- As_tatol=As_tatol+As
- symbol=symbol+1
- else:
- continue
- else:
- cellAs.append(As_tatol/5)
- break
-
- cellAsmean=(sum(cellAs)-max(cellAs)-min(cellAs))/(len(cellAs)-2)
- for j in range(len(cellAs)): #计算每个电芯的soc差
- celldeltAs.append(cellAs[j]-cellAsmean)
- return np.array(celldeltAs)
- #计算每个电芯的均衡时长..........................................................................................................................
- def _bal_time(self,dict_bal):
- dict_baltime={}
- dict_baltime1={}
- for key in dict_bal:
- count=1
- x=eval(key)
- while x>0:
- if x & 1==1: #判断最后一位是否为1
- if count in dict_baltime.keys():
- dict_baltime[count] = dict_baltime[count] + dict_bal[key]
- else:
- dict_baltime[count] = dict_bal[key]
- else:
- pass
- count += 1
- x >>= 1 #右移一位
-
- dict_baltime=dict(sorted(dict_baltime.items(),key=lambda dict_baltime:dict_baltime[0]))
- for key in dict_baltime: #解析均衡的电芯编号
- if self.celltype==1: #科易6040
- if key<14:
- dict_baltime1[key]=dict_baltime[key]
- elif key<18:
- dict_baltime1[key-1]=dict_baltime[key]
- else:
- dict_baltime1[key-3]=dict_baltime[key]
- elif self.celltype==1: #科易4840
- if key<4:
- dict_baltime1[key-1]=dict_baltime[key]
- elif key<8:
- dict_baltime1[key-1]=dict_baltime[key]
- elif key<14:
- dict_baltime1[key-3]=dict_baltime[key]
- elif key<18:
- dict_baltime1[key-4]=dict_baltime[key]
- else:
- dict_baltime1[key-6]=dict_baltime[key]
- else:
- dict_baltime1=dict_baltime
- return dict_baltime1
- #三元电池的内短路电流计算...........................................................................................................................................................
- def _ncm_intershort(self):
- column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime']
- df_res=pd.DataFrame(columns=column_name)
-
- if not self.df_bms.empty:
- if self.df_soh.empty:
- batsoh=self.df_bms.loc[0,'PackSOH']
- capacity=self.param.Capacity*batsoh/100
- else:
- batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh']
- capacity=self.param.Capacity*batsoh/100
- standingtime=0
- standingtime1=0
- firsttime=1
- firsttime1=1
- dict_bal={}
- dict_bal1={}
- for i in range(2,len(self.df_bms)-2):
- if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal1.keys():
- dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step
- else:
- dict_bal1[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal1={}
- else:
- pass
- if abs(self.packcrnt[i]) < 1 and abs(self.packcrnt[i-1]) < 1 and abs(self.packcrnt[i+1]) < 1:
- delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds()
- standingtime=standingtime+delttime
- standingtime1=standingtime1+delttime
- self._celltemp_weight(i)
- #静置法计算内短路-开始.....................................................................................................................................
- if firsttime==1:
- if standingtime>self.StandardStandingTime*2: #静置时间满足要求
- standingtime=0
- cellvolt_now=self._cellvolt_get(i)
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- cellvolt_last=self._cellvolt_get(i-1)
- deltvolt=max(abs(cellvolt_now-cellvolt_last))
- if 2<cellvolt_min<4.5 and 2<cellvolt_max<4.5 and deltvolt<0.003:
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_last=self._celldeltsoc_get(i,dict_baltime,capacity)
- time_last=self.bmstime[i]
- firsttime=0
- else:
- pass
- elif standingtime>3600*5:
- standingtime=0
- cellvolt_now=self._cellvolt_get(i)
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- cellvolt_last=self._cellvolt_get(i-1)
- deltvolt=max(abs(cellvolt_now-cellvolt_last))
-
- if 2<cellvolt_min<4.5 and 2<cellvolt_max<4.5 and deltvolt<0.005:
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_now=self._celldeltsoc_get(i,dict_baltime,capacity)
- time_now=self.bmstime[i]
-
- list_sub=deltsoc_now-deltsoc_last
- list_pud=[0.01*capacity*3600*1000/(time_now-time_last).total_seconds()]*self.param.CellVoltNums
- leak_current=list_sub*list_pud
- # leak_current=np.array(leak_current)
- leak_current=np.round(leak_current,3)
- leak_current=list(leak_current)
-
- df_res.loc[len(df_res)]=[time_last,time_now,self.sn,1,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe
- time_last=time_now #更新时间
- deltsoc_last=deltsoc_now #更新soc差
- dict_bal={}
- else:
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal.keys():
- dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step
- else:
- dict_bal[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal={}
- #满电静置法计算内短路-开始.....................................................................................................................................................
- if standingtime1>self.StandardStandingTime and abs(self.packcrnt[i+2]) >= 1:
- standingtime1=0
- cellvolt_now1=self._cellvolt_get(i)
- cellvolt_max1=max(cellvolt_now1)
- cellvolt_min1=min(cellvolt_now1)
- cellvolt_last1=self._cellvolt_get(i-1)
- deltvolt1=max(abs(cellvolt_now1-cellvolt_last1))
- cellsoc_now1=np.interp(cellvolt_max1,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if cellsoc_now1>=self.param.FullChrgSoc-50 and 2<cellvolt_min1<4.5 and 2<cellvolt_max1<4.5 and deltvolt1<0.005:
- if firsttime1==1:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- deltsoc_last1=self._celldeltsoc_get(i,dict_baltime1,capacity)
- time_last1=self.bmstime[i]
- firsttime1=0
- else:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- time_now1=self.bmstime[i]
- if (time_now1-time_last1).total_seconds()>3600*12:
- deltsoc_now1=self._celldeltsoc_get(i,dict_baltime1,capacity)
- list_sub1=deltsoc_now1-deltsoc_last1
- list_pud1=[0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds()]*self.param.CellVoltNums
- leak_current1=list_sub1*list_pud1
- # leak_current1=np.array(leak_current1)
- leak_current1=np.round(leak_current1,3)
- leak_current1=list(leak_current1)
-
- df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe
- time_last1=time_now1 #更新时间
- deltsoc_last1=deltsoc_now1 #更新soc差
- dict_bal1={}
- else:
- pass
- else:
- pass
- else:
- pass
- else:
- dict_bal={}
- firsttime=1
- standingtime=0
- standingtime1=0
- pass
- if df_res.empty: #返回计算结果
- return pd.DataFrame()
- else:
- return df_res
- #磷酸铁锂电池内短路计算程序.............................................................................................................................
- def _lfp_intershort(self):
- column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime']
- df_res=pd.DataFrame(columns=column_name)
- if not self.df_bms.empty:
- if self.df_soh.empty:
- batsoh=self.df_bms.loc[0,'PackSOH']
- capacity=self.param.Capacity*batsoh/100
- else:
- batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh']
- capacity=self.param.Capacity*batsoh/100
- standingtime=0
- standingtime1=0
- firsttime=1
- firsttime1=1
- dict_bal={}
- dict_bal1={}
- chrg_start=[]
- chrg_end=[]
- dict_bal_list=[]
- charging=0
- dict_bal3={}
- for i in range(3,len(self.df_bms)-3):
- #静置法计算内短路..........................................................................................................................
- if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal1.keys():
- dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step
- else:
- dict_bal1[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal1={}
- else:
- pass
- if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1:
- delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds()
- standingtime=standingtime+delttime
- standingtime1=standingtime1+delttime
- self._celltemp_weight(i)
- #静置法计算内短路-开始.....................................................................................................................................
- if firsttime==1:
- if standingtime>self.StandardStandingTime: #静置时间满足要求
- standingtime=0
- cellvolt_now=self._cellvolt_get(i)
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- cellvolt_last=self._cellvolt_get(i-1)
- deltvolt=max(abs(cellvolt_now-cellvolt_last))
- if 2<cellvolt_max<self.param.OcvInflexionBelow-0.002 and 2<cellvolt_min<4.5 and abs(deltvolt)<0.003:
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_last=self._celldeltsoc_get(i,dict_baltime,capacity)
- time_last=self.bmstime[i]
- firsttime=0
- else:
- pass
- else:
- pass
- elif standingtime>3600*12:
- standingtime=0
- cellvolt_now=np.array(self._cellvolt_get(i))
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- cellvolt_last=np.array(self._cellvolt_get(i-1))
- deltvolt=max(abs(cellvolt_now-cellvolt_last))
- if 2<cellvolt_max<self.param.OcvInflexionBelow-0.002 and 2<cellvolt_min<4.5 and abs(deltvolt)<0.003:
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_now=self._celldeltsoc_get(i, dict_baltime,capacity) #获取每个电芯的SOC差
- time_now=self.bmstime[i]
- list_sub=deltsoc_now-deltsoc_last
- list_pud=[0.01*capacity*3600*1000/(time_now-time_last).total_seconds()]*self.param.CellVoltNums
- leak_current=list_sub*list_pud
- # leak_current=np.array(leak_current)
- leak_current=np.round(leak_current,3)
- leak_current=list(leak_current)
-
- df_res.loc[len(df_res)]=[time_last,time_now,self.sn,1,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe
- time_last=time_now #更新时间
- deltsoc_last=deltsoc_now #更新soc差
- dict_bal={}
- else:
- pass
- else:
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal.keys():
- dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step
- else:
- dict_bal[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal={}
- #非平台区间静置法计算内短路-开始.....................................................................................................................................................
- if standingtime1>self.StandardStandingTime:
- standingtime1=0
- cellvolt_now1=self._cellvolt_get(i)
- cellvolt_max1=max(cellvolt_now1)
- cellvolt_min1=min(cellvolt_now1)
- cellvolt_last1=self._cellvolt_get(i-1)
- deltvolt1=max(abs(cellvolt_now1-cellvolt_last1))
-
- if 2<cellvolt_max1<self.param.OcvInflexionBelow-0.002 and 2<cellvolt_min1<4.5 and deltvolt1<0.003:
- if firsttime1==1:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- deltsoc_last1=self._celldeltsoc_get(i,dict_baltime1,capacity)
- time_last1=self.bmstime[i]
- firsttime1=0
- else:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- deltsoc_now1=self._celldeltsoc_get(i,dict_baltime1,capacity)
- time_now1=self.bmstime[i]
- time_now1=self.bmstime[i]
- if abs(max(deltsoc_now1)-max(deltsoc_last1))<10 and (time_now1-time_last1).total_seconds()>3600*24:
- list_sub1=deltsoc_now1-deltsoc_last1
- list_pud1=[0.01*capacity*3600*1000/(time_now1-time_last1).total_seconds()]*self.param.CellVoltNums
- leak_current1=list_sub1*list_pud1
- # leak_current1=np.array(leak_current1)
- leak_current1=np.round(leak_current1,3)
- leak_current1=list(leak_current1)
-
- df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe
- time_last1=time_now1 #更新时间
- deltsoc_last1=deltsoc_now1 #更新soc差
- dict_bal1={}
- else:
- pass
-
- else:
- pass
- else:
- pass
- else:
- dict_bal={}
- firsttime=1
- standingtime=0
- standingtime1=0
- pass
- #获取充电数据——开始..............................................................................................................
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态']) #统计均衡状态
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal3.keys():
- dict_bal3[str(balstat)]=dict_bal3[str(balstat)]+bal_step
- else:
- dict_bal3[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal3={}
- if charging==0:
-
- if self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<=-1 and self.bms_soc[i]<40: #判断充电开始
- cellvolt_now=self._cellvolt_get(i)
- if min(cellvolt_now)<self.param.CellFullChrgVolt-0.15:
- charging=1
- if len(chrg_start)>len(chrg_end):
- chrg_start[-1]=i
- else:
- chrg_start.append(i)
- else:
- pass
- else:
- pass
- else: #充电中
- if (self.bmstime[i+1]-self.bmstime[i]).total_seconds()>180 or (self.packcrnt[i]>self.param.Capacity/3 and self.packcrnt[i+1]>self.param.Capacity/3): #如果充电过程中时间间隔>180s,则舍弃该次充电
- chrg_start.remove(chrg_start[-1])
- charging=0
- continue
- elif self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]<-1:
- cellvolt_now=self._cellvolt_get(i)
- if min(cellvolt_now)>self.param.CellFullChrgVolt-0.11: #电压>满充电压-0.13V,即3.37V
- self._celltemp_weight(i)
- if i-chrg_start[-1]>10 and self.celltemp>10:
- chrg_end.append(i+1)
- dict_bal_list.append(dict_bal3)
- dict_bal3={}
- charging=0
- continue
- else:
- chrg_start.remove(chrg_start[-1])
- charging=0
- continue
- else:
- pass
- else:
- pass
-
- #基于充电数据计算单体电芯的漏电流..........................................................................................................
- if len(chrg_end)>1:
- for i in range(len(chrg_end)):
- if i<1:
- dict_baltime={}
- deltAs_last=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime)
- time_last=self.bmstime[chrg_end[i]]
- else:
- dict_baltime=self._bal_time(dict_bal_list[i]) #获取每个电芯的均衡时间
- deltAs_now=self._cellDeltAs_get(chrg_start[i],chrg_end[i],dict_baltime) #获取每个电芯的As差
- time_now=self.bmstime[chrg_end[i]]
- list_sub=deltAs_now-deltAs_last
- list_pud=[-1000/(time_now-time_last).total_seconds()]*self.param.CellVoltNums
- leak_current=list_sub*list_pud
- # leak_current=np.array(leak_current)
- leak_current=np.round(leak_current,3)
- leak_current=list(leak_current)
- df_res.loc[len(df_res)]=[time_last,time_now,self.sn,3,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe
- deltAs_last=deltAs_now
- time_last=time_now
- else:
- pass
- if df_res.empty:
- return pd.DataFrame()
- else:
- return df_res
|