123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- from datetime import datetime
- from multiprocessing import Pool
- import json
- import os
- import time
- import traceback
- import warnings
- from sqlalchemy import text, delete, and_, or_, update
- import pandas as pd
- import OCV_cal_V5
- import sta_stacs
- import traceback
- import numpy as np
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- from ZlwlAlgosCommon.service.iotp.Beans import DataField
- from ZlwlAlgosCommon.orm.models import *
-
- #...................................电池包电芯安全诊断函数......................................................................................................................
- def diag_cal(process_num):#, process
- # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境)
- # 根据sn 获取对应的参数
- #Hbase
- algo_list = ['soh_chracter'] # 本调度所包含的算法名列表。
- process = 10
- loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
- logger_main.info(f"process-{process_num}: 配置中间件")
- hbase_params = sysUtils.get_cf_param('hbase')
- iotp_service = IotpAlgoService(hbase_params=hbase_params)
-
- sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
- mysql_kw_conn = mysql_kw_engine.connect()
- df_t_device = pd.read_sql(sql, mysql_kw_conn)
- sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
- df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
- sql = f"select pack_code, param from algo_pack_param"
- pack_code = 'CL3282A'
- df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
- adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
- adjustable_param = adjustable_param.to_dict("records")
- pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
- pack_param = pack_param.to_dict("records")
- df_algo_pack_param = json.loads(pack_param[0]['param'])
- df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
- folder_path1 = r"/home/wangliming/project/single_task/1other_task/数仓/合众数据导出/需求/取数需求6/data"#
- # folder_path2 = r"/home/wangliming/project/single_task/1other_task/数仓/合众数据导出/需求/取数需求6/data"
- #/data_highspeed/common/hz/lifecycle/zhuxi_20230927/--------------12个车
- # folder_path_soh = r"/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/soh_cal"
- #/data_highspeed/common/hz/lifecycle/lzx_20230919/datavin2
- vinlst = ['LUZAGAAA0MA010431', 'LUZAGAAA3NA043599', 'LUZAGAAA6MA016623', 'LUZAGAGA0NA090337', 'LUZAGAAA3MA005613', 'LUZAGAAA1NA028597',
- 'LUZAGAAA4NA025502', 'LUZAGAAA4NA031669', 'LUZAGAAAXNA027982', 'LUZAGAAA8LA012992', 'LUZAGAAA4MA049426', 'LUZAGAAA1MA069570']
- vinflelst = [item + '.csv' for item in vinlst]
- csv_files1 = [file for file in os.listdir(folder_path1) if file.endswith('.csv')]
- # csv_files_sts = [file.split('_')[0] + '.csv' for file in os.listdir(folder_path_sts) if file.endswith('.csv')]
- # csv_files2 = [file.split('-')[0] + '.csv' for file in os.listdir(folder_path2) if file.endswith('.csv')]
- # print('sts计算数量:', len(set(csv_files_sts)))
- # print('soh计算数量:', len(set(csv_files_soh)))
- # df_unfinish = pd.read_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/unfinish.csv', encoding='GB18030')
- # unfinishlst_tp = list(df_unfinish['vin'])
- # done_vin = list(set(csv_files_soh))
- # df_done = pd.DataFrame(done_vin, columns=['vin'])
- # df_done.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/done.csv', index = False, encoding='GB18030')
- # unfinishlst = [item + '.csv' for item in unfinishlst_tp]
- unfinish = list(set(csv_files1) & set(vinflelst))
- # 使用os.walk()函数遍历文件夹A及其子文件夹
- df_algo_soh = pd.DataFrame()
- try:
- # for root, dirs, files in os.walk(folder_path):
- # 遍历每个子文件夹
- # calfiles = [i.split('.')[0] for i in files]
- # calsnlst = list(set(calfiles) - set(vinlst))
- # csv_files = [file for file in os.listdir(folder_path) if file.endswith('.csv')]
- # calsohfiles = [i+'.csv' for i in vinlst]
- for dir_name in unfinish[(process_num):(process_num+1)]:#:#csv_files[33*(process_num):33*(process_num+1)][11:20]:#calsohfiles[2*(process_num):2*(process_num+1)]:#读取各个sn[5*(process_num):5*(process_num+1)]
- # 获取子文件夹的完整路径
- dir_path = os.path.join(folder_path1, dir_name)
- # print(f'Reading files from folder: {dir_path}')
- # itemsn = dir_name.split('_')[1]
- itemsn = dir_name.split('.')[0]
- # df_data = pd.read_csv(dir_path)#, compression='zip'
- # 遍历当前子文件夹中的所有文件
- df_chrg_chrac = pd.DataFrame()
- df_chrg_stacs = pd.DataFrame()
- df_dschrg_stacs = pd.DataFrame()
- df_rest_stacs = pd.DataFrame()
- try:
- time_st = time.time()
- # 获取文件的完整路径
- loggers['soh_chracter'].info(f'开始执行算法{dir_path}')
- df_data = pd.read_csv(dir_path)
- if len(df_data) > 30:
- df_data=df_data.replace('[]', np.nan)
- df_data.rename(columns={"tm":"Time", "packvoltage":"pack_volt", "packcrnt":"pack_crnt",
- "packsoc":"pack_soc", "cellvoltage":"cell_voltage", "celltemp":"cell_temp",
- "cellminvol":"cell_volt_min", "cellmaxvol":"cell_volt_max", "vehodo":"odo"}, inplace=True)
- #,"cellminvol":"cell_volt_min", "cellmaxvol":"cell_volt_max", "vehodo":"odo"
- # df_data['sn'] = 'LUZAGAAAXKA008957'
- # df_data['odo'] = 456
- df_data.dropna(axis=0,subset = ["time", "sn", "pack_volt", "pack_soc", "pack_crnt", "cell_voltage"], inplace=True)
- df_data['time'] = pd.to_datetime(df_data['time'], unit = 's')
- df_data.drop(df_data.index[(df_data['pack_volt'] < 0.001) | (df_data['pack_volt'] > 1000) | (df_data['pack_soc'] > 100) | (df_data['pack_soc'] < 0) | (df_data['pack_crnt'] > 1000) | (df_data['pack_crnt'] < -1000)], inplace=True)
- df_data['cell_volt_max'] = df_data['cell_volt_max'].apply(lambda x: x/1000)
- df_data['cell_volt_min'] = df_data['cell_volt_min'].apply(lambda x: x/1000)
- df_data.sort_values(by = ['time'], inplace = True)
- df_data.reset_index(drop = True, inplace = True)
- datalen = len(df_data)
- singlen = 6000
- cyc_num = int(datalen/singlen)
- for item in range(cyc_num):
- try:
- df_cal_data = df_data.iloc[item*singlen:(item+1)*singlen+600].copy()
- df_cal_data.reset_index(drop = True, inplace = True)
- flg = 2
- ocv_soh = OCV_cal_V5.cell_statistic(df_cal_data, df_algo_soh, flg)
- df_chrgrt_add = ocv_soh.soh_cal_est()
- if not df_chrgrt_add.empty:
- df_chrg_chrac = df_chrg_chrac.append(df_chrgrt_add)
- # df_sts_stacs = sta_stacs.cell_statistic(df_cal_data)
- # chrg_temp, dschrg_temp, rest_temp = df_sts_stacs.rest_sta()
- # if not chrg_temp.empty:
- # df_chrg_stacs = df_chrg_stacs.append(chrg_temp)
- # if not dschrg_temp.empty:
- # df_dschrg_stacs = df_dschrg_stacs.append(dschrg_temp)
- # if not rest_temp.empty:
- # df_rest_stacs = df_rest_stacs.append(rest_temp)
- except Exception as e:
- loggers['soh_chracter'].error(f'算法运行出错{itemsn}, 计算错误位置{item}')
- loggers['soh_chracter'].error(str(e))
- loggers['soh_chracter'].error(traceback.format_exc())
- if not df_chrg_chrac.empty:#soh 计算结果
- df_chrg_chrac.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
- df_chrg_chrac.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/soh_cal/' + itemsn + '-soh结果.csv',index=False,encoding='GB18030')
- # if not df_chrg_stacs.empty:
- # df_chrg_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
- # df_chrg_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_charge_proc_di.csv',index=False,encoding='GB18030')
- # if not df_dschrg_stacs.empty:
- # df_dschrg_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
- # df_dschrg_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_drive_proc_di.csv',index=False,encoding='GB18030')
- # if not df_rest_stacs.empty:
- # df_rest_stacs.drop_duplicates(subset = ['sn','time_st'], keep = 'first', inplace = True)
- # df_rest_stacs.to_csv('/home/liuzhongxiao/zlwl-algos/USER/liuzhongxiao/hz_soh/stacs/' + itemsn + '_dwd_batt_persona_standing_proc_di.csv',index=False,encoding='GB18030')
- loggers['soh_chracter'].info(f'算法运行完成{itemsn},算法耗时{time.time()-time_st}')
- except Exception as e:
- loggers['soh_chracter'].error(f'算法运行出错{itemsn}, 计算错误位置{item}')
- loggers['soh_chracter'].error(str(e))
- loggers['soh_chracter'].error(traceback.format_exc())
- except Exception as e:
- loggers['soh_chracter'].error(f'算法运行出错{itemsn}')
- loggers['soh_chracter'].error(str(e))
- loggers['soh_chracter'].error(traceback.format_exc())
- #...............................................主函数起定时作用.......................................................................................................................
- if __name__ == "__main__":
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/liuzhongxiao/zlwl-algos/" # 设置相对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "schedule" # 应用名, 建议与topic的后缀相同
-
- sysUtils = SysUtils(cur_env, app_path)
-
- mysqlUtils = MysqlUtils()
- mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
- mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
-
-
- mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
- mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- # 读取配置文件 (该部分请不要修改)
- processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
- pool = Pool(processes = int(processes))
- logger_main.info("开始分配子进程")
- for i in range(int(processes)):
- pool.apply_async(diag_cal, (i, ))# 6,
- pool.close()
- logger_main.info("进程分配结束,堵塞主进程")
- pool.join()
|