import pandas as pd import numpy as np from scipy.interpolate import interp2d import BatParam import datetime class BatSoc(): def __init__(self,sn,celltype,df_bms,df_soh,df_ram_sn,df_socdiff): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_soh=df_soh self.df_socdiff=df_socdiff self.df_ram_sn=df_ram_sn df_bms['time']=pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S') self.df_bms=df_bms if not self.df_ram_sn.empty: self.df_bms=self.df_bms[self.df_bms['time'] > self.df_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据 self.df_bms.reset_index(inplace=True,drop=True) #重置索引 self.packcrnt=df_bms['PackCrnt']*self.param.PackCrntDec self.bms_soc=df_bms['PackSOC'] self.bms_soh=df_bms['PackSOH'] self.bmstime= pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S') self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)] def batsoc(self): if self.celltype<50: if not self.df_bms.empty: df_res, df_ram=self._ncm_soc() return df_res, df_ram else: return pd.DataFrame(), pd.DataFrame() elif self.celltype>50: if not self.df_bms.empty: df_res, df_ram=self._lfp_soc() return df_res, df_ram else: return pd.DataFrame(), pd.DataFrame() else: return pd.DataFrame() #........................................................寻找当前行数据的最小温度值........................................................................................... def _celltemp_weight(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) celltemp=np.mean(celltemp) self.celltemp=celltemp if self.celltype==99: if celltemp>=25: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=15: self.tempweight=0.6 self.StandardStandingTime=7200 elif celltemp>=5: self.tempweight=0. self.StandardStandingTime=10800 else: self.tempweight=0.1 self.StandardStandingTime=18000 else: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=1800 elif celltemp>=10: self.tempweight=0.8 self.StandardStandingTime=3600 elif celltemp>=5: self.tempweight=0.6 self.StandardStandingTime=7200 else: self.tempweight=0.2 self.StandardStandingTime=10800 #.......................................................获取当前行所有电压数据............................................................................................... def _cellvolt_get(self,num): cellvolt = np.array(self.df_bms.loc[num,self.cellvolt_name]) return cellvolt #......................................................判断单体电压离散度,并返回OCV值.................................................................................... def _ocv_dispersion(self): cellvolt=self.df_bms[self.cellvolt_name] cellocv=cellvolt.mean(axis=0) cellocv_std=cellvolt.std(axis=0) cellocv_std=cellocv_std.fillna(0) if max(cellocv_std)<0.003: return np.array(cellocv) else: return [0] #...........................................................三元电池的soc计算............................................................................................... def _ncm_soc(self): column_name=['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax','socmin_num','socmax_num','cellsoc_diff','cellsoc','ocvweight','socstep'] df_res=pd.DataFrame(columns=column_name) df_bms_len=len(self.df_bms) rampackcrnt=self.packcrnt[df_bms_len-1] ocvweight=0 #获取电池包SOH................................................................................................ if self.df_soh.empty: batsoh=self.bms_soh[0] if batsoh>30: capacity=self.param.Capacity*batsoh/100 else: capacity=self.param.Capacity else: batsoh=eval(self.df_soh.loc[0,'cellsoh']) batsoh=np.array(batsoh) capacity=self.param.Capacity*batsoh/100 #计算静置时间和累计As量.................................................................................................. if not self.df_ram_sn.empty: standingtime=self.df_ram_sn.iloc[-1]['standingtime'] as_accum=0 for i in range(df_bms_len): if i==0: step=(self.bmstime[i]-self.df_ram_sn.iloc[-1]['time']).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.df_ram_sn.iloc[-1]['rampackcrnt'])<0.1: standingtime=standingtime+step else: standingtime=0 if step<120: as_accum=as_accum-self.packcrnt[i]*step elif 13600*24*30: standingtime=3600*24*30 else: pass #计算单体SOC............................................................................................................................. if not self.df_ram_sn.empty: self._celltemp_weight(0) ramcellvolt=self.df_ram_sn.iloc[-1]['ramcellvolt'] ramcellsoc=self.df_ram_sn.iloc[-1]['cellsoc'] ocvweight_2dlook=interp2d(self.param.OcvWeight_StandingTime, self.param.OcvWeight_Temp, self.param.OcvWeight, kind = 'linear') ocvweight=ocvweight_2dlook(standingtime, self.celltemp) ocvweight=ocvweight[0] if self.bmstime[0]>datetime.datetime.strptime('2021-10-22 19:49:20','%Y-%m-%d %H:%M:%S'): pass if ocvweight>0.01: cellocv=self._ocv_dispersion() if df_bms_len>1: #数据字段长度≥2 if 2self.param.CellFullChrgCrnt and max(self.packcrnt)<-0.5: deltsoc=self.param.FullChrgSoc-max(cellsoc) cellsoc=cellsoc+deltsoc else: pass #计算电池包packsoc和最大最小SOC.................................................................................................................................................................................. cellsoc[cellsoc>100]=100 #限值最大值100 cellsoc[cellsoc<0]=0 #限值最小值0 cellsoc1=list(map(lambda x:float(format(x,'.1f')), cellsoc)) cellsocmin=min(cellsoc1) cellsocmax=max(cellsoc1) cellsoc_diff=cellsocmax-cellsocmin socmin_num=cellsoc1.index(cellsocmin)+1 socmax_num=cellsoc1.index(cellsocmax)+1 alpha=(cellsocmin+cellsocmax)*0.02-2 #blending系数计算 if cellsocmax>90 or alpha>1: alpha=1 elif cellsocmin<10 or alpha<-1: alpha=-1 else: pass packsoc=(cellsocmin+cellsocmax)*0.5+(cellsocmax-cellsocmin)*0.5*alpha packsoc=eval(format(packsoc,'.1f')) socdsp=packsoc #输出ram结果.................................................................................................................................................................... list_ram=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, cellsoc, standingtime, rampackcrnt, ramcellvolt,0,0,ocvweight,as_accum,socstep] df_ram=pd.DataFrame(columns=self.df_ram_sn.columns) df_ram.loc[0]=list_ram #输出计算结果........................................................................................................................................................................ df_res.loc[0]=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, socdsp, cellsocmin, cellsocmax,socmin_num,socmax_num,cellsoc_diff,str(cellsoc1),ocvweight,socstep] return df_res, df_ram #...........................................................磷酸铁锂电池的soc计算............................................................................................... def _lfp_soc(self): column_name=['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax','socmin_num','socmax_num','cellsoc_diff','cellsoc','ocvweight','socstep'] df_res=pd.DataFrame(columns=column_name) df_bms_len=len(self.df_bms) rampackcrnt=self.packcrnt[df_bms_len-1] if not self.df_socdiff.empty: cellsoc_diff=self.df_socdiff.iloc[-1]['cellsoc_diff'] else: cellsoc_diff=2 #获取电池包SOH................................................................................................ if self.df_soh.empty: batsoh=self.bms_soh[0] if batsoh>30: capacity=self.param.Capacity*batsoh/100 else: capacity=self.param.Capacity else: batsoh=self.df_soh.loc[0,'soh'] capacity=self.param.Capacity*batsoh/100 #计算静置时间和累计As量以及OCV权重值.................................................................................................. if not self.df_ram_sn.empty: standingtime=self.df_ram_sn.iloc[-1]['standingtime'] ocvweight=self.df_ram_sn.iloc[-1]['ocvweight'] as_accum=0 for i in range(df_bms_len): if i==0: step=(self.bmstime[i]-self.df_ram_sn.iloc[-1]['time']).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.df_ram_sn.iloc[-1]['rampackcrnt'])<0.1: standingtime=standingtime+step else: standingtime=0 if step<120: as_accum=as_accum-self.packcrnt[i]*step elif 13600*24*30: standingtime=3600*24*30 else: pass #权重值计算................................................. ocvweight=ocvweight-(as_accum*10)/(capacity*3600) if ocvweight>1: ocvweight=1 elif ocvweight<0: ocvweight=0 else: pass ocvweight=eval(format(ocvweight,'.2f')) #计算单体最大最小SOC............................................................................................................................. cellocv=self._ocv_dispersion() #获取每一列电压的平均值 if not self.df_ram_sn.empty: self._celltemp_weight(0) kocellvoltmin=self.df_ram_sn.iloc[-1]['kocellvoltmin'] kocellvoltmax=self.df_ram_sn.iloc[-1]['kocellvoltmax'] ramcellsoc=self.df_ram_sn.iloc[-1]['cellsoc'] #满足静置时间 if standingtime>self.StandardStandingTime: #数据字段长度≥2............................................................................................................... if df_bms_len>1: ocvsocmin=np.interp(min(cellocv),self.param.LookTab_OCV,self.param.LookTab_SOC) ocvsocmax=np.interp(max(cellocv),self.param.LookTab_OCV,self.param.LookTab_SOC) if 2=self.param.OcvInflexionBelow and ocvsocmin2 and max(cellocv)<4: #第一平台向第二平台修正 if ocvsocmax>self.param.SocInflexion3+5 and max(ramcellsoc)self.param.SocInflexion3+5 and min(ramcellsoc)self.param.SocInflexion1+5 and max(ramcellsoc)self.param.SocInflexion1+5 and min(ramcellsoc)self.param.SocInflexion3+5: if ocvsocmaxself.param.SocInflexion3+5: cellsocmax=self.param.SocInflexion2 cellsocmin=cellsocmax-cellsoc_diff socstep=6 else: cellsocmin=self.param.SocInflexion2 cellsocmax=cellsocmin+cellsoc_diff socstep=7 else: cellsoc=ramcellsoc socstep=8 else: cellsoc=ramcellsoc socstep=8 #数据字段长度==1.............................................................................................. else: cellvolt0=self._cellvolt_get(0) ramcellvolt=self.df_ram_sn.iloc[-1]['ramcellvolt'] if max(abs(cellvolt0-ramcellvolt))<0.004: ocvmax=max(cellvolt0) ocvmin=min(cellvolt0) ocvsocmin=np.interp(ocvmin,self.param.LookTab_OCV,self.param.LookTab_SOC) ocvsocmax=np.interp(ocvmax,self.param.LookTab_OCV,self.param.LookTab_SOC) if 2=self.param.OcvInflexionBelow and ocvsocmin2 and ocvmax<4: #第一平台向第二平台修正 if ocvsocmax>self.param.SocInflexion3+5 and max(ramcellsoc)self.param.SocInflexion3+5 and min(ramcellsoc)self.param.SocInflexion1+5 and max(ramcellsoc)self.param.SocInflexion1+5 and min(ramcellsoc)self.param.SocInflexion3+5: if ocvsocmaxself.param.SocInflexion3+5: cellsocmax=self.param.SocInflexion2 cellsocmin=cellsocmax-cellsoc_diff socstep=15 else: cellsocmin=self.param.SocInflexion2 cellsocmax=cellsocmin+cellsoc_diff socstep=16 else: cellsoc=ramcellsoc socstep=17 else: cellsoc=ramcellsoc socstep=17 else: cellsoc=ramcellsoc socstep=17 #不满足静置时间,判断电压回弹方向 elif standingtime>130 and 2kocellvoltmin+0.005 and ocvsocmin1>min(ramcellsoc): if 2kocellvoltmax+0.005 and ocvsocmax1>max(ramcellsoc): #最大最小电压均在非平台区 if ocvsocmax1self.param.SocInflexion1+5 and max(ramcellsoc)max(ramcellsoc): #最大最小电压均在非平台区 if ocvsocmax1self.param.CellFullChrgCrnt and max(self.packcrnt)<-0.5: deltsoc=self.param.FullChrgSoc-max(cellsoc) cellsoc=cellsoc+deltsoc else: pass #计算所有电芯SOC........................................................................................... if not socstep in [8,17,21,22,25,26,27,28,29]: looktab_ocv=[min(cellocv),max(cellocv)] looktab_soc=[cellsocmin,cellsocmax] cellsoc=np.array(list(map(lambda x:np.interp(x,looktab_ocv,looktab_soc),cellocv))) else: pass #计算电池包packsoc和最大最小SOC.................................................................................................................................................................................. cellsoc[cellsoc>100]=100 #限值最大值100 cellsoc[cellsoc<0]=0 #限值最小值0 cellsoc1=list(map(lambda x:eval(format(x,'.1f')), cellsoc)) cellsocmin=min(cellsoc1) cellsocmax=max(cellsoc1) cellsoc_diff=cellsocmax-cellsocmin socmin_num=cellsoc1.index(cellsocmin)+1 socmax_num=cellsoc1.index(cellsocmax)+1 alpha=(cellsocmin+cellsocmax)*0.02-2 #blending系数计算 if cellsocmax>90 or alpha>1: alpha=1 elif cellsocmin<10 or alpha<-1: alpha=-1 else: pass packsoc=(cellsocmin+cellsocmax)*0.5+(cellsocmax-cellsocmin)*0.5*alpha packsoc=eval(format(packsoc,'.1f')) socdsp=packsoc #输出ram结果.................................................................................................................................................................... list_ram=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, cellsoc, standingtime, rampackcrnt, ramcellvolt,kocellvoltmin,kocellvoltmax,ocvweight,as_accum,socstep] df_ram=pd.DataFrame(columns=self.df_ram_sn.columns) df_ram.loc[0]=list_ram #输出计算结果........................................................................................................................................................................ df_res.loc[0]=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, socdsp, cellsocmin, cellsocmax,socmin_num,socmax_num,cellsoc_diff,str(cellsoc1),ocvweight,socstep] return df_res, df_ram