from datetime import datetime from multiprocessing import Pool import json import os import time import traceback import warnings from sqlalchemy import text, delete, and_, or_, update import pandas as pd import traceback import sta_stacs from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * #...................................电池包电芯安全诊断函数...................................................................................................................... def diag_cal(process_num, process):# # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境) # 根据sn 获取对应的参数 #Hbase algo_list = ['soc_chracter'] # 本调度所包含的算法名列表。 process = 10 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") hbase_params = sysUtils.get_cf_param('hbase') iotp_service = IotpAlgoService(hbase_params=hbase_params) sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device" mysql_kw_conn = mysql_kw_engine.connect() df_t_device = pd.read_sql(sql, mysql_kw_conn) sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param" df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn) sql = f"select pack_code, param from algo_pack_param" pack_code = 'CL3282A' df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn) adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1) adjustable_param = adjustable_param.to_dict("records") pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1) pack_param = pack_param.to_dict("records") df_algo_pack_param = json.loads(pack_param[0]['param']) df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()} folder_path = r"/data/common/benz" # 使用os.walk()函数遍历文件夹A及其子文件夹 try: for root, dirs, files in os.walk(folder_path): # 遍历每个子文件夹 for dir_name in dirs[5*(process_num):5*(process_num+1)]:#读取各个sn # 获取子文件夹的完整路径 dir_path = os.path.join(root, dir_name) # print(f'Reading files from folder: {dir_path}') itemsn = dir_path.split('/')[-1] # 遍历当前子文件夹中的所有文件 df_chrg_chrac = pd.DataFrame() for file_name in os.listdir(dir_path):#读取各sn下不同时间的数据 # if (itemsn == 'LY9F49BC5MALBZ879') and (file_name == '2022-10-15-00-00-00_2022-10-22-00-00-00.zippkl'): try: time_st = time.time() # 获取文件的完整路径 file_path = os.path.join(dir_path, file_name) loggers['soc_chracter'].info(f'开始执行算法{file_path}') df_data = pd.read_pickle(file_path, compression='zip') if len(df_data) > 30: df_data['mileage'].replace(0, pd.np.nan, inplace = True) df_data['mileage'].fillna(method = 'bfill', inplace = True) df_data['mileage'].fillna(method = 'ffill', inplace = True) df_out, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗 if len(df_out) > 30: Diagsts_temp = sta_stacs.cell_statistic(df_algo_pack_param, df_out)#计算内阻 df_chrgrt_add = Diagsts_temp.rest_sta() if not df_chrgrt_add.empty: df_chrg_chrac = df_chrg_chrac.append(df_chrgrt_add) df_chrg_chrac.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True) df_chrg_chrac.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_pre/chracter/' + itemsn + '充电表单.csv',index=False,encoding='GB18030') loggers['soc_chracter'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}') except Exception as e: loggers['soc_chracter'].error('算法运行出错') loggers['soc_chracter'].error(str(e)) loggers['soc_chracter'].error(traceback.format_exc()) except Exception as e: loggers['soc_chracter'].error('算法运行出错') loggers['soc_chracter'].error(str(e)) loggers['soc_chracter'].error(traceback.format_exc()) #...............................................主函数起定时作用....................................................................................................................... if __name__ == "__main__": cur_env = 'dev' # 设置运行环境 app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "schedule" # 应用名, 建议与topic的后缀相同 sysUtils = SysUtils(cur_env, app_path) mysqlUtils = MysqlUtils() mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp') mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params) mysql_kw_params = sysUtils.get_cf_param('mysql-algo') mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '10')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(diag_cal, (i, 10,))# pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join() #log信息配置 #读取fault_code=C599的当前故障 # df_chrgrt = pd.read_csv(r'C:\Users\zldc\project\User\lzx\hz-application-algo\USER\lzx\状态统计\充电过程统计\充电表单-单车.csv' ,encoding='GB18030') # df_dschrg = pd.read_csv(r'C:\Users\zldc\project\User\lzx\hz-application-algo\USER\lzx\状态统计\充放电温度压差统计\放电表单\用车表单-单车.csv' ,encoding='GB18030') #定时任务....................................................................................................................................................................... # diag_cal()