import pandas as pd import numpy as np import peakbotm from scipy.signal import savgol_filter as svflter from scipy import interpolate import os import joblib class cell_chracter: def __init__(self, df_bms, df_param, cellvolt_name, celltemp_name): #参数初始化 self.df_bms=pd.DataFrame(df_bms) self.bmstime= pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S') self.cellvolt_list=cellvolt_name self.celltemp_name=celltemp_name self.capty = float(df_param['capacity']) np_chrg_ocv = df_param['charge_ocv_v'] np_chrg_soc = df_param['charge_ocv_soc'] np_dschrg_ocv = df_param['discharge_ocv_v'] np_dschrg_soc = df_param['discharge_ocv_soc'] np_ocv = (np.array(np_chrg_ocv) + np.array(np_dschrg_ocv))/2 np_soc = (np.array(np_chrg_soc) + np.array(np_dschrg_soc))/2 self.LookTab_OCV = np_ocv self.LookTab_SOC = np_soc abs_model_file=f"{os.path.dirname(os.path.abspath(__file__))}/model/V_1_0_0/" self.Dnn_predict = joblib.load(abs_model_file + 'Dnnmodel.pkl') self.inputnorm_x = joblib.load(abs_model_file + 'normorlized_x') self.inputnorm_y = joblib.load(abs_model_file + 'normorlized_y') # self.Dnn_predict = dnn_predict # self.inputnorm_x = input_norm_x # self.inputnorm_y = input_norm_y def states_cal(self): def soc_cal_ncm(df_data_temp): data_ori_temp = df_data_temp.copy() data_ori_delNan = data_ori_temp.dropna(axis = 0, subset = ["pack_crnt", "pack_soc"])#删除电压、电流、bms状态值为Nan值所在行"CellVoltage", data_ori_delnone = data_ori_delNan.drop(data_ori_delNan[(data_ori_delNan['pack_soc'] == "") | (data_ori_delNan['pack_crnt'] == "")].index)#删除电压、电流、bms状态值为空值所在行(data_ori_delNan['CellVoltage'] == "") | data_ori = data_ori_delnone.fillna(method ='backfill', inplace = False , axis = 0) celltemp_name = self.celltemp_name#电芯温度数量 #---------------------------------充电过程判断-------------------------------------- def clean_dead_value(series, num_dead_thresh): slide_list = [series.index[0]] slide_list_all = [] for i in range(series.index[0],series.index[-1]): j = i + 1 diff = series[j] - series[i] if diff == 0: slide_list.append(j) else: slide_list.clear() slide_list.append(j) if len(slide_list) >= num_dead_thresh: target_list = slide_list.copy() slide_list_all.append(target_list) index= [] # 将找到的满足条件的index合并 for i in range(len(slide_list_all) - 1): if set(slide_list_all[i]) < set(slide_list_all[i + 1]): index.append(i) m = {i: element for i, element in enumerate(slide_list_all)} [m.pop(i) for i in index] return list(m.values()) df_data_chrg_chek = data_ori.copy() df_data_chrg_chek.reset_index(drop = True, inplace = True) df_data_chrg_chek['crnt_flg'] = 0 df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] > 0, 'crnt_flg'] = 1 df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] < 0, 'crnt_flg'] = -1 df_data_chrg_chek['sts_flg'] = 2 df_sts_chrg = pd.DataFrame(columns=list(df_data_chrg_chek.columns)) df_crnt_flg = df_data_chrg_chek['crnt_flg'] num_dead_thresh = 15#判断连续多少个数据为阈值 indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行 rest_num = len(indexs_to_delelte) if rest_num > 0:#仅有一个连续数据时 for splice_item in range(0, rest_num):#rest_num df_data_temp = df_data_chrg_chek.iloc[indexs_to_delelte[splice_item][0]:indexs_to_delelte[splice_item][-1]]#获得电流连续数据 df_data_temp.reset_index(drop = True, inplace = True) df_soc_temp = df_data_temp['pack_soc'] delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3) df_time_temp = pd.to_datetime(df_data_temp['time']) delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours') if all(df_data_temp['crnt_flg'] == 0):#静置判断 if delta_time > 0.17: df_data_temp['sts_flg'] = 0 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) elif all(df_data_temp['crnt_flg'] == -1):#充电判断 if (delta_time >= 0.03) and delta_soc >= 30: df_data_temp['sts_flg'] = 1 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) elif all(df_data_temp['crnt_flg'] == 1):#充电判断 if (delta_time >= 0.03) and delta_soc >= 30: df_data_temp['sts_flg'] = 1 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) df_dschrg = pd.concat([df_data_chrg_chek, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'pack_soc'], keep = False) data_temp = pd.concat([df_dschrg, df_sts_chrg]) data_temp.sort_values(by = 'time', inplace = True) data_temp.reset_index(drop = True, inplace = True) df_chrg_temp = data_temp.loc[data_temp['sts_flg'] == 1] df_chrg_temp.reset_index(inplace = True, drop = True) SOC_pre_cal = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method', 'battery_type']) data_temp_rest = data_temp[abs(data_temp['pack_crnt']) > 0.02*self.capty] np_dschrg_time = pd.to_datetime(data_temp_rest['time']) dff_dschrg_time = np.diff(np_dschrg_time)/pd.Timedelta(1, 'min')#放电过程前后时间差 rest_dschrg_pos = np.where(dff_dschrg_time > 120)#筛选前后时间差大于30min的位置及时间点,判断为两次放电 if (len(rest_dschrg_pos[0]) > 0): df_celvlt = data_temp_rest[self.cellvolt_list] df_celvlt_min = df_celvlt.min(axis = 1) df_celvlt_max = df_celvlt.max(axis = 1) LookTab_OCV = self.LookTab_OCV LookTab_SOC = self.LookTab_SOC for item_rest in rest_dschrg_pos[0]: if (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) < 0.03*self.capty) and (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) > 0.1): min_volt = min(LookTab_OCV) max_volt = max(LookTab_OCV) soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数 df_celvlt_min[df_celvlt_min < min_volt] = min_volt + 0.0005 df_celvlt_max[df_celvlt_max > max_volt] = max_volt - 0.0005 cellvltmin = df_celvlt_min[item_rest + 1] cellvltmax = df_celvlt_max[item_rest + 1] SOC_now = data_temp_rest['pack_soc'].iloc[item_rest + 1] SOC_min = round(soc_interplt(cellvltmin)[0], 1) SOC_max = round(soc_interplt(cellvltmax)[0], 1) if SOC_min < 30: SOC_cal = 0.8*SOC_min + 0.2*SOC_max SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 1 elif SOC_max > 90: SOC_cal = 0.8*SOC_max + 0.2*SOC_min SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 2 else: SOC_cal = 0.5*(SOC_max + SOC_min) SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 3 if SOC_del > 5: time_pre = data_temp_rest['time'].iloc[item_rest + 1] odo_now = data_temp_rest['odo'].iloc[item_rest + 1] SOC_pre_cal.loc[len(SOC_pre_cal)] = [data_temp_rest['sn'].iloc[0], time_pre, SOC_now, SOC_real, SOC_del, odo_now, method, 'NCM'] return SOC_pre_cal def soc_cal_lfp(df_data_temp): data_ori_temp = df_data_temp.copy() data_ori_delNan = data_ori_temp.dropna(axis = 0, subset = ["pack_crnt", "pack_soc"])#删除电压、电流、bms状态值为Nan值所在行"CellVoltage", data_ori_delnone = data_ori_delNan.drop(data_ori_delNan[(data_ori_delNan['pack_soc'] == "") | (data_ori_delNan['pack_crnt'] == "")].index)#删除电压、电流、bms状态值为空值所在行(data_ori_delNan['CellVoltage'] == "") | data_ori = data_ori_delnone.fillna(method ='backfill', inplace = False , axis = 0) itemsn = data_ori['sn'].iloc[0] def clean_dead_value(series, num_dead_thresh): slide_list = [series.index[0]] slide_list_all = [] for i in range(series.index[0],series.index[-1]): j = i + 1 diff = series[j] - series[i] if diff == 0: slide_list.append(j) else: slide_list.clear() slide_list.append(j) if len(slide_list) >= num_dead_thresh: target_list = slide_list.copy() slide_list_all.append(target_list) index= [] # 将找到的满足条件的index合并 for i in range(len(slide_list_all) - 1): if set(slide_list_all[i]) < set(slide_list_all[i + 1]): index.append(i) m = {i: element for i, element in enumerate(slide_list_all)} [m.pop(i) for i in index] return list(m.values()) df_data_chrg_chek = data_ori.copy() df_data_chrg_chek.reset_index(drop = True, inplace = True) df_data_chrg_chek['crnt_flg'] = 0 df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] > 0, 'crnt_flg'] = 1 df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] < 0, 'crnt_flg'] = -1 df_data_chrg_chek['sts_flg'] = 2 df_sts_chrg = pd.DataFrame(columns=list(df_data_chrg_chek.columns)) df_crnt_flg = df_data_chrg_chek['crnt_flg'] num_dead_thresh = 15#判断连续多少个数据为阈值 indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行 rest_num = len(indexs_to_delelte) if rest_num > 0:#仅有一个连续数据时 for splice_item in range(0, rest_num):#rest_num df_data_temp = df_data_chrg_chek.iloc[indexs_to_delelte[splice_item][0]:indexs_to_delelte[splice_item][-1]]#获得电流连续数据 df_data_temp.reset_index(drop = True, inplace = True) if len(df_data_temp) > 10: df_soc_temp = df_data_temp['pack_soc'] delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3) df_time_temp = pd.to_datetime(df_data_temp['time']) delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours') if all(df_data_temp['crnt_flg'] == 0):#静置判断 if delta_time > 0.17: df_data_temp['sts_flg'] = 0 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) elif all(df_data_temp['crnt_flg'] == -1):#充电判断 if (delta_time >= 0.03) and delta_soc >= 30: df_data_temp['sts_flg'] = 1 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) elif all(df_data_temp['crnt_flg'] == 1):#充电判断 if (delta_time >= 0.03) and delta_soc >= 30: df_data_temp['sts_flg'] = 1 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) df_dschrg = pd.concat([df_data_chrg_chek, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'pack_soc'], keep = False) data_temp = pd.concat([df_dschrg, df_sts_chrg]) data_temp.sort_values(by = 'time', inplace = True) data_temp.reset_index(inplace = True, drop = True) celltemp_name = self.celltemp_name#电芯温度数量 #---------------------------------充电过程判断-------------------------------------- df_chrg_temp = data_temp.loc[data_temp['sts_flg'] == 1] df_chrg_temp.reset_index(inplace = True, drop = True) df_chracter_ofc = pd.DataFrame() SOC_rlt_final = pd.DataFrame() SOC_pre_cal = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method']) SOC_pre_cal_dnn = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method']) data_temp_rest = data_temp[abs(data_temp['pack_crnt']) > 0.02*self.capty] np_dschrg_time = pd.to_datetime(data_temp_rest['time']) dff_dschrg_time = np.diff(np_dschrg_time)/pd.Timedelta(1, 'min')#放电过程前后时间差 rest_dschrg_pos = np.where(dff_dschrg_time > 120)#筛选前后时间差大于30min的位置及时间点,判断为两次放电 if (len(rest_dschrg_pos[0]) > 0): df_celvlt = data_temp_rest[self.cellvolt_list] df_celvlt_min = df_celvlt.min(axis = 1) df_celvlt_max = df_celvlt.max(axis = 1) LookTab_OCV = self.LookTab_OCV LookTab_SOC = self.LookTab_SOC ocv_interplt = interpolate.interp1d(LookTab_SOC, LookTab_OCV, kind = 'linear')#OCV-SOC插值函数 ocv_pre_val = ocv_interplt(25) for item_rest in rest_dschrg_pos[0]: if (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) < 0.02*self.capty) and (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) > 0.3) and\ (df_celvlt_min.iloc[item_rest + 1] < ocv_pre_val) and (df_celvlt_min.iloc[item_rest + 1] > 2): min_volt = min(LookTab_OCV) soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数 df_celvlt_min[df_celvlt_min < min_volt] = min_volt + 0.0005 cellvltmin = df_celvlt_min[item_rest + 1] cellvltmax = df_celvlt_max[item_rest + 1] SOC_now = data_temp_rest['pack_soc'].iloc[item_rest + 1] SOC_min = round(soc_interplt(cellvltmin)[0], 1) SOC_max = round(soc_interplt(cellvltmax)[0], 1) if SOC_min < 30: SOC_cal = 0.8*SOC_min + 0.2*SOC_max SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 2 elif SOC_max > 90: SOC_cal = 0.8*SOC_max + 0.2*SOC_min SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 2 else: SOC_cal = 0.5*(SOC_max + SOC_min) SOC_del = abs(SOC_cal - SOC_now) SOC_real = str([SOC_max, SOC_min, SOC_cal]) method = 0 if SOC_del > 5: time_pre = data_temp_rest['time'].iloc[item_rest + 1] odo_now = data_temp_rest['odo'].iloc[item_rest + 1] SOC_pre_cal.loc[len(SOC_pre_cal)] = [itemsn, time_pre, SOC_now, SOC_real, SOC_del, odo_now, method] #------------------------------------DNN方法估计SOC if not df_chrg_temp.empty: para_lvbo = 7 chrgr_time = pd.to_datetime(df_chrg_temp['time']) delta_time = (np.diff(chrgr_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数 pos = np.where(delta_time > 10)#充电数据分段,大于10min时,认为是两个充电过程 dvsocchrcter = [] splice_num = [] if len(pos[0]) >= 1: pos_ful_tem = np.insert(pos, 0, 0) pos_len = len(pos_ful_tem) data_len = len(chrgr_time) pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1) for item in range(0,len(pos_ful)-1): splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item])) splice_num = np.insert(splice_num, 0, 0) else: splice_num = np.zeros(len(chrgr_time)) pos_ful = np.array([0]) if len(splice_num) > 0: df_chrg_temp['chrgr_rest'] = splice_num chrgr_splice_num = np.unique(df_chrg_temp['chrgr_rest'])#判断有几段充电数据 if len(chrgr_splice_num) > 0:#特征参数中增加一列,电压微分中波峰到满充位置电量,进行SOH计算 for item_chrgr in chrgr_splice_num: df_chrgr_splice_data = df_chrg_temp.loc[(df_chrg_temp['chrgr_rest'] == item_chrgr)] df_chrgr_splice_data.reset_index(inplace = True, drop = True) df_soc_temp = df_chrgr_splice_data['pack_soc'] df_time_temp = pd.to_datetime(df_chrgr_splice_data['time']) delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3) if (df_soc_temp.iloc[0] < 40) and (len(df_soc_temp) > 30): delta_time = round((df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours'), 3) rate_chrg = round(delta_soc/(100*delta_time), 2) df_celvlt_max = df_chrgr_splice_data['cell_volt_max'] df_crnt = abs(df_chrgr_splice_data['pack_crnt']) df_soc = df_chrgr_splice_data['pack_soc'] if (df_celvlt_max.iloc[-1] > 3.55) and (df_crnt.iloc[-1] < 0.1*self.capty) and (df_soc.iloc[-1] < 95): SOC_now = df_soc.iloc[-1] SOC_real = 100 SOC_del = 100 - SOC_now method = 3 time_pre = df_chrgr_splice_data['time'].iloc[-1] odo_now = df_chrgr_splice_data['odo'].iloc[0] if SOC_del > 5: SOC_pre_cal.loc[len(SOC_pre_cal)] = [itemsn, time_pre, SOC_now, SOC_pre_cal, SOC_del, odo_now, method] df_celvltdff = df_celvlt_max.diff() df_celvltdff.fillna(method ='bfill', inplace = True , axis = 0) df_celvltdff.reset_index(inplace = True, drop = True) df_celvltdffsmth = svflter(df_celvltdff, para_lvbo, 3) df_time_temp = pd.to_datetime(df_chrgr_splice_data['time']) np_dff_time = np.diff(df_time_temp)/pd.Timedelta(1, 'hours') np_dff_time_new = np.append(np_dff_time, 0) np_chrg_ah = np.multiply(np.array(np_dff_time_new),-1*np.array(df_chrgr_splice_data['pack_crnt'])) chrg_ahlst = [sum(np_chrg_ah[:i]) for i in range(1, len(np_chrg_ah)+1)] df_chrg_ah = pd.DataFrame(chrg_ahlst) df_chrg_ah.columns = ['chrgah'] df_data_tem = df_chrgr_splice_data[['time','sn', 'pack_crnt','pack_volt','pack_soc', 'odo', 'cell_volt_max', 'cell_temp_max', 'cell_temp_min']] df_data = pd.concat([df_data_tem, df_chrg_ah], axis = 1) df_data.rename(columns = {'cell_temp_max':'max_T', 'cell_temp_min':'min_T', 'odo':'odo', 'cell_volt_max':'frst'}, inplace = True) df_data.drop(df_data[(df_data[['max_T', 'min_T']] < -30).any(axis = 1) | (df_data[['max_T', 'min_T']] > 125).any(axis = 1)].index, inplace = True) df_data.drop(df_data[(df_data['frst'] < 2) | (df_data['frst'] > 5)].index, inplace = True) df_data.reset_index(drop = True, inplace = True) datalen = len(df_data) diff_vlt = df_data['frst'].diff() # 使用 cumsum() 函数标记下降区域 down_regions = (diff_vlt >= 0).cumsum() # 使用 groupby() 和 transform() 函数计算每个区域的长度 region_sizes = down_regions.groupby(down_regions).transform('size') # 使用 where() 函数删除连续3次以上下降位置的数据 data_series_filtered = df_data.where(region_sizes < 10) nan_elements = data_series_filtered.isna() # 使用 np.where() 函数查找所有 NaN 元素的位置 nan_indices = np.where(nan_elements == True)[0] if len(nan_indices)>0: last_num = nan_indices[-1] else: last_num = 0 df_data_real = df_data.iloc[last_num:] df_data_real.reset_index(drop= True, inplace=True) df_dataana = df_data_real.loc[(df_data_real['pack_soc'] > 30) & (df_data_real['pack_soc'] < 80)] df_dataana.reset_index(drop = True, inplace = True) if (len(df_dataana) > 20):# and (rate_chrg <= 0.95) def dats_interp(input1, input2, datalen): cap_frst = list(np.linspace(input1.iloc[0], input1.iloc[-1], datalen, endpoint = False))#len(df_dataana['chrgah']) fuc_inter = interpolate.interp1d(input1, input2, kind = 'nearest')#定义差值函数 vlt_splce_temp = list(fuc_inter(cap_frst)) return vlt_splce_temp crntabs = list(abs(df_dataana['pack_crnt'])) df_dataana['abscrnt'] = crntabs crntnum = df_dataana.loc[:,'abscrnt'].value_counts() #统计电芯数量这一列每个元素出现的个数 crntmaxNums = int(crntnum.idxmax()) df_data_cal = df_dataana.loc[(abs(df_dataana['pack_crnt']) > crntmaxNums-4) & (abs(df_dataana['pack_crnt']) < crntmaxNums+4)] df_data_cal = df_data_cal.iloc[3:-2] df_data_cal.reset_index(drop= True, inplace=True) if (len(df_data_cal) > 10):# and (all(abs(dff_soc) < 2))and (crntmaxNums/self.capty < 0.95) rate_end = round(abs(df_data['pack_crnt'].iloc[-1])/self.capty, 3) df_data_cal['frst'] = svflter(df_data_cal['frst'], para_lvbo, 3) split_len = int(1.2*(df_data_cal['pack_soc'].iloc[-1] - df_data_cal['pack_soc'].iloc[0])) if split_len > 10: cap_frst = list(np.linspace(df_data_cal['chrgah'].iloc[0], df_data_cal['chrgah'].iloc[-1], split_len, endpoint = False))#len(df_dataana['chrgah']) vlt_splce_temp = dats_interp(df_data_cal['chrgah'], df_data_cal['frst'], split_len) soc_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_soc'], split_len) crnt_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_crnt'], split_len) maxt_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['max_T'], split_len) mint_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['min_T'], split_len) odo_splt = [df_data_cal['odo'].iloc[0]]*split_len time_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['time'], split_len) pkvlt_splt_temp = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_volt'], split_len) pkvlt_splt = svflter(pkvlt_splt_temp, para_lvbo, 3) vlt_splce = svflter(vlt_splce_temp, para_lvbo, 3) np_dff_vlt = np.diff(vlt_splce) np_dff_cap = np.diff(cap_frst) np_dff_dv_dq_temp = np.divide(np_dff_vlt, np_dff_cap) np_dff_dv_dq = np.insert(np_dff_dv_dq_temp, 0, np_dff_dv_dq_temp[0]) df_dvdq_data = pd.DataFrame() df_dvdq_data['pack_soc'] = soc_splt df_dvdq_data['pack_crnt'] = crnt_splt df_dvdq_data['frst'] = vlt_splce df_dvdq_data['max_T'] = maxt_splt df_dvdq_data['min_T'] = mint_splt df_dvdq_data['odo'] = odo_splt df_dvdq_data['chrgah'] = cap_frst df_dvdq_data['pack_volt'] = pkvlt_splt df_dvdq_data['time'] = pd.to_datetime(time_splt)#, utc=True, unit='ms').tz_convert('Asia/Shanghai') df_dvdq_data['dffsoc'] = df_dvdq_data['pack_soc'].diff() df_dvdq_data['dffah'] = df_dvdq_data['chrgah'].diff() df_dvdq_data['dv_dq'] = np_dff_dv_dq df_dvdq_data['fr_vlt_dvdq'] = vlt_splce#电压 df_dvdq_data.fillna(method = 'bfill', inplace = True , axis = 0) delta_soc = df_dvdq_data['pack_soc'].iloc[-1] - df_dvdq_data['pack_soc'].iloc[0] if all(df_dvdq_data['dffsoc'] < 3) and delta_soc > 10: df_dvdq_data['df_dq_lvb'] = svflter(df_dvdq_data['dv_dq'], para_lvbo, 3, mode = 'nearest')# df_data = df_dvdq_data.copy()#loc[(df_dataana['pack_soc'] > 30) & (df_dataana['pack_soc'] < 70)] df_data.reset_index(drop = True, inplace = True) dvsocchrcter = [] if len(df_data) > 10: cellvltname = ['fr_vlt_dvdq']#, 'send', 'thrd' cellvltahname = ['df_dq_lvb']#, 'dffsendvlt', 'dffthdvlt' maxsoc = 0 for itemnum in range(0, 1): snchrcter = [] np_vlt_ori = np.array(df_data[cellvltahname[itemnum]])#dffminvlt np_vltori_smth = svflter(np_vlt_ori, para_lvbo, 3) rcoumax, peak_indexmax, rcoumin, peak_indexmin = peakbotm.find_peak_triangleNum(np_vltori_smth, 7)#, rcoumin, peak_indexmin if (len(peak_indexmax) > 0) and (len(peak_indexmin) > 0): peak_fst = np_vltori_smth[peak_indexmax[0]] botm_fst = np_vltori_smth[peak_indexmin[0]] pkbtmratfst = botm_fst/peak_fst else: peak_fst = -1 if (peak_fst>0):#and (botm_fst>0) and (abs(pkbtmratfst) >= 0.2) and (abs(pkbtmratfst) <= 0.8) maxindex = peak_indexmax[0] maxsoc = np.array(df_data['pack_soc'])[maxindex] snchrcter.append(itemsn) snchrcter.append(np.array(df_data['time'])[maxindex])#df_data.loc[0, 'time'] snchrcter.append(np.array(df_data[cellvltname[itemnum]])[maxindex])#电压 snchrcter.append(maxsoc) snchrcter.append(pkbtmratfst) snchrcter.append(np.array(df_data['max_T'])[maxindex])#最大电芯温度 snchrcter.append(np.array(df_data['min_T'])[maxindex])#最大电芯温度 snchrcter.append(round(np.array(df_data['pack_crnt'])[maxindex]/self.capty, 3))#倍率 snchrcter.append(df_data['odo'].iloc[0])#里程 snchrcter.append(np.array(df_data['pack_volt'])[maxindex])#里程 dvsocchrcter.append(snchrcter) df_chracter_ofc= pd.DataFrame(dvsocchrcter)#各电芯的电压微分特征值 peak_pos = abs(maxsoc - df_dvdq_data['pack_soc'].iloc[0]) if (not df_chracter_ofc.empty) and (peak_pos > 3): setcolumns = ['sn', 'time', 'frstvlt', 'soc_ori', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate', 'odo', 'pack_vlt'] df_chracter_ofc.columns = setcolumns df_chracter_ofc['rate'] = np.abs(df_chracter_ofc['rate']) else: df_chracter_ofc = pd.DataFrame() if (not df_chracter_ofc.empty): DataArray = df_chracter_ofc.values Y = DataArray[:, 3]#SOC值 X = DataArray[:, [2,5,6,7,8,9]] #电压、峰谷比,温度,rate,odo4, X = np.array(X)#转化为array,自变量 Y = np.array(Y)#转化为array,因变量soc paraminput_x = self.inputnorm_x.fit_transform(X) Soc_ori = np.array(Y) SOC_pre_dnn_ori = self.Dnn_predict.predict(paraminput_x) SOC_pre = self.inputnorm_y.inverse_transform(SOC_pre_dnn_ori.reshape(-1, 1)).reshape(-1) SOC_error = (SOC_pre - Soc_ori) SOC_error_abs = abs(SOC_error) # if max(SOC_error_abs) > 5: df_chracter_ofc['soc_pre'] = SOC_pre df_chracter_ofc['soc_error'] = SOC_error.astype('float') df_chracter_ofc['soc_pre'] = df_chracter_ofc['soc_pre'].round(3) df_chracter_ofc['soc_error'] = df_chracter_ofc['soc_error'].round(3) df_soc_rlt = df_chracter_ofc[(df_chracter_ofc['soc_pre'] < 60) & (df_chracter_ofc['soc_pre'] > 35) & (df_chracter_ofc['soc_ori'] < 60) & (df_chracter_ofc['soc_ori'] > 35)] df_soc_rlt.reset_index(drop = True, inplace = True) if not df_soc_rlt.empty: SOC_rlt = df_soc_rlt[['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'frstvlt', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate', 'odo']] SOC_rlt['soc_pre'] = SOC_rlt['soc_pre'].round(1) SOC_rlt['soc_error'] = SOC_rlt['soc_error'].round(1) # SOC_clms = ['frstsoc', 'SOC_pre', 'SOC_error', 'frstvlt', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate'] # df_cellname = SOC_rlt.groupby(['time'])['cellname'].apply(list) # cellname = df_cellname.values[0] # namenum = 'cellnum:' + str(list(map(int, [i[12:] for i in cellname]))) # SOC_rltlst = [] # SOC_rltlst.append(self.sn) # SOC_rltlst.append(namenum) # SOC_rltlst.append(SOC_rlt['time'].iloc[0]) # for itemsoc in SOC_clms: # df_cellname = SOC_rlt.groupby(['time'])[itemsoc].apply(list) # cellname = df_cellname.values[0] # SOC_rltlst.append(str(cellname)) # SOC_rltlst.append(SOC_rlt['odo'].iloc[0]) # SOC_rlt_final = pd.DataFrame(SOC_rltlst).T # SOC_rlt_final.columns = ['sn', 'cellname', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'frstvlt', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate', 'odo'] SOC_pre_cal_dnn = SOC_rlt[['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo']] SOC_pre_cal_dnn['method'] = 1 df_soc_rlt_con = pd.concat([SOC_pre_cal, SOC_pre_cal_dnn]) df_soc_rlt_con.reset_index(drop = True, inplace = True) if not df_soc_rlt_con.empty: df_soc_rlt_con['battery_type'] = 'lfp' soc_pre_method = list(df_soc_rlt_con['method']) if 3 in soc_pre_method: SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 3)] elif 2 in soc_pre_method: SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 2)] elif 1 in soc_pre_method: SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 1)] else: SOC_out = pd.DataFrame() else: SOC_out = pd.DataFrame() return SOC_out #SOC_rlt_final, SOH_rlt_final # SOC_rlt = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method', 'battery_type']) df_data = self.df_bms.copy() df_data = df_data.loc[:, ~df_data.columns.duplicated()] if max(self.LookTab_OCV) > 4: SOC_rlt = soc_cal_ncm(df_data)#df_data.groupby('sn').apply(soc_cal_ncm) else: SOC_rlt = soc_cal_lfp(df_data)#df_data.groupby('sn').apply(soc_cal_lfp) if not SOC_rlt.empty: SOC_rlt.rename(columns={"soc_ori":"soc", "soc_pre":"soc_real", "soc_error":"soc_delta"}, inplace = True) SOC_rlt.drop(columns=['battery_type'], inplace = True) fillna = len(SOC_rlt)*[np.nan] SOC_rlt['soc_max'] = fillna SOC_rlt['soc_min'] = fillna SOC_rlt['cell_soc_delta'] = fillna return SOC_rlt else: return pd.DataFrame(columns = ['sn', 'time', 'soc_max', 'soc_min', 'soc', 'soc_real', 'soc_delta', 'odo', 'method', 'cell_soc_delta'])