123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- import datetime
- import gc
- import re
- from multiprocessing import Pool
- import json
- import logging
- import logging.handlers
- import os
- import time
- import traceback
- import warnings
- from sqlalchemy import text, delete, and_, or_, update
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- from ZlwlAlgosCommon.service.iotp.Beans import DataField
- from ZlwlAlgosCommon.orm.models import *
-
- def main(process_num):
- # 程序不能停止
- # while(True):
- warnings.filterwarnings("ignore")
- try:
- # 调用算法前的准备工作
- kafka_topic_key = 'topic_task_basic_test'
- kafka_groupid_key = 'group_id_task_basic_test'
- algo_list = ['DataBasic_Info'] # 本调度所包含的算法名列表。
-
- loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
- logger_main.info(f"process-{process_num}: 配置中间件")
-
- # mysql
- mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
- mysqlUtils = MysqlUtils()
- mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
- mysql_algo_conn = mysql_algo_engine.connect()
-
- # redis
- redis_params = sysUtils.get_cf_param('redis')
- redisUtils = RedisUtils()
- redis_conn = redisUtils.get_redis_conncect(redis_params)
-
- # hbase
- hbase_params = sysUtils.get_cf_param('hbase')
- iotp_service = IotpAlgoService(hbase_params=hbase_params)
- # kafka
- kafka_params = sysUtils.get_cf_param('kafka')
- kafkaUtils = KafkaUtils()
- kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
-
- logger_main.info(f"process-{process_num}: 获取算法参数及电池参数")
-
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- try:
- sn_list=['LY9139BB0MALBZ793']
- start_time='2022-01-01 00:00:00'
- end_time='2022-02-12 00:00:00'
- columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt,
- DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
- DataField.pack_soc, DataField.other_temp_value, DataField.bal_cell,
- DataField.pack_soh, DataField.charge_sta]
- df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
- current_path = os.path.dirname(os.path.abspath(__file__))
- excel_path = os.path.join(current_path , 'temp.xlsx')
- logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据")
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
-
- if __name__ == '__main__':
- #while(True):
- try:
- # 配置量
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/shouxueqi/projects/zlwl-algos" # 设置app绝对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "task_second_1" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- # 读取配置文件 (该部分请不要修改)
- processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
- pool = Pool(processes = int(processes))
- logger_main.info("开始分配子进程")
- for i in range(int(processes)):
- pool.apply_async(main, (i, ))
- pool.close()
- logger_main.info("进程分配结束,堵塞主进程")
- pool.join()
- except Exception as e:
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- finally:
- handlers = logger_main.handlers.copy()
- for h in handlers:
- logger_main.removeHandler(h)
- pool.terminate()
|