|
@@ -0,0 +1,247 @@
|
|
|
|
+import pandas as pd
|
|
|
|
+import numpy as np
|
|
|
|
+import datetime
|
|
|
|
+import time, datetime
|
|
|
|
+import math
|
|
|
|
+import itertools
|
|
|
|
+from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
|
|
|
|
+
|
|
|
|
+class vol_sor_est:
|
|
|
|
+ def __init__(self,sn,celltype,df_bms): #参数初始化
|
|
|
|
+
|
|
|
|
+ self.sn=sn
|
|
|
|
+ self.celltype=celltype
|
|
|
|
+ self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
|
|
|
|
+ self.df_bms=pd.DataFrame(df_bms)
|
|
|
|
+ self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
|
|
|
|
+ self.packvolt=df_bms['总电压[V]']
|
|
|
|
+ self.bms_soc=df_bms['SOC[%]']
|
|
|
|
+ self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
|
|
|
|
+
|
|
|
|
+ self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
|
|
|
|
+ self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
|
|
|
|
+ self.bmssta = df_bms['充电状态']
|
|
|
|
+ #定义加权滤波函数..................................................................................................
|
|
|
|
+ def moving_average(interval, windowsize):
|
|
|
|
+ window = np.ones(int(windowsize)) / float(windowsize)
|
|
|
|
+ re = np.convolve(interval, window, 'same')
|
|
|
|
+ return re
|
|
|
|
+#.............................................内阻及电压估计............................................................................
|
|
|
|
+ def volsor_cal(self):
|
|
|
|
+ start_time = time.time()
|
|
|
|
+ dischrg_data_temp = self.df_bms.loc[(self.df_bms['充电状态'] == 3) & (self.df_bms['总电流[A]'] < -3) & (self.df_bms['SOC[%]'] > 60)]#放电数据
|
|
|
|
+ dischrg_data = dischrg_data_temp.reset_index(drop=True)
|
|
|
|
+ df_voltout_result = pd.DataFrame(columns = ['sn','time','volout_confr','volout_amplt'])
|
|
|
|
+ if not dischrg_data_temp.empty:
|
|
|
|
+ temp_rest_time = dischrg_data['时间戳']
|
|
|
|
+ rest_time = pd.to_datetime(temp_rest_time)
|
|
|
|
+ delta_time = (np.diff(rest_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数
|
|
|
|
+ pos = np.where(delta_time > 10)
|
|
|
|
+ pos_ful_tem = np.insert(pos, 0, 0)
|
|
|
|
+ pos_len = len(pos_ful_tem)
|
|
|
|
+ data_len = len(rest_time)
|
|
|
|
+ pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
|
|
|
|
+ splice_num = []
|
|
|
|
+ for item in range(0,len(pos_ful)-1):
|
|
|
|
+ splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
|
|
|
|
+ splice_num = np.insert(splice_num, 0, 0)
|
|
|
|
+ dischrg_data_temp['dischrgr_rest'] = splice_num
|
|
|
|
+ #---------------------------对分段数据使用RLS算法估计电压及内阻-------------------------------------------
|
|
|
|
+ def theta2RC(theta, dt):
|
|
|
|
+ #计算电池OCV及阻值
|
|
|
|
+ OCV = theta[0,0] / (1-theta[0,1])
|
|
|
|
+ R0 = (theta[0,3] - theta[0,2]) / (1 + theta[0,1])
|
|
|
|
+ R1 = -(theta[0,3] + theta[0,2]) / (1 - theta[0,1]) - R0
|
|
|
|
+ Tau = dt/2*(1 + theta[0,1]) / (1 - theta[0,1])
|
|
|
|
+ return [OCV,R0,R1,Tau]
|
|
|
|
+ cellvolt_name = self.cellvolt_list
|
|
|
|
+ dischrgr_check_data = dischrg_data_temp.drop(['GSM信号','故障等级','故障代码','开关状态','单体压差','绝缘电阻','总电压[V]','充电状态','单体压差'],axis=1,inplace=False)
|
|
|
|
+ dischrgr_check_data.fillna(value=0)
|
|
|
|
+ df_est_vol_ful = pd.DataFrame()
|
|
|
|
+ df_est_sor_ful = pd.DataFrame()
|
|
|
|
+ k = 0
|
|
|
|
+ for i in range(0,len(pos_ful)-1):#len(pos_ful)-1#每段放电数据计算
|
|
|
|
+ df_distest_data_temp = dischrgr_check_data.loc[dischrgr_check_data['dischrgr_rest'] == i]
|
|
|
|
+ df_distest_data = df_distest_data_temp.reset_index(drop = True)
|
|
|
|
+ df_distime = pd.to_datetime(df_distest_data['时间戳'])
|
|
|
|
+ df_disvolt = df_distest_data[cellvolt_name]/1000
|
|
|
|
+ df_discrnt = df_distest_data['总电流[A]']
|
|
|
|
+ df_dissoc = df_distest_data['SOC[%]']
|
|
|
|
+ df_est_vol = pd.DataFrame()
|
|
|
|
+ df_est_sor = pd.DataFrame()
|
|
|
|
+ if len(df_discrnt) > 120:
|
|
|
|
+ end = time.time()
|
|
|
|
+ print('第' + str(k) + '段数据' + str(df_distime[0]))
|
|
|
|
+ print(end - start_time)
|
|
|
|
+ for item in cellvolt_name:#每个电芯的循环计算
|
|
|
|
+ dt_temp = np.diff(df_distime)/pd.Timedelta(1, 'seconds')
|
|
|
|
+ data_len = len(dt_temp)#时间微分后的长度
|
|
|
|
+ dt = np.append(dt_temp, dt_temp[data_len-1])#计算时间间隔并补充
|
|
|
|
+ crnt = df_discrnt
|
|
|
|
+ Ut = df_disvolt[item]
|
|
|
|
+ unit_mat = np.mat(np.eye(4))
|
|
|
|
+
|
|
|
|
+ for j in range(data_len + 1):#每个电芯的实际计算过程
|
|
|
|
+ theta = [3.4, 0.448, -0.0108, 0.0106]
|
|
|
|
+ P = np.mat(10**6*np.eye(4))
|
|
|
|
+ lambd = 0.96
|
|
|
|
+ Err = [0]
|
|
|
|
+ Err2 = [0]*(data_len + 1)
|
|
|
|
+ OCV = [0]
|
|
|
|
+ R0 = [0]
|
|
|
|
+ R1 = [0]
|
|
|
|
+ Tau = [0]
|
|
|
|
+ Theta = np.mat(np.zeros([data_len + 1, 4]))
|
|
|
|
+ Theta[0,:] = theta
|
|
|
|
+ U1 = [0]
|
|
|
|
+ Vt = [0]
|
|
|
|
+ for m in range(1,data_len + 1):
|
|
|
|
+ phi = np.mat([1 , Ut[m], crnt[m], crnt[m-1]]).T #测量向量4×1的向量
|
|
|
|
+ K = np.dot(P, phi)/(lambd + np.dot(np.dot(phi.T, P),phi)) #增益的递推公式,为4×1的向量
|
|
|
|
+ P = np.dot((unit_mat - np.dot(K, phi.T)),P)/lambd #协方差矩阵的递归公式
|
|
|
|
+ Err.append(Ut[m] - np.dot(Theta[m-1,:], phi)) #误差
|
|
|
|
+ theta = Theta[m-1, :] + Err[m]*K.T #待估向量theta的递归公式
|
|
|
|
+ Theta[m,:] = theta #theta估计向量矩阵化
|
|
|
|
+ [ocv,r0,r1,tau] = theta2RC(Theta[m,:], dt[m])
|
|
|
|
+ Errtemp = []
|
|
|
|
+ for ss in range(0, min(m-1, 29) + 1):
|
|
|
|
+ ocv,r0,r1,tau = theta2RC(Theta[m - ss, :], dt[m])
|
|
|
|
+ U1.append(U1[m-1]*math.exp(-dt[m]/tau) + (1 - math.exp(-dt[m]/tau))*r1*crnt[m])
|
|
|
|
+ Vt.append(ocv - U1[m] - crnt[m]*r0)
|
|
|
|
+ Err2[m] = Ut[m] - Vt[m]
|
|
|
|
+ if abs(Err2[m] <= 0.05):
|
|
|
|
+ break
|
|
|
|
+ elif ss == 29:
|
|
|
|
+ id, Err2[m] = min(enumerate(Errtemp))
|
|
|
|
+ [ocv,r0,r1,tau] = theta2RC(Theta[m - id, :], dt[m])
|
|
|
|
+ else:
|
|
|
|
+ Errtemp.append(abs(Err2[m]))
|
|
|
|
+ OCV.append(ocv)
|
|
|
|
+ R0.append(r0)
|
|
|
|
+ R1.append(r1)
|
|
|
|
+ Tau.append(tau)
|
|
|
|
+ df_est_vol[item] = OCV
|
|
|
|
+ df_est_sor[item] = R0
|
|
|
|
+ df_est_vol['dischrgr_num'] = k
|
|
|
|
+ df_est_vol['时间戳'] = df_distest_data['时间戳']
|
|
|
|
+ df_est_sor['dischrgr_num'] = k
|
|
|
|
+ df_est_sor['时间戳'] = df_distest_data['时间戳']
|
|
|
|
+ k = k + 1
|
|
|
|
+ df_est_vol_ful = df_est_vol_ful.append(df_est_vol)
|
|
|
|
+ df_est_sor_ful = df_est_sor_ful.append(df_est_sor)
|
|
|
|
+ df_est_vol_ful.reset_index(drop = True)
|
|
|
|
+ df_est_sor_ful.reset_index(drop = True)
|
|
|
|
+ #---------------------------------------------------------统计内阻及电压估计状态-------------------------------------------------------
|
|
|
|
+ df_voltout_result = pd.DataFrame(columns = ['sn','time','volout_confr','volout_amplt'])
|
|
|
|
+ df_sortout_result = pd.DataFrame(columns = ['sn','time','sorout_confr','sorout_amplt'])
|
|
|
|
+ df_volsortout_result = pd.DataFrame(columns = ['sn','time','volsorout_confr','volsorout_amplt'])
|
|
|
|
+ df_delt_vol_total = pd.DataFrame()
|
|
|
|
+ df_delt_sor_total = pd.DataFrame()
|
|
|
|
+ m = 1
|
|
|
|
+ for i in range(0,k):#有多少段数据
|
|
|
|
+ df_ana_vol_temp = df_est_vol_ful.loc[df_est_vol_ful['dischrgr_num'] == i].reset_index(drop = True)#选取某段数据
|
|
|
|
+ df_ana_sor_temp = df_est_sor_ful.loc[df_est_sor_ful['dischrgr_num'] == i].reset_index(drop = True)#选取某段数据
|
|
|
|
+ df_vol_cho = df_ana_vol_temp[cellvolt_name[0]]#挑选某个电池的估计电压,判断估计范围是否正常
|
|
|
|
+ df_vol_logi = [(df_vol_cho > 2.5) & (df_vol_cho < 4.3)]
|
|
|
|
+ df_vol_arr = np.array(df_vol_logi).astype(int)
|
|
|
|
+ df_vol_cho_logi = df_vol_arr[0]
|
|
|
|
+ num_times = [(k, len(list(v))) for k, v in itertools.groupby(df_vol_cho_logi)]
|
|
|
|
+ num_times_len = len(num_times)
|
|
|
|
+ fitter_len = num_times[num_times_len - 1][1]#拟合好后的数据长度
|
|
|
|
+ ini_len = len(df_vol_cho)#初始数据长度
|
|
|
|
+ if (num_times[num_times_len - 1][0] == 1) & (num_times[num_times_len - 1][1] > 0.6*fitter_len):
|
|
|
|
+ df_delt_vol_accum = pd.DataFrame()
|
|
|
|
+ df_delt_sor_accum = pd.DataFrame()
|
|
|
|
+ df_delt_vol = pd.DataFrame()
|
|
|
|
+ df_delt_sor = pd.DataFrame()
|
|
|
|
+ volout_confr = []#偏离与否
|
|
|
|
+ volout_amplt = []#偏离幅度
|
|
|
|
+ sorout_confr = []#偏离与否
|
|
|
|
+ sorout_amplt = []#偏离幅度
|
|
|
|
+ volsorout_confr = []
|
|
|
|
+ volsorout_amplt = []
|
|
|
|
+ df_ana_vol_spl = df_ana_vol_temp[(ini_len-fitter_len):(ini_len-1)]#筛选出拟合至合理范围的电压及内阻
|
|
|
|
+ df_ana_sor_spl = df_ana_sor_temp[(ini_len-fitter_len):(ini_len-1)]
|
|
|
|
+ df_ana_vol = df_ana_vol_spl[cellvolt_name]#筛选出的拟合电压在合理范围内的数据
|
|
|
|
+ df_ana_sor = df_ana_sor_spl[cellvolt_name]
|
|
|
|
+ df_ana_vol_min = np.min(df_ana_vol, axis = 1)
|
|
|
|
+ df_ana_vol_max = np.max(df_ana_vol, axis = 1)
|
|
|
|
+ df_ana_sor_max = np.max(df_ana_sor, axis = 1)
|
|
|
|
+ df_ana_sor_min = np.min(df_ana_sor, axis = 1)
|
|
|
|
+ df_ana_vol_mean = (np.sum(df_ana_vol, axis = 1) - df_ana_vol_min - df_ana_vol_max)/(df_ana_vol.shape[1] - 2)#估计电压均值
|
|
|
|
+ df_ana_sor_mean = (np.sum(df_ana_sor, axis = 1) - df_ana_sor_min - df_ana_sor_max)/(df_ana_sor.shape[1] - 2)#估计内阻均值
|
|
|
|
+ for item in cellvolt_name:
|
|
|
|
+ df_delt_vol[item] = df_ana_vol[item] - df_ana_vol_mean#计算各电芯拟合电压与同时刻电压均值的差值
|
|
|
|
+ df_delt_sor[item] = df_ana_sor[item] - df_ana_sor_mean
|
|
|
|
+ df_delt_vol_min = np.min(df_delt_vol, axis = 1)
|
|
|
|
+ df_delt_vol_max = np.max(df_delt_vol, axis = 1)
|
|
|
|
+ df_delt_sor_min = np.min(df_delt_sor, axis = 1)
|
|
|
|
+ df_delt_sor_max = np.max(df_delt_sor, axis = 1)
|
|
|
|
+ df_delt_vol_mean = (np.sum(df_delt_vol, axis = 1) - df_delt_vol_min - df_delt_vol_max)/(df_delt_vol.shape[1] - 2)#估计电压差值的均值
|
|
|
|
+ df_delt_sor_mean = (np.sum(df_delt_sor, axis = 1) - df_delt_sor_min - df_delt_sor_max)/(df_delt_sor.shape[1] - 2)#估计电压差值的均值
|
|
|
|
+ df_delt_vol_std = np.std(df_delt_vol, axis = 1)
|
|
|
|
+ df_delt_sor_std = np.std(df_delt_sor, axis = 1)
|
|
|
|
+ df_delt_data_len = len(df_delt_vol_std)
|
|
|
|
+ for cell_num in cellvolt_name:
|
|
|
|
+ df_delt_vol_cal = (df_delt_vol[cell_num] - df_delt_vol_mean)/df_delt_vol_std#计算每个电压与均值的偏差
|
|
|
|
+ df_delt_sor_cal = (df_delt_sor[cell_num] - df_delt_sor_mean)/df_delt_sor_std#计算每个内阻与均值的偏差
|
|
|
|
+ df_delt_vol_confr = abs(df_delt_vol_cal) > 3#电压估计值与均值差值大于3sigma
|
|
|
|
+ df_delt_sor_confr = abs(df_delt_sor_cal) > 3#电压估计值与均值差值大于3sigma
|
|
|
|
+ df_delt_vol_confr_num = np.sum(df_delt_vol_confr!=0)#统计电压离群度的非零数
|
|
|
|
+ df_delt_sor_confr_num = np.sum(df_delt_sor_confr!=0)#统计电阻离群度的非零数
|
|
|
|
+ df_delt_vol_sor_cal = df_delt_vol_cal*df_delt_sor_cal
|
|
|
|
+ df_delt_vol_sor_confr = abs(df_delt_vol_sor_cal) > 9#电压估计值内阻估计值乘积与均值差值大于3sigma
|
|
|
|
+ df_delt_vol_sor_confr_num = np.sum(df_delt_vol_sor_confr!=0)#统计非零数
|
|
|
|
+ df_length_limit = df_delt_data_len/10
|
|
|
|
+ df_delt_vol_accum = pd.concat([df_delt_vol_accum, df_delt_vol_cal], axis = 1)
|
|
|
|
+ df_delt_sor_accum = pd.concat([df_delt_sor_accum, df_delt_sor_cal], axis = 1)
|
|
|
|
+ if df_delt_vol_confr_num > df_length_limit:
|
|
|
|
+ volout_confr.append(1)#1为偏离,0为非偏离
|
|
|
|
+ volout_amplt.append(max(abs(df_delt_vol_cal)))#偏离度
|
|
|
|
+ else:
|
|
|
|
+ volout_confr.append(0)#1为偏离,0为非偏离
|
|
|
|
+ volout_amplt.append(0)#偏离度
|
|
|
|
+ if df_delt_sor_confr_num > df_length_limit:
|
|
|
|
+ sorout_confr.append(1)#1为偏离,0为非偏离
|
|
|
|
+ sorout_amplt.append(max(abs(df_delt_vol_cal)))#偏离度
|
|
|
|
+ else:
|
|
|
|
+ sorout_confr.append(0)#1为偏离,0为非偏离
|
|
|
|
+ sorout_amplt.append(0)#偏离度
|
|
|
|
+ if df_delt_vol_sor_confr_num > df_length_limit:
|
|
|
|
+ volsorout_confr.append(1)
|
|
|
|
+ volsorout_amplt.append(max(abs(df_delt_vol_sor_cal)))#偏离度
|
|
|
|
+ else:
|
|
|
|
+ volsorout_confr.append(0)
|
|
|
|
+ volsorout_amplt.append(0)#偏离度
|
|
|
|
+ df_delt_vol_accum.columns = cellvolt_name
|
|
|
|
+ df_delt_sor_accum.columns = cellvolt_name
|
|
|
|
+ df_delt_vol_accum['dischrgr_num'] = m
|
|
|
|
+ df_delt_sor_accum['dischrgr_num'] = m
|
|
|
|
+ df_delt_vol_accum['时间戳'] = list(df_ana_vol_spl['时间戳'])
|
|
|
|
+ df_delt_sor_accum['时间戳'] = list(df_ana_vol_spl['时间戳'])
|
|
|
|
+ m = m + 1
|
|
|
|
+ if any(volout_confr):
|
|
|
|
+ df_volout_temp = pd.DataFrame({"sn":[self.sn], "time":[df_ana_vol_temp['时间戳'][0]], "volout_confr":[str(volout_confr)], "volout_amplt":[str(volout_amplt)]})
|
|
|
|
+ df_voltout_result = df_voltout_result.append(df_volout_temp)
|
|
|
|
+ df_voltout_result = df_voltout_result.reset_index(drop = True)
|
|
|
|
+ df_voltout_result.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
|
|
|
|
+ # df_voltout_result.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\02算法检测\判断结果\电压偏离.csv',index=False,encoding='GB18030')
|
|
|
|
+ if any(sorout_confr):
|
|
|
|
+ df_sorout_temp = pd.DataFrame({"sn":[self.sn], "time":[df_ana_vol_temp['时间戳'][0]], "sorout_confr":[str(sorout_confr)], "sorout_amplt":[str(sorout_amplt)]})
|
|
|
|
+ df_sortout_result = df_sortout_result.append(df_sorout_temp)
|
|
|
|
+ df_sortout_result = df_sortout_result.reset_index(drop = True)
|
|
|
|
+ df_sortout_result.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
|
|
|
|
+ # df_sortout_result.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\02算法检测\判断结果\内阻偏离.csv',index=False,encoding='GB18030')
|
|
|
|
+ if any(volsorout_confr):
|
|
|
|
+ df_volsorout_temp = pd.DataFrame({"sn":[self.sn], "time":[df_ana_vol_temp['时间戳'][0]], "volsorout_confr":[str(volsorout_confr)], "volsorout_amplt":[str(volsorout_amplt)]})
|
|
|
|
+ df_volsortout_result = df_volsortout_result.append(df_volsorout_temp)
|
|
|
|
+ df_volsortout_result = df_volsortout_result.reset_index(drop = True)
|
|
|
|
+ df_volsortout_result.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
|
|
|
|
+ # df_volsortout_result.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\02算法检测\判断结果\电压内阻偏离.csv',index=False,encoding='GB18030')
|
|
|
|
+ end_time = time.time()
|
|
|
|
+ print(end_time - start_time)
|
|
|
|
+ if not df_voltout_result.empty:
|
|
|
|
+ return [df_sortout_result, df_voltout_result, df_volsortout_result]
|
|
|
|
+ else:
|
|
|
|
+ return [pd.DataFrame(), pd.DataFrame(), pd.DataFrame()]
|
|
|
|
+ #----------------------------------------------------对估计电压及内阻作图-----------------------------------------------------------------
|