123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- from datetime import datetime
- from multiprocessing import Pool
- import json
- import os
- import time
- import traceback
- import warnings
- from apscheduler.schedulers.blocking import BlockingScheduler
- from sqlalchemy import text, delete, and_, or_, update
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- from ZlwlAlgosCommon.service.iotp.Beans import DataField
- from ZlwlAlgosCommon.orm.models import *
- from healthscore.V_1_0_0.BatHealthScore import HealthScore
- from sohdiag.V_1_0_0.SOHBatDiag import sohdiag
- from OffLineAlarm.V1_0_0 import off_line_warning
- #from OmissionNotice.V_1_0_0 import omission_notice
-
- def get_battery_info(mysql_iotp_conn):
- sql = "select * from ff_battery_status "
- t_battery = pd.read_sql(sql, mysql_iotp_conn)
- return t_battery
- def get_bat_health_info(mysql_algo_conn,sn_list):
- if len(sn_list) == 1:
- sn_tuple = f"('{sn_list[0]}')"
- else:
- sn_tuple = tuple(sn_list)
- sql = "select * from algo_all_fault_info_ing where sn in {}".format(sn_tuple)
- df_diag_ram = pd.read_sql(sql, mysql_algo_conn)
- sql = "select sn,MAX(time_st) AS time_st,time_sp,soh,cellsoh_diff,cellsoh,odo from algo_soh WHERE sn in {} GROUP BY sn".format(sn_tuple)
- df_soh = pd.read_sql(sql, mysql_algo_conn)
-
- sql = "SELECT * FROM algo_mid_uniform_result where sn in {}".format(sn_tuple)
- df_uniform = pd.read_sql(sql, mysql_algo_conn)
- sql = "SELECT * FROM algo_mid_sorout where sn in {}".format(sn_tuple)
- df_sor = pd.read_sql(sql, mysql_algo_conn)
- return df_diag_ram, df_soh, df_uniform, df_sor
-
- def update_param(db_engine, rc):#
- # 从redis中获取参数,如果redis中获取不到,则去数据库中获取
- df_algo_adjustable_param = pd.read_sql("select id, algo_id, pack_code, param from algo_adjustable_param", db_engine)
- df_algo_list = pd.read_sql("select id, algo_id, algo_name, is_activate, global_param, fault_code, fault_influence from algo_list", db_engine)
- df_algo_pack_param = pd.read_sql("select id, pack_code, param from algo_pack_param", db_engine)
- df_snpk_list = pd.read_sql("select sn, imei,pack_model,scrap_status from t_device", db_engine)
- df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4]
- return df_algo_adjustable_param,df_algo_list,df_algo_pack_param,df_snpk_list
- def main():
- # 程序不能停止
- try:
- warnings.filterwarnings("ignore")
- try:
- cleanUtils = CleanUtils()
- # 调用算法前的准备工作
- mysql_algo_conn = None
- mysql_algo_engine = None
- mysql_iotp_conn = None
- mysql_iotp_engine= None
- kafka_consumer = None
- rc= None
-
- kafka_topic_key = 'topic_test_sxq'#topic_task_day_1_1 test_code
- kafka_groupid_key = 'group_id_test_sxq' #group_id_task_day_1_1 test_code
- algo_list = ['healthscore', 'sohdiag','offline_diag'] # 本调度所包含的算法名列表。
- process_num=1
- loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
- logger_main.info(f"process-{process_num}: 配置中间件")
-
- # mysql
- mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
- mysqlUtils = MysqlUtils()
- mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
- mysql_algo_conn = mysql_algo_engine.connect()
-
-
- mysql_iotp_data = sysUtils.get_cf_param('mysql-iotp')
- mysqlUtils = MysqlUtils()
- mysql_iotp_engine, mysql_iopt_Session= mysqlUtils.get_mysql_engine(mysql_iotp_data)
- mysql_iotp_conn = mysql_iotp_engine.connect()
-
-
- # kafka
- kafka_params = sysUtils.get_cf_param('kafka')
- kafkaUtils = KafkaUtils()
- kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
- #Hbase
- hbase_params = sysUtils.get_cf_param('hbase-datafactory')#test_code
- iotp_service = IotpAlgoService(hbase_params=hbase_params)
- #redis
- redis_params = sysUtils.get_cf_param('redis')
- reidsUtils = RedisUtils()
- rc = reidsUtils.get_redis_conncect(redis_params)
- except Exception as e:
- logger_main.error(f'process-{process_num}: {e}')
- logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
- cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
-
- # 开始准备调度
- logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
- #for message in kafka_consumer:
- #print('test')
- #KafkaConsumer.commit()
- try:
- logger_main.info(f'收到调度')
- if not mysql_algo_conn.closed:
- mysql_algo_conn.close()
- mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个mysql连接
- if not mysql_iotp_conn.closed:
- mysql_iotp_conn.close()
- mysql_iotp_conn = mysql_iotp_engine.connect() # 从连接池中获取一个mysql连接
-
- # schedule_params = json.loads(message.value)
- # if (schedule_params is None) or (schedule_params ==''):
- # logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
- # continue
- # kafka 调度参数解析
- # df_snlist = pd.DataFrame(schedule_params['snlist'])
- # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
- # df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
- # df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
- # df_algo_param = pd.DataFrame(schedule_params['algo_list'])
- # # start_time = schedule_params['start_time']
- # # end_time = schedule_params['end_time']
- # pack_code = schedule_params['pack_code']
- # cell_type = schedule_params['cell_type']
-
- logger_main.info("获取算法参数")
- df_algo_adjustable_param, df_algo_list, df_algo_pack_param, df_snpk_list= update_param(mysql_algo_conn,rc)
- pack_code_list= list(set(df_snpk_list['pack_model']))
- for i in range(0,len(pack_code_list)):
- df_res_new=pd.DataFrame()
- df_res_update=pd.DataFrame()
- df_res_end=pd.DataFrame()
- df_snlist=df_snpk_list[df_snpk_list['pack_model']==pack_code_list[i]]
- sn_list=df_snlist['sn'].tolist()
- df_algo_adjustable_param_pack_code=df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code_list[i]]
- df_algo_param=df_algo_list
- # mysql数据读取
- try:
- time_st = time.time()
- logger_main.info(f'process-{process_num}开始读取mysql故障数据')
- df_diag_ram,df_soh,df_uniform,df_sor=get_bat_health_info(mysql_algo_conn,sn_list)
- logger_main.info(f'process-{process_num}读取mysql耗时{time.time()-time_st}')
- except Exception as e:
- logger_main.error(f"process-{process_num}:读取redis出错")
- logger_main.error(f"process-{process_num}:{e}")
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
- continue
- # 算法调用
- # 健康度评分算法调用
- try:
- time_st = time.time()
- loggers['healthscore'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
- healthscole = HealthScore(df_soh, df_uniform, df_sor)
- df_res_healthscore = df_snlist['sn'].apply(lambda x : healthscole.health_score(x))
- df_res_healthscore = pd.concat(df_res_healthscore.tolist(), axis=0)
- loggers['healthscore'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
- except Exception as e:
- loggers['healthscore'].error('算法运行出错')
- loggers['healthscore'].error(str(e))
- loggers['healthscore'].error(traceback.format_exc())
- df_res_healthscore=pd.DataFrame()
- # SOH诊断算法调用
- try:
- time_st = time.time()
- loggers['sohdiag'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
- df_res_new_soh, df_res_end_soh = sohdiag(df_soh, df_diag_ram, df_snlist, df_algo_adjustable_param_pack_code)
- loggers['sohdiag'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
- except Exception as e:
- loggers['sohdiag'].error('算法运行出错')
- loggers['sohdiag'].error(str(e))
- loggers['sohdiag'].error(traceback.format_exc())
- df_res_new_soh=pd.DataFrame()
- df_res_end_soh=pd.DataFrame()
- # 离线诊断算法调用
- try:
- time_st = time.time()
- loggers['offline_diag'].info(f'开始执行算法,电池包号为{pack_code_list[i]}')
- t_battery=get_battery_info(mysql_iotp_conn)
- offline_diag=off_line_warning.Off_Line_Warning()
- df_res_new_ofl,df_res_update_ofl,df_res_end_ofl=offline_diag.diag(t_battery,df_diag_ram,df_algo_adjustable_param_pack_code,df_snlist,df_algo_param)
- loggers['offline_diag'].info(f'算法运行完成,算法耗时{time.time()-time_st}')
- #print(len(df_res_new_ofl))
- except Exception as e:
- loggers['offline_diag'].error('算法运行出错')
- loggers['offline_diag'].error(str(e))
- loggers['offline_diag'].error(traceback.format_exc())
- df_res_new_ofl=pd.DataFrame()
- df_res_update_ofl=pd.DataFrame()
- df_res_end_ofl=pd.DataFrame()
-
-
- #结果写入mysql
- try:
- df_res_new = pd.concat([df_res_new_ofl,df_res_new_soh]) #, res1
- df_res_update=df_res_update_ofl #df_res_update_lw_soc#pd.concat([df_res_update_lw_soc,df_res_update_crnt, df_res_update_temp]) #, res1
- df_res_end = pd.concat([df_res_end_ofl,df_res_end_soh]) #, res2
- df_res_new.reset_index(drop=True, inplace=True)
- df_res_update.reset_index(drop=True, inplace=True)
- df_res_end.reset_index(drop=True, inplace=True)
- time_st = time.time()
- session = mysql_algo_Session()
- if not df_res_healthscore.empty:
- df_res_healthscore.to_sql("algo_health_score",con=mysql_algo_conn, if_exists="append",index=False)
- if not df_res_new.empty:
- df_res_new['date_info'] = df_res_new['start_time']
- df_res_new['create_time'] = datetime.now()
- df_res_new['create_by'] = 'algo'
- df_res_new['is_delete'] = 0
- df_res_new.to_sql("algo_all_fault_info_ing", con=mysql_algo_conn, if_exists="append", index=False)
- logger_main.info(f'process-{process_num}新增未结束故障入库{pack_code_list[i]}完成')
-
- if not df_res_end.empty:
- df_res_end=df_res_end.where(pd.notnull(df_res_end),None)
- df_res_end=df_res_end.fillna(0)
- for index in df_res_end.index:
- df_t = df_res_end.loc[index:index]
- sql = 'delete from algo_all_fault_info_ing where start_time=:start_time and fault_code=:fault_code and sn=:sn'
- params = {'start_time': df_t['start_time'].values[0],
- 'fault_code': df_t['fault_code'].values[0], 'sn': df_t['sn'].values[0]}
- session.execute(sql, params=params)
- sql = 'insert into algo_all_fault_info_done (date_info, start_time, end_time, sn, imei, model, fault_level, fault_code, fault_info,\
- fault_reason, fault_advice, fault_location, device_status,odo, create_time, create_by,update_time, update_by, is_delete,comment) values \
- (:date_info, :start_time, :end_time, :sn, :imei, :model,:fault_level, :fault_code, :fault_info,\
- :fault_reason, :fault_advice, :fault_location, :device_status, :odo, :create_time, :create_by, :update_time,:update_by, :is_delete , :comment)'
- params = {'date_info': datetime.now(),
- 'start_time': df_t['start_time'].values[0],
- 'end_time': df_t['end_time'].values[0],
- 'sn': df_t['sn'].values[0],
- 'imei': df_t['imei'].values[0],
- 'model' :pack_code_list[i],
- 'fault_level': df_t['fault_level'].values[0],
- 'fault_code': df_t['fault_code'].values[0],
- 'fault_info': df_t['fault_info'].values[0],
- 'fault_reason': df_t['fault_reason'].values[0],
- 'fault_advice': df_t['fault_advice'].values[0],
- 'fault_location': df_t['fault_location'].values[0],
- 'device_status': df_t['device_status'].values[0],
- 'odo': df_t['odo'].values[0],
- 'create_time': datetime.now(),
- 'create_by': 'algo',
- 'update_time': datetime.now(),
- 'update_by': None,
- 'is_delete': 0,
- 'comment': None}
- session.execute(sql, params=params)
- session.commit()
- logger_main.info(f'process-{process_num}结束故障入库{pack_code_list[i]}完成')
- if not df_res_update.empty:
- df_res_update=df_res_update.where(pd.notnull(df_res_update),None)
- df_res_update=df_res_update.fillna(0)
- for index in df_res_update.index:
- df_t = df_res_update.loc[index:index]
- try:
- # 更新数据
- with mysql_algo_Session() as session:
- session.execute(update(AlgoAllFaultInfoIng).where(
- and_((AlgoAllFaultInfoIng.start_time == df_t['start_time'].values[0]),
- (AlgoAllFaultInfoIng.fault_code == df_t['fault_code'].values[0]),
- (AlgoAllFaultInfoIng.sn == df_t['sn'].values[0]))).
- values(fault_level=df_t['fault_level'].values[0],
- comment=df_t['comment'].values[0]))
- session.commit()
- except Exception as e:
- logger_main.error(f"process-{process_num}:{pack_code_list[i]}结果入库出错")
- logger_main.error(f"process-{process_num}:{e}")
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
- finally:
- session.close()
- logger_main.info(f"process-{process_num}: 更新入库完成")
- else:
- logger_main.info(f"process-{process_num}: 无更新故障")
-
- logger_main.info(f"process-{process_num}: 结果入库耗时:{time.time()-time_st}")
- except Exception as e:
- logger_main.error(f"process-{process_num}:结果入库出错")
- logger_main.error(f"process-{process_num}:{e}")
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
- finally:
- pass
-
- except Exception as e:
- logger_main.error(f"process-{process_num}:获取mysql数据库数据出错")
- logger_main.error(f"process-{process_num}:{e}")
-
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
- logger_main.info(f"process-{process_num}: 结束本轮计算")
- except Exception as e:
- logger_main.error(f'process-{process_num}: {e}')
- logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
- cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, rc)
- if __name__ == '__main__':
-
- #定时任务.......................................................................................................................................................................
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/shouxueqi/projects/zlwl-algos/" # 设置app绝对路径fff
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "task_day_1_1" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
-
- main()
- x=0
- while x==0:
- state_time=time.localtime(time.time())
- tt ='2023-06-01 00:00:00'
- t=time.strptime(tt,'%Y-%m-%d %H:%M:%S')
- if state_time>t:
- x=1
- main()
- scheduler = BlockingScheduler()
- scheduler.add_job(main, 'interval', days=1, id='diag_job')
-
- try:
- logger_main.info(os.getpid())
- scheduler.start()
-
-
- except Exception as e:
- print(str(e))
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
|