123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826 |
- import pandas as pd
- import numpy as np
- import datetime
- import BatParam
- class BatInterShort():
- def __init__(self,sn,celltype,df_bms,df_soh,df_last,df_last1,df_last2,df_last3,df_lfp): #参数初始化
- if (not df_lfp.empty) and celltype>50:
- df_bms=pd.concat([df_lfp, df_bms], ignore_index=True)
- df_bms.reset_index(inplace=True,drop=True)
- else:
- pass
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.packcrnt=df_bms['PackCrnt']*self.param.PackCrntDec
- self.packvolt=df_bms['PackVolt']
- self.bms_soc=df_bms['PackSOC']
- df_bms['time']=pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S')
- self.bmstime= df_bms['time']
- self.df_bms=df_bms
- self.df_soh=df_soh
- self.df_last=df_last
- self.df_last1=df_last1
- self.df_last2=df_last2
- self.df_last3=df_last3
- self.df_lfp=df_lfp
- self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)]
- def intershort(self):
- if self.celltype<=50:
- df_res, df_ram_last, df_ram_last1, df_ram_last3=self._ncm_intershort()
- return df_res, df_ram_last, df_ram_last1,self.df_last2, df_ram_last3,self.df_lfp
- else:
- df_res, df_ram_last, df_ram_last1, df_ram_last2, df_ram_last3, df_ram_lfp=self._lfp_intershort()
- return df_res, df_ram_last, df_ram_last1, df_ram_last2, df_ram_last3, df_ram_lfp
- #定义滑动滤波函数....................................................................................
- def _np_move_avg(self,a, n, mode="same"):
- return (np.convolve(a, np.ones((n,)) / n, mode=mode))
- #寻找当前行数据的最小温度值.............................................................................
- def _celltemp_weight(self,num):
- celltemp = list(self.df_bms.loc[num,self.celltemp_name])
- celltemp=min(celltemp)
- self.celltemp=celltemp
- if self.celltype>50:
- if celltemp>=25:
- self.tempweight=1
- self.StandardStandingTime=4500
- elif celltemp>=15:
- self.tempweight=0.6
- self.StandardStandingTime=7200
- elif celltemp>=5:
- self.tempweight=0.2
- self.StandardStandingTime=10800
- else:
- self.tempweight=0.1
- self.StandardStandingTime=10800
- else:
- if celltemp>=25:
- self.tempweight=1
- self.StandardStandingTime=3600
- elif celltemp>=15:
- self.tempweight=0.8
- self.StandardStandingTime=5400
- elif celltemp>=5:
- self.tempweight=0.6
- self.StandardStandingTime=7200
- else:
- self.tempweight=0.2
- self.StandardStandingTime=10800
- #获取前5min每个电压的平均值........................................................................................
- def _avgvolt_get(self,num):
- time_now=self.df_bms.loc[num, 'time']
- time_last=time_now-datetime.timedelta(seconds=300)
- df_volt=self.df_bms[(self.df_bms['time']>=time_last) & (self.df_bms['time']<=time_now)]
- df_volt=df_volt[self.cellvolt_name]
- df_volt=df_volt[(df_volt>2) & (df_volt<4.5)]
- df_volt=df_volt.dropna()
- cellvolt_std=df_volt.std(axis=0)
- if len(df_volt)>2 and max(cellvolt_std)<0.005:
- cellvolt_sum=df_volt.sum(0)-df_volt.max(0)-df_volt.min(0)
- cellvolt_mean=cellvolt_sum/(len(df_volt)-2)
- cellvolt=cellvolt_mean
- elif len(df_volt)==2:
- # df_volt=pd.DataFrame(df_volt,dtype=np.float)
- if max(abs(df_volt.iloc[1]-df_volt.iloc[0]))<0.003:
- cellvolt=df_volt.mean(0)
- else:
- cellvolt=pd.DataFrame()
- elif len(df_volt)==1:
- cellvolt=df_volt.iloc[0]
- else:
- cellvolt=pd.DataFrame()
- return cellvolt
- #获取当前行所有电压数据........................................................................................
- def _cellvolt_get(self,num):
- cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name])
- return cellvolt
- #获取单个电压值.................................................................................................
- def _singlevolt_get(self,num,series,mode): #mode==1取当前行单体电压值,mode==2取某个单体所有电压值
- s=str(series)
- if mode==1:
- singlevolt=self.df_bms.loc[num,'CellVolt' + s]
- return singlevolt
- else:
- singlevolt=self.df_bms['CellVolt' + s]
- return singlevolt
- #获取当前行所有soc差...........................................................................................
- def _celldeltsoc_get(self,cellvolt_list,dict_baltime,capacity):
- cellsoc=[]
- celldeltsoc=[]
- for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压对应的SOC值
- cellvolt=cellvolt_list[j-1]
- ocv_soc=np.interp(cellvolt,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if j in dict_baltime.keys():
- ocv_soc=ocv_soc+dict_baltime[j]*self.param.BalCurrent/(capacity*3600) #补偿均衡电流
- else:
- pass
- cellsoc.append(ocv_soc)
- cellsocmean=(sum(cellsoc)-max(cellsoc)-min(cellsoc))/(len(cellsoc)-2)
- for j in range(len(cellsoc)): #计算每个电芯的soc差
- celldeltsoc.append(cellsoc[j]-cellsocmean)
- return np.array(celldeltsoc), np.array(cellsoc)
-
- #寻找DVDQ的峰值点,并返回..........................................................................................................................
- def _dvdq_peak(self, time, soc, cellvolt, packcrnt):
- cellvolt = self._np_move_avg(cellvolt, 5, mode="same")
- Soc = 0
- Ah = 0
- Ah_total=0
- Volt = cellvolt[0]
- DV_Volt = []
- DQ_Ah = []
- DVDQ = []
- time1 = []
- soc1 = []
- soc2 = []
- xvolt=[]
- for m in range(1, len(time)):
- Step = (time[m] - time[m - 1]).total_seconds()
- Soc = Soc - packcrnt[m] * Step * 100 / (3600 * float(self.param.Capacity))
- Ah = Ah - packcrnt[m] * Step / 3600
- if (cellvolt[m]-Volt)>0.0012 and Ah>0.01:
- Ah_total=Ah_total+Ah
- DQ_Ah.append(Ah_total)
- DV_Volt.append(cellvolt[m]-Volt)
- DVDQ.append((DV_Volt[-1])/Ah)
- xvolt.append(cellvolt[m])
- Volt=cellvolt[m]
- Ah = 0
- soc1.append(Soc)
- time1.append(time[m])
- soc2.append(soc[m])
- #切片,去除前后10min的数据
- df_Data1 = pd.DataFrame({'time': time1,
- 'SOC': soc2,
- 'DVDQ': DVDQ,
- 'AhSoc': soc1,
- 'DQ_Ah':DQ_Ah,
- 'DV_Volt':DV_Volt,
- 'XVOLT':xvolt})
- start_time=df_Data1.loc[0,'time']
- start_time=start_time+datetime.timedelta(seconds=900)
- end_time=df_Data1.loc[len(time1)-1,'time']
- end_time=end_time-datetime.timedelta(seconds=1200)
- if soc2[0]<36:
- df_Data1=df_Data1[(df_Data1['SOC']>40) & (df_Data1['SOC']<80)]
- else:
- df_Data1=df_Data1[(df_Data1['time']>start_time) & (df_Data1['SOC']<80)]
- df_Data1=df_Data1[(df_Data1['XVOLT']>self.param.PeakVoltLowLmt) & (df_Data1['XVOLT']<self.param.PeakVoltUpLmt)]
- # print(packcrnt[int(len(time)/2)], min(self.celltemp))
- # ax1 = plt.subplot(3, 1, 1)
- # plt.plot(df_Data1['SOC'],df_Data1['DQ_Ah'],'g*-')
- # plt.xlabel('SOC/%')
- # plt.ylabel('DQ_Ah')
- # plt.legend()
- # ax1 = plt.subplot(3, 1, 2)
- # plt.plot(df_Data1['SOC'],df_Data1['XVOLT'],'y*-')
- # plt.xlabel('SOC/%')
- # plt.ylabel('Volt/V')
- # plt.legend()
- # ax1 = plt.subplot(3, 1, 3)
- # plt.plot(df_Data1['SOC'], df_Data1['DVDQ'], 'r*-')
- # plt.xlabel('SOC/%')
- # plt.ylabel('DV/DQ')
- # plt.legend()
- # # plt.show()
- if len(df_Data1)>2:
- PeakIndex = df_Data1['DVDQ'].idxmax()
- df_Data2 = df_Data1[(df_Data1['SOC'] > (df_Data1['SOC'][PeakIndex] - 0.5)) & (df_Data1['SOC'] < (df_Data1['SOC'][PeakIndex] + 0.5))]
- if len(df_Data2) > 1 and min(df_Data1['SOC'])+0.5<df_Data1['SOC'][PeakIndex]<max(df_Data1['SOC'])-1:
- return df_Data1['DQ_Ah'][PeakIndex]
-
- else:
- if min(df_Data1['SOC'])+0.5<df_Data1['SOC'][PeakIndex]<max(df_Data1['SOC'])-1:
- df_Data1=df_Data1.drop([PeakIndex])
- elif df_Data1['SOC'][PeakIndex]>max(df_Data1['SOC'])-1:
- df_Data1=df_Data1[df_Data1['SOC']<(df_Data1['SOC'][PeakIndex]-1)]
- else:
- df_Data1=df_Data1[df_Data1['SOC']>(df_Data1['SOC'][PeakIndex]+0.5)]
- if len(df_Data1)>2:
- PeakIndex = df_Data1['DVDQ'].idxmax()
- df_Data2 = df_Data1[(df_Data1['SOC'] > (df_Data1['SOC'][PeakIndex] - 0.5)) & (df_Data1['SOC'] < (df_Data1['SOC'][PeakIndex] + 0.5))]
- if len(df_Data2) > 1 and min(df_Data1['SOC'])+0.5<df_Data1['SOC'][PeakIndex]<max(df_Data1['SOC'])-1: #
- return df_Data1['DQ_Ah'][PeakIndex]
- else:
- return 0
- else:
- return 0
- else:
- return 0
- #获取所有电芯的As差
- def _cellDeltAs_get(self,chrg_st,chrg_end,dict_baltime):
- cellAs=[]
- celldeltAs=[]
- for j in range(1, self.param.CellVoltNums+1): #获取每个电芯电压>峰值电压的充入As数
- if j in dict_baltime.keys(): #补偿均衡电流
- As=-self.param.BalCurrent*dict_baltime[j]
- else:
- As=0
- As_tatol=0
- symbol=0
- for m in range(chrg_st+1,chrg_end):
- As=As-self.packcrnt[m]*(self.bmstime[m]-self.bmstime[m-1]).total_seconds()
- if symbol<5:
- if self.df_bms.loc[m,'CellVolt'+str(j)]>self.param.PeakCellVolt[symbol]:
- As_tatol=As_tatol+As
- symbol=symbol+1
- else:
- continue
- else:
- cellAs.append(As_tatol/5)
- break
- cellAsmean=(sum(cellAs)-max(cellAs)-min(cellAs))/(len(cellAs)-2)
- for j in range(len(cellAs)): #计算每个电芯的soc差
- celldeltAs.append(cellAs[j]-cellAsmean)
- return np.array(celldeltAs)
- #计算每个电芯的均衡时长..........................................................................................................................
- def _bal_time(self,dict_bal):
- dict_baltime={}
- dict_baltime1={}
- for key in dict_bal:
- count=1
- x=eval(key)
- while x>0:
- if x & 1==1: #判断最后一位是否为1
- if count in dict_baltime.keys():
- dict_baltime[count] = dict_baltime[count] + dict_bal[key]
- else:
- dict_baltime[count] = dict_bal[key]
- else:
- pass
- count += 1
- x >>= 1 #右移一位
- dict_baltime=dict(sorted(dict_baltime.items(),key=lambda dict_baltime:dict_baltime[0]))
- for key in dict_baltime: #解析均衡的电芯编号
- if self.celltype==1: #科易6040
- if key<14:
- dict_baltime1[key]=dict_baltime[key]
- elif key<18:
- dict_baltime1[key-1]=dict_baltime[key]
- else:
- dict_baltime1[key-3]=dict_baltime[key]
- elif self.celltype==1: #科易4840
- if key<4:
- dict_baltime1[key-1]=dict_baltime[key]
- elif key<8:
- dict_baltime1[key-1]=dict_baltime[key]
- elif key<14:
- dict_baltime1[key-3]=dict_baltime[key]
- elif key<18:
- dict_baltime1[key-4]=dict_baltime[key]
- else:
- dict_baltime1[key-6]=dict_baltime[key]
- else:
- dict_baltime1=dict_baltime
- return dict_baltime1
- #三元电池的内短路电流计算...........................................................................................................................................................
- def _ncm_intershort(self):
- df_res=pd.DataFrame(columns=['time_st', 'time_sp', 'sn', 'method','short_current','baltime'])
- df_ram_last=self.df_last
- df_ram_last1=self.df_last1
- df_ram_last3=self.df_last3
- #容量初始化
- if self.df_soh.empty:
- batsoh=self.df_bms.loc[0,'PackSOH']
- capacity=self.param.Capacity*batsoh/100
- else:
- batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh']
- capacity=self.param.Capacity*batsoh/100
- #参数初始化
- if df_ram_last.empty:
- firsttime=1
- dict_bal={}
- else:
- deltsoc_last=df_ram_last.loc[0,'deltsoc']
- cellsoc_last=df_ram_last.loc[0,'cellsoc']
- time_last=df_ram_last.loc[0,'time']
- firsttime=0
- dict_bal={}
- if df_ram_last1.empty:
- firsttime1=1
- dict_bal1={}
- else:
- deltsoc_last1=df_ram_last1.loc[0,'deltsoc1']
- time_last1=df_ram_last1.loc[0,'time1']
- firsttime1=0
- dict_bal1={}
- if df_ram_last3.empty:
- standingtime=0
- standingtime1=0
- standingtime2=0
- else:
- standingtime=df_ram_last3.loc[0,'standingtime']
- standingtime1=df_ram_last3.loc[0,'standingtime1']
- standingtime2=df_ram_last3.loc[0,'standingtime2']
- dict_bal1={}
- if abs(self.packcrnt[0])<0.01 and standingtime>1 and standingtime1>1:
- standingtime=standingtime+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds()
- standingtime1=standingtime1+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds()
- else:
- pass
- for i in range(1,len(self.df_bms)-1):
- if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal1.keys():
- dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step
- else:
- dict_bal1[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal1={}
- else:
- pass
- if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1:
- delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds()
- standingtime=standingtime+delttime
- standingtime1=standingtime1+delttime
- self._celltemp_weight(i)
- #长时间静置法计算内短路-开始.....................................................................................................................................
- if firsttime==1:
- if standingtime>self.StandardStandingTime*2: #静置时间满足要求
- standingtime=0
- cellvolt_now=self._avgvolt_get(i)
- if not cellvolt_now.empty:
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- # cellvolt_last=self._avgvolt_get(i-1)
- # deltvolt=max(abs(cellvolt_now-cellvolt_last))
- cellsoc_max=np.interp(cellvolt_max,self.param.LookTab_OCV,self.param.LookTab_SOC)
- cellsoc_min=np.interp(cellvolt_min,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if 2<cellvolt_min<4.5 and 2<cellvolt_max<4.5 and (45<cellsoc_max or cellsoc_max<30) and (45<cellsoc_min or cellsoc_min<30):
- dict_baltime={} #获取每个电芯的均衡时间
- deltsoc_last, cellsoc_last=self._celldeltsoc_get(cellvolt_now,dict_baltime,capacity)
- time_last=self.bmstime[i]
- firsttime=0
- df_ram_last.loc[0]=[self.sn,time_last,deltsoc_last,cellsoc_last] #更新RAM信息
- else:
- pass
- elif standingtime>3600*12:
- standingtime=0
- cellvolt_now=self._avgvolt_get(i)
- if not cellvolt_now.empty:
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- # cellvolt_last=self._avgvolt_get(i-1)
- # deltvolt=max(abs(cellvolt_now-cellvolt_last))
- cellsoc_max=np.interp(cellvolt_max,self.param.LookTab_OCV,self.param.LookTab_SOC)
- cellsoc_min=np.interp(cellvolt_min,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if 2<cellvolt_min<4.5 and 2<cellvolt_max<4.5 and (45<cellsoc_max or cellsoc_max<30) and (45<cellsoc_min or cellsoc_min<30):
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_now, cellsoc_now=self._celldeltsoc_get(cellvolt_now,dict_baltime,capacity)
- time_now=self.bmstime[i]
- if -5<max(cellsoc_now-cellsoc_last)<5:
- df_ram_last.loc[0]=[self.sn,time_now,deltsoc_now,cellsoc_now] #更新RAM信息
- list_sub=deltsoc_now-deltsoc_last
- list_pud=(0.01*capacity*3600*1000)/(time_now-time_last).total_seconds()
- leak_current=list_sub*list_pud
- # leak_current=np.array(leak_current)
- leak_current=np.round(leak_current,3)
- leak_current=list(leak_current)
- df_res.loc[len(df_res)]=[time_last,time_now,self.sn,1,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe
- time_last=time_now #更新时间
- deltsoc_last=deltsoc_now #更新soc差
- dict_bal={}
- else:
- firsttime=1
- else:
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal.keys():
- dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step
- else:
- dict_bal[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal={}
- #满电静置法计算内短路-开始.....................................................................................................................................................
- if self.StandardStandingTime<standingtime1:
- standingtime1=0
- cellvolt_now1=self._avgvolt_get(i)
- if not cellvolt_now1.empty:
- cellvolt_max1=max(cellvolt_now1)
- cellvolt_min1=min(cellvolt_now1)
- # cellvolt_last1=self._avgvolt_get(i-1)
- # deltvolt1=max(abs(cellvolt_now1-cellvolt_last1))
- cellsoc_now1=np.interp(cellvolt_max1,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if cellsoc_now1>self.param.FullChrgSoc-10 and 2<cellvolt_min1<4.5 and 2<cellvolt_max1<4.5:
- if firsttime1==1:
- dict_baltime1={} #获取每个电芯的均衡时间
- deltsoc_last1, cellsoc_last1=self._celldeltsoc_get(cellvolt_now1,dict_baltime1,capacity)
- time_last1=self.bmstime[i]
- firsttime1=0
- df_ram_last1.loc[0]=[self.sn,time_last1,deltsoc_last1] #更新RAM信息
- else:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- time_now1=self.bmstime[i]
- if (time_now1-time_last1).total_seconds()>3600*20:
- deltsoc_now1, cellsoc_now1=self._celldeltsoc_get(cellvolt_now1,dict_baltime1,capacity)
- df_ram_last1.loc[0]=[self.sn,time_now1,deltsoc_now1] #更新RAM信息
- list_sub1=deltsoc_now1-deltsoc_last1
- list_pud1=(0.01*capacity*3600*1000)/(time_now1-time_last1).total_seconds()
- leak_current1=list_sub1*list_pud1
- # leak_current1=np.array(leak_current1)
- leak_current1=np.round(leak_current1,3)
- leak_current1=list(leak_current1)
- df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe
- time_last1=time_now1 #更新时间
- deltsoc_last1=deltsoc_now1 #更新soc差
- dict_bal1={}
- else:
- pass
- else:
- pass
- else:
- pass
- else:
- df_ram_last=pd.DataFrame(columns=['sn','time','deltsoc','cellsoc']) #电流>0,清空上次静置的SOC差
- dict_bal={}
- firsttime=1
- standingtime=0
- standingtime1=0
- pass
- #更新RAM的standingtime
- df_ram_last3.loc[0]=[self.sn,self.bmstime[len(self.bmstime)-1],standingtime,standingtime1,standingtime2]
- #返回计算结果
- if df_res.empty:
- return pd.DataFrame(), df_ram_last, df_ram_last1, df_ram_last3
- else:
- return df_res, df_ram_last, df_ram_last1, df_ram_last3
- #磷酸铁锂电池内短路计算程序.............................................................................................................................
- def _lfp_intershort(self):
- column_name=['time_st', 'time_sp', 'sn', 'method','short_current','baltime']
- df_res=pd.DataFrame(columns=column_name)
- df_ram_last=self.df_last
- df_ram_last1=self.df_last1
- df_ram_last2=self.df_last2
- df_ram_last3=self.df_last3
- df_ram_lfp=pd.DataFrame(columns=self.df_bms.columns.tolist())
- #容量初始化
- if self.df_soh.empty:
- batsoh=self.df_bms.loc[0,'PackSOH']
- capacity=self.param.Capacity*batsoh/100
- else:
- batsoh=self.df_soh.loc[len(self.df_soh)-1,'soh']
- capacity=self.param.Capacity*batsoh/100
- #参数初始化
- if df_ram_last.empty:
- firsttime=1
- dict_bal={}
- else:
- deltsoc_last=df_ram_last.loc[0,'deltsoc']
- cellsoc_last=df_ram_last.loc[0,'cellsoc']
- time_last=df_ram_last.loc[0,'time']
- firsttime=0
- dict_bal={}
- if df_ram_last1.empty:
- firsttime1=1
- dict_bal1={}
- else:
- deltsoc_last1=df_ram_last1.loc[0,'deltsoc1']
- time_last1=df_ram_last1.loc[0,'time1']
- firsttime1=0
- dict_bal1={}
- if df_ram_last2.empty:
- firsttime2=1
- charging=0
- dict_bal2={}
- else:
- deltAs_last2=df_ram_last2.loc[0,'deltAs2']
- time_last2=df_ram_last2.loc[0,'time2']
- firsttime2=0
- charging=0
- dict_bal2={}
- if df_ram_last3.empty:
- standingtime=0
- standingtime1=0
- standingtime2=0
- else:
- standingtime=df_ram_last3.loc[0,'standingtime']
- standingtime1=df_ram_last3.loc[0,'standingtime1']
- standingtime2=df_ram_last3.loc[0,'standingtime2']
- dict_bal1={}
- if abs(self.packcrnt[0])<0.01 and standingtime>1 and standingtime1>1:
- standingtime=standingtime+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds()
- standingtime1=standingtime1+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds()
- else:
- pass
- for i in range(1,len(self.df_bms)-1):
- #静置法计算内短路..........................................................................................................................
- if firsttime1==0: #满电静置算法--计算均衡状态对应的均衡时间
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal1.keys():
- dict_bal1[str(balstat)]=dict_bal1[str(balstat)]+bal_step
- else:
- dict_bal1[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal1={}
- else:
- pass
- if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1:
- delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds()
- standingtime=standingtime+delttime
- standingtime1=standingtime1+delttime
- self._celltemp_weight(i)
- #长时间静置法计算内短路-开始.....................................................................................................................................
- if firsttime==1:
- if standingtime>self.StandardStandingTime: #静置时间满足要求
- cellvolt_now=self._avgvolt_get(i)
- if not cellvolt_now.empty:
- standingtime=0
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- # cellvolt_last=self._avgvolt_get(i-1)
- # deltvolt=max(abs(cellvolt_now-cellvolt_last))
- cellsoc_max=np.interp(cellvolt_max,self.param.LookTab_OCV,self.param.LookTab_SOC)
- cellsoc_min=np.interp(cellvolt_min,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if cellsoc_max<self.param.SocInflexion1-2 and 12<cellsoc_min:
- dict_baltime={} #获取每个电芯的均衡时间
- deltsoc_last, cellsoc_last=self._celldeltsoc_get(cellvolt_now,dict_baltime,capacity)
- time_last=self.bmstime[i]
- firsttime=0
- df_ram_last.loc[0]=[self.sn,time_last,deltsoc_last,cellsoc_last] #更新RAM信息
- else:
- pass
- else:
- pass
- elif standingtime>3600*12:
- cellvolt_now=np.array(self._avgvolt_get(i))
- if not cellvolt_now.empty:
- standingtime=0
- cellvolt_min=min(cellvolt_now)
- cellvolt_max=max(cellvolt_now)
- # cellvolt_last=np.array(self._avgvolt_get(i-1))
- # deltvolt=max(abs(cellvolt_now-cellvolt_last))
- cellsoc_max=np.interp(cellvolt_max,self.param.LookTab_OCV,self.param.LookTab_SOC)
- cellsoc_min=np.interp(cellvolt_min,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if cellsoc_max<self.param.SocInflexion1-2 and 12<cellsoc_min:
- dict_baltime=self._bal_time(dict_bal) #获取每个电芯的均衡时间
- deltsoc_now, cellsoc_now=self._celldeltsoc_get(cellvolt_now, dict_baltime,capacity) #获取每个电芯的SOC差
- time_now=self.bmstime[i]
- if -5<max(cellsoc_now-cellsoc_last)<5:
- df_ram_last.loc[0]=[self.sn,time_now,deltsoc_now,cellsoc_now] #更新RAM信息
- list_sub=deltsoc_now-deltsoc_last
- list_pud=(0.01*capacity*3600*1000)/(time_now-time_last).total_seconds()
- leak_current=list_sub*list_pud
- # leak_current=np.array(leak_current)
- leak_current=np.round(leak_current,3)
- leak_current=list(leak_current)
- df_res.loc[len(df_res)]=[time_last,time_now,self.sn,1,str(leak_current),str(dict_baltime)] #计算结果存入Dataframe
- time_last=time_now #更新时间
- deltsoc_last=deltsoc_now #更新soc差
- dict_bal={}
- else:
- firsttime=1
- else:
- pass
- else:
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态'])
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal.keys():
- dict_bal[str(balstat)]=dict_bal[str(balstat)]+bal_step
- else:
- dict_bal[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal={}
- #非平台区间静置法计算内短路-开始.....................................................................................................................................................
- if standingtime1>self.StandardStandingTime:
- cellvolt_now1=self._avgvolt_get(i)
- if not cellvolt_now1.empty:
- standingtime1=0
- cellvolt_max1=max(cellvolt_now1)
- cellvolt_min1=min(cellvolt_now1)
- # cellvolt_last1=self._avgvolt_get(i-1)
- # deltvolt1=max(abs(cellvolt_now1-cellvolt_last1))
- cellsoc_max1=np.interp(cellvolt_max1,self.param.LookTab_OCV,self.param.LookTab_SOC)
- cellsoc_min1=np.interp(cellvolt_min1,self.param.LookTab_OCV,self.param.LookTab_SOC)
- if cellsoc_max1<self.param.SocInflexion1-2 and 1<cellsoc_min1:
- if firsttime1==1:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- deltsoc_last1, cellsoc_last1=self._celldeltsoc_get(cellvolt_now1,dict_baltime1,capacity)
- time_last1=self.bmstime[i]
- firsttime1=0
- df_ram_last1.loc[0]=[self.sn,time_last1,deltsoc_last1] #更新RAM信息
- else:
- dict_baltime1=self._bal_time(dict_bal1) #获取每个电芯的均衡时间
- deltsoc_now1, cellsoc_now1=self._celldeltsoc_get(cellvolt_now1,dict_baltime1,capacity)
- time_now1=self.bmstime[i]
- df_ram_last1.loc[0]=[self.sn,time_now1,deltsoc_now1] #更新RAM信息
- if (time_now1-time_last1).total_seconds()>3600*24:
- list_sub1=deltsoc_now1-deltsoc_last1
- list_pud1=(0.01*capacity*3600*1000)/(time_now1-time_last1).total_seconds()
- leak_current1=list_sub1*list_pud1
- # leak_current1=np.array(leak_current1)
- leak_current1=np.round(leak_current1,3)
- leak_current1=list(leak_current1)
- df_res.loc[len(df_res)]=[time_last1,time_now1,self.sn,2,str(leak_current1),str(dict_baltime1)] #计算结果存入Dataframe
- time_last1=time_now1 #更新时间
- deltsoc_last1=deltsoc_now1 #更新soc差
- dict_bal1={}
- else:
- pass
- else:
- pass
- else:
- pass
- else:
- df_ram_last=pd.DataFrame(columns=['sn','time','deltsoc','cellsoc']) #电流>0,清空上次静置的SOC差
- dict_bal={}
- firsttime=1
- standingtime=0
- standingtime1=0
- pass
- if i==len(self.df_bms)-2 and abs(self.packcrnt[i+1]) < 0.1: #数据中断后仍在静置,将最后一条数据写入RAM
- df_ram_lfp.loc[0]=self.df_bms.iloc[-1]
- else:
- pass
- #获取充电数据——开始..............................................................................................................
- try:
- balstat=int(self.df_bms.loc[i,'单体均衡状态']) #统计均衡状态
- if balstat>0.5:
- bal_step=(self.bmstime[i+1]-self.bmstime[i]).total_seconds() #均衡步长
- bal_step=int(bal_step)
- if str(balstat) in dict_bal2.keys():
- dict_bal2[str(balstat)]=dict_bal2[str(balstat)]+bal_step
- else:
- dict_bal2[str(balstat)]=bal_step
- else:
- pass
- except:
- dict_bal2={}
- #判断充电状态
- if charging==0:
- if self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i-1]<=-1:
- if self.bms_soc[i]<40:
- cellvolt_now=self._cellvolt_get(i)
- if min(cellvolt_now)<self.param.CellFullChrgVolt-0.18:
- charging=1
- chrg_start=i
- else:
- pass
- else:
- pass
- else: #充电中
- cellvolt_now=self._cellvolt_get(i)
- if (self.bmstime[i+1]-self.bmstime[i]).total_seconds()>180 or (self.packcrnt[i]>-0.1 and self.packcrnt[i-1]>-0.1) or (self.packcrnt[i]<-self.param.Capacity and self.packcrnt[i+1]<-self.param.Capacity): #如果充电过程中时间间隔>180s,则舍弃该次充电
- charging=0
- continue
- elif min(cellvolt_now)>self.param.CellFullChrgVolt-0.15:
- self._celltemp_weight(i)
- if i-chrg_start>10 and self.celltemp>15:
- chrg_end=i+1
- charging=0
- #计算漏电流值...................................................................
- if firsttime2==1:
- firsttime2=0
- dict_baltime={}
- peaksoc_list=[]
- time = list(self.bmstime[chrg_start:chrg_end])
- packcrnt = list(self.packcrnt[chrg_start:chrg_end])
- soc = list(self.bms_soc[chrg_start:chrg_end])
- for j in range(1, self.param.CellVoltNums + 1):
- cellvolt = self._singlevolt_get(i,j,2) #取单体电压j的所有电压值
- cellvolt = list(cellvolt[chrg_start:chrg_end])
- peaksoc = self._dvdq_peak(time, soc, cellvolt, packcrnt)
- if peaksoc>1:
- peaksoc_list.append(peaksoc)
- else:
- break
- if len(peaksoc_list)==self.param.CellVoltNums:
- cellsocmean=(sum(peaksoc_list)-max(peaksoc_list)-min(peaksoc_list))/(len(peaksoc_list)-2)
- celldeltsoc=[x-cellsocmean for x in peaksoc_list] #计算每个电芯的soc差
- deltAs_last2=np.array(celldeltsoc)
- time_last2=self.bmstime[chrg_end]
- df_ram_last2.loc[0]=[self.sn,time_last2,deltAs_last2] #更新RAM信息
- else:
- dict_baltime={}
- peaksoc_list=[]
- time = list(self.bmstime[chrg_start:chrg_end])
- packcrnt = list(self.packcrnt[chrg_start:chrg_end])
- soc = list(self.bms_soc[chrg_start:chrg_end])
- for j in range(1, self.param.CellVoltNums + 1):
- cellvolt = self._singlevolt_get(i,j,2) #取单体电压j的所有电压值
- cellvolt = list(cellvolt[chrg_start:chrg_end])
- peaksoc = self._dvdq_peak(time, soc, cellvolt, packcrnt)
- if peaksoc>1:
- peaksoc_list.append(peaksoc)
- else:
- break
-
- if len(peaksoc_list)==self.param.CellVoltNums:
- cellsocmean=(sum(peaksoc_list)-max(peaksoc_list)-min(peaksoc_list))/(len(peaksoc_list)-2)
- celldeltsoc=[x-cellsocmean for x in peaksoc_list] #计算每个电芯的soc差
- deltAs_now2=np.array(celldeltsoc)
- time_now2=self.bmstime[chrg_end]
- list_sub2=deltAs_now2-deltAs_last2
- list_pud2=(3600*1000)/(time_now2-time_last2).total_seconds()
- leak_current2=list_sub2*list_pud2
- leak_current2=np.round(leak_current2,3)
- leak_current2=list(leak_current2)
- df_res.loc[len(df_res)]=[time_last2,time_now2,self.sn,3,str(leak_current2),str(dict_baltime)] #计算结果存入Dataframe
- deltAs_last2=deltAs_now2
- time_last2=time_now2
- dict_bal2={}
- df_ram_last2.loc[0]=[self.sn,time_last2,deltAs_last2] #更新RAM信息
- else:
- charging=0
- continue
- elif i==len(self.df_bms)-2: #数据中断后仍在充电,将前段充电数据写入RAM
- df_ram_lfp=self.df_bms.iloc[chrg_start:]
- df_ram_lfp['sn']=self.sn
- else:
- pass
- #更新RAM
- df_ram_last3.loc[0]=[self.sn,self.bmstime[len(self.bmstime)-1],standingtime,standingtime1,standingtime2]
- #返回结果
- if df_res.empty:
- return pd.DataFrame(), df_ram_last, df_ram_last1, df_ram_last2, df_ram_last3,df_ram_lfp
- else:
- return df_res, df_ram_last, df_ram_last1, df_ram_last2, df_ram_last3, df_ram_lfp
|