import time, datetime import json import traceback from apscheduler.schedulers.blocking import BlockingScheduler import pandas as pd from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService import time def main(): global sn_list, start_time, end_time, mysql_iotp_engine, mysql_kw_conn, mysql_kw_engine, topic, max_count, logger_main try: logger_main.info(f"执行调度{sn_list}") df_devcode = pd.DataFrame({'sn':sn_list}) # 根据sn 获取对应的参数 sql = f"select sn, imei, pack_model, device_cell_type from t_device" mysql_kw_conn = mysql_kw_engine.connect() df_t_device = pd.read_sql(sql, mysql_kw_conn) sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param_copy" df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn) sql = f"select pack_code, param from algo_pack_param" df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn) sql = f"select algo_id, algo_name, is_activate, global_param, fault_level, fault_code from algo_list" df_algo_list= pd.read_sql(sql, mysql_kw_conn) algo_list = df_algo_list.to_dict("records") df_merge = pd.merge(df_devcode, df_t_device, on='sn', how='inner') if df_merge.empty: print(f"未查到{sn_list}对应的参数,无法调度") # 分组发送 for (pack_code, cell_type), df in df_merge.groupby(["pack_model", "device_cell_type"]): adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1) adjustable_param = adjustable_param.to_dict("records") pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1) pack_param = pack_param.to_dict("records") count = 0 sn_list = [] for d in df.index: sn = df.loc[d, 'sn'] imei = df.loc[d, 'imei'] sn_list.append({'sn':sn, 'imei':imei}) count = count + 1 if count >= max_count: send_data = {'snlist':sn_list, 'adjustable_param':adjustable_param, 'pack_param':pack_param, 'algo_list':algo_list, 'pack_code':pack_code, 'cell_type':cell_type, 'start_time':start_time, 'end_time':end_time} kafka_producer.send(topic, bytes(json.dumps(send_data),'utf-8')) print(f"{sn_list}调度成功") count = 0 sn_list = [] time.sleep(3) mysql_kw_conn.close() except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) if __name__ == '__main__': cur_env = 'dev' # 设置运行环境 app_path = "/home/zhuxi/project/zlwl-algos/" # 设置相对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "schedule" # 应用名, 建议与topic的后缀相同 sysUtils = SysUtils(cur_env, app_path) mysqlUtils = MysqlUtils() mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp') mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params) mysql_kw_params = sysUtils.get_cf_param('mysql-algo') mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params) redis_params = sysUtils.get_cf_param('redis') redisUtils = RedisUtils() rc = redisUtils.get_redis_conncect(redis_params) kafka_params = sysUtils.get_cf_param('kafka') kafkaUtils = KafkaUtils() kafka_producer = kafkaUtils.get_kafka_producer(kafka_params, client_id="test") logger_main = sysUtils.get_logger(app_name, log_base_path) max_count = 1 #批处理最大量 topic = "topic_task_hour_6" #sn_list=os.listdir('/data/common/benchi/data') df_sn=pd.read_excel('/home/zhuxi/project/zlwl-algos/USER/zhuxi/FaultClass/V1_0_1/t_device.xlsx') sn_list=list(df_sn['sn']) #sn_list=['PJXCLL532S234M004'] start_time = "2023-05-04 08:00:00" end_time = "2023-05-04 11:00:00" main()