main.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import datetime, time
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import traceback
  6. import warnings
  7. from sqlalchemy import text, delete, and_, or_, update
  8. import pandas as pd
  9. import traceback
  10. import SOC_pre_vltVV
  11. import OCV_SOC
  12. from ZlwlAlgosCommon.utils.ProUtils import *
  13. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  14. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  15. from ZlwlAlgosCommon.orm.models import *
  16. import sys
  17. from sklearn.datasets import load_iris
  18. import joblib
  19. import json
  20. #...................................电池包电芯安全诊断函数......................................................................................................................
  21. def diag_cal(process_num):#, process
  22. # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境)
  23. # 根据sn 获取对应的参数
  24. #Hbase
  25. algo_list = ['soc_pre'] # 本调度所包含的算法名列表。
  26. process = 10
  27. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  28. logger_main.info(f"process-{process_num}: 配置中间件")
  29. hbase_params = sysUtils.get_cf_param('hbase-datafactory')
  30. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  31. sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
  32. mysql_kw_conn = mysql_kw_engine.connect()
  33. df_t_device = pd.read_sql(sql, mysql_kw_conn)
  34. sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
  35. df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
  36. sql = f"select pack_code, param from algo_pack_param"
  37. pack_code = 'JX18020'
  38. df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
  39. adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  40. adjustable_param = adjustable_param.to_dict("records")
  41. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  42. pack_param = pack_param.to_dict("records")
  43. df_algo_pack_param = json.loads(pack_param[0]['param'].replace(' ', ''))
  44. df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  45. # snlst = list(df_t_device[df_t_device['pack_model'] == pack_code]['sn'])[0:1]#[3*(process_num):3*(process_num+1)]
  46. pk_sn = list(df_t_device[(df_t_device['pack_model'] == pack_code)]['sn'])
  47. snlst_ort = [item for item in pk_sn if 'S23620' in item]#
  48. snlst = snlst_ort[3*(process_num):3*(process_num+1)]
  49. def getDateList(start_date, end_date):
  50. date_list = []
  51. start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
  52. end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d')# %H:%M:%S
  53. date_list.append(start_date.strftime('%Y-%m-%d'))
  54. while start_date < end_date:
  55. start_date += datetime.timedelta(days=1)
  56. date_list.append(start_date.strftime('%Y-%m-%d'))
  57. return date_list
  58. # dnn_predict = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/Dnnmodel.pkl')
  59. # input_norm_X = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/normorlized_x')
  60. # input_norm_Y = joblib.load('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/normorlized_y')
  61. # 使用os.walk()函数遍历文件夹A及其子文件夹
  62. # snlst = ['PJXCLL532S234M030']#'PJXCLL532S234M007', 'PJXCLL532S234M010', 'PJXCLL532S234M004', 'PJXCLL532S234M030', 'PJXCLL532S234M018', , 'PJXCLL532S234M014', 'PJXCLL532S234M016'
  63. try:
  64. for itemsn in snlst:
  65. df_soc_pre = pd.DataFrame()
  66. try:
  67. time_st = time.time()
  68. # 获取文件的完整路径
  69. itemsn = 'PJXCLL128N234P002'#, 'PJXCLL532S234M012', 'PJXCLL532S234M015'
  70. loggers['soc_pre'].info(f'开始执行算法{itemsn}')
  71. data_lst = getDateList("2023-06-29","2023-06-30")
  72. for item_date in range(0, len(data_lst)-1):#
  73. try:
  74. st_time = '2023-06-28 10:30:00'#data_lst[item_date] + ' 00:00:00'
  75. et = '2023-06-29 01:00:00'#data_lst[item_date + 1] + ' 09:00:00'
  76. # st_time = (datetime.datetime.strptime(st, '%Y-%m-%d %H:%M:%S') - datetime.timedelta(hours=3)).strftime('%Y-%m-%d %H:%M:%S')
  77. columns = [DataField.sn, DataField.time,DataField.pack_crnt, DataField.pack_volt, DataField.cell_voltage_count, DataField.cell_temp_count,
  78. DataField.cell_voltage, DataField.cell_temp, DataField.pack_soc, DataField.other_temp_value, DataField.latitude,
  79. DataField.longitude, DataField.mileage, DataField.accum_chg_ah, DataField.accum_dschg_ah, DataField.accum_chg_wh,
  80. DataField.accum_dschg_wh]
  81. df_data_all = iotp_service.get_data(sn_list=[itemsn], columns=columns, start_time=st_time, end_time=et)
  82. loggers['soc_pre'].info(f'sn-{itemsn},起始-{st_time},结束-{et},获取到{len(df_data_all)}条数据,取数耗时:{time.time()-time_st}')
  83. if len(df_data_all) > 30:
  84. df_data_all['mileage'].replace(0, pd.np.nan, inplace=True)
  85. df_data_all['mileage'] = df_data_all['mileage'].fillna(method='backfill')
  86. df_data_all['mileage'] = df_data_all['mileage'].fillna(method='ffill')
  87. df_data_all.rename(columns={"mileage":"odo"}, inplace=True)
  88. # df_data_gps=iotp_service.gps_datacleaning(df_data_all)
  89. df_out, df_table, cellvolt_name, celltemp_name = iotp_service.datacleaning(df_algo_pack_param,df_data_all)#进行数据清洗
  90. if (not df_out.empty):# and (not df_data_gps.empty)
  91. # df_out['time'] = df_out['time'].dt.strftime('%Y-%m-%d %H:%M:%S')
  92. # df_merge_gab = pd.merge(df_out, df_data_gps,on=['sn','time','datatype'],how='outer')
  93. # df_merge_gab_fill = df_merge_gab.groupby("sn").fillna(method='backfill')
  94. # df_merge_gab_fill = pd.concat([df_merge_gab[['sn']], df_merge_gab_fill], axis=1)
  95. # df_merge_gab_fill = df_merge_gab.groupby("sn").fillna(method='ffill')
  96. # df_merge_gab_fill = pd.concat([df_merge_gab[['sn']], df_merge_gab_fill], axis=1)
  97. # df_merge = df_merge_gab_fill.sort_values(["sn","time"],ascending = [True, True])
  98. df_merge_cal = df_out[df_out['datatype']==12]
  99. if not df_merge_cal.empty:
  100. chrg_chracter = SOC_pre_vltVV.cell_chracter(df_merge_cal, df_algo_pack_param, cellvolt_name, celltemp_name)#计算内阻
  101. soc_pre = chrg_chracter.states_cal()
  102. OCV_chrac = OCV_SOC.cell_statistic(df_algo_pack_param, df_merge_cal)
  103. OCV_soc_rlt = OCV_chrac.soc_cal_est()
  104. df_soc_rlt = pd.concat([soc_pre, OCV_soc_rlt])
  105. if not df_soc_rlt.empty:
  106. df_soc_pre = df_soc_pre.append(df_soc_rlt)
  107. except Exception as e:
  108. loggers['soc_pre'].error('算法运行出错')
  109. loggers['soc_pre'].error(str(e))
  110. loggers['soc_pre'].error(traceback.format_exc())
  111. if not df_soc_pre.empty:
  112. df_soc_pre.drop_duplicates(subset = ['sn','time'], keep = 'last', inplace = True)
  113. df_soc_pre.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/SOC_Fct/SOC_rlt/' + itemsn + 'SOC估计结果VV.csv',index=False,encoding='GB18030')
  114. loggers['soc_pre'].info(f'算法运行完成{itemsn},算法耗时{time.time()-time_st}')
  115. except Exception as e:
  116. loggers['soc_pre'].error('算法运行出错')
  117. loggers['soc_pre'].error(str(e))
  118. loggers['soc_pre'].error(traceback.format_exc())
  119. except Exception as e:
  120. loggers['soc_pre'].error('算法运行出错')
  121. loggers['soc_pre'].error(str(e))
  122. loggers['soc_pre'].error(traceback.format_exc())
  123. #...............................................主函数起定时作用.......................................................................................................................
  124. if __name__ == "__main__":
  125. cur_env = 'dev' # 设置运行环境
  126. app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径
  127. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  128. app_name = "schedule" # 应用名, 建议与topic的后缀相同
  129. sysUtils = SysUtils(cur_env, app_path)
  130. mysqlUtils = MysqlUtils()
  131. mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
  132. mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
  133. mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
  134. mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
  135. logger_main = sysUtils.get_logger(app_name, log_base_path)
  136. logger_main.info(f"本次主进程号: {os.getpid()}")
  137. # 读取配置文件 (该部分请不要修改)
  138. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  139. pool = Pool(processes = int(processes))
  140. logger_main.info("开始分配子进程")
  141. for i in range(int(processes)):
  142. pool.apply_async(diag_cal, (i, ))#5,10,
  143. pool.close()
  144. logger_main.info("进程分配结束,堵塞主进程")
  145. pool.join()