123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- import pandas as pd
- import numpy as np
- import logging
- import logging.handlers
- import os
- import re
- class OfflineAlgoUtils:
-
-
- def __init__(self):
- pass
-
- def get_log_handler(self, log_file, level=logging.INFO):
- # 根据日期滚动
- if not os.path.exists(os.path.dirname(log_file)):
- os.makedirs(os.path.dirname(log_file))
- fh = logging.handlers.TimedRotatingFileHandler(filename=log_file, when="D", interval=1, backupCount=7,
- encoding="utf-8")
- formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
- fh.suffix = "%Y-%m-%d_%H-%M-%S"
- fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
- fh.setFormatter(formatter)
- fh.setLevel(level)
- return fh
-
- def get_logger(self, log_name, log_path):
- # 日志配置
- logger = logging.getLogger(log_name)
- logger.setLevel(logging.INFO)
- logger.addHandler(self.get_log_handler("{}/{}.info.log".format(log_path, log_name), logging.INFO))
- logger.addHandler(self.get_log_handler("{}/{}.error.log".format(log_path, log_name), logging.ERROR))
- return logger
-
- def datacleaning(self, df_algo_pack_param, df_in, droplmt=1):
- df_in=df_in[df_in['datatype']==12]
- df_in = df_in.drop(['latitude','longitude','mileage','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah','vin','accum_energyfeed_wh','accum_energyfeed_ah'], axis=1, errors='ignore')
- if not df_in.empty:
- # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
- df_in=df_in.replace('[]', np.nan)
- df_in.dropna(axis=0,subset = ["time", "sn", "cell_voltage", "cell_temp", "pack_crnt"], inplace=True)
- df_in['time'] = pd.to_datetime(df_in['time'], format='%Y-%m-%d %H:%M:%S')
- if droplmt==1:
- df_in.drop(df_in.index[(df_in['pack_volt'] < 0.001) | (df_in['pack_volt'] > 1000) | (df_in['pack_soc'] > 100) | (df_in['pack_soc'] < 0) | (df_in['pack_crnt'] > 1000) | (df_in['pack_crnt'] < -1000)], inplace=True)
- else:
- pass
- if not df_in.empty:
- df_in = df_in.groupby('sn',group_keys=False).apply(lambda x:x.sort_values('time'))
- df_in.reset_index(drop=True, inplace=True)
- #电压、温度分列
- CellVoltNums=int(df_algo_pack_param['CellVoltTotalCount'])
- CellTempNums = int(df_algo_pack_param['CellTempTotalCount'])
- cellvolt_name=['cell_voltage'+str(x) for x in range(1, CellVoltNums+1)]
- celltemp_name=['cell_temp'+str(x) for x in range(1, CellTempNums+1)]
- df_volt = df_in['cell_voltage'].apply(lambda x : pd.Series(list(x)[:CellVoltNums]))
- df_volt.columns = cellvolt_name
- df_volt=df_volt.astype('float')
- cellvoltmax = df_volt.max(axis=1)
- cellvoltmin = df_volt.min(axis=1)
- df_volt[['cell_volt_max','cell_volt_min']] = pd.concat([cellvoltmax,cellvoltmin], axis=1)
- df_temp = df_in['cell_temp'].apply(lambda x : pd.Series(list(x)[:CellTempNums]))
- df_temp.columns = celltemp_name
- df_temp=df_temp.astype('float')
- celltempmax = df_temp.max(axis=1)
- celltempmin = df_temp.min(axis=1)
- df_temp[['cell_temp_max','cell_temp_min']] = pd.concat([celltempmax,celltempmin], axis=1)
- #其他温度分列
- if len(df_in['other_temp_value'].loc[0]):
- df_otherTemp_name=['mos_temp', 'env_temp', 'fastcharg_connector_temp',
- 'onc_connector_temp', 'heat_plate1_temp', 'heat_plate2_temp', 'connector_1_temp','connector_2_temp', 'pcb_temp', 'bat_inner_temp']
- df_otherTemp=pd.DataFrame([list(x[0]) for x in np.array(df_in[['other_temp_value']])]).iloc[:,list(range(len(df_otherTemp_name)))]
- df_otherTemp.columns=df_otherTemp_name
- df_otherTemp=df_otherTemp.astype('float')
- df_out = pd.concat([df_in, df_volt, df_temp, df_otherTemp],axis=1)
- else:
- df_out = pd.concat([df_in, df_volt, df_temp],axis=1)
-
- # df_out.dropna(axis=0, inplace=True)
- df_out.dropna(axis=0,subset = cellvolt_name+celltemp_name, inplace=True)
- df_out.reset_index(inplace=True, drop=True)
- df_table = df_out.drop_duplicates(subset=['sn'], keep='first', ignore_index=True)
- df_table = df_table.set_index('sn')
- else:
- df_out = pd.DataFrame()
- df_table = pd.DataFrame()
- cellvolt_name = []
- celltemp_name = []
- return df_out, df_table, cellvolt_name, celltemp_name
- else:
- return pd.DataFrame(), pd.DataFrame(), [], []
- def gps_datacleaning(self, df_in):
- df_in=df_in[df_in['datatype']==16]
- df_in=df_in[['sn','time','datatype','latitude','longitude','mileage']]
- if not df_in.empty:
- # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
- df_in=df_in.replace('[]', np.nan)
- df_in=df_in.replace('',np.nan)
- df_in=df_in.dropna(axis=0,how='any')
-
- if 'latitude' in df_in.columns:
- df_in["latitude"]=df_in["latitude"].astype(float)
- if 'longitude' in df_in.columns:
- df_in["longitude"]=df_in["longitude"].astype(float)
- if 'mileage' in df_in.columns:
- df_in["mileage"]=df_in["mileage"].astype(float)
- ##处理经纬度为0的情况
- df_in=df_in.replace(0,np.nan)
- df_in=df_in.sort_values(["sn","time"],ascending = [True, True])
- df_in_filled = df_in.groupby("sn").fillna(method='ffill')
- df_out = pd.concat([df_in[['sn']], df_in_filled], axis=1)
- df_out.reset_index(inplace=True, drop=True)
-
- return df_out
- else:
- return pd.DataFrame()
- def accum_datacleaning(self, df_in):
- df_in=df_in[df_in['datatype']==23]
- df_in=df_in[['sn','time','datatype','accum_chg_wh','accum_dschg_wh','accum_dschg_ah','accum_chg_ah','accum_energyfeed_wh','accum_energyfeed_ah']]
- if not df_in.empty:
- # df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
- df_in=df_in.replace('[]', np.nan)
- df_in=df_in.replace('',np.nan)
- df_in=df_in.dropna(axis=0,how='any')
-
- if 'accum_energyfeed_wh' in df_in.columns:
- df_in["accum_energyfeed_wh"]=df_in["accum_energyfeed_wh"].astype(float)
- if 'accum_chg_wh' in df_in.columns:
- df_in["accum_chg_wh"]=df_in["accum_chg_wh"].astype(float)
- if 'accum_dschg_wh' in df_in.columns:
- df_in["accum_dschg_wh"]=df_in["accum_dschg_wh"].astype(float)
- ##处理经纬度为0的情况
- df_in=df_in.replace(0,np.nan)
- df_in=df_in.sort_values(["sn","time"],ascending = [True, True])
- df_in_filled = df_in.groupby("sn").fillna(method='ffill')
- df_in_filled = df_in.groupby("sn").fillna(method='bfill')
- df_out = pd.concat([df_in[['sn']], df_in_filled], axis=1)
- df_out.reset_index(inplace=True, drop=True)
- return df_out
- else:
- return pd.DataFrame()
- def vin_datacleaning(self, df_in):
- df_in=df_in[df_in['datatype']==50]
- df_in=df_in[['sn','time','datatype','vin']]
- if not df_in.empty:
- df_in["vin"].replace("","z",inplace=True)
- df_out= df_in
- return df_out
- else:
- return pd.DataFrame()
|