import os import pandas as pd import numpy as np def clean_dead_value(series, num_dead_thresh): slide_list = [series.index[0]] slide_list_all = [] for i in range(series.index[0],series.index[-1]): j = i + 1 diff = series[j] - series[i] if diff == 0: slide_list.append(j) else: slide_list.clear() slide_list.append(j) if len(slide_list) >= num_dead_thresh: target_list = slide_list.copy() slide_list_all.append(target_list) index= [] # 将找到的满足条件的index合并 for i in range(len(slide_list_all) - 1): if set(slide_list_all[i]) < set(slide_list_all[i + 1]): index.append(i) m = {i: element for i, element in enumerate(slide_list_all)} [m.pop(i) for i in index] return list(m.values()) def crnt_insert(df_data_temp): df_data_temp['time'] = pd.to_datetime(df_data_temp['time']) df_data_temp.sort_values(by = ['time'], inplace = True) interval = pd.Timedelta(minutes = 5) threshold = pd.Timedelta(minutes = 20) inserted_data = pd.DataFrame(columns=df_data_temp.columns) for i in range(1, len(df_data_temp)): current_time = df_data_temp.iloc[i]['time'] prev_time = df_data_temp.iloc[i-1]['time'] time_diff = current_time - prev_time if time_diff >= threshold: num_intervals = int(time_diff / interval) - 1 new_times = pd.date_range(start=prev_time + interval, end=current_time - interval, freq=interval) new_rows = pd.DataFrame({col: np.nan for col in df_data_temp.columns}, index=range(len(new_times))) new_rows['time'] = new_times inserted_data = inserted_data.append(new_rows) df_data_temp = pd.concat([df_data_temp, inserted_data], ignore_index=True).sort_values('time') # df_data_temp['PackCrnt'] = df_data_temp['PackCrnt'].interpolate() df_data_temp['packcrnt'].fillna(0.2, inplace = True) df_data_temp.drop_duplicates(subset=['time'], keep = 'first', inplace = True) df_data_temp.sort_values(by = ['time'], inplace = True) df_data_temp.reset_index(drop = True, inplace = True) return df_data_temp def split_data(df_data2): df_data_ori = df_data2.copy() df_data_ori.reset_index(drop = True, inplace = True) df_data = crnt_insert(df_data_ori) df_data['crnt_flg'] = 0 df_data.loc[df_data['packcrnt'] > 0.05*102, 'crnt_flg'] = 1 df_data.loc[df_data['packcrnt'] < -0.05*102, 'crnt_flg'] = -1 df_data['sts_flg'] = 2 df_sts_chrg = pd.DataFrame(columns=list(df_data.columns)) df_crnt_flg = df_data['crnt_flg'] num_dead_thresh = 15 indexs_to_delelte = clean_dead_value(df_crnt_flg, num_dead_thresh)#获得连续数据所在的行 rest_num = len(indexs_to_delelte) if rest_num > 0:#仅有一个连续数据时 for splice_item in range(0, rest_num):#rest_num df_data_temp = df_data.iloc[indexs_to_delelte[splice_item][0]:(indexs_to_delelte[splice_item][-1]+1)]#获得电流连续数据 df_data_temp.reset_index(drop = True, inplace = True) # cal_ah_temp = cal_ah(df_data_temp) delta_soc = df_data_temp['packsoc'].iloc[-1] - df_data_temp['packsoc'].iloc[0] df_time_temp = pd.to_datetime(df_data_temp['time']) delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours') if all(df_data_temp['crnt_flg'] == 0):#静置判断 if delta_time > 0.17: df_data_temp['sts_flg'] = 0 df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) elif all(df_data_temp['crnt_flg'] == -1) and (delta_soc > 2):#充电判断0.1*self.capty df_data_temp['sts_flg'] = 1 df_time_temp = pd.to_datetime(df_data_temp['time']) df_soc_temp = df_data_temp['packsoc'] delta_soc = round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3) delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours') if delta_time >= 0.03: rate = round(delta_soc/(100*delta_time), 3) else: rate = 0 if abs(rate) > 0.3: status = 'offboard_charge' else: status = 'onboard_charge' if df_soc_temp.iloc[-1] >= 99: ful_chrg_flg = 1 else: ful_chrg_flg = 0 df_data_temp['charge_method']= status df_data_temp['full_charge_flg']= ful_chrg_flg df_sts_chrg = df_sts_chrg.append(df_data_temp) df_sts_chrg.reset_index(drop = True, inplace = True) # elif all(df_data_temp['crnt_flg'] == 1) and (cal_ah_temp > 0.1*self.capty):#充电判断 # df_data_temp['sts_flg'] = 1 # df_sts_chrg = df_sts_chrg.append(df_data_temp) # df_sts_chrg.reset_index(drop = True, inplace = True) df_dschrg = pd.concat([df_data, df_sts_chrg, df_sts_chrg]).drop_duplicates(subset = ['time', 'packsoc'], keep = False) data_temp = pd.concat([df_dschrg, df_sts_chrg]) data_temp.sort_values(by = 'time', inplace = True) data_temp.reset_index(drop = True, inplace = True) mask = (data_temp['sts_flg'] == 2) & (data_temp['sts_flg'].shift(13) == 0) & (data_temp['sts_flg'].shift(-13) == 0) data_temp.loc[mask, 'sts_flg'] = 0#静置中的单点行车修改为静置 df_chrg_temp = pd.DataFrame() chrgr_splice_num = [] if len(df_sts_chrg) > 30: #---------------------------------充电过程判断-------------------------------------- df_chrg_temp = data_temp.loc[(data_temp['sts_flg'] == 1)]#获取电池充电段 if not df_chrg_temp.empty: df_chrg_temp.reset_index(inplace = True, drop = True) chrgr_time = pd.to_datetime(df_chrg_temp['time']) delta_time = (np.diff(chrgr_time)/pd.Timedelta(1, 'min'))# pos = np.where(delta_time > 30) splice_num = [] if len(pos[0]) >= 1: pos_ful_tem = np.insert(pos, 0, 0) pos_len = len(pos_ful_tem) data_len = len(chrgr_time) pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1) for item in range(0,len(pos_ful)-1): splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item])) splice_num = np.insert(splice_num, 0, 0) else: splice_num = np.zeros(len(chrgr_time)) pos_ful = np.array([0]) if len(splice_num) > 0: df_chrg_temp['chrgr_rest'] = splice_num chrgr_splice_num = np.unique(df_chrg_temp['chrgr_rest'])#判断有几段充电数据 if not data_temp.empty: stat_delta_flg = np.diff(data_temp['sts_flg'])#计算时间差的分钟数 delta_time = np.diff(pd.to_datetime(data_temp['time']))/pd.Timedelta(1, 'min') stat_pos = np.where((stat_delta_flg != 0) | (delta_time > 40))#充电数据分段,大于10min时,认为是两个充电过程 sts_splice_num = [] if len(stat_pos[0]) >= 1: pos_ful_tem = np.insert(stat_pos, 0, 0) pos_len = len(pos_ful_tem) data_len = len(data_temp) pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1) for item in range(0,len(pos_ful)-1): sts_splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item])) sts_splice_num = np.insert(sts_splice_num, 0, 0) else: sts_splice_num = np.zeros(len(data_temp)) pos_ful = np.array([0]) if len(sts_splice_num) > 0: data_temp['state_bms'] = sts_splice_num state_splice_num = np.unique(data_temp['state_bms'])#判断有几段充电数据 return data_temp, df_chrg_temp, chrgr_splice_num, state_splice_num def datacleaning(df): if not df.empty: # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai') df=df.replace('', np.nan) df.dropna(axis=0,subset = [ "time", "cellvoltage", "celltemp", "packcrnt"], inplace=True) df['time'] = pd.to_datetime(df['time'], unit='s') #+ pd.Timedelta(hours=8) df.drop(df.index[(df['packvoltage'] > 1000) | abs(df['packcrnt'] > 999) | (df['packsoc'] > 100) ], inplace=True) df.drop(df.index[(df['packvoltage'] < 0.001) & (abs(df['packcrnt']) < 0.001) ], inplace=True) if not df.empty: df.sort_values(by="time", inplace=True) df.drop_duplicates(subset="time", inplace=True) df.reset_index(drop=True, inplace=True) #num_lst = df.loc[:,'CellVoltTotalCount'].value_counts() #统计电芯数量这一列每个元素出现的个数 CellVoltNums = 96 #num_lst.idxmax() # 找出电芯数量出现最多的次数 #num_lst = df.loc[:,'CellTempTotalCount'].value_counts() #统计电芯数量这一列每个元素出现的个数 CellTempNums = 30#num_lst.idxmax() # 找出电芯数量出现最多的次数 cellvolt_name=['cellvoltage'+str(x) for x in range(1, CellVoltNums+1)] celltemp_name=['celltemp'+str(x) for x in range(1, CellTempNums+1)] df_volt=pd.DataFrame([x[0].split(",") for x in np.array(df[['cellvoltage']])]).iloc[:,list(range(CellVoltNums))] df_volt.columns = cellvolt_name df_volt=df_volt.astype('float') cellvoltmax = df_volt.max(axis=1) cellvoltmin = df_volt.min(axis=1) df_volt[['cellvoltmax','cellvoltmin']] = pd.concat([cellvoltmax,cellvoltmin], axis=1) df_temp=pd.DataFrame([x[0].split(",") for x in np.array(df[['celltemp']])]).iloc[:,list(range(CellTempNums))] df_temp.columns = celltemp_name df_temp=df_temp.astype('float') celltempmax = df_temp.max(axis=1) celltempmin = df_temp.min(axis=1) df_temp[['celltempmax','celltempmin']] = pd.concat([celltempmax,celltempmin], axis=1) df=pd.concat([df,df_volt,df_temp],axis=1) df.drop(df.index[(df['cellvoltmin'] <0.1)], inplace=True) df.reset_index(inplace=True, drop=True) # df_table = df.drop_duplicates(subset=['SN'], keep='first', ignore_index=True) # df_table = df_table.set_index('SN') else: df = pd.DataFrame() #df_table = pd.DataFrame() cellvolt_name = [] celltemp_name = [] return df, cellvolt_name, celltemp_name #df_table, else: return pd.DataFrame(), [], [] # pd.DataFrame(), def get_all_csv_files_in_folder(folder_path): csv_files = [] for file in os.listdir(folder_path): if file.lower().endswith('.csv'): csv_files.append(os.path.join(folder_path, file)) return csv_files def concat_csv_files(csv_files): dfs = [pd.read_csv(file) for file in csv_files] return pd.concat(dfs, ignore_index=True) folder_path = "/data/common/hz/lifecycle/LUZAGAAAXKA008957" # 替换为实际的文件夹路径 out_put_folder_path="/home/shouxueqi/projects/zlwl-algos/USER/shouxueqi/hz_datacleaning/LUZAGAAAXKA008957_charge.feather" csv_files_list = get_all_csv_files_in_folder(folder_path) if len(csv_files_list) > 0: concatenated_df = concat_csv_files(csv_files_list) if concatenated_df is not None: print('启动数据清洗') df_cleaned,cellvolt_name,celltemp_name=datacleaning(concatenated_df) df_cleaned.to_csv('clearned_data.csv') print('完成数据清洗') data_temp, df_chrg_splited, chrgr_splice_num, state_splice_num=split_data(df_cleaned) print('完成计算') df_chrg_splited.to_feather(out_put_folder_path) #print(concatenated_df) else: print("No CSV files found in the folder.") # def get_all_files_in_folder(folder_path): # all_files = [] # for root, dirs, files in os.walk(folder_path): # for file in files: # file_path = os.path.join(root, file) # all_files.append(file_path) # return all_files # def concat_csv_files(csv_files): # dfs = [] # for file in csv_files: # df = pd.read_csv(file) # dfs.append(df) # return dfs#pd.concat(dfs, ignore_index=True) # def open_first_csv_file(files_list): # for file_path in files_list: # if file_path.lower().endswith('.csv'): # df = pd.read_csv(file_path) # return df # return None # folder_path = "/data/common/hz/lifecycle/LUZAGAAAXKA008957" # 替换为实际的文件夹路径 # out_put_folder_path="/home/shouxueqi/projects/zlwl-algos/USER/shouxueqi/hz_datacleaning/LUZAGAAAXKA008957_charge.feather" # file_list = get_all_files_in_folder(folder_path) # if len(file_list) > 0: # concatenated_df = concat_csv_files(file_list) # if concatenated_df is not None: # df_cleaned,cellvolt_name,celltemp_name=datacleaning(concatenated_df) # data_temp, df_chrg_splited, chrgr_splice_num, state_splice_num=split_data(df_cleaned) # df_chrg_splited.to_feather(out_put_folder_path) # print("Finished.") # else: # print("No CSV files found in the folder.")