123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- import pandas as pd
- import numpy as np
- from scipy.signal import savgol_filter as svflter
- from scipy import interpolate
- import os
- import joblib
- class cell_chracter:
- def __init__(self, df_bms, df_param, cellvolt_name, celltemp_name, pack_code): #参数初始化
- self.df_bms=pd.DataFrame(df_bms)
- self.bmstime= pd.to_datetime(df_bms['time'], format='%Y-%m-%d %H:%M:%S')
- self.cellvolt_list=cellvolt_name
- self.celltemp_name=celltemp_name
- self.capty = float(df_param['capacity'])
- np_chrg_ocv = df_param['charge_ocv_v']
- np_chrg_soc = df_param['charge_ocv_soc']
- np_dschrg_ocv = df_param['discharge_ocv_v']
- np_dschrg_soc = df_param['discharge_ocv_soc']
- np_ocv = (np.array(np_chrg_ocv) + np.array(np_dschrg_ocv))/2
- np_soc = (np.array(np_chrg_soc) + np.array(np_dschrg_soc))/2
- self.LookTab_OCV = np_ocv
- self.LookTab_SOC = np_soc
- abs_model_file=f"{os.path.dirname(os.path.abspath(__file__))}/model/V_1_0_0/"
- self.Dnn_predict = joblib.load(abs_model_file + 'Dnnmodel.pkl')
- self.inputnorm_x = joblib.load(abs_model_file + 'normorlized_x')
- self.inputnorm_y = joblib.load(abs_model_file + 'normorlized_y')
- self.pack_code = pack_code
- def states_cal(self):
- def find_peak_triangleNum(filter_signal, peak_width):
- # 判断是否有凸起
- length_data = len(filter_signal)
- thre = 0.7*np.percentile(filter_signal, 95) # 设置阈值高度 95%是前400个点的20个波峰点
- botthre = np.percentile(filter_signal, 20) # 设置阈值低度 5%是400个点的后20个波峰点
- # 在整个区域内找极值
- l = []
- bot = []
- for i in range(1,length_data-1):
- if (filter_signal[i-1] < filter_signal[i]) and (filter_signal[i]>filter_signal[i+1]) and (filter_signal[i]>thre):
- l.append(i)
- elif (filter_signal[i] == filter_signal[i-1]) and (filter_signal[i]>thre):
- l.append(i) # 最高点前后可能有相等的情况
- if (filter_signal[i-1] > filter_signal[i]) and (filter_signal[i]<filter_signal[i+1]) and (filter_signal[i]<botthre):
- bot.append(i)
- elif (filter_signal[i] == filter_signal[i-1]) and (filter_signal[i]<botthre):
- bot.append(i) # 最高点前后可能有相等的情况
- CC = len(l) # 统计极值得个数
- cou = 0
- ll = l.copy()
- for j in range(1, CC):
- if l[j]-l[j-1] < peak_width: # 此判断用于将位于同一个峰内的极值点去掉
- if l[j] > l[j-1]: # 同一个峰内的数据,将小的值替换成0
- ll[j-1] = 0
- else:
- ll[j] = 0
- cou = cou+1
- rcou = CC - cou
- ll = [i for i in ll if i > 0] # 去掉0的值
- peak_index = []
- # 找到每个区间内波峰最大值
- # 截断每个区间再求区间最大值的索引
- for i in range(len(ll)):
- if i == 0:
- index_range = np.array(l)[np.array(l) <= ll[i]]
- else:
- index_range = np.array(l)[(np.array(l)<=ll[i]) & (np.array(l)>ll[i-1])]
- # 找到每个区间最大值得索引
- if len(index_range) > 0:
- peak_index.append(index_range[np.argmax(filter_signal[index_range],axis=0)])
- DD = len(bot) # 统计极值得个数
- cou = 0
- botm = bot.copy()
- for j in range(1, DD):
- if bot[j]-bot[j-1] < peak_width: # 此判断用于将位于同一个峰内的极值点去掉
- if bot[j] < bot[j-1]: # 同一个峰内的数据,将大的值替换成0
- botm[j] = 0
- else:
- botm[j-1] = 0
- cou = cou+1
- Dcou = DD - cou
- botm = [i for i in botm if i > 0] # 去掉0的值
- bot_index = []
- # 找到每个区间内波峰最大值
- # 截断每个区间再求区间最大值的索引
- for i in range(len(botm)):
- if i == 0:
- botindex_range = np.array(bot)[np.array(bot) >= botm[i]]
- else:
- botindex_range = np.array(bot)[(np.array(bot) >= botm[i]) & (np.array(bot) < botm[i-1])]
- # 找到每个区间最小值得索引
- if len(botindex_range) > 0:
- bot_index.append(botindex_range[np.argmin(filter_signal[botindex_range],axis=0)])
- return [rcou, peak_index, Dcou, bot_index]#
- def soc_cal_ncm(df_data_temp1):
- data_ori_temp = df_data_temp1.copy()
- itemsn = data_ori_temp['sn'].iloc[0]
- data_ori_delNan = data_ori_temp.dropna(axis = 0, subset = ["pack_crnt", "pack_soc"])#删除电压、电流、bms状态值为Nan值所在行"CellVoltage",
- data_ori_delnone = data_ori_delNan.drop(data_ori_delNan[(data_ori_delNan['pack_soc'] == "") | (data_ori_delNan['pack_crnt'] == "")].index)#删除电压、电流、bms状态值为空值所在行(data_ori_delNan['CellVoltage'] == "") |
- data_ori = data_ori_delnone.fillna(method ='backfill', inplace = False , axis = 0)
- celltemp_name = self.celltemp_name#电芯温度数量
- #---------------------------------充电过程判断--------------------------------------
- def clean_dead_value(series, num_dead_thresh):
- slide_list = [series.index[0]]
- slide_list_all = []
- for i in range(series.index[0],series.index[-1]):
- j = i + 1
- diff = series[j] - series[i]
- if diff == 0:
- slide_list.append(j)
- else:
- slide_list.clear()
- slide_list.append(j)
- if len(slide_list) >= num_dead_thresh:
- target_list = slide_list.copy()
- slide_list_all.append(target_list)
- index= [] # 将找到的满足条件的index合并
- for i in range(len(slide_list_all) - 1):
- if set(slide_list_all[i]) < set(slide_list_all[i + 1]):
- index.append(i)
- m = {i: element for i, element in enumerate(slide_list_all)}
- [m.pop(i) for i in index]
- return list(m.values())
- df_data_chrg_chek = data_ori.copy()
- df_data_chrg_chek.reset_index(drop = True, inplace = True)
- df_data_chrg_chek['crnt_flg'] = 0
- df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] > 0, 'crnt_flg'] = 1
- df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] < 0, 'crnt_flg'] = -1
- df_data_chrg_chek['sts_flg'] = 2
- df_sts_chrg = pd.DataFrame(columns=list(df_data_chrg_chek.columns))
- df_crnt_flg = df_data_chrg_chek['crnt_flg']
- num_dead_thresh = 15#判断连续多少个数据为阈值
- indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行
- rest_num = len(indexs_to_delelte)
- if rest_num > 0:#仅有一个连续数据时
- for splice_item in range(0, rest_num):#rest_num
- df_data_temp = df_data_chrg_chek.iloc[indexs_to_delelte[splice_item][0]:indexs_to_delelte[splice_item][-1]]#获得电流连续数据
- df_data_temp.reset_index(drop = True, inplace = True)
- df_soc_temp = df_data_temp['pack_soc']
- delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3)
- df_time_temp = pd.to_datetime(df_data_temp['time'])
- delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours')
- if all(df_data_temp['crnt_flg'] == 0):#静置判断
- if delta_time > 0.17:
- df_data_temp['sts_flg'] = 0
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- elif all(df_data_temp['crnt_flg'] == -1):#充电判断
- if (delta_time >= 0.03) and delta_soc >= 30:
- df_data_temp['sts_flg'] = 1
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- elif all(df_data_temp['crnt_flg'] == 1):#充电判断
- if (delta_time >= 0.03) and delta_soc >= 30:
- df_data_temp['sts_flg'] = 1
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- df_dschrg = pd.concat([df_data_chrg_chek, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'pack_soc'], keep = False)
- data_temp = pd.concat([df_dschrg, df_sts_chrg])
- data_temp.sort_values(by = 'time', inplace = True)
- data_temp.reset_index(drop = True, inplace = True)
- df_chrg_temp = data_temp.loc[data_temp['sts_flg'] == 1]
- df_chrg_temp.reset_index(inplace = True, drop = True)
- SOC_pre_cal = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method', 'battery_type'])
- data_temp_rest = data_temp[abs(data_temp['pack_crnt']) > 0.02*self.capty]
- np_dschrg_time = pd.to_datetime(data_temp_rest['time'])
- dff_dschrg_time = np.diff(np_dschrg_time)/pd.Timedelta(1, 'min')#放电过程前后时间差
- rest_dschrg_pos = np.where(dff_dschrg_time > 120)#筛选前后时间差大于30min的位置及时间点,判断为两次放电
- if (len(rest_dschrg_pos[0]) > 0):
- df_celvlt = data_temp_rest[self.cellvolt_list]
- df_celvlt_min = df_celvlt.min(axis = 1)
- df_celvlt_max = df_celvlt.max(axis = 1)
- LookTab_OCV = self.LookTab_OCV
- LookTab_SOC = self.LookTab_SOC
- for item_rest in rest_dschrg_pos[0]:
- if (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) < 0.03*self.capty) and (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) > 0.1):
- min_volt = min(LookTab_OCV)
- max_volt = max(LookTab_OCV)
- soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数
- df_celvlt_min[df_celvlt_min < min_volt] = min_volt + 0.0005
- df_celvlt_max[df_celvlt_max > max_volt] = max_volt - 0.0005
- cellvltmin = df_celvlt_min[item_rest + 1]
- cellvltmax = df_celvlt_max[item_rest + 1]
- SOC_now = data_temp_rest['pack_soc'].iloc[item_rest + 1]
- SOC_min = round(soc_interplt(cellvltmin)[0], 1)
- SOC_max = round(soc_interplt(cellvltmax)[0], 1)
- if SOC_min < 30:
- SOC_cal = 0.8*SOC_min + 0.2*SOC_max
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 1
- elif SOC_max > 90:
- SOC_cal = 0.8*SOC_max + 0.2*SOC_min
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 2
- else:
- SOC_cal = 0.5*(SOC_max + SOC_min)
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 3
- if SOC_del > 5:
- time_pre = data_temp_rest['time'].iloc[item_rest + 1]
- odo_now = data_temp_rest['odo'].iloc[item_rest + 1]
- SOC_pre_cal.loc[len(SOC_pre_cal)] = [itemsn, time_pre, SOC_now, SOC_real, SOC_del, odo_now, method, 'NCM']
- return SOC_pre_cal
- def soc_cal_lfp(df_data_temp2):
- data_ori_temp = df_data_temp2.copy()
- itemsn = data_ori_temp['sn'].iloc[0]
- data_ori_delNan = data_ori_temp.dropna(axis = 0, subset = ["pack_crnt", "pack_soc"])#删除电压、电流、bms状态值为Nan值所在行"CellVoltage",
- data_ori_delnone = data_ori_delNan.drop(data_ori_delNan[(data_ori_delNan['pack_soc'] == "") | (data_ori_delNan['pack_crnt'] == "")].index)#删除电压、电流、bms状态值为空值所在行(data_ori_delNan['CellVoltage'] == "") |
- data_ori = data_ori_delnone.fillna(method ='backfill', inplace = False , axis = 0)
- def clean_dead_value(series, num_dead_thresh):
- slide_list = [series.index[0]]
- slide_list_all = []
- for i in range(series.index[0],series.index[-1]):
- j = i + 1
- diff = series[j] - series[i]
- if diff == 0:
- slide_list.append(j)
- else:
- slide_list.clear()
- slide_list.append(j)
- if len(slide_list) >= num_dead_thresh:
- target_list = slide_list.copy()
- slide_list_all.append(target_list)
- index= [] # 将找到的满足条件的index合并
- for i in range(len(slide_list_all) - 1):
- if set(slide_list_all[i]) < set(slide_list_all[i + 1]):
- index.append(i)
- m = {i: element for i, element in enumerate(slide_list_all)}
- [m.pop(i) for i in index]
- return list(m.values())
- df_data_chrg_chek = data_ori.copy()
- df_data_chrg_chek.reset_index(drop = True, inplace = True)
- df_data_chrg_chek['crnt_flg'] = 0
- df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] > 0, 'crnt_flg'] = 1
- df_data_chrg_chek.loc[df_data_chrg_chek['pack_crnt'] < 0, 'crnt_flg'] = -1
- df_data_chrg_chek['sts_flg'] = 2
- df_sts_chrg = pd.DataFrame(columns=list(df_data_chrg_chek.columns))
- df_crnt_flg = df_data_chrg_chek['crnt_flg']
- num_dead_thresh = 15#判断连续多少个数据为阈值
- indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行
- rest_num = len(indexs_to_delelte)
- if rest_num > 0:#仅有一个连续数据时
- for splice_item in range(0, rest_num):#rest_num
- df_data_temp = df_data_chrg_chek.iloc[indexs_to_delelte[splice_item][0]:indexs_to_delelte[splice_item][-1]]#获得电流连续数据
- df_data_temp.reset_index(drop = True, inplace = True)
- df_soc_temp = df_data_temp['pack_soc']
- delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3)
- df_time_temp = pd.to_datetime(df_data_temp['time'])
- delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours')
- if all(df_data_temp['crnt_flg'] == 0):#静置判断
- if delta_time > 0.17:
- df_data_temp['sts_flg'] = 0
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- elif all(df_data_temp['crnt_flg'] == -1):#充电判断
- if (delta_time >= 0.03) and delta_soc >= 30:
- df_data_temp['sts_flg'] = 1
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- elif all(df_data_temp['crnt_flg'] == 1):#充电判断
- if (delta_time >= 0.03) and delta_soc >= 30:
- df_data_temp['sts_flg'] = 1
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- df_dschrg = pd.concat([df_data_chrg_chek, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'pack_soc'], keep = False)
- data_temp = pd.concat([df_dschrg, df_sts_chrg])
- data_temp.sort_values(by = 'time', inplace = True)
- data_temp.reset_index(inplace = True, drop = True)
- celltemp_name = self.celltemp_name#电芯温度数量
- #---------------------------------充电过程判断--------------------------------------
- df_chrg_temp = data_temp.loc[data_temp['sts_flg'] == 1]
- df_chrg_temp.reset_index(inplace = True, drop = True)
- df_chracter_ofc = pd.DataFrame()
- SOC_rlt_final = pd.DataFrame()
- SOC_pre_cal = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method'])
- SOC_pre_cal_dnn = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method'])
- data_temp_rest = data_temp[abs(data_temp['pack_crnt']) > 0.02*self.capty]
- np_dschrg_time = pd.to_datetime(data_temp_rest['time'])
- dff_dschrg_time = np.diff(np_dschrg_time)/pd.Timedelta(1, 'min')#放电过程前后时间差
- rest_dschrg_pos = np.where(dff_dschrg_time > 120)#筛选前后时间差大于30min的位置及时间点,判断为两次放电
- if (len(rest_dschrg_pos[0]) > 0):
- df_celvlt = data_temp_rest[self.cellvolt_list]
- df_celvlt_min = df_celvlt.min(axis = 1)
- df_celvlt_max = df_celvlt.max(axis = 1)
- LookTab_OCV = self.LookTab_OCV
- LookTab_SOC = self.LookTab_SOC
- ocv_interplt = interpolate.interp1d(LookTab_SOC, LookTab_OCV, kind = 'linear')#OCV-SOC插值函数
- ocv_pre_val = ocv_interplt(25)
- for item_rest in rest_dschrg_pos[0]:
- if (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) < 0.02*self.capty) and (abs(data_temp_rest['pack_crnt'].iloc[item_rest + 1]) > 0.3) and\
- (df_celvlt_min.iloc[item_rest + 1] < ocv_pre_val) and (df_celvlt_min.iloc[item_rest + 1] > 2):
- min_volt = min(LookTab_OCV)
- soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数
- df_celvlt_min[df_celvlt_min < min_volt] = min_volt + 0.0005
- cellvltmin = df_celvlt_min[item_rest + 1]
- cellvltmax = df_celvlt_max[item_rest + 1]
- SOC_now = data_temp_rest['pack_soc'].iloc[item_rest + 1]
- SOC_min = round(soc_interplt(cellvltmin)[0], 1)
- SOC_max = round(soc_interplt(cellvltmax)[0], 1)
- if SOC_min < 30:
- SOC_cal = 0.8*SOC_min + 0.2*SOC_max
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 2
- elif SOC_max > 90:
- SOC_cal = 0.8*SOC_max + 0.2*SOC_min
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 2
- else:
- SOC_cal = 0.5*(SOC_max + SOC_min)
- SOC_del = abs(SOC_cal - SOC_now)
- SOC_real = str([SOC_max, SOC_min, SOC_cal])
- method = 0
- if SOC_del > 5:
- time_pre = data_temp_rest['time'].iloc[item_rest + 1]
- odo_now = data_temp_rest['odo'].iloc[item_rest + 1]
- SOC_pre_cal.loc[len(SOC_pre_cal)] = [itemsn, time_pre, SOC_now, SOC_real, SOC_del, odo_now, method]
- #------------------------------------DNN方法估计SOC
- if (not df_chrg_temp.empty) and (self.pack_code == 'JX18020'):
- para_lvbo = 7
- chrgr_time = pd.to_datetime(df_chrg_temp['time'])
- delta_time = (np.diff(chrgr_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数
- pos = np.where(delta_time > 10)#充电数据分段,大于10min时,认为是两个充电过程
- dvsocchrcter = []
- splice_num = []
- if len(pos[0]) >= 1:
- pos_ful_tem = np.insert(pos, 0, 0)
- pos_len = len(pos_ful_tem)
- data_len = len(chrgr_time)
- pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
- for item in range(0,len(pos_ful)-1):
- splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
- splice_num = np.insert(splice_num, 0, 0)
- else:
- splice_num = np.zeros(len(chrgr_time))
- pos_ful = np.array([0])
- if len(splice_num) > 0:
- df_chrg_temp['chrgr_rest'] = splice_num
- chrgr_splice_num = np.unique(df_chrg_temp['chrgr_rest'])#判断有几段充电数据
- if len(chrgr_splice_num) > 0:#特征参数中增加一列,电压微分中波峰到满充位置电量,进行SOH计算
- for item_chrgr in chrgr_splice_num:
- df_chrgr_splice_data = df_chrg_temp.loc[(df_chrg_temp['chrgr_rest'] == item_chrgr)]
- df_chrgr_splice_data.reset_index(inplace = True, drop = True)
- df_soc_temp = df_chrgr_splice_data['pack_soc']
- df_time_temp = pd.to_datetime(df_chrgr_splice_data['time'])
- delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3)
- if (df_soc_temp.iloc[0] < 40) and (len(df_soc_temp) > 30):
- delta_time = round((df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours'), 3)
- rate_chrg = round(delta_soc/(100*delta_time), 2)
- df_celvlt_max = df_chrgr_splice_data['cell_volt_max']
- df_crnt = abs(df_chrgr_splice_data['pack_crnt'])
- df_soc = df_chrgr_splice_data['pack_soc']
- if (df_celvlt_max.iloc[-1] > 3.55) and (df_crnt.iloc[-1] < 0.1*self.capty) and (df_soc.iloc[-1] < 95):
- SOC_now = df_soc.iloc[-1]
- SOC_real = 100
- SOC_del = 100 - SOC_now
- method = 3
- time_pre = df_chrgr_splice_data['time'].iloc[-1]
- odo_now = df_chrgr_splice_data['odo'].iloc[0]
- if SOC_del > 5:
- SOC_pre_cal.loc[len(SOC_pre_cal)] = [itemsn, time_pre, SOC_now, SOC_pre_cal, SOC_del, odo_now, method]
- df_celvltdff = df_celvlt_max.diff()
- df_celvltdff.fillna(method ='bfill', inplace = True , axis = 0)
- df_celvltdff.reset_index(inplace = True, drop = True)
- df_celvltdffsmth = svflter(df_celvltdff, para_lvbo, 3)
- df_time_temp = pd.to_datetime(df_chrgr_splice_data['time'])
- np_dff_time = np.diff(df_time_temp)/pd.Timedelta(1, 'hours')
- np_dff_time_new = np.append(np_dff_time, 0)
- np_chrg_ah = np.multiply(np.array(np_dff_time_new),-1*np.array(df_chrgr_splice_data['pack_crnt']))
- chrg_ahlst = [sum(np_chrg_ah[:i]) for i in range(1, len(np_chrg_ah)+1)]
- df_chrg_ah = pd.DataFrame(chrg_ahlst)
- df_chrg_ah.columns = ['chrgah']
- df_data_tem = df_chrgr_splice_data[['time','sn', 'pack_crnt','pack_volt','pack_soc', 'odo', 'cell_volt_max',
- 'cell_temp_max', 'cell_temp_min']]
- df_data = pd.concat([df_data_tem, df_chrg_ah], axis = 1)
- df_data.rename(columns = {'cell_temp_max':'max_T', 'cell_temp_min':'min_T', 'cell_volt_max':'frst'}, inplace = True) #'mileage':'odo',
- df_data.drop(df_data[(df_data[['max_T', 'min_T']] < -30).any(axis = 1) | (df_data[['max_T', 'min_T']] > 125).any(axis = 1)].index, inplace = True)
- df_data.drop(df_data[(df_data['frst'] < 2) | (df_data['frst'] > 5)].index, inplace = True)
- df_data.reset_index(drop = True, inplace = True)
- datalen = len(df_data)
- diff_vlt = df_data['frst'].diff()
- # 使用 cumsum() 函数标记下降区域
- down_regions = (diff_vlt >= 0).cumsum()
- # 使用 groupby() 和 transform() 函数计算每个区域的长度
- region_sizes = down_regions.groupby(down_regions).transform('size')
- # 使用 where() 函数删除连续3次以上下降位置的数据
- data_series_filtered = df_data.where(region_sizes < 10)
- nan_elements = data_series_filtered.isna()
- # 使用 np.where() 函数查找所有 NaN 元素的位置
- nan_indices = np.where(nan_elements == True)[0]
- if len(nan_indices)>0:
- last_num = nan_indices[-1]
- else:
- last_num = 0
-
- df_data_real = df_data.iloc[last_num:]
- df_data_real.reset_index(drop= True, inplace=True)
- df_dataana = df_data_real.loc[(df_data_real['pack_soc'] > 30) & (df_data_real['pack_soc'] < 80)]
- df_dataana.reset_index(drop = True, inplace = True)
- if (len(df_dataana) > 20):# and (rate_chrg <= 0.95)
- def dats_interp(input1, input2, datalen):
- cap_frst = list(np.linspace(input1.iloc[0], input1.iloc[-1], datalen, endpoint = False))#len(df_dataana['chrgah'])
- fuc_inter = interpolate.interp1d(input1, input2, kind = 'nearest')#定义差值函数
- vlt_splce_temp = list(fuc_inter(cap_frst))
- return vlt_splce_temp
- crntabs = list(abs(df_dataana['pack_crnt']))
- df_dataana['abscrnt'] = crntabs
- crntnum = df_dataana.loc[:,'abscrnt'].value_counts() #统计电芯数量这一列每个元素出现的个数
- crntmaxNums = int(crntnum.idxmax())
- df_data_cal = df_dataana.loc[(abs(df_dataana['pack_crnt']) > crntmaxNums-4) & (abs(df_dataana['pack_crnt']) < crntmaxNums+4)]
- df_data_cal = df_data_cal.iloc[3:-2]
- df_data_cal.reset_index(drop= True, inplace=True)
- if (len(df_data_cal) > 10):# and (all(abs(dff_soc) < 2))and (crntmaxNums/self.capty < 0.95)
- rate_end = round(abs(df_data['pack_crnt'].iloc[-1])/self.capty, 3)
- df_data_cal['frst'] = svflter(df_data_cal['frst'], para_lvbo, 3)
- split_len = int(1.2*(df_data_cal['pack_soc'].iloc[-1] - df_data_cal['pack_soc'].iloc[0]))
- if split_len > 10:
- cap_frst = list(np.linspace(df_data_cal['chrgah'].iloc[0], df_data_cal['chrgah'].iloc[-1], split_len, endpoint = False))#len(df_dataana['chrgah'])
- vlt_splce_temp = dats_interp(df_data_cal['chrgah'], df_data_cal['frst'], split_len)
- soc_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_soc'], split_len)
- crnt_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_crnt'], split_len)
- maxt_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['max_T'], split_len)
- mint_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['min_T'], split_len)
- odo_splt = [df_data_cal['odo'].iloc[0]]*split_len
- time_splt = dats_interp(df_data_cal['chrgah'], df_data_cal['time'], split_len)
- pkvlt_splt_temp = dats_interp(df_data_cal['chrgah'], df_data_cal['pack_volt'], split_len)
- pkvlt_splt = svflter(pkvlt_splt_temp, para_lvbo, 3)
- vlt_splce = svflter(vlt_splce_temp, para_lvbo, 3)
- np_dff_vlt = np.diff(vlt_splce)
- np_dff_cap = np.diff(cap_frst)
- np_dff_dv_dq_temp = np.divide(np_dff_vlt, np_dff_cap)
- np_dff_dv_dq = np.insert(np_dff_dv_dq_temp, 0, np_dff_dv_dq_temp[0])
-
- df_dvdq_data = pd.DataFrame()
- df_dvdq_data['pack_soc'] = soc_splt
- df_dvdq_data['pack_crnt'] = crnt_splt
- df_dvdq_data['frst'] = vlt_splce
- df_dvdq_data['max_T'] = maxt_splt
- df_dvdq_data['min_T'] = mint_splt
- df_dvdq_data['odo'] = odo_splt
- df_dvdq_data['chrgah'] = cap_frst
- df_dvdq_data['pack_volt'] = pkvlt_splt
- df_dvdq_data['time'] = pd.to_datetime(time_splt)#, utc=True, unit='ms').tz_convert('Asia/Shanghai')
-
- df_dvdq_data['dffsoc'] = df_dvdq_data['pack_soc'].diff()
- df_dvdq_data['dffah'] = df_dvdq_data['chrgah'].diff()
- df_dvdq_data['dv_dq'] = np_dff_dv_dq
- df_dvdq_data['fr_vlt_dvdq'] = vlt_splce#电压
- df_dvdq_data.fillna(method = 'bfill', inplace = True , axis = 0)
- delta_soc = df_dvdq_data['pack_soc'].iloc[-1] - df_dvdq_data['pack_soc'].iloc[0]
- if all(df_dvdq_data['dffsoc'] < 3) and delta_soc > 10:
- df_dvdq_data['df_dq_lvb'] = svflter(df_dvdq_data['dv_dq'], para_lvbo, 3, mode = 'nearest')#
- df_data = df_dvdq_data.copy()#loc[(df_dataana['pack_soc'] > 30) & (df_dataana['pack_soc'] < 70)]
- df_data.reset_index(drop = True, inplace = True)
- dvsocchrcter = []
- if len(df_data) > 10:
- cellvltname = ['fr_vlt_dvdq']#, 'send', 'thrd'
- cellvltahname = ['df_dq_lvb']#, 'dffsendvlt', 'dffthdvlt'
- maxsoc = 0
- for itemnum in range(0, 1):
- snchrcter = []
- np_vlt_ori = np.array(df_data[cellvltahname[itemnum]])#dffminvlt
- np_vltori_smth = svflter(np_vlt_ori, para_lvbo, 3)
- rcoumax, peak_indexmax, rcoumin, peak_indexmin = find_peak_triangleNum(np_vltori_smth, 7)#, rcoumin, peak_indexmin
- if (len(peak_indexmax) > 0) and (len(peak_indexmin) > 0):
- peak_fst = np_vltori_smth[peak_indexmax[0]]
- botm_fst = np_vltori_smth[peak_indexmin[0]]
- pkbtmratfst = botm_fst/peak_fst
- else:
- peak_fst = -1
- if (peak_fst>0):#and (botm_fst>0) and (abs(pkbtmratfst) >= 0.2) and (abs(pkbtmratfst) <= 0.8)
- maxindex = peak_indexmax[0]
- maxsoc = np.array(df_data['pack_soc'])[maxindex]
-
- snchrcter.append(itemsn)
- snchrcter.append(np.array(df_data['time'])[maxindex])#df_data.loc[0, 'time']
- snchrcter.append(np.array(df_data[cellvltname[itemnum]])[maxindex])#电压
- snchrcter.append(maxsoc)
- snchrcter.append(pkbtmratfst)
- snchrcter.append(np.array(df_data['max_T'])[maxindex])#最大电芯温度
- snchrcter.append(np.array(df_data['min_T'])[maxindex])#最大电芯温度
- snchrcter.append(round(np.array(df_data['pack_crnt'])[maxindex]/self.capty, 3))#倍率
- snchrcter.append(df_data['odo'].iloc[0])#里程
- snchrcter.append(np.array(df_data['pack_volt'])[maxindex])#里程
- dvsocchrcter.append(snchrcter)
- df_chracter_ofc= pd.DataFrame(dvsocchrcter)#各电芯的电压微分特征值
- peak_pos = abs(maxsoc - df_dvdq_data['pack_soc'].iloc[0])
- if (not df_chracter_ofc.empty) and (peak_pos > 3):
- setcolumns = ['sn', 'time', 'frstvlt', 'soc_ori', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate', 'odo', 'pack_vlt']
- df_chracter_ofc.columns = setcolumns
- df_chracter_ofc['rate'] = np.abs(df_chracter_ofc['rate'])
- else:
- df_chracter_ofc = pd.DataFrame()
- if (not df_chracter_ofc.empty):
- DataArray = df_chracter_ofc.values
- Y = DataArray[:, 3]#SOC值
- X = DataArray[:, [2,5,6,7,8,9]] #电压、峰谷比,温度,rate,odo4,
- X = np.array(X)#转化为array,自变量
- Y = np.array(Y)#转化为array,因变量soc
- paraminput_x = self.inputnorm_x.fit_transform(X)
- Soc_ori = np.array(Y)
- SOC_pre_dnn_ori = self.Dnn_predict.predict(paraminput_x)
- SOC_pre = self.inputnorm_y.inverse_transform(SOC_pre_dnn_ori.reshape(-1, 1)).reshape(-1)
- SOC_error = (SOC_pre - Soc_ori)
- SOC_error_abs = abs(SOC_error)
- # if max(SOC_error_abs) > 5:
- df_chracter_ofc['soc_pre'] = SOC_pre
- df_chracter_ofc['soc_error'] = SOC_error.astype('float')
- df_chracter_ofc['soc_pre'] = df_chracter_ofc['soc_pre'].round(3)
- df_chracter_ofc['soc_error'] = df_chracter_ofc['soc_error'].round(3)
- df_soc_rlt = df_chracter_ofc[(df_chracter_ofc['soc_pre'] < 60) & (df_chracter_ofc['soc_pre'] > 35) &
- (df_chracter_ofc['soc_ori'] < 60) & (df_chracter_ofc['soc_ori'] > 35)]
- df_soc_rlt.reset_index(drop = True, inplace = True)
- if not df_soc_rlt.empty:
- SOC_rlt = df_soc_rlt[['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'frstvlt', 'pkbtmfst', 'frstmaxT',
- 'frstminT', 'rate', 'odo']]
- SOC_rlt['soc_pre'] = SOC_rlt['soc_pre'].round(1)
- SOC_rlt['soc_error'] = SOC_rlt['soc_error'].round(1)
- # SOC_clms = ['frstsoc', 'SOC_pre', 'SOC_error', 'frstvlt', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate']
- # df_cellname = SOC_rlt.groupby(['time'])['cellname'].apply(list)
- # cellname = df_cellname.values[0]
- # namenum = 'cellnum:' + str(list(map(int, [i[12:] for i in cellname])))
- # SOC_rltlst = []
- # SOC_rltlst.append(self.sn)
- # SOC_rltlst.append(namenum)
- # SOC_rltlst.append(SOC_rlt['time'].iloc[0])
- # for itemsoc in SOC_clms:
- # df_cellname = SOC_rlt.groupby(['time'])[itemsoc].apply(list)
- # cellname = df_cellname.values[0]
- # SOC_rltlst.append(str(cellname))
- # SOC_rltlst.append(SOC_rlt['odo'].iloc[0])
- # SOC_rlt_final = pd.DataFrame(SOC_rltlst).T
- # SOC_rlt_final.columns = ['sn', 'cellname', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'frstvlt', 'pkbtmfst', 'frstmaxT', 'frstminT', 'rate', 'odo']
- SOC_pre_cal_dnn = SOC_rlt[['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo']]
- SOC_pre_cal_dnn['method'] = 1
- df_soc_rlt_con = pd.concat([SOC_pre_cal, SOC_pre_cal_dnn])
- df_soc_rlt_con.reset_index(drop = True, inplace = True)
- if not df_soc_rlt_con.empty:
- df_soc_rlt_con['battery_type'] = 'lfp'
- soc_pre_method = list(df_soc_rlt_con['method'])
- if 3 in soc_pre_method:
- SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 3)]
- elif 2 in soc_pre_method:
- SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 2)]
- elif 1 in soc_pre_method:
- SOC_out = df_soc_rlt_con[(df_soc_rlt_con['method'] == 1)]
- else:
- SOC_out = pd.DataFrame()
- else:
- SOC_out = pd.DataFrame()
- return SOC_out #SOC_rlt_final, SOH_rlt_final
- df_data = self.df_bms.copy()
- df_data = df_data.loc[:, ~df_data.columns.duplicated()]
- # def states_cal(self):
- # SOC_rlt = pd.DataFrame(columns = ['sn', 'time', 'soc_ori', 'soc_pre', 'soc_error', 'odo', 'method', 'battery_type'])
- if max(self.LookTab_OCV) > 4:
- SOC_rlt = df_data.groupby('sn').apply(soc_cal_ncm)
- else:
- SOC_rlt = df_data.groupby('sn').apply(soc_cal_lfp)
- if not SOC_rlt.empty:
- SOC_rlt.rename(columns={"soc_ori":"soc", "soc_pre":"soc_real", "soc_error":"soc_delta"}, inplace = True)
- SOC_rlt.drop(columns=['battery_type'], inplace = True)
- fillna = len(SOC_rlt)*[np.nan]
- SOC_rlt['soc_max'] = fillna
- SOC_rlt['soc_min'] = fillna
- SOC_rlt['cell_soc_delta'] = fillna
- return SOC_rlt
- else:
- return pd.DataFrame(columns = ['sn', 'time', 'soc_max', 'soc_min', 'soc', 'soc_real', 'soc_delta',
- 'odo', 'method', 'cell_soc_delta'])
|