import pandas as pd import numpy as np import datetime from LIB.MIDDLE.CellStateEstimation.Common import BatParam class BatUniform(): def __init__(self,sn,celltype,df_bms,df_uniform,df_last3,df_lfp1): #参数初始化 if (not df_lfp1.empty) and celltype>50: df_lfp1.drop(['sn'],axis=1) df_bms=pd.concat([df_lfp1, df_bms], ignore_index=True) else: pass self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=df_bms self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') # df_uniform['time']=pd.to_datetime(df_uniform['time'], format='%Y-%m-%d %H:%M:%S') self.df_uniform=df_uniform self.df_last3=df_last3 self.df_lfp1=df_lfp1 self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] def batuniform(self): if self.celltype<50: df_res, df_ram_last3=self._ncm_uniform() return df_res, df_ram_last3, self.df_lfp1 else: df_res, df_ram_last3, df_ram_lfp1=self._lfp_uniform() return df_res, df_ram_last3, df_ram_lfp1 #定义滑动滤波函数........................................................................................................................................ def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #寻找当前行数据的最小温度值................................................................................................................................ def _celltemp_weight(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) celltemp.remove(min(celltemp)) self.celltemp=celltemp if self.celltype>50: if min(celltemp)>=25: self.tempweight=1 self.StandardStandingTime=2400 elif min(celltemp)>=15: self.tempweight=0.6 self.StandardStandingTime=3600 elif min(celltemp)>=5: self.tempweight=0.2 self.StandardStandingTime=4800 else: self.tempweight=0.1 self.StandardStandingTime=7200 else: if min(celltemp)>=25: self.tempweight=1 self.StandardStandingTime=1800 elif min(celltemp)>=15: self.tempweight=0.8 self.StandardStandingTime=2400 elif min(celltemp)>=5: self.tempweight=0.6 self.StandardStandingTime=3600 else: self.tempweight=0.2 self.StandardStandingTime=7200 #获取当前行所有电压数据............................................................................................................................ def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name])/1000 return cellvolt #获取单个电压值................................................................................................. def _singlevolt_get(self,num,series,mode): #mode==1取当前行单体电压值,mode==2取某个单体所有电压值 s=str(series) if mode==1: singlevolt=self.df_bms.loc[num,'单体电压' + s]/1000 return singlevolt else: singlevolt=self.df_bms['单体电压' + s]/1000 return singlevolt #寻找DVDQ的峰值点,并返回.......................................................................................................................... def _dvdq_peak(self, time, soc, cellvolt, packcrnt): cellvolt = self._np_move_avg(cellvolt, 3, mode="same") Soc = 0 Ah = 0 Volt = cellvolt[0] DV_Volt = [] DQ_Ah = [] DVDQ = [] time1 = [] soc1 = [] soc2 = [] xvolt=[] for m in range(1, len(time)): Step = (time[m] - time[m - 1]).total_seconds() Soc = Soc - packcrnt[m] * Step * 100 / (3600 * self.param.Capacity) Ah = Ah - packcrnt[m] * Step / 3600 if (cellvolt[m]-Volt)>0.0019 and Ah>0: DQ_Ah.append(Ah) DV_Volt.append(cellvolt[m]-Volt) DVDQ.append((DV_Volt[-1])/Ah) xvolt.append(cellvolt[m]) Volt=cellvolt[m] Ah = 0 soc1.append(Soc) time1.append(time[m]) soc2.append(soc[m]) #切片,去除前后10min的数据 df_Data1 = pd.DataFrame({'time': time1, 'SOC': soc2, 'DVDQ': DVDQ, 'AhSoc': soc1, 'DQ_Ah':DQ_Ah, 'DV_Volt':DV_Volt, 'XVOLT':xvolt}) start_time=df_Data1.loc[0,'time'] start_time=start_time+datetime.timedelta(seconds=900) end_time=df_Data1.loc[len(time1)-1,'time'] end_time=end_time-datetime.timedelta(seconds=1200) if soc2[0]<36: df_Data1=df_Data1[(df_Data1['SOC']>40) & (df_Data1['SOC']<80)] else: df_Data1=df_Data1[(df_Data1['time']>start_time) & (df_Data1['SOC']<80)] df_Data1=df_Data1[(df_Data1['XVOLT']>self.param.PeakVoltLowLmt) & (df_Data1['XVOLT']2: #寻找峰值点,且峰值点个数>2 PeakIndex = df_Data1['DVDQ'].idxmax() df_Data2 = df_Data1[(df_Data1['SOC'] > (df_Data1['SOC'][PeakIndex] - 0.5)) & (df_Data1['SOC'] < (df_Data1['SOC'][PeakIndex] + 0.5))] if len(df_Data2) > 2 and df_Data1.loc[PeakIndex,'XVOLT'] (df_Data1['SOC'][PeakIndex] - 0.5)) & (df_Data1['SOC'] < (df_Data1['SOC'][PeakIndex] + 0.5))] if len(df_Data2) > 2 and df_Data1.loc[PeakIndex,'XVOLT']1: standingtime2=standingtime2+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds() else: pass for i in range(1,len(self.df_bms)-2): if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: #电流为0 delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime2=standingtime2+delttime self._celltemp_weight(i) #获取不同温度对应的静置时间 if standingtime2>self.StandardStandingTime: #静置时间满足要求 if abs(self.packcrnt[i+2]) >= 0.1: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 33600*6: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 3=len(self.df_bms)-3: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 31: standingtime2=standingtime2+(self.bmstime[0]-df_ram_last3.loc[0,'time3']).total_seconds() else: pass for i in range(2,len(self.df_bms)-2): #静置电压法计算电芯一致性 if abs(self.packcrnt[i]) < 0.1 and abs(self.packcrnt[i-1]) < 0.1 and abs(self.packcrnt[i+1]) < 0.1: #电流为0 delttime=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() standingtime2=standingtime2+delttime self._celltemp_weight(i) #获取不同温度对应的静置时间 if standingtime2>self.StandardStandingTime: #静置时间满足要求 if abs(self.packcrnt[i+2]) >= 0.1: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 2 < cellvolt_max < self.param.OcvInflexionBelow-0.002 and 23600*6: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 2 < cellvolt_max < self.param.OcvInflexionBelow-0.002 and 2=len(self.df_bms)-3: standingtime2=0 cellvolt_now=self._cellvolt_get(i) cellvolt_min=min(cellvolt_now) cellvolt_max=max(cellvolt_now) cellvolt_last=self._cellvolt_get(i-1) deltvolt=max(abs(cellvolt_now-cellvolt_last)) if 2 < cellvolt_max < self.param.OcvInflexionBelow-0.002 and 2len(chrg_end): chrg_start[-1]=i else: chrg_start.append(i) else: pass else: #充电中 if (self.bmstime[i+1]-self.bmstime[i]).total_seconds()>180 or (self.packcrnt[i]<-self.param.Capacity/2 and self.packcrnt[i+1]<-self.param.Capacity/2): #如果充电过程中时间间隔>180s,则舍弃该次充电 chrg_start.remove(chrg_start[-1]) charging=0 continue elif self.packcrnt[i]<=-1 and self.packcrnt[i+1]<=-1 and self.packcrnt[i+2]>-1: #判断电流波动时刻 cellvolt_now=self._cellvolt_get(i+1) if max(cellvolt_now)>self.param.CellFullChrgVolt: #电压>满充电压 chrg_end.append(i+1) charging=0 continue else: pass elif self.packcrnt[i+1]>-0.1 and self.packcrnt[i+2]>-0.1: #判断充电结束 charging=0 if len(chrg_start)>len(chrg_end): if self.bms_soc[i]>90: chrg_end.append(i) else: chrg_start.remove(chrg_start[-1]) continue else: continue elif i==len(self.packcrnt)-3 and self.packcrnt[i+1]<-1 and self.packcrnt[i+2]<-1: charging=0 if len(chrg_start)>len(chrg_end) and self.bms_soc[i]>90: #soc>90 chrg_end.append(i) continue else: df_ram_lfp1=self.df_bms.iloc[chrg_start[-1]:] df_ram_lfp1['sn']=self.sn chrg_start.remove(chrg_start[-1]) continue else: continue if chrg_end: #DVDQ方法计算soc差 peaksoc_list=[] for i in range(len(chrg_end)): peaksoc_list = [] self._celltemp_weight(chrg_start[i]) if min(self.celltemp)>10: for j in range(1, self.param.CellVoltNums + 1): cellvolt = self._singlevolt_get(i,j,2) #取单体电压j的所有电压值 cellvolt = list(cellvolt[chrg_start[i]:chrg_end[i]]) time = list(self.bmstime[chrg_start[i]:chrg_end[i]]) packcrnt = list(self.packcrnt[chrg_start[i]:chrg_end[i]]) soc = list(self.bms_soc[chrg_start[i]:chrg_end[i]]) peaksoc = self._dvdq_peak(time, soc, cellvolt, packcrnt) if peaksoc>1: peaksoc_list.append(peaksoc) #计算到达峰值点的累计Soc else: pass if len(peaksoc_list)>self.param.CellVoltNums/2: peaksoc_max=max(peaksoc_list) peaksoc_min=min(peaksoc_list) peaksoc_maxnum=peaksoc_list.index(peaksoc_min)+1 peaksoc_minnum=peaksoc_list.index(peaksoc_max)+1 cellsoc_diff=peaksoc_max-peaksoc_min cellsoc_diff=eval(format(cellsoc_diff,'.1f')) if not df_res.empty: cellvolt_rank=df_res.iloc[-1]['cellvolt_rank'] df_res.loc[len(df_res)]=[self.bmstime[chrg_start[i]], self.sn, cellsoc_diff, 0, peaksoc_minnum, peaksoc_maxnum, cellvolt_rank] elif not self.df_uniform.empty: cellvolt_rank=self.df_uniform.iloc[-1]['cellvolt_rank'] df_res.loc[len(df_res)]=[self.bmstime[chrg_start[i]], self.sn, cellsoc_diff, 0, peaksoc_minnum, peaksoc_maxnum, cellvolt_rank] else: pass else: pass else: pass #更新RAM的standingtime df_ram_last3.loc[0]=[self.sn,self.bmstime[len(self.bmstime)-1],standingtime,standingtime1,standingtime2] if df_res.empty: return pd.DataFrame(), df_ram_last3, df_ram_lfp1 else: df_res.sort_values(by='time', ascending=True, inplace=True) return df_res, df_ram_last3, df_ram_lfp1