123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- import os
- import pandas as pd
- import numpy as np
- def clean_dead_value(series, num_dead_thresh):
- slide_list = [series.index[0]]
- slide_list_all = []
- for i in range(series.index[0],series.index[-1]):
- j = i + 1
- diff = series[j] - series[i]
- if diff == 0:
- slide_list.append(j)
- else:
- slide_list.clear()
- slide_list.append(j)
- if len(slide_list) >= num_dead_thresh:
- target_list = slide_list.copy()
- slide_list_all.append(target_list)
- index= [] # 将找到的满足条件的index合并
- for i in range(len(slide_list_all) - 1):
- if set(slide_list_all[i]) < set(slide_list_all[i + 1]):
- index.append(i)
- m = {i: element for i, element in enumerate(slide_list_all)}
- [m.pop(i) for i in index]
- return list(m.values())
- def crnt_insert(df_data_temp):
- df_data_temp['time'] = pd.to_datetime(df_data_temp['time'])
- df_data_temp.sort_values(by = ['time'], inplace = True)
- interval = pd.Timedelta(minutes = 5)
- threshold = pd.Timedelta(minutes = 20)
- inserted_data = pd.DataFrame(columns=df_data_temp.columns)
- for i in range(1, len(df_data_temp)):
- current_time = df_data_temp.iloc[i]['time']
- prev_time = df_data_temp.iloc[i-1]['time']
- time_diff = current_time - prev_time
-
- if time_diff >= threshold:
- num_intervals = int(time_diff / interval) - 1
- new_times = pd.date_range(start=prev_time + interval, end=current_time - interval, freq=interval)
- new_rows = pd.DataFrame({col: np.nan for col in df_data_temp.columns}, index=range(len(new_times)))
- new_rows['time'] = new_times
- inserted_data = inserted_data.append(new_rows)
- df_data_temp = pd.concat([df_data_temp, inserted_data], ignore_index=True).sort_values('time')
- # df_data_temp['PackCrnt'] = df_data_temp['PackCrnt'].interpolate()
- df_data_temp['packcrnt'].fillna(0.2, inplace = True)
- df_data_temp.drop_duplicates(subset=['time'], keep = 'first', inplace = True)
- df_data_temp.sort_values(by = ['time'], inplace = True)
- df_data_temp.reset_index(drop = True, inplace = True)
- return df_data_temp
- def split_data(df_data2):
- df_data_ori = df_data2.copy()
- df_data_ori.reset_index(drop = True, inplace = True)
- df_data = crnt_insert(df_data_ori)
- df_data['crnt_flg'] = 0
- df_data.loc[df_data['packcrnt'] > 0.05*102, 'crnt_flg'] = 1
- df_data.loc[df_data['packcrnt'] < -0.05*102, 'crnt_flg'] = -1
- df_data['sts_flg'] = 2
- df_sts_chrg = pd.DataFrame(columns=list(df_data.columns))
- df_crnt_flg = df_data['crnt_flg']
- num_dead_thresh = 15
- indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行
- rest_num = len(indexs_to_delelte)
- if rest_num > 0:#仅有一个连续数据时
- for splice_item in range(0, rest_num):#rest_num
- df_data_temp = df_data.iloc[indexs_to_delelte[splice_item][0]:(indexs_to_delelte[splice_item][-1]+1)]#获得电流连续数据
- df_data_temp.reset_index(drop = True, inplace = True)
- # cal_ah_temp = cal_ah(df_data_temp)
- delta_soc = df_data_temp['packsoc'].iloc[-1] - df_data_temp['packsoc'].iloc[0]
- df_time_temp = pd.to_datetime(df_data_temp['time'])
- delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours')
- if all(df_data_temp['crnt_flg'] == 0):#静置判断
- if delta_time > 0.17:
- df_data_temp['sts_flg'] = 0
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- elif all(df_data_temp['crnt_flg'] == -1) and (delta_soc > 2):#充电判断0.1*self.capty
- df_data_temp['sts_flg'] = 1
-
- df_time_temp = pd.to_datetime(df_data_temp['time'])
- df_soc_temp = df_data_temp['packsoc']
- delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3)
- delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours')
- if delta_time >= 0.03:
- rate = round(delta_soc/(100*delta_time), 3)
- else:
- rate = 0
- if abs(rate) > 0.3:
- status = 'offboard_charge'
- else:
- status = 'onboard_charge'
- if df_soc_temp.iloc[-1] >= 99:
- ful_chrg_flg = 1
- else:
- ful_chrg_flg = 0
- df_data_temp['charge_method']= status
- df_data_temp['full_charge_flg']= ful_chrg_flg
-
- df_sts_chrg = df_sts_chrg.append(df_data_temp)
- df_sts_chrg.reset_index(drop = True, inplace = True)
- # elif all(df_data_temp['crnt_flg'] == 1) and (cal_ah_temp > 0.1*self.capty):#充电判断
- # df_data_temp['sts_flg'] = 1
- # df_sts_chrg = df_sts_chrg.append(df_data_temp)
- # df_sts_chrg.reset_index(drop = True, inplace = True)
- df_dschrg = pd.concat([df_data, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'packsoc'], keep = False)
- data_temp = pd.concat([df_dschrg, df_sts_chrg])
- data_temp.sort_values(by = 'time', inplace = True)
- data_temp.reset_index(drop = True, inplace = True)
- mask = (data_temp['sts_flg'] == 2) & (data_temp['sts_flg'].shift(13) == 0) & (data_temp['sts_flg'].shift(-13) == 0)
- data_temp.loc[mask, 'sts_flg'] = 0#静置中的单点行车修改为静置
- df_chrg_temp = pd.DataFrame()
- chrgr_splice_num = []
- if len(df_sts_chrg) > 30:
- #---------------------------------充电过程判断--------------------------------------
- df_chrg_temp = data_temp.loc[(data_temp['sts_flg'] == 1)]#获取电池充电段
- if not df_chrg_temp.empty:
- df_chrg_temp.reset_index(inplace = True, drop = True)
- chrgr_time = pd.to_datetime(df_chrg_temp['time'])
- delta_time = (np.diff(chrgr_time)/pd.Timedelta(1, 'min'))#
- pos = np.where(delta_time > 30)
- splice_num = []
- if len(pos[0]) >= 1:
- pos_ful_tem = np.insert(pos, 0, 0)
- pos_len = len(pos_ful_tem)
- data_len = len(chrgr_time)
- pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
- for item in range(0,len(pos_ful)-1):
- splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
- splice_num = np.insert(splice_num, 0, 0)
- else:
- splice_num = np.zeros(len(chrgr_time))
- pos_ful = np.array([0])
- if len(splice_num) > 0:
- df_chrg_temp['chrgr_rest'] = splice_num
- chrgr_splice_num = np.unique(df_chrg_temp['chrgr_rest'])#判断有几段充电数据
- if not data_temp.empty:
- stat_delta_flg = np.diff(data_temp['sts_flg'])#计算时间差的分钟数
- delta_time = np.diff(pd.to_datetime(data_temp['time']))/pd.Timedelta(1, 'min')
- stat_pos = np.where((stat_delta_flg != 0) | (delta_time > 40))#充电数据分段,大于10min时,认为是两个充电过程
- sts_splice_num = []
- if len(stat_pos[0]) >= 1:
- pos_ful_tem = np.insert(stat_pos, 0, 0)
- pos_len = len(pos_ful_tem)
- data_len = len(data_temp)
- pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
- for item in range(0,len(pos_ful)-1):
- sts_splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
- sts_splice_num = np.insert(sts_splice_num, 0, 0)
- else:
- sts_splice_num = np.zeros(len(data_temp))
- pos_ful = np.array([0])
- if len(sts_splice_num) > 0:
- data_temp['state_bms'] = sts_splice_num
- state_splice_num = np.unique(data_temp['state_bms'])#判断有几段充电数据
- return data_temp, df_chrg_temp, chrgr_splice_num, state_splice_num
- def datacleaning(df):
- if not df.empty:
- # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
- df=df.replace('', np.nan)
- df.dropna(axis=0,subset = [ "time", "cellvoltage", "celltemp", "packcrnt"], inplace=True)
- df['time'] = pd.to_datetime(df['time'], unit='s') #+ pd.Timedelta(hours=8)
- df.drop(df.index[(df['packvoltage'] > 1000) | abs(df['packcrnt'] > 999) | (df['packsoc'] > 100) ], inplace=True)
- df.drop(df.index[(df['packvoltage'] < 0.001) & (abs(df['packcrnt']) < 0.001) ], inplace=True)
- if not df.empty:
- df.sort_values(by="time", inplace=True)
- df.drop_duplicates(subset="time", inplace=True)
- df.reset_index(drop=True, inplace=True)
- #num_lst = df.loc[:,'CellVoltTotalCount'].value_counts() #统计电芯数量这一列每个元素出现的个数
- CellVoltNums = 96 #num_lst.idxmax() # 找出电芯数量出现最多的次数
- #num_lst = df.loc[:,'CellTempTotalCount'].value_counts() #统计电芯数量这一列每个元素出现的个数
- CellTempNums = 30#num_lst.idxmax() # 找出电芯数量出现最多的次数
- cellvolt_name=['cellvoltage'+str(x) for x in range(1, CellVoltNums+1)]
- celltemp_name=['celltemp'+str(x) for x in range(1, CellTempNums+1)]
- df_volt=pd.DataFrame([x[0].split(",") for x in np.array(df[['cellvoltage']])]).iloc[:,list(range(CellVoltNums))]
- df_volt.columns = cellvolt_name
- df_volt=df_volt.astype('float')
- cellvoltmax = df_volt.max(axis=1)
- cellvoltmin = df_volt.min(axis=1)
- df_volt[['cellvoltmax','cellvoltmin']] = pd.concat([cellvoltmax,cellvoltmin], axis=1)
- df_temp=pd.DataFrame([x[0].split(",") for x in np.array(df[['celltemp']])]).iloc[:,list(range(CellTempNums))]
- df_temp.columns = celltemp_name
- df_temp=df_temp.astype('float')
- celltempmax = df_temp.max(axis=1)
- celltempmin = df_temp.min(axis=1)
- df_temp[['celltempmax','celltempmin']] = pd.concat([celltempmax,celltempmin], axis=1)
-
- df=pd.concat([df,df_volt,df_temp],axis=1)
- df.drop(df.index[(df['cellvoltmin'] <0.1)], inplace=True)
- df.reset_index(inplace=True, drop=True)
- # df_table = df.drop_duplicates(subset=['SN'], keep='first', ignore_index=True)
- # df_table = df_table.set_index('SN')
- else:
- df = pd.DataFrame()
- #df_table = pd.DataFrame()
- cellvolt_name = []
- celltemp_name = []
- return df, cellvolt_name, celltemp_name #df_table,
- else:
- return pd.DataFrame(), [], [] # pd.DataFrame(),
-
- def get_all_csv_files_in_folder(folder_path):
- csv_files = []
- for file in os.listdir(folder_path):
- if file.lower().endswith('.csv'):
- csv_files.append(os.path.join(folder_path, file))
- return csv_files
- def concat_csv_files(csv_files):
- dfs = [pd.read_csv(file) for file in csv_files]
- return pd.concat(dfs, ignore_index=True)
- folder_path = "/data/common/hz/lifecycle/LUZAGAAAXKA008957" # 替换为实际的文件夹路径
- out_put_folder_path="/home/shouxueqi/projects/zlwl-algos/USER/shouxueqi/hz_datacleaning/LUZAGAAAXKA008957_charge.feather"
- csv_files_list = get_all_csv_files_in_folder(folder_path)
- if len(csv_files_list) > 0:
- concatenated_df = concat_csv_files(csv_files_list)
- if concatenated_df is not None:
- print('启动数据清洗')
- df_cleaned,cellvolt_name,celltemp_name=datacleaning(concatenated_df)
- df_cleaned.to_csv('clearned_data.csv')
- print('完成数据清洗')
- data_temp, df_chrg_splited, chrgr_splice_num, state_splice_num=split_data(df_cleaned)
- print('完成计算')
- df_chrg_splited.to_feather(out_put_folder_path)
- #print(concatenated_df)
- else:
- print("No CSV files found in the folder.")
- # def get_all_files_in_folder(folder_path):
- # all_files = []
- # for root, dirs, files in os.walk(folder_path):
- # for file in files:
- # file_path = os.path.join(root, file)
- # all_files.append(file_path)
- # return all_files
- # def concat_csv_files(csv_files):
- # dfs = []
- # for file in csv_files:
- # df = pd.read_csv(file)
- # dfs.append(df)
- # return dfs#pd.concat(dfs, ignore_index=True)
- # def open_first_csv_file(files_list):
- # for file_path in files_list:
- # if file_path.lower().endswith('.csv'):
- # df = pd.read_csv(file_path)
- # return df
- # return None
- # folder_path = "/data/common/hz/lifecycle/LUZAGAAAXKA008957" # 替换为实际的文件夹路径
- # out_put_folder_path="/home/shouxueqi/projects/zlwl-algos/USER/shouxueqi/hz_datacleaning/LUZAGAAAXKA008957_charge.feather"
- # file_list = get_all_files_in_folder(folder_path)
- # if len(file_list) > 0:
- # concatenated_df = concat_csv_files(file_list)
- # if concatenated_df is not None:
- # df_cleaned,cellvolt_name,celltemp_name=datacleaning(concatenated_df)
- # data_temp, df_chrg_splited, chrgr_splice_num, state_splice_num=split_data(df_cleaned)
- # df_chrg_splited.to_feather(out_put_folder_path)
- # print("Finished.")
- # else:
- # print("No CSV files found in the folder.")
|