import pandas as pd import numpy as np import matplotlib.pyplot as plt from pymysql import paramstyle from scipy.interpolate import interp2d from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class BatSoc(): def __init__(self,sn,celltype,df_bms,df_soh,df_ram_sn,df_socdiff): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_soh=df_soh self.df_socdiff=df_socdiff self.df_ram_sn=df_ram_sn df_bms['时间戳']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.df_bms=df_bms if not self.df_ram_sn.empty: self.df_bms=self.df_bms[self.df_bms['时间戳'] > self.df_ram_sn.iloc[-1]['time']] #滤除原始数据中的重复数据 self.df_bms.reset_index(inplace=True,drop=True) #重置索引 self.packcrnt=self.df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=self.df_bms['总电压[V]'] self.bms_soc=self.df_bms['SOC[%]'] self.bms_soh=self.df_bms['SOH[%]'] self.bmstime=self.df_bms['时间戳'] self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] def batsoc(self): if self.celltype==1 or self.celltype==2 or self.celltype==3 or self.celltype==4: if not self.df_bms.empty: df_res, df_ram=self._ncm_soc() return df_res, df_ram else: return pd.DataFrame(), pd.DataFrame() elif self.celltype==99: if not self.df_bms.empty: df_res, df_ram=self._lfp_soc() return df_res, df_ram else: return pd.DataFrame(), pd.DataFrame() else: return pd.DataFrame() #..........................................................定义滑动滤波函数.................................................................................................. def _np_move_avg(self,a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #........................................................寻找当前行数据的最小温度值........................................................................................... def _celltemp_weight(self,num): celltemp = list(self.df_bms.loc[num,self.celltemp_name]) celltemp=np.mean(celltemp) self.celltemp=celltemp if self.celltype==99: if celltemp>=25: self.tempweight=1 self.StandardStandingTime=3600 elif celltemp>=15: self.tempweight=0.6 self.StandardStandingTime=7200 elif celltemp>=5: self.tempweight=0. self.StandardStandingTime=10800 else: self.tempweight=0.1 self.StandardStandingTime=18000 else: if celltemp>=20: self.tempweight=1 self.StandardStandingTime=1800 elif celltemp>=10: self.tempweight=0.8 self.StandardStandingTime=3600 elif celltemp>=5: self.tempweight=0.6 self.StandardStandingTime=7200 else: self.tempweight=0.2 self.StandardStandingTime=10800 #.......................................................获取当前行所有电压数据............................................................................................... def _cellvolt_get(self,num): cellvolt = list(self.df_bms.loc[num,self.cellvolt_name])/1000 return cellvolt #......................................................判断单体电压离散度,并返回OCV值.................................................................................... def _ocv_dispersion(self,ocv): cellvolt=self._cellvolt_get(0) max_index=cellvolt.index(max(cellvolt))+1 min_index=cellvolt.index(min(cellvolt))+1 cellvoltmax=(self.df_bms['单体电压'+str(max_index)]/1000).tolist() cellvoltmin=(self.df_bms['单体电压'+str(min_index)]/1000).tolist() if ocv==1: max_std=np.std(cellvoltmax,ddof=1) min_std=np.std(cellvoltmin,ddof=1) if max_std<0.0015 and min_std<0.0015: ocvmin=np.mean(cellvoltmin) ocvmax=np.mean(cellvoltmax) return ocvmin, ocvmax else: return 0, 0 else: return np.mean(cellvoltmin), np.mean(cellvoltmax) #...........................................................三元电池的soc计算............................................................................................... def _ncm_soc(self): column_name=['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax','ocvweight','socstep'] df_res=pd.DataFrame(columns=column_name) df_bms_len=len(self.df_bms) rampackcrnt=self.packcrnt[df_bms_len-1] ocvweight=0 #获取电池包SOH................................................................................................ if self.df_soh.empty: batsoh=self.bms_soh[0] capacity=self.param.Capacity*batsoh/100 else: batsoh=self.df_soh.loc[0,'soh'] capacity=self.param.Capacity*batsoh/100 #计算静置时间和累计As量.................................................................................................. if not self.df_ram_sn.empty: standingtime=self.df_ram_sn.iloc[-1]['standingtime'] as_accum=0 for i in range(df_bms_len): if i==0: step=(self.bmstime[i]-self.df_ram_sn.iloc[-1]['time']).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.df_ram_sn.iloc[-1]['rampackcrnt'])<0.1: standingtime=standingtime+step else: standingtime=0 if step<120: as_accum=as_accum-self.packcrnt[i]*step else: as_accum=as_accum+((self.bms_soc[i]-self.df_ram_sn.iloc[-1]['bms_soc'])*capacity*3600/100) else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.packcrnt[i-1])<0.1: standingtime=standingtime+step as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 as_accum=0 for i in range(df_bms_len): if i==0: pass else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.packcrnt[i-1])<0.1: standingtime=standingtime+step as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 as_accum=as_accum-self.packcrnt[i]*step if standingtime>3600*24*30: standingtime=3600*24*30 else: pass #计算单体最大最小SOC............................................................................................................................. if not self.df_ram_sn.empty: self._celltemp_weight(0) ramcellvoltmin=self.df_ram_sn.iloc[-1]['ramcellvoltmin'] ramcellvoltmax=self.df_ram_sn.iloc[-1]['ramcellvoltmax'] ramcellsocmin=self.df_ram_sn.iloc[-1]['cellsocmin'] ramcellsocmax=self.df_ram_sn.iloc[-1]['cellsocmax'] ocvweight_2dlook=interp2d(self.param.OcvWeight_StandingTime, self.param.OcvWeight_Temp, self.param.OcvWeight, kind = 'linear') ocvweight=ocvweight_2dlook(standingtime, self.celltemp) ocvweight=ocvweight[0] if ocvweight>0.01: if df_bms_len>1: #数据字段长度≥2 ocvmin, ocvmax=self._ocv_dispersion(1) if 21: #数据字段长度≥2 cellvolt0=self._cellvolt_get(df_bms_len-1) cellvolt1=self._cellvolt_get(df_bms_len-2) if self.param.CellFullChrgVolt-0.01最小SOC a=cellsocmin cellsocmin=cellsocmax cellsocmax=a else: pass if cellsocmax>100: #限制最大最小SOC的范围 cellsocmax=100 elif cellsocmax<0: cellsocmax=0 else: pass if cellsocmin>100: cellsocmin=100 elif cellsocmin<0: cellsocmin=0 else: pass alpha=(cellsocmin+cellsocmax)*0.02-2 #blending系数计算 if cellsocmax>90 or alpha>1: alpha=1 elif cellsocmin<10 or alpha<-1: alpha=-1 else: pass packsoc=(cellsocmin+cellsocmax)*0.5+(cellsocmax-cellsocmin)*0.5*alpha packsoc=eval(format(packsoc,'.1f')) socdsp=packsoc #输出ram结果.................................................................................................................................................................... list_ram=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, cellsocmin, cellsocmax, standingtime, rampackcrnt, ramcellvoltmin, ramcellvoltmax,0,0,ocvweight,as_accum,socstep] df_ram=pd.DataFrame(columns=self.df_ram_sn.columns) df_ram.loc[0]=list_ram #输出计算结果........................................................................................................................................................................ df_res.loc[0]=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, socdsp, cellsocmin1, cellsocmax1,ocvweight,socstep] return df_res, df_ram #...........................................................磷酸铁锂电池的soc计算............................................................................................... def _lfp_soc(self): column_name=['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax','ocvweight','socstep'] df_res=pd.DataFrame(columns=column_name) df_bms_len=len(self.df_bms) rampackcrnt=self.packcrnt[df_bms_len-1] cellsoc_diff=self.df_socdiff.iloc[-1]['cellsoc_diff'] #获取电池包SOH................................................................................................ if self.df_soh.empty: batsoh=self.bms_soh[0] capacity=self.param.Capacity*batsoh/100 else: batsoh=self.df_soh.loc[0,'soh'] capacity=self.param.Capacity*batsoh/100 #计算静置时间和累计As量以及OCV权重值.................................................................................................. if not self.df_ram_sn.empty: standingtime=self.df_ram_sn.iloc[-1]['standingtime'] ocvweight=self.df_ram_sn.iloc[-1]['ocvweight'] as_accum=0 for i in range(df_bms_len): if i==0: step=(self.bmstime[i]-self.df_ram_sn.iloc[-1]['time']).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.df_ram_sn.iloc[-1]['rampackcrnt'])<0.1: standingtime=standingtime+step else: standingtime=0 if step<120: as_accum=as_accum-self.packcrnt[i]*step else: as_accum=as_accum+((self.bms_soc[i]-self.df_ram_sn.iloc[-1]['bms_soc'])*capacity*3600/100) else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.packcrnt[i-1])<0.1: standingtime=standingtime+step as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 ocvweight=0.5 as_accum=0 for i in range(df_bms_len): if i==0: pass else: step=(self.bmstime[i]-self.bmstime[i-1]).total_seconds() if abs(self.packcrnt[i])<0.1 and abs(self.packcrnt[i-1])<0.1: standingtime=standingtime+step as_accum=as_accum-self.packcrnt[i]*step else: standingtime=0 as_accum=as_accum-self.packcrnt[i]*step #静置时间限值.............................................. if standingtime>3600*24*30: standingtime=3600*24*30 else: pass #权重值计算................................................. ocvweight=ocvweight-as_accum*10/(capacity*3600) if ocvweight>1: ocvweight=1 elif ocvweight<0: ocvweight=0 else: pass ocvweight=eval(format(ocvweight,'.2f')) #计算单体最大最小SOC............................................................................................................................. if not self.df_ram_sn.empty: self._celltemp_weight(0) kocellvoltmin=self.df_ram_sn.iloc[-1]['kocellvoltmin'] kocellvoltmax=self.df_ram_sn.iloc[-1]['kocellvoltmax'] ramcellvoltmin=self.df_ram_sn.iloc[-1]['ramcellvoltmin'] ramcellvoltmax=self.df_ram_sn.iloc[-1]['ramcellvoltmax'] ramcellsocmin=self.df_ram_sn.iloc[-1]['cellsocmin'] ramcellsocmax=self.df_ram_sn.iloc[-1]['cellsocmax'] #满足静置时间 if standingtime>self.StandardStandingTime: #数据字段长度≥2............................................................................................................... if df_bms_len>1: ocvmin, ocvmax=self._ocv_dispersion(1) ocvsocmin=np.interp(ocvmin,self.param.LookTab_OCV,self.param.LookTab_SOC) ocvsocmax=np.interp(ocvmax,self.param.LookTab_OCV,self.param.LookTab_SOC) if 2=self.param.OcvInflexionBelow and ocvsocminself.param.SocInflexion3+5 and ramcellsocmaxself.param.SocInflexion3+5 and ramcellsocminself.param.SocInflexion1+5 and ramcellsocmaxself.param.SocInflexion1+5 and ramcellsocminself.param.SocInflexion3+5: if ocvsocmaxself.param.SocInflexion3+5: cellsocmax=self.param.SocInflexion2 cellsocmin=cellsocmax-cellsoc_diff socstep=6 else: cellsocmin=self.param.SocInflexion2 cellsocmax=cellsocmin+cellsoc_diff socstep=7 else: cellsocmin=ramcellsocmin cellsocmax=ramcellsocmax socstep=8 #数据字段长度==1.............................................................................................. else: cellvolt0=self._cellvolt_get(0) ocvmax=max(cellvolt0) ocvmin=min(cellvolt0) ocvsocmin=np.interp(ocvmin,self.param.LookTab_OCV,self.param.LookTab_SOC) ocvsocmax=np.interp(ocvmax,self.param.LookTab_OCV,self.param.LookTab_SOC) if 2=self.param.OcvInflexionBelow and ocvsocminself.param.SocInflexion3+5 and ramcellsocmaxself.param.SocInflexion3+5 and ramcellsocminself.param.SocInflexion1+5 and ramcellsocmaxself.param.SocInflexion1+5 and ramcellsocminself.param.SocInflexion3+5: if ocvsocmaxself.param.SocInflexion3+5: cellsocmax=self.param.SocInflexion2 cellsocmin=cellsocmax-cellsoc_diff socstep=15 else: cellsocmin=self.param.SocInflexion2 cellsocmax=cellsocmin+cellsoc_diff socstep=16 else: cellsocmin=ramcellsocmin cellsocmax=ramcellsocmax socstep=17 #更新ram电芯电压 ramcellvoltmin=ocvmin ramcellvoltmax=ocvmax #不满足静置时间,判断电压回弹方向 elif standingtime>130 and 2kocellvoltmin+0.005 and ocvsocmin1>ramcellsocmin: if 2kocellvoltmax+0.005 and ocvsocmax1>ramcellsocmax: #最大最小电压均在非平台区 if ocvsocmax1self.param.SocInflexion1+5 and ramcellsocmaxramcellsocmax: #最大最小电压均在非平台区 if ocvsocmax11: #数据字段长度≥2 cellvolt0=self._cellvolt_get(df_bms_len-1) cellvolt1=self._cellvolt_get(df_bms_len-2) if self.param.CellFullChrgVolt最小SOC a=cellsocmin cellsocmin=cellsocmax cellsocmax=a else: pass if cellsocmax>100: #限制最大最小SOC的范围 cellsocmax=100 elif cellsocmax<0: cellsocmax=0 else: pass if cellsocmin>100: cellsocmin=100 elif cellsocmin<0: cellsocmin=0 else: pass alpha=(cellsocmin+cellsocmax)*0.02-2 #blending系数计算 if cellsocmax>90 or alpha>1: alpha=1 elif cellsocmin<10 or alpha<-1: alpha=-1 else: pass packsoc=(cellsocmin+cellsocmax)*0.5+(cellsocmax-cellsocmin)*0.5*alpha packsoc=eval(format(packsoc,'.1f')) socdsp=packsoc #输出ram结果.................................................................................................................................................................... list_ram=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, cellsocmin, cellsocmax, standingtime, rampackcrnt, ramcellvoltmin, ramcellvoltmax,kocellvoltmin,kocellvoltmax,ocvweight,as_accum,socstep] df_ram=pd.DataFrame(columns=self.df_ram_sn.columns) df_ram.loc[0]=list_ram #输出计算结果........................................................................................................................................................................ df_res.loc[0]=[self.bmstime[df_bms_len-1], self.sn, self.bms_soc[df_bms_len-1], packsoc, socdsp, cellsocmin1, cellsocmax1,ocvweight,socstep] return df_res, df_ram