import datetime, time from multiprocessing import Pool import json import os import traceback import warnings from sqlalchemy import text, delete, and_, or_, update import pandas as pd import traceback import SOC_pre_vltVV import OCV_SOC from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * import sys from sklearn.datasets import load_iris import joblib import json #...................................电池包电芯安全诊断函数...................................................................................................................... def diag_cal(process_num):#, process # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境) # 根据sn 获取对应的参数 #Hbase algo_list = ['soc_pre'] # 本调度所包含的算法名列表。 process = 10 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") hbase_params = sysUtils.get_cf_param('hbase-datafactory') iotp_service = IotpAlgoService(hbase_params=hbase_params) sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device" mysql_kw_conn = mysql_kw_engine.connect() df_t_device = pd.read_sql(sql, mysql_kw_conn) sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param" df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn) sql = f"select pack_code, param from algo_pack_param" pack_code = 'JX18020' df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn) adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1) adjustable_param = adjustable_param.to_dict("records") pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1) pack_param = pack_param.to_dict("records") df_algo_pack_param = json.loads(pack_param[0]['param'].replace(' ', '')) df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()} # snlst = list(df_t_device[df_t_device['pack_model'] == pack_code]['sn'])[0:1]#[3*(process_num):3*(process_num+1)] pk_sn = list(df_t_device[(df_t_device['pack_model'] == pack_code)]['sn']) snlst_ort = [item for item in pk_sn if 'S23620' in item]# snlst = snlst_ort[3*(process_num):3*(process_num+1)] def getDateList(start_date, end_date): date_list = [] start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d') end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')# %H:%M:%S date_list.append(start_date.strftime('%Y-%m-%d')) while start_date < end_date: start_date += datetime.timedelta(days=1) date_list.append(start_date.strftime('%Y-%m-%d')) return date_list # dnn_predict = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/Dnnmodel.pkl') # input_norm_X = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/normorlized_x') # input_norm_Y = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/normorlized_y') # 使用os.walk()函数遍历文件夹A及其子文件夹 # snlst = ['PJXCLL532S234M030']#'PJXCLL532S234M007', 'PJXCLL532S234M010', 'PJXCLL532S234M004', 'PJXCLL532S234M030', 'PJXCLL532S234M018', , 'PJXCLL532S234M014', 'PJXCLL532S234M016' try: for itemsn in snlst: df_soc_pre = pd.DataFrame() try: time_st = time.time() # 获取文件的完整路径 itemsn = 'PJXCLL128N234P002'#, 'PJXCLL532S234M012', 'PJXCLL532S234M015' loggers['soc_pre'].info(f'开始执行算法{itemsn}') data_lst = getDateList("2023-06-29","2023-06-30") for item_date in range(0, len(data_lst)-1):# try: st_time = '2023-06-28 10:30:00'#data_lst[item_date] + ' 00:00:00' et = '2023-06-29 01:00:00'#data_lst[item_date + 1] + ' 09:00:00' # st_time = (datetime.datetime.strptime(st, '%Y-%m-%d %H:%M:%S') - datetime.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S') columns = [DataField.sn, DataField.time,DataField.pack_crnt, DataField.pack_volt, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.latitude, DataField.longitude, DataField.mileage, DataField.accum_chg_ah, DataField.accum_dschg_ah, DataField.accum_chg_wh, DataField.accum_dschg_wh] df_data_all = iotp_service.get_data(sn_list=[itemsn], columns=columns, start_time=st_time, end_time=et) loggers['soc_pre'].info(f'sn-{itemsn},起始-{st_time},结束-{et},获取到{len(df_data_all)}条数据,取数耗时:{time.time()-time_st}') if len(df_data_all) > 30: df_data_all['mileage'].replace(0, pd.np.nan, inplace=True) df_data_all['mileage'] = df_data_all['mileage'].fillna(method='backfill') df_data_all['mileage'] = df_data_all['mileage'].fillna(method='ffill') df_data_all.rename(columns={"mileage":"odo"}, inplace=True) # df_data_gps=iotp_service.gps_datacleaning(df_data_all) df_out, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data_all)#进行数据清洗 if (not df_out.empty):# and (not df_data_gps.empty) # df_out['time'] = df_out['time'].dt.strftime('%Y-%m-%d %H:%M:%S') # df_merge_gab = pd.merge(df_out, df_data_gps,on=['sn','time','datatype'],how='outer') # df_merge_gab_fill = df_merge_gab.groupby("sn").fillna(method='backfill') # df_merge_gab_fill = pd.concat([df_merge_gab[['sn']], df_merge_gab_fill], axis=1) # df_merge_gab_fill = df_merge_gab.groupby("sn").fillna(method='ffill') # df_merge_gab_fill = pd.concat([df_merge_gab[['sn']], df_merge_gab_fill], axis=1) # df_merge = df_merge_gab_fill.sort_values(["sn","time"],ascending = [True, True]) df_merge_cal = df_out[df_out['datatype']==12] if not df_merge_cal.empty: chrg_chracter = SOC_pre_vltVV.cell_chracter(df_merge_cal, df_algo_pack_param, cellvolt_name, celltemp_name)#计算内阻 soc_pre = chrg_chracter.states_cal() OCV_chrac = OCV_SOC.cell_statistic(df_algo_pack_param, df_merge_cal) OCV_soc_rlt = OCV_chrac.soc_cal_est() df_soc_rlt = pd.concat([soc_pre, OCV_soc_rlt]) if not df_soc_rlt.empty: df_soc_pre = df_soc_pre.append(df_soc_rlt) except Exception as e: loggers['soc_pre'].error('算法运行出错') loggers['soc_pre'].error(str(e)) loggers['soc_pre'].error(traceback.format_exc()) if not df_soc_pre.empty: df_soc_pre.drop_duplicates(subset = ['sn','time'], keep = 'last', inplace = True) df_soc_pre.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/SOC_rlt/' + itemsn + 'SOC估计结果VV.csv',index=False,encoding='GB18030') loggers['soc_pre'].info(f'算法运行完成{itemsn},算法耗时{time.time()-time_st}') except Exception as e: loggers['soc_pre'].error('算法运行出错') loggers['soc_pre'].error(str(e)) loggers['soc_pre'].error(traceback.format_exc()) except Exception as e: loggers['soc_pre'].error('算法运行出错') loggers['soc_pre'].error(str(e)) loggers['soc_pre'].error(traceback.format_exc()) #...............................................主函数起定时作用....................................................................................................................... if __name__ == "__main__": cur_env = 'dev' # 设置运行环境 app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "schedule" # 应用名, 建议与topic的后缀相同 sysUtils = SysUtils(cur_env, app_path) mysqlUtils = MysqlUtils() mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp') mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params) mysql_kw_params = sysUtils.get_cf_param('mysql-algo') mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(diag_cal, (i, ))#5,10, pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join()