main.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from datetime import datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from sqlalchemy import text, delete, and_, or_, update
  9. import pandas as pd
  10. import traceback
  11. import sta_stacs
  12. from ZlwlAlgosCommon.utils.ProUtils import *
  13. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  14. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  15. from ZlwlAlgosCommon.orm.models import *
  16. #...................................电池包电芯安全诊断函数......................................................................................................................
  17. def diag_cal(process_num, process):#
  18. # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境)
  19. # 根据sn 获取对应的参数
  20. #Hbase
  21. algo_list = ['soc_chracter'] # 本调度所包含的算法名列表。
  22. process = 10
  23. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  24. logger_main.info(f"process-{process_num}: 配置中间件")
  25. hbase_params = sysUtils.get_cf_param('hbase')
  26. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  27. sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
  28. mysql_kw_conn = mysql_kw_engine.connect()
  29. df_t_device = pd.read_sql(sql, mysql_kw_conn)
  30. sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
  31. df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
  32. sql = f"select pack_code, param from algo_pack_param"
  33. pack_code = 'CL3282A'
  34. df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
  35. adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  36. adjustable_param = adjustable_param.to_dict("records")
  37. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  38. pack_param = pack_param.to_dict("records")
  39. df_algo_pack_param = json.loads(pack_param[0]['param'])
  40. df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  41. folder_path = r"/data/common/benz"
  42. # 使用os.walk()函数遍历文件夹A及其子文件夹
  43. try:
  44. for root, dirs, files in os.walk(folder_path):
  45. # 遍历每个子文件夹
  46. for dir_name in dirs[5*(process_num):5*(process_num+1)]:#读取各个sn
  47. # 获取子文件夹的完整路径
  48. dir_path = os.path.join(root, dir_name)
  49. # print(f'Reading files from folder: {dir_path}')
  50. itemsn = dir_path.split('/')[-1]
  51. # 遍历当前子文件夹中的所有文件
  52. df_chrg_chrac = pd.DataFrame()
  53. for file_name in os.listdir(dir_path):#读取各sn下不同时间的数据
  54. # if (itemsn == 'LY9F49BC5MALBZ879') and (file_name == '2022-10-15-00-00-00_2022-10-22-00-00-00.zippkl'):
  55. try:
  56. time_st = time.time()
  57. # 获取文件的完整路径
  58. file_path = os.path.join(dir_path, file_name)
  59. loggers['soc_chracter'].info(f'开始执行算法{file_path}')
  60. df_data = pd.read_pickle(file_path, compression='zip')
  61. if len(df_data) > 30:
  62. df_data['mileage'].replace(0, pd.np.nan, inplace = True)
  63. df_data['mileage'].fillna(method = 'bfill', inplace = True)
  64. df_data['mileage'].fillna(method = 'ffill', inplace = True)
  65. df_out, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data)#进行数据清洗
  66. if len(df_out) > 30:
  67. Diagsts_temp = sta_stacs.cell_statistic(df_algo_pack_param, df_out)#计算内阻
  68. df_chrgrt_add = Diagsts_temp.rest_sta()
  69. if not df_chrgrt_add.empty:
  70. df_chrg_chrac = df_chrg_chrac.append(df_chrgrt_add)
  71. df_chrg_chrac.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
  72. df_chrg_chrac.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_pre/chracter/' + itemsn + '充电表单.csv',index=False,encoding='GB18030')
  73. loggers['soc_chracter'].info(f'算法运行完成{pack_code},算法耗时{time.time()-time_st}')
  74. except Exception as e:
  75. loggers['soc_chracter'].error('算法运行出错')
  76. loggers['soc_chracter'].error(str(e))
  77. loggers['soc_chracter'].error(traceback.format_exc())
  78. except Exception as e:
  79. loggers['soc_chracter'].error('算法运行出错')
  80. loggers['soc_chracter'].error(str(e))
  81. loggers['soc_chracter'].error(traceback.format_exc())
  82. #...............................................主函数起定时作用.......................................................................................................................
  83. if __name__ == "__main__":
  84. cur_env = 'dev' # 设置运行环境
  85. app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径
  86. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  87. app_name = "schedule" # 应用名, 建议与topic的后缀相同
  88. sysUtils = SysUtils(cur_env, app_path)
  89. mysqlUtils = MysqlUtils()
  90. mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
  91. mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
  92. mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
  93. mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
  94. logger_main = sysUtils.get_logger(app_name, log_base_path)
  95. logger_main.info(f"本次主进程号: {os.getpid()}")
  96. # 读取配置文件 (该部分请不要修改)
  97. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '10')) # 默认为1个进程
  98. pool = Pool(processes = int(processes))
  99. logger_main.info("开始分配子进程")
  100. for i in range(int(processes)):
  101. pool.apply_async(diag_cal, (i, 10,))#
  102. pool.close()
  103. logger_main.info("进程分配结束,堵塞主进程")
  104. pool.join()
  105. #log信息配置
  106. #读取fault_code=C599的当前故障
  107. # df_chrgrt = pd.read_csv(r'C:\Users\zldc\project\User\lzx\hz-application-algo\USER\lzx\状态统计\充电过程统计\充电表单-单车.csv' ,encoding='GB18030')
  108. # df_dschrg = pd.read_csv(r'C:\Users\zldc\project\User\lzx\hz-application-algo\USER\lzx\状态统计\充放电温度压差统计\放电表单\用车表单-单车.csv' ,encoding='GB18030')
  109. #定时任务.......................................................................................................................................................................
  110. # diag_cal()