from datetime import datetime from multiprocessing import Pool import json import os import time import traceback import warnings from sqlalchemy import text, delete, and_, or_, update import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService from ZlwlAlgosCommon.service.iotp.Beans import DataField from ZlwlAlgosCommon.orm.models import * from healthscore.V_1_0_0.BatHealthScore import HealthScore # from sohdiag.V_1_0_0.SOHBatDiag import sohdiag from OffLineAlarm.V1_0_0 import off_line_warning def get_battery_info(mysql_iotp_conn): sql = "select * from ff_battery_status " t_battery = pd.read_sql(sql, mysql_iotp_conn) return t_battery def get_bat_health_info(mysql_algo_conn,sn_list): if len(sn_list) == 1: sn_tuple = f"('{sn_list[0]}')" else: sn_tuple = tuple(sn_list) sql = "select * from algo_all_fault_info_ing where sn in {}".format(sn_tuple) df_diag_ram = pd.read_sql(sql, mysql_algo_conn) sql = "SELECT * FROM algo_soh where sn in {}".format(sn_tuple) df_soh = pd.read_sql(sql, mysql_algo_conn) sql = "SELECT * FROM algo_mid_uniform_result where sn in {}".format(sn_tuple) df_uniform = pd.read_sql(sql, mysql_algo_conn) sql = "SELECT * FROM algo_mid_sorout where sn in {}".format(sn_tuple) df_sor = pd.read_sql(sql, mysql_algo_conn) return df_diag_ram, df_soh, df_uniform, df_sor def main(process_num): # 程序不能停止 while(True): try: warnings.filterwarnings("ignore") try: # 调用算法前的准备工作 kafka_topic_key = 'topic_task_day_1_1' kafka_groupid_key = 'group_id_task_day_1_1' algo_list = ['healthscore', 'sohdiag','offline_diag'] # 本调度所包含的算法名列表。 loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger logger_main.info(f"process-{process_num}: 配置中间件") # mysql mysql_algo_params = sysUtils.get_cf_param('mysql-algo') mysqlUtils = MysqlUtils() mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) mysql_algo_conn = mysql_algo_engine.connect() mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp') mysqlUtils = MysqlUtils() mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data) mysql_iotp_conn = mysql_iotp_engine.connect() # kafka kafka_params = sysUtils.get_cf_param('kafka') kafkaUtils = KafkaUtils() kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key) #Hbase hbase_params = sysUtils.get_cf_param('hbase') iotp_service = IotpAlgoService(hbase_params=hbase_params) #redis redis_params = sysUtils.get_cf_param('redis') reidsUtils = RedisUtils() rc = reidsUtils.get_redis_conncect(redis_params) except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') # 开始准备调度 logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度") for message in kafka_consumer: try: logger_main.info(f'收到调度') if mysql_algo_conn.closed: mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接 schedule_params = json.loads(message.value) if (schedule_params is None) or (schedule_params ==''): logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value))) continue # kafka 调度参数解析 df_snlist = pd.DataFrame(schedule_params['snlist']) df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai']) df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param']) df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()} df_algo_param = pd.DataFrame(schedule_params['algo_list']) start_time = schedule_params['start_time'] end_time = schedule_params['end_time'] pack_code = schedule_params['pack_code'] cell_type = schedule_params['cell_type'] sn_list=df_snlist['sn'].tolist() df_res_new=pd.DataFrame() df_res_update=pd.DataFrame() df_res_end=pd.DataFrame() except Exception as e: logger_main.error(f"process-{process_num}:获取mysql数据库数据出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue # mysql数据读取 try: time_st = time.time() logger_main.info(f'process-{process_num}开始读取mysql故障数据') df_diag_ram,df_soh,df_uniform,df_sor=get_bat_health_info(mysql_algo_conn,sn_list) logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}') except Exception as e: logger_main.error(f"process-{process_num}:读取redis出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") continue # 算法调用 # 健康度评分算法调用 try: time_st = time.time() loggers['healthscore'].info('开始执行算法') healthscole = HealthScore(df_soh, df_uniform, df_sor) df_res_healthscore = df_snlist['sn'].apply(lambda x : healthscole.health_score(x)) df_res_healthscore = pd.concat(df_res_healthscore.tolist(), axis=0) loggers['healthscore'].info(f'算法运行完成,算法耗时{time.time()-time_st}') except Exception as e: loggers['healthscore'].error('算法运行出错') loggers['healthscore'].error(str(e)) loggers['healthscore'].error(traceback.format_exc()) df_res_healthscore=pd.DataFrame() # SOH诊断算法调用 # try: # time_st = time.time() # loggers['sohdiag'].info('开始执行算法') # df_res_new_soh, df_res_end_soh = sohdiag(df_soh, df_diag_ram, df_sn_process, df_adjustable_param) # loggers['sohdiag'].info(f'算法运行完成,算法耗时{time.time()-time_st}') # except Exception as e: # loggers['sohdiag'].error('算法运行出错') # loggers['sohdiag'].error(str(e)) # loggers['sohdiag'].error(traceback.format_exc()) # 离线诊断算法调用 try: time_st = time.time() loggers['offline_diag'].info('开始执行算法') t_battery=get_battery_info(mysql_iotp_conn) offline_diag=off_line_warning.Off_Line_Warning() df_res_new_ofl,df_res_update_ofl,df_res_end_ofl=offline_diag.diag(t_battery,df_diag_ram,df_algo_adjustable_param,df_snlist,df_algo_param) loggers['offline_diag'].info(f'算法运行完成,算法耗时{time.time()-time_st}') except Exception as e: loggers['offline_diag'].error('算法运行出错') loggers['offline_diag'].error(str(e)) loggers['offline_diag'].error(traceback.format_exc()) df_res_new_ofl=pd.DataFrame() df_res_update_ofl=pd.DataFrame() df_res_end_ofl=pd.DataFrame() #结果写入mysql try: df_res_new =df_res_new_ofl #pd.concat([df_res_new_ofl,df_res_new_soh]) #, res1 df_res_update=df_res_update_ofl#df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1 df_res_end = df_res_end_ofl#pd.concat([df_res_end_ofl,df_res_end_soh]) #, res2 df_res_new.reset_index(drop=True, inplace=True) df_res_update.reset_index(drop=True, inplace=True) df_res_end.reset_index(drop=True, inplace=True) time_st = time.time() session = mysql_algo_Session() if not df_res_healthscore.empty: df_res_healthscore.to_sql("algo_health_score",con=mysql_algo_conn, if_exists="append",index=False) if not df_res_new.empty: df_res_new['date_info'] = df_res_new['start_time'] df_res_new['create_time'] = datetime.now() df_res_new['create_by'] = 'algo' df_res_new['is_delete'] = 0 df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False) logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code}完成') if not df_res_end.empty: df_res_end=df_res_end.where(pd.notnull(df_res_end),None) df_res_end=df_res_end.fillna(0) for index in df_res_end.index: df_t = df_res_end.loc[index:index] sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn' params = {'start_time': df_t['start_time'].values[0], 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]} session.execute(sql, params=params) sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\ fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \ (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\ :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)' params = {'date_info': datetime.now(), 'start_time': df_t['start_time'].values[0], 'end_time': df_t['end_time'].values[0], 'sn': df_t['sn'].values[0], 'imei': df_t['imei'].values[0], 'model' :pack_code, 'fault_level': df_t['fault_level'].values[0], 'fault_code': df_t['fault_code'].values[0], 'fault_info': df_t['fault_info'].values[0], 'fault_reason': df_t['fault_reason'].values[0], 'fault_advice': df_t['fault_advice'].values[0], 'fault_location': df_t['fault_location'].values[0], 'device_status': df_t['device_status'].values[0], 'odo': df_t['odo'].values[0], 'create_time': datetime.now(), 'create_by': 'algo', 'update_time': datetime.now(), 'update_by': None, 'is_delete': 0, 'comment': None} session.execute(sql, params=params) session.commit() logger_main.info(f'process-{process_num}结束故障入库{pack_code}完成') if not df_res_update.empty: df_res_update=df_res_update.where(pd.notnull(df_res_update),None) df_res_update=df_res_update.fillna(0) for index in df_res_update.index: df_t = df_res_update.loc[index:index] try: # 更新数据 with mysql_algo_Session() as session: session.execute(update(AlgoAllFaultInfoIng).where( and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]), (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]), (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))). values(fault_level=df_t['fault_level'].values[0], comment=df_t['comment'].values[0])) session.commit() except Exception as e: logger_main.error(f"process-{process_num}:{pack_code}结果入库出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") finally: session.close() logger_main.info(f"process-{process_num}: 更新入库完成") else: logger_main.info(f"process-{process_num}: 无更新故障") logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}") except Exception as e: logger_main.error(f"process-{process_num}:结果入库出错") logger_main.error(f"process-{process_num}:{e}") logger_main.error(f"process-{process_num}:{traceback.format_exc()}") finally: pass except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') if __name__ == '__main__': while(True): try: # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "/home/shouxueqi/projects/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_day_1_1" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程 pool = Pool(processes = int(processes)) logger_main.info("开始分配子进程") for i in range(int(processes)): pool.apply_async(main, (i, )) pool.close() logger_main.info("进程分配结束,堵塞主进程") pool.join() except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) finally: handlers = logger_main.handlers.copy() for h in handlers: logger_main.removeHandler(h) pool.terminate()