import datetime import gc import re from multiprocessing import Pool import json import logging import logging.handlers import os import time import traceback import warnings from sqlalchemy import text, delete, and_, or_, update import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * def main(process_num): # 程序不能停止 # while(True): warnings.filterwarnings("ignore") try: # 调用算法前的准备工作 kafka_topic_key = 'topic_task_basic_test' kafka_groupid_key = 'group_id_task_basic_test' algo_list = ['DataBasic_Info'] # 本调度所包含的算法名列表。 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) mysql_algo_conn = mysql_algo_engine.connect() # redis redis_params = sysUtils.get_cf_param('redis') redisUtils = RedisUtils() redis_conn = redisUtils.get_redis_conncect(redis_params) # hbase hbase_params = sysUtils.get_cf_param('hbase') iotp_service = IotpAlgoService(hbase_params=hbase_params) # kafka kafka_params = sysUtils.get_cf_param('kafka') kafkaUtils = KafkaUtils() kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key) logger_main.info(f"process-{process_num}: 获取算法参数及电池参数") except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) try: sn_list=['LY9139BB0MALBZ793'] start_time='2022-01-01 00:00:00' end_time='2022-02-12 00:00:00' columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt, DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.bal_cell, DataField.pack_soh, DataField.charge_sta] df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time) current_path = os.path.dirname(os.path.abspath(__file__)) excel_path = os.path.join(current_path , 'temp.xlsx') logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据") except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) if __name__ == '__main__': #while(True): try: # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "/home/shouxueqi/projects/zlwl-algos" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_second_1" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(main, (i, )) pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join() except Exception as e: print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) finally: handlers = logger_main.handlers.copy() for h in handlers: logger_main.removeHandler(h) pool.terminate()