|
@@ -0,0 +1,122 @@
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import datetime
|
|
|
+import time, datetime
|
|
|
+import scipy
|
|
|
+import math
|
|
|
+import itertools
|
|
|
+import log
|
|
|
+from scipy import interpolate
|
|
|
+from sklearn.linear_model import LinearRegression
|
|
|
+from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
|
|
|
+
|
|
|
+class sor_est:
|
|
|
+ def __init__(self,sn,celltype,df_bms): #参数初始化
|
|
|
+
|
|
|
+ self.sn=sn
|
|
|
+ self.celltype=celltype
|
|
|
+ self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
|
|
|
+ self.df_bms=pd.DataFrame(df_bms)
|
|
|
+ self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
|
|
|
+ self.packvolt=df_bms['总电压[V]']
|
|
|
+ self.bms_soc=df_bms['SOC[%]']
|
|
|
+ self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
|
|
|
+ self.LookTab_OCV = self.param.LookTab_OCV
|
|
|
+ self.LookTab_SOC = self.param.LookTab_SOC
|
|
|
+
|
|
|
+ self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
|
|
|
+ self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
|
|
|
+ self.bmssta = df_bms['充电状态']
|
|
|
+ #定义加权滤波函数..................................................................................................
|
|
|
+ def moving_average(interval, windowsize):
|
|
|
+ window = np.ones(int(windowsize)) / float(windowsize)
|
|
|
+ re = np.convolve(interval, window, 'same')
|
|
|
+ return re
|
|
|
+#.............................................内阻及电压估计............................................................................
|
|
|
+ def sor_cal(self):
|
|
|
+ start_time = time.time()
|
|
|
+ data = self.df_bms
|
|
|
+ data.fillna(0, inplace=True)
|
|
|
+ LookTab_OCV = self.LookTab_OCV
|
|
|
+ LookTab_SOC = self.LookTab_SOC
|
|
|
+ max_volt = max(LookTab_OCV)
|
|
|
+ min_volt = min(LookTab_OCV)
|
|
|
+ crnt = data['总电流[A]']
|
|
|
+ zero_group = itertools.groupby(crnt, lambda x : x == 0)#判断电流是否为0,并根据0和非0分组
|
|
|
+ zero_flg = []
|
|
|
+ zero_value = []
|
|
|
+ for k, v in zero_group:
|
|
|
+ zero_flg.append(k)#0和非0状态
|
|
|
+ zero_value.append(len(list(v)))#各个连续0和非0值的个数
|
|
|
+ zero_flg_new = zero_flg.copy()
|
|
|
+ for item in range(0, len(zero_value)):
|
|
|
+ if (zero_flg[item] == True) & (zero_value[item] < 6):#筛选出充电或放电过程中跳零的点,0点数据少于等于3个采样的时刻
|
|
|
+ zero_flg_new[item] = False
|
|
|
+ def get_index1(lst=None, item=''):
|
|
|
+ return [index for (index,value) in enumerate(lst) if value == item]#获取某个元素的位置
|
|
|
+ true_num = (get_index1(zero_flg_new, True))#获取电流为0的位置
|
|
|
+ zero_value_fit = []
|
|
|
+ true_num_flg = []
|
|
|
+ for item in range(len(true_num) - 1):
|
|
|
+ if ((true_num[item + 1] - true_num[item]) == 2):#重新赋值零点和非零点并计算各数据段个数
|
|
|
+ true_num_flg.append(True)
|
|
|
+ true_num_flg.append(False)
|
|
|
+ zero_value_fit.append(zero_value[true_num[item]])#[2*item]
|
|
|
+ zero_value_fit.append(zero_value[true_num[item] + 1])#[2*item + 1]
|
|
|
+ else:
|
|
|
+ true_num_flg.append(True)
|
|
|
+ true_num_flg.append(False)
|
|
|
+ zero_value_fit.append(zero_value[true_num[item]])
|
|
|
+ zero_value_fit.append(sum(zero_value[(true_num[item] + 1):(true_num[item + 1])]))
|
|
|
+ true_num_flg_fit = []
|
|
|
+ for item in range(1,len(zero_value_fit) + 1):
|
|
|
+ true_num_flg_fit.append(sum(zero_value_fit[0:item]) - 1)
|
|
|
+ true_num_flg_fit.insert(0,0)
|
|
|
+ soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数
|
|
|
+ cellvolt_name = self.cellvolt_list#电芯数量
|
|
|
+ celltemp_name = self.celltemp_name#电芯温度数量
|
|
|
+ df_sor_result = pd.DataFrame(columns = ["sn", "time", "sor", "sor_sigma", "soc", "temp", "delta_time"])
|
|
|
+ for range_num in range(0, len(true_num_flg) - 1):#
|
|
|
+ sor_temp = []
|
|
|
+ df_time = pd.to_datetime(data['时间戳'])
|
|
|
+ delta_time = (df_time.iloc[true_num_flg_fit[range_num + 1]] - df_time.iloc[true_num_flg_fit[range_num] + 1])/pd.Timedelta(1, 'min')#电流为0阶段的时间长度
|
|
|
+ if (true_num_flg[range_num] == True) & (delta_time > 10):#电流为0时间大于30 min时,利用下一段数据计算阻值
|
|
|
+ df_sor_soc_data = data.iloc[(true_num_flg_fit[range_num] + 1):(true_num_flg_fit[range_num + 1] + 1)]#获取电流为零数据段,计算SOC
|
|
|
+ df_sor_cal_temp = data.iloc[(true_num_flg_fit[range_num + 1] + 1):(true_num_flg_fit[range_num + 2] + 1)]#获取电流非零数据,计算内阻
|
|
|
+ df_sor_cal_temp.reset_index(drop = True, inplace = True)
|
|
|
+ df_sor_cal_len = len(df_sor_cal_temp)
|
|
|
+ cal_range_num = true_num_flg_fit[range_num + 1] + 1
|
|
|
+ if df_sor_cal_len > 20:#放电数据
|
|
|
+ df_sor_cal = df_sor_cal_temp.iloc[:30]#筛选前30行放电数据
|
|
|
+ df_sor_time = pd.to_datetime(df_sor_cal['时间戳'])
|
|
|
+ delta_time_sor = (df_sor_time.iloc[-1] - df_sor_time.iloc[0])/pd.Timedelta(1, 'min')#计算内阻使用数据段的时长
|
|
|
+ df_soc_volt = df_sor_soc_data[cellvolt_name].iloc[-1]/1000
|
|
|
+ df_soc_volt[df_soc_volt > max_volt] = max_volt - 0.005
|
|
|
+ df_soc_volt[df_soc_volt < min_volt] = min_volt + 0.005
|
|
|
+ df_soc = soc_interplt(list(df_soc_volt))#插值计算电芯SOC
|
|
|
+ df_sor_volt = df_sor_cal[cellvolt_name]/1000
|
|
|
+ df_sor_crnt = df_sor_cal['总电流[A]']
|
|
|
+ df_sorvolt_dif = np.diff(df_sor_volt, axis = 0)
|
|
|
+ df_sorcrnt_dif = np.diff(df_sor_crnt, axis = 0)
|
|
|
+ df_sorvolt_dif_csv = pd.DataFrame(df_sorvolt_dif)
|
|
|
+ df_sorvolt_dif_csv.columns = cellvolt_name
|
|
|
+ if np.sum(df_sorcrnt_dif != 0) > 0.5*len(df_sorcrnt_dif):
|
|
|
+ for item in cellvolt_name:
|
|
|
+ lineModel = LinearRegression()
|
|
|
+ lineModel.fit(np.array(df_sorcrnt_dif).reshape(-1, 1), df_sorvolt_dif_csv[item])
|
|
|
+ sor_temp.append(lineModel.coef_[0])
|
|
|
+ sor_sort_temp = np.sort(sor_temp)
|
|
|
+ sor_sort_del = sor_sort_temp[1:-2]
|
|
|
+ sor_sort_del_mean = np.mean(sor_sort_del)#去除最值的均值
|
|
|
+ sor_sort_del_std = np.std(sor_sort_del)#去除最值的均方差
|
|
|
+ sor_sigma = list((sor_temp - sor_sort_del_mean)/sor_sort_del_std)
|
|
|
+ df_sor_result_temp = pd.DataFrame({"sn":[self.sn], "time":[df_sor_cal_temp['时间戳'][0]], "sor":[str(sor_temp)], "sor_sigma":[str(sor_sigma)],
|
|
|
+ "soc":[str(list(df_soc))], "temp":[str(list(df_sor_cal_temp[celltemp_name].iloc[0]))], "delta_time":[delta_time_sor]})
|
|
|
+ df_sor_result = df_sor_result.append(df_sor_result_temp)
|
|
|
+ df_sor_result.reset_index(drop = True, inplace = True)
|
|
|
+ end_time = time.time()
|
|
|
+ print(end_time - start_time)
|
|
|
+ if not df_sor_result.empty:
|
|
|
+ return df_sor_result
|
|
|
+ else:
|
|
|
+ return pd.DataFrame()
|