# 标签原始数据获取 from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService import pandas as pd # from DataSplit.V_1_0_0 import data_status as ds ##充电状态标准化程序 # from DataSplit.V_1_0_0 import data_split as dt ##分段函数程序 import traceback import datetime import sys sys.path.append("/home/zhuxi/project/zlwl-algos") from ALGOS.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_split as dt from ALGOS.PERIODIC.task_day_1.group_1.DataSplit.V_1_0_0 import data_status as ds from dateutil.relativedelta import * cur_env = 'dev' # 设置运行环境 app_path = "/home/zhuxi/project/zlwl-algos" # 设置相对路径 sysUtils = SysUtils(cur_env, app_path) hbase_params = sysUtils.get_cf_param('hbase') hbase_datafactory_params = sysUtils.get_cf_param('hbase-datafactory') iotp_service = IotpAlgoService(hbase_params=hbase_params) iotp_datafactory_service = IotpAlgoService(hbase_params=hbase_datafactory_params) mysql_datafactory_params = sysUtils.get_cf_param('mysql-datafactory') mysqlUtils = MysqlUtils() mysql_datafactory_engine, mysql_datafactory_Session= mysqlUtils.get_mysql_engine(mysql_datafactory_params) mysql_datafactory_conn = mysql_datafactory_engine.connect() dataSOH = pd.read_excel('sn-20210903.xlsx',sheet_name='科易6040') fileNames = dataSOH['SN号'] fileNames = list(fileNames) # 根据标签 从 数据集 取数 columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt, DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.cell_balance,DataField.latitude,DataField.longitude, DataField.pack_soh, DataField.charge_sta,DataField.mileage, DataField.accum_chg_wh, DataField.accum_dschg_wh, DataField.accum_chg_ah,DataField.accum_dschg_ah,DataField.vin] sql = f"select pack_code, param from algo_pack_param" df_algo_pack_param = pd.read_sql(sql, mysql_datafactory_conn) #pack_code='GM02010' pack_code='KY01710' pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1) pack_param = pack_param.to_dict("records") df_algo_pack_param = eval(pack_param[0]['param']) log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "download" # 应用名 logger_main = sysUtils.get_logger(app_name, log_base_path) list_col=[] for k in range(100): col=[] list_col.append(col) datelist=list(pd.date_range('20210501','20230531',freq='MS')) for sn in fileNames[3:]: #for sn in ['MGMCLN750N215I068']: for date in datelist: try: start_time=str(date) st=datetime.datetime.strptime(start_time,'%Y-%m-%d %H:%M:%S') #end=st+datetime.timedelta(days=1) end=st+ relativedelta(months = +1) end_time=str(end) # start_time='2021-05-01 00:00:00' # end_time='2023-05-31 00:00:00' df_data = iotp_datafactory_service.get_data(sn_list=[sn], columns=columns, start_time=start_time, end_time=end_time) if len(df_data)>0: df_bms,df_table,cellvolt_name,celltemp_name=iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗 df_data_accum=iotp_service.accum_datacleaning(df_data) df_data_gps=iotp_service.gps_datacleaning(df_data) df_data_vin=iotp_service.vin_datacleaning(df_data) if len(df_data_vin)==0: df_data_vin=pd.DataFrame(columns=['sn','time','datatype','vin']) df_data_accum['time'] = pd.to_datetime(df_data_accum['time'], format='%Y-%m-%d %H:%M:%S') df_data_gps['time'] = pd.to_datetime(df_data_gps['time'], format='%Y-%m-%d %H:%M:%S') df_data_vin['time'] = pd.to_datetime(df_data_vin['time'], format='%Y-%m-%d %H:%M:%S') df_merge_ga = pd.merge(df_bms, df_data_accum,on=['sn','time','datatype'],how='outer') df_merge_ga = pd.merge(df_merge_ga,df_data_gps,on=['sn','time','datatype'],how='outer') df_merge_ga = pd.merge(df_merge_ga,df_data_vin,on=['sn','time','datatype'],how='outer') df_merge_ga=df_merge_ga.sort_values(["sn","time"],ascending = [True, True]) df_merge_ga_filled = df_merge_ga.groupby("sn").fillna(method='ffill') df_merge_ga_filled= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1) df_merge_ga_filled = df_merge_ga_filled.groupby("sn").fillna(method='bfill') df_merge_ga= pd.concat([df_merge_ga[['sn']], df_merge_ga_filled], axis=1) df_merge_ga['time'] = pd.to_datetime(df_merge_ga['time'], format='%Y-%m-%d %H:%M:%S') df_merge=df_merge_ga.sort_values(["sn","time"],ascending = [True, True]) df_merge_filled = df_merge.groupby("sn").fillna(method='ffill') df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1) df_merge_filled = df_merge.groupby("sn").fillna(method='bfill') df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1) df_merge=df_merge.dropna(subset=['pack_crnt','latitude','longitude','vin'],axis=0,how='all') df_merge=df_merge[df_merge['datatype']==12] df_merge=ds.data_status(df_merge,c_soc_dif_p=0.05,s_soc_dif_p=0,c_order_delta=1200,s_order_delta=300) #df_merge['vin'] = df_merge['vin'].astype('str') #df_merge['vin'] = sn df_merge.reset_index(drop=True,inplace=True) df_drive,df_charge,df_stand,df_data_split_rlt=dt.split(df_merge,celltemp_name,cellvolt_name,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0) df_charge.drop(['cell_balance','cell_temp','cell_voltage','other_temp_value'],axis=1,errors='ignore',inplace=True) if len(df_charge)>0: chrg_columns = list(df_charge.columns) l=len(chrg_columns) for i in range(l): list_col[i].extend(list(df_charge[chrg_columns[i]])) data_charge=pd.DataFrame({chrg_columns[i]:list_col[i] for i in range(l)}) data_charge.to_feather('data_chargePK5002.feather') #data_charge.to_csv('data_charge.csv') except Exception as e: logger_main.error(f"process-{0}:{'MGMC'}数据清洗出错") logger_main.error(f"process-{0}:{e}") logger_main.error(f"process-{0}:{traceback.format_exc()}")