123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333 |
- import datetime
- import gc
- import re
- from multiprocessing import Pool
- import json
- import logging
- import logging.handlers
- import os
- import time
- import traceback
- import warnings
- from sqlalchemy import text, delete, and_, or_, update
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- from ZlwlAlgosCommon.service.iotp.Beans import DataField
- from ZlwlAlgosCommon.orm.models import *
- from FaultWarning.V_1_0_0 import CBMSBatDiag
- from FaultWarning.V_1_0_0 import CBMSBatDiag_TempV3
- def get_cell_info(db_engine,rc):
- #获取SOH数据
- data = rc.get("algo_param_from_mysql:df_soh")
- if pd.isnull(data):
- df_soh = pd.read_sql("select * from `algo_soh`GROUP BY sn DESC", db_engine)
- else:
- df_soh = pd.DataFrame(json.loads(data))
- if len(df_soh) > 0:
- df_soh['time_st'] = pd.to_datetime(df_soh['time_st'], unit='ms')
- df_soh['time_sp'] = pd.to_datetime(df_soh['time_sp'], unit='ms')
- #获取SOR数据
- data = rc.get("algo_param_from_mysql:sor_result")
- if pd.isnull(data):
- df_sor = pd.read_sql("select * from `algo_mid_sorout`GROUP BY sn DESC", db_engine)
- else:
- df_sor = pd.DataFrame(json.loads(data))
- if len(df_sor) > 0:
- df_sor['time'] = pd.to_datetime(df_sor['time'], unit='ms')
- #获取一致性数据
- data = rc.get("Algo:FaultDiag:SafetyWarning:uniform_result:{}")
- if pd.isnull(data):
- df_uniform=pd.read_sql("select * from `algo_mid_uniform_result`GROUP BY sn DESC", db_engine)
- else:
- df_uniform = pd.read_json(data)
- if len(df_uniform) > 0:
- df_uniform['time'] = pd.to_datetime(df_uniform['time'],unit='ms')
- return df_sor,df_uniform #df_soh,
- def get_fault_info(db_engine):
- df_diag_ram = pd.read_sql("select * from algo_all_fault_info_ing", db_engine)
- df_diag_ram['end_flag'] = 0
- return df_diag_ram
- def fault_warning(logger, mysql_algo_conn, mysql_algo_Session, start_time, df_data, df_table, cellvolt_name, celltemp_name,pack_code, cell_type, df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list):
- #电芯电压温度故障诊断
- time_start1 = time.time()
- FaultDiagVolt = CBMSBatDiag.BatDiagVolt(df_data, df_table, cellvolt_name, celltemp_name,cell_type)
- df_res_new_volt,df_res_update_volt, df_res_end_volt = FaultDiagVolt.diag(df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)#df_soh,
- logger.info(f"{datetime.datetime.now()},电压故障运行耗时{time.time()-time_start1} ")
- time_start1 = time.time()
- FaultDiagTemp = CBMSBatDiag_TempV3.BatDiag(df_data, df_table, celltemp_name, cellvolt_name)
- df_res_new_temp,df_res_update_temp, df_res_end_Temp = FaultDiagTemp.diag(df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)
- logger.info(f"{datetime.datetime.now()},温度故障运行耗时{time.time()-time_start1} ")
-
- df_res_new = pd.concat([df_res_new_volt, df_res_new_temp]) #, res1
- df_res_update=pd.concat([df_res_update_volt, df_res_update_temp]) #, res1
- df_res_end = pd.concat([df_res_end_volt, df_res_end_Temp]) #, res2
- df_res_new.reset_index(drop=True, inplace=True)
- df_res_update.reset_index(drop=True, inplace=True)
- df_res_end.reset_index(drop=True, inplace=True)
-
- if not df_res_end.empty:
- df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
- df_res_end=df_res_end.fillna(0)
- for index in df_res_end.index:
- df_t = df_res_end.loc[index:index]
- try:
- # 删除数据
- with mysql_algo_Session() as session:
- session.execute(delete(AlgoAllFaultInfoIng).where(and_(
- (AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
- (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
- (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))))
- session.flush()
- # 插入数据
- session.add(AlgoAllFaultInfoDone(
- date_info=datetime.datetime.now(),
- start_time=df_t['start_time'].values[0],
- end_time=df_t['end_time'].values[0],
- sn=df_t['sn'].values[0],
- imei=df_t['imei'].values[0]),
- model=pack_code,
- fault_level=df_t['fault_level'].values[0],
- fault_code=df_t['fault_code'].values[0],
- fault_reason=df_t['fault_reason'].values[0],
- fault_advice=df_t['fault_advice'].values[0],
- fault_location=df_t['fault_location'].values[0],
- device_status=df_t['device_status'].values[0],
- odo=df_t['odo'].values[0],
- create_time=datetime.datetime.now(),
- create_by='algo',
- update_time=datetime.datetime.now(),
- update_by=None,
- is_delete=0,
- comment=df_t['comment'].values[0])
- session.flush()
- session.commit()
- except Exception as e:
- logger.error('{}运行出错'.format(pack_code))
- logging.error(str(e))
- logging.error(traceback.format_exc())
- finally:
- session.close()
- logger.info('更新入库完成')
- else:
- logger.info('无结束故障')
- #新增故障入库
- if not df_res_new.empty:
- df_res_new=df_res_new.where(pd.notnull(df_res_new),None)
- df_res_new=df_res_new.fillna(0)
- Scrap_Vin = pd.read_sql("select sn from t_scrap_device where create_time>='{}'".format(start_time), mysql_algo_conn)
- df_res_new=df_res_new[~df_res_new['sn'].isin(Scrap_Vin['sn'])]
- if not df_res_new.empty:
- # 如果该故障未结束且不存在于ing,则写入ing
- try:
- df_res_new['date_info'] = df_res_new['start_time']
- df_res_new['create_time'] = datetime.datetime.now()
- df_res_new['create_by'] = 'algo'
- df_res_new['update_by'] = None
- df_res_new['is_delete'] = 0
- #df_res_new['imei'] = imei
-
- df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn,
- if_exists="append", index=False)
- logger.info('新增故障{} 入库{}完成'.format(df_res_new, 'algo_all_fault_info_ing'))
- except Exception as e:
- logger.error('{}运行出错'.format(pack_code))
- logger.error(str(e))
- logger.error(traceback.format_exc())
- else:
- logger.info('新增故障为报废电池')
- else:
- logger.info('无新增故障')
- #更新故障入库
- if not df_res_update.empty:
- df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
- df_res_update=df_res_update.fillna(0)
- for index in df_res_update.index:
- df_t = df_res_update.loc[index:index]
- try:
- # 更新数据
- with mysql_algo_Session() as session:
- session.execute(update(AlgoAllFaultInfoIng).where(
- and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
- (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
- (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
- values(fault_level=df_t['fault_level'].values[0],
- comment=df_t['comment'].values[0]))
- session.commit()
- except Exception as e:
- logger.error('{}运行出错'.format(pack_code))
- logger.error(str(e))
- logger.error(traceback.format_exc())
- finally:
- session.close()
- logger.info('更新入库完成')
- else:
- logger.info('无更新故障')
- def main(process_num):
- # 程序不能停止
- while(True):
- warnings.filterwarnings("ignore")
- try:
- # 调用算法前的准备工作
- kafka_topic_key = 'topic_task_min_10'
- kafka_groupid_key = 'group_id_task_min_10'
- algo_list = ['FaultWarning', 'other'] # 本调度所包含的算法名列表。
-
- loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
- logger_main.info(f"process-{process_num}: 配置中间件")
-
- # mysql
- mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
- mysqlUtils = MysqlUtils()
- mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
- mysql_algo_conn = mysql_algo_engine.connect()
-
- # redis
- redis_params = sysUtils.get_cf_param('redis')
- redisUtils = RedisUtils()
- redis_conn = redisUtils.get_redis_conncect(redis_params)
-
- # hbase
- hbase_params = sysUtils.get_cf_param('hbase')
- iotp_service = IotpAlgoService(hbase_params=hbase_params)
- # kafka
- kafka_params = sysUtils.get_cf_param('kafka')
- kafkaUtils = KafkaUtils()
- kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
-
- logger_main.info(f"process-{process_num}: 获取算法参数及电池参数")
- df_sor,df_uniform = get_cell_info(mysql_algo_conn, redis_conn)
- df_diag_ram = get_fault_info(mysql_algo_conn)
-
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
-
- # 开始准备调度
- try:
- logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
- param_update_timer = time.time()
- for message in kafka_consumer:
- try:
- logger_main.info(f'收到调度 {message.value}')
- if mysql_algo_conn.close:
- mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
-
- if time.time()-param_update_timer > 1200: # 超过20分钟,更新参数
- df_sor,df_uniform=get_cell_info(mysql_algo_conn, redis_conn)#df_soh,
- param_update_timer = time.time()
-
- schedule_params = json.loads(message.value)
- if (schedule_params is None) or (schedule_params ==''):
- logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
- continue
- # kafka 调度参数解析
- df_snlist = pd.DataFrame(schedule_params['snlist'])
- df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
- df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
- df_algo_list = pd.DataFrame(schedule_params['algo_list'])
- start_time = schedule_params['start_time']
- end_time = schedule_params['end_time']
- pack_code = schedule_params['pack_code']
- cell_type = schedule_params['cell_type']
- sn_list=df_snlist['sn'].tolist()
-
- # 取数
- logger_main.info(f"process-{process_num}: 开始取数")
- columns = [DataField.error_level, DataField.error_code, DataField.pack_crnt, DataField.pack_volt,
- DataField.bms_sta, DataField.cell_voltage_count, DataField.cell_temp_count, DataField.cell_voltage, DataField.cell_temp,
- DataField.pack_soc, DataField.other_temp_value, DataField.bal_cell,
- DataField.pack_soh, DataField.charge_sta]
- df_data = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
- logger_main.info(f"process-{process_num}: {str(sn_list)}获取到{str(len(df_data))}条数据")
- except Exception as e:
- logging.error('{}运行出错'.format(pack_code))
- logging.error(str(e))
- logging.error(traceback.format_exc())
-
- try:
- # 数据清洗
- if len(df_data) == 0:
- logger_main.info(f"process-{process_num}: 无数据跳过本次运算")
- continue
- df_data,df_table,cellvolt_name,celltemp_name=iotp_service.data_clean(df_data,df_algo_pack_param)#进行数据清洗
- if len(df_data) == 0:
- logger_main.info(f"process-{process_num}: 数据清洗完成, 无有效数据,跳过本次运算")
- continue
- else:
- logger_main.info(f"process-{process_num}: {pack_code}, time_type:{df_data.loc[0, 'time']} ~ {df_data.iloc[-1]['time']}, 数据清洗完成")
- except Exception as e:
- logger_main.error(f"process-{process_num}:{pack_code}数据清洗出错")
- logger_main.error(f"process-{process_num}:{e}")
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
-
- # 算法调用
- try:
- fault_warning(loggers['FaultWarning'], mysql_algo_conn, mysql_algo_Session, start_time, df_data, df_table, cellvolt_name,
- celltemp_name,pack_code, cell_type, df_diag_ram, df_algo_adjustable_param, df_algo_pack_param, df_algo_list)
- except Exception as e:
- loggers['FaultWarning'].error('{}运行出错'.format(pack_code))
- loggers['FaultWarning'].error(str(e))
- loggers['FaultWarning'].error(traceback.format_exc())
-
- # 第二个算法调用
- try:
- pass
- except Exception as e:
- pass
-
- except Exception as e:
- logging.error('{}运行出错'.format(pack_code))
- logging.error(str(e))
- logging.error(traceback.format_exc())
- finally:
- iotp_service.close()
- if __name__ == '__main__':
- while(True):
- try:
- # 配置量
- cur_env = 'dev' # 设置运行环境
- app_path = "." # 设置app绝对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "task_min_10" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- # 读取配置文件 (该部分请不要修改)
- processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '2')) # 默认为1个进程
- pool = Pool(processes = int(processes))
- logger_main.info("开始分配子进程")
- for i in range(int(processes)):
- pool.apply_async(main, (i, ))
- pool.close()
- logger_main.info("进程分配结束,堵塞主进程")
- pool.join()
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- finally:
- handlers = logger_main.handlers.copy()
- for h in handlers:
- logger_main.removeHandler(h)
- pool.terminate()
|