datadownload.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. import datetime
  2. import gc
  3. import re
  4. from multiprocessing import Pool
  5. import json
  6. import logging
  7. import logging.handlers
  8. import os
  9. import time
  10. import traceback
  11. import warnings
  12. from sqlalchemy import text, delete, and_, or_, update
  13. import pandas as pd
  14. from ZlwlAlgosCommon.utils.ProUtils import *
  15. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  16. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  17. from ZlwlAlgosCommon.orm.models import *
  18. def main(process_num):
  19. # 程序不能停止
  20. # while(True):
  21. warnings.filterwarnings("ignore")
  22. try:
  23. # 调用算法前的准备工作
  24. kafka_topic_key = 'topic_task_basic_test'
  25. kafka_groupid_key = 'group_id_task_basic_test'
  26. algo_list = ['DataBasic_Info'] # 本调度所包含的算法名列表。
  27. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  28. logger_main.info(f"process-{process_num}: 配置中间件")
  29. # mysql
  30. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  31. mysqlUtils = MysqlUtils()
  32. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  33. mysql_algo_conn = mysql_algo_engine.connect()
  34. # redis
  35. redis_params = sysUtils.get_cf_param('redis')
  36. redisUtils = RedisUtils()
  37. redis_conn = redisUtils.get_redis_conncect(redis_params)
  38. # hbase
  39. hbase_params = sysUtils.get_cf_param('hbase')
  40. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  41. # kafka
  42. kafka_params = sysUtils.get_cf_param('kafka')
  43. kafkaUtils = KafkaUtils()
  44. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  45. logger_main.info(f"process-{process_num}: 获取算法参数及电池参数")
  46. except Exception as e:
  47. logger_main.error(str(e))
  48. logger_main.error(traceback.format_exc())
  49. try:
  50. sn_list=['LY9139BB0MALBZ793']
  51. start_time='2022-01-01 00:00:00'
  52. end_time='2022-02-12 00:00:00'
  53. columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt,
  54. DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
  55. DataField.pack_soc, DataField.other_temp_value, DataField.bal_cell,
  56. DataField.pack_soh, DataField.charge_sta]
  57. df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  58. current_path = os.path.dirname(os.path.abspath(__file__))
  59. excel_path = os.path.join(current_path , 'temp.xlsx')
  60. logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据")
  61. except Exception as e:
  62. logger_main.error(str(e))
  63. logger_main.error(traceback.format_exc())
  64. if __name__ == '__main__':
  65. #while(True):
  66. try:
  67. # 配置量
  68. cur_env = 'dev' # 设置运行环境
  69. app_path = "/home/shouxueqi/projects/zlwl-algos" # 设置app绝对路径
  70. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  71. app_name = "task_second_1" # 应用名
  72. sysUtils = SysUtils(cur_env, app_path)
  73. logger_main = sysUtils.get_logger(app_name, log_base_path)
  74. logger_main.info(f"本次主进程号: {os.getpid()}")
  75. # 读取配置文件 (该部分请不要修改)
  76. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  77. pool = Pool(processes = int(processes))
  78. logger_main.info("开始分配子进程")
  79. for i in range(int(processes)):
  80. pool.apply_async(main, (i, ))
  81. pool.close()
  82. logger_main.info("进程分配结束,堵塞主进程")
  83. pool.join()
  84. except Exception as e:
  85. print(traceback.format_exc())
  86. logger_main.error(str(e))
  87. logger_main.error(traceback.format_exc())
  88. finally:
  89. handlers = logger_main.handlers.copy()
  90. for h in handlers:
  91. logger_main.removeHandler(h)
  92. pool.terminate()