import pandas as pd import numpy as np import logging import logging.handlers import os import re class OfflineAlgoUtils: def __init__(self): pass def get_log_handler(self, log_file, level=logging.INFO): # 根据日期滚动 if not os.path.exists(os.path.dirname(log_file)): os.makedirs(os.path.dirname(log_file)) fh = logging.handlers.TimedRotatingFileHandler(filename=log_file, when="D", interval=1, backupCount=7, encoding="utf-8") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M-%S" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}") fh.setFormatter(formatter) fh.setLevel(level) return fh def get_logger(self, log_name, log_path): # 日志配置 logger = logging.getLogger(log_name) logger.setLevel(logging.INFO) logger.addHandler(self.get_log_handler("{}/{}.info.log".format(log_path, log_name), logging.INFO)) logger.addHandler(self.get_log_handler("{}/{}.error.log".format(log_path, log_name), logging.ERROR)) return logger def datacleaning(self, df_algo_pack_param, df_in, droplmt=1): df_in=df_in[df_in['datatype']==12] df_in = df_in.drop(['latitude','longitude','mileage','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah','vin','accum_energyfeed_wh','accum_energyfeed_ah'], axis=1, errors='ignore') if not df_in.empty: # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai') df_in=df_in.replace('[]', np.nan) df_in.dropna(axis=0,subset = ["time", "sn", "cell_voltage", "cell_temp", "pack_crnt"], inplace=True) df_in['time'] = pd.to_datetime(df_in['time'], format='%Y-%m-%d %H:%M:%S') if droplmt==1: df_in.drop(df_in.index[(df_in['pack_volt'] < 0.001) | (df_in['pack_volt'] > 1000) | (df_in['pack_soc'] > 100) | (df_in['pack_soc'] < 0) | (df_in['pack_crnt'] > 1000) | (df_in['pack_crnt'] < -1000)], inplace=True) else: pass if not df_in.empty: df_in = df_in.groupby('sn',group_keys=False).apply(lambda x:x.sort_values('time')) df_in.reset_index(drop=True, inplace=True) #电压、温度分列 CellVoltNums=int(df_algo_pack_param['CellVoltTotalCount']) CellTempNums = int(df_algo_pack_param['CellTempTotalCount']) cellvolt_name=['cell_voltage'+str(x) for x in range(1, CellVoltNums+1)] celltemp_name=['cell_temp'+str(x) for x in range(1, CellTempNums+1)] df_volt = df_in['cell_voltage'].apply(lambda x : pd.Series(list(x)[:CellVoltNums])) df_volt.columns = cellvolt_name df_volt=df_volt.astype('float') cellvoltmax = df_volt.max(axis=1) cellvoltmin = df_volt.min(axis=1) df_volt[['cell_volt_max','cell_volt_min']] = pd.concat([cellvoltmax,cellvoltmin], axis=1) df_temp = df_in['cell_temp'].apply(lambda x : pd.Series(list(x)[:CellTempNums])) df_temp.columns = celltemp_name df_temp=df_temp.astype('float') celltempmax = df_temp.max(axis=1) celltempmin = df_temp.min(axis=1) df_temp[['cell_temp_max','cell_temp_min']] = pd.concat([celltempmax,celltempmin], axis=1) #其他温度分列 if len(df_in['other_temp_value'].loc[0]): df_otherTemp_name=['mos_temp', 'env_temp', 'fastcharg_connector_temp', 'onc_connector_temp', 'heat_plate1_temp', 'heat_plate2_temp', 'connector_1_temp','connector_2_temp', 'pcb_temp', 'bat_inner_temp'] df_otherTemp=pd.DataFrame([list(x[0]) for x in np.array(df_in[['other_temp_value']])]).iloc[:,list(range(len(df_otherTemp_name)))] df_otherTemp.columns=df_otherTemp_name df_otherTemp=df_otherTemp.astype('float') df_out = pd.concat([df_in, df_volt, df_temp, df_otherTemp],axis=1) else: df_out = pd.concat([df_in, df_volt, df_temp],axis=1) # df_out.dropna(axis=0, inplace=True) df_out.dropna(axis=0,subset = cellvolt_name+celltemp_name, inplace=True) df_out.reset_index(inplace=True, drop=True) df_table = df_out.drop_duplicates(subset=['sn'], keep='first', ignore_index=True) df_table = df_table.set_index('sn') else: df_out = pd.DataFrame() df_table = pd.DataFrame() cellvolt_name = [] celltemp_name = [] return df_out, df_table, cellvolt_name, celltemp_name else: return pd.DataFrame(), pd.DataFrame(), [], [] def gps_datacleaning(self, df_in): df_in=df_in[df_in['datatype']==16] df_in=df_in[['sn','time','datatype','latitude','longitude','mileage']] if not df_in.empty: # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai') df_in=df_in.replace('[]', np.nan) df_in=df_in.replace('',np.nan) df_in=df_in.dropna(axis=0,how='any') if 'latitude' in df_in.columns: df_in["latitude"]=df_in["latitude"].astype(float) if 'longitude' in df_in.columns: df_in["longitude"]=df_in["longitude"].astype(float) if 'mileage' in df_in.columns: df_in["mileage"]=df_in["mileage"].astype(float) ##处理经纬度为0的情况 df_in=df_in.replace(0,np.nan) df_in=df_in.sort_values(["sn","time"],ascending = [True, True]) df_in_filled = df_in.groupby("sn").fillna(method='ffill') df_out = pd.concat([df_in[['sn']], df_in_filled], axis=1) df_out.reset_index(inplace=True, drop=True) return df_out else: return pd.DataFrame() def accum_datacleaning(self, df_in): df_in=df_in[df_in['datatype']==23] df_in=df_in[['sn','time','datatype','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah','accum_energyfeed_wh','accum_energyfeed_ah']] if not df_in.empty: # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai') df_in=df_in.replace('[]', np.nan) df_in=df_in.replace('',np.nan) df_in=df_in.dropna(axis=0,how='any') if 'accum_energyfeed_wh' in df_in.columns: df_in["accum_energyfeed_wh"]=df_in["accum_energyfeed_wh"].astype(float) if 'accum_chg_wh' in df_in.columns: df_in["accum_chg_wh"]=df_in["accum_chg_wh"].astype(float) if 'accum_dschg_wh' in df_in.columns: df_in["accum_dschg_wh"]=df_in["accum_dschg_wh"].astype(float) ##处理经纬度为0的情况 df_in=df_in.replace(0,np.nan) df_in=df_in.sort_values(["sn","time"],ascending = [True, True]) df_in_filled = df_in.groupby("sn").fillna(method='ffill') df_in_filled = df_in.groupby("sn").fillna(method='bfill') df_out = pd.concat([df_in[['sn']], df_in_filled], axis=1) df_out.reset_index(inplace=True, drop=True) return df_out else: return pd.DataFrame() def vin_datacleaning(self, df_in): df_in=df_in[df_in['datatype']==50] df_in=df_in[['sn','time','datatype','vin']] if not df_in.empty: df_in["vin"].replace("","z",inplace=True) df_out= df_in return df_out else: return pd.DataFrame()