main_hz.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from datetime import datetime
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import time
  6. import traceback
  7. import warnings
  8. from sqlalchemy import text, delete, and_, or_, update
  9. import pandas as pd
  10. import OCV_cal_V5
  11. import sta_stacs
  12. import traceback
  13. import numpy as np
  14. from ZlwlAlgosCommon.utils.ProUtils import *
  15. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  16. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  17. from ZlwlAlgosCommon.orm.models import *
  18. #...................................电池包电芯安全诊断函数......................................................................................................................
  19. def diag_cal(process_num):#, process
  20. # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境)
  21. # 根据sn 获取对应的参数
  22. #Hbase
  23. algo_list = ['soh_chracter'] # 本调度所包含的算法名列表。
  24. process = 10
  25. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  26. logger_main.info(f"process-{process_num}: 配置中间件")
  27. hbase_params = sysUtils.get_cf_param('hbase')
  28. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  29. sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
  30. mysql_kw_conn = mysql_kw_engine.connect()
  31. df_t_device = pd.read_sql(sql, mysql_kw_conn)
  32. sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
  33. df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
  34. sql = f"select pack_code, param from algo_pack_param"
  35. pack_code = 'CL3282A'
  36. df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
  37. adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  38. adjustable_param = adjustable_param.to_dict("records")
  39. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  40. pack_param = pack_param.to_dict("records")
  41. df_algo_pack_param = json.loads(pack_param[0]['param'])
  42. df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  43. folder_path1 = r"/home/wangliming/project/single_task/1other_task/数仓/合众数据导出/需求/取数需求6/data"#
  44. # folder_path2 = r"/home/wangliming/project/single_task/1other_task/数仓/合众数据导出/需求/取数需求6/data"
  45. #/data_highspeed/common/hz/lifecycle/zhuxi_20230927/--------------12个车
  46. # folder_path_soh = r"/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/soh_cal"
  47. #/data_highspeed/common/hz/lifecycle/lzx_20230919/datavin2
  48. vinlst = ['LUZAGAAA0MA010431', 'LUZAGAAA3NA043599', 'LUZAGAAA6MA016623', 'LUZAGAGA0NA090337', 'LUZAGAAA3MA005613', 'LUZAGAAA1NA028597',
  49. 'LUZAGAAA4NA025502', 'LUZAGAAA4NA031669', 'LUZAGAAAXNA027982', 'LUZAGAAA8LA012992', 'LUZAGAAA4MA049426', 'LUZAGAAA1MA069570']
  50. vinflelst = [item + '.csv' for item in vinlst]
  51. csv_files1 = [file for file in os.listdir(folder_path1) if file.endswith('.csv')]
  52. # csv_files_sts = [file.split('_')[0] + '.csv' for file in os.listdir(folder_path_sts) if file.endswith('.csv')]
  53. # csv_files2 = [file.split('-')[0] + '.csv' for file in os.listdir(folder_path2) if file.endswith('.csv')]
  54. # print('sts计算数量:', len(set(csv_files_sts)))
  55. # print('soh计算数量:', len(set(csv_files_soh)))
  56. # df_unfinish = pd.read_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/unfinish.csv', encoding='GB18030')
  57. # unfinishlst_tp = list(df_unfinish['vin'])
  58. # done_vin = list(set(csv_files_soh))
  59. # df_done = pd.DataFrame(done_vin, columns=['vin'])
  60. # df_done.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/done.csv', index = False, encoding='GB18030')
  61. # unfinishlst = [item + '.csv' for item in unfinishlst_tp]
  62. unfinish = list(set(csv_files1) & set(vinflelst))
  63. # 使用os.walk()函数遍历文件夹A及其子文件夹
  64. df_algo_soh = pd.DataFrame()
  65. try:
  66. # for root, dirs, files in os.walk(folder_path):
  67. # 遍历每个子文件夹
  68. # calfiles = [i.split('.')[0] for i in files]
  69. # calsnlst = list(set(calfiles) - set(vinlst))
  70. # csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]
  71. # calsohfiles = [i+'.csv' for i in vinlst]
  72. for dir_name in unfinish[(process_num):(process_num+1)]:#:#csv_files[33*(process_num):33*(process_num+1)][11:20]:#calsohfiles[2*(process_num):2*(process_num+1)]:#读取各个sn[5*(process_num):5*(process_num+1)]
  73. # 获取子文件夹的完整路径
  74. dir_path = os.path.join(folder_path1, dir_name)
  75. # print(f'Reading files from folder: {dir_path}')
  76. # itemsn = dir_name.split('_')[1]
  77. itemsn = dir_name.split('.')[0]
  78. # df_data = pd.read_csv(dir_path)#, compression='zip'
  79. # 遍历当前子文件夹中的所有文件
  80. df_chrg_chrac = pd.DataFrame()
  81. df_chrg_stacs = pd.DataFrame()
  82. df_dschrg_stacs = pd.DataFrame()
  83. df_rest_stacs = pd.DataFrame()
  84. try:
  85. time_st = time.time()
  86. # 获取文件的完整路径
  87. loggers['soh_chracter'].info(f'开始执行算法{dir_path}')
  88. df_data = pd.read_csv(dir_path)
  89. if len(df_data) > 30:
  90. df_data=df_data.replace('[]', np.nan)
  91. df_data.rename(columns={"tm":"Time", "packvoltage":"pack_volt", "packcrnt":"pack_crnt",
  92. "packsoc":"pack_soc", "cellvoltage":"cell_voltage", "celltemp":"cell_temp",
  93. "cellminvol":"cell_volt_min", "cellmaxvol":"cell_volt_max", "vehodo":"odo"}, inplace=True)
  94. #,"cellminvol":"cell_volt_min", "cellmaxvol":"cell_volt_max", "vehodo":"odo"
  95. # df_data['sn'] = 'LUZAGAAAXKA008957'
  96. # df_data['odo'] = 456
  97. df_data.dropna(axis=0,subset = ["time", "sn", "pack_volt", "pack_soc", "pack_crnt", "cell_voltage"], inplace=True)
  98. df_data['time'] = pd.to_datetime(df_data['time'], unit = 's')
  99. df_data.drop(df_data.index[(df_data['pack_volt'] < 0.001) | (df_data['pack_volt'] > 1000) | (df_data['pack_soc'] > 100) | (df_data['pack_soc'] < 0) | (df_data['pack_crnt'] > 1000) | (df_data['pack_crnt'] < -1000)], inplace=True)
  100. df_data['cell_volt_max'] = df_data['cell_volt_max'].apply(lambda x: x/1000)
  101. df_data['cell_volt_min'] = df_data['cell_volt_min'].apply(lambda x: x/1000)
  102. df_data.sort_values(by = ['time'], inplace = True)
  103. df_data.reset_index(drop = True, inplace = True)
  104. datalen = len(df_data)
  105. singlen = 6000
  106. cyc_num = int(datalen/singlen)
  107. for item in range(cyc_num):
  108. try:
  109. df_cal_data = df_data.iloc[item*singlen:(item+1)*singlen+600].copy()
  110. df_cal_data.reset_index(drop = True, inplace = True)
  111. flg = 2
  112. ocv_soh = OCV_cal_V5.cell_statistic(df_cal_data, df_algo_soh, flg)
  113. df_chrgrt_add = ocv_soh.soh_cal_est()
  114. if not df_chrgrt_add.empty:
  115. df_chrg_chrac = df_chrg_chrac.append(df_chrgrt_add)
  116. # df_sts_stacs = sta_stacs.cell_statistic(df_cal_data)
  117. # chrg_temp, dschrg_temp, rest_temp = df_sts_stacs.rest_sta()
  118. # if not chrg_temp.empty:
  119. # df_chrg_stacs = df_chrg_stacs.append(chrg_temp)
  120. # if not dschrg_temp.empty:
  121. # df_dschrg_stacs = df_dschrg_stacs.append(dschrg_temp)
  122. # if not rest_temp.empty:
  123. # df_rest_stacs = df_rest_stacs.append(rest_temp)
  124. except Exception as e:
  125. loggers['soh_chracter'].error(f'算法运行出错{itemsn}, 计算错误位置{item}')
  126. loggers['soh_chracter'].error(str(e))
  127. loggers['soh_chracter'].error(traceback.format_exc())
  128. if not df_chrg_chrac.empty:#soh 计算结果
  129. df_chrg_chrac.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
  130. df_chrg_chrac.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/soh_cal/' + itemsn + '-soh结果.csv',index=False,encoding='GB18030')
  131. # if not df_chrg_stacs.empty:
  132. # df_chrg_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
  133. # df_chrg_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_charge_proc_di.csv',index=False,encoding='GB18030')
  134. # if not df_dschrg_stacs.empty:
  135. # df_dschrg_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
  136. # df_dschrg_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_drive_proc_di.csv',index=False,encoding='GB18030')
  137. # if not df_rest_stacs.empty:
  138. # df_rest_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
  139. # df_rest_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_standing_proc_di.csv',index=False,encoding='GB18030')
  140. loggers['soh_chracter'].info(f'算法运行完成{itemsn},算法耗时{time.time()-time_st}')
  141. except Exception as e:
  142. loggers['soh_chracter'].error(f'算法运行出错{itemsn}, 计算错误位置{item}')
  143. loggers['soh_chracter'].error(str(e))
  144. loggers['soh_chracter'].error(traceback.format_exc())
  145. except Exception as e:
  146. loggers['soh_chracter'].error(f'算法运行出错{itemsn}')
  147. loggers['soh_chracter'].error(str(e))
  148. loggers['soh_chracter'].error(traceback.format_exc())
  149. #...............................................主函数起定时作用.......................................................................................................................
  150. if __name__ == "__main__":
  151. cur_env = 'dev' # 设置运行环境
  152. app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径
  153. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  154. app_name = "schedule" # 应用名, 建议与topic的后缀相同
  155. sysUtils = SysUtils(cur_env, app_path)
  156. mysqlUtils = MysqlUtils()
  157. mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
  158. mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
  159. mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
  160. mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
  161. logger_main = sysUtils.get_logger(app_name, log_base_path)
  162. logger_main.info(f"本次主进程号: {os.getpid()}")
  163. # 读取配置文件 (该部分请不要修改)
  164. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  165. pool = Pool(processes = int(processes))
  166. logger_main.info("开始分配子进程")
  167. for i in range(int(processes)):
  168. pool.apply_async(diag_cal, (i, ))# 6,
  169. pool.close()
  170. logger_main.info("进程分配结束,堵塞主进程")
  171. pool.join()