123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- import json
- import traceback
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- import pandas as pd
- import datetime, time
- def main():
- global sn_list, start_time, end_time, mysql_iotp_engine, mysql_kw_conn, mysql_kw_engine, topic, max_count, logger_main
- try:
-
- logger_main.info(f"执行调度{sn_list}")
-
-
- df_devcode = pd.DataFrame({'sn':sn_list})
-
- # 根据sn 获取对应的参数
- sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
- mysql_kw_conn = mysql_kw_engine.connect()
- df_t_device = pd.read_sql(sql, mysql_kw_conn)
- sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
- df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
- sql = f"select pack_code, param from algo_pack_param"
- df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
- sql = f"select id,algo_id, algo_name, is_activate, global_param, fault_level, fault_code from algo_list"
- df_algo_list= pd.read_sql(sql, mysql_kw_conn)
- algo_list = df_algo_list.to_dict("records")
- df_merge = pd.merge(df_devcode, df_t_device, on='sn', how='inner')
-
- print()
- # 分组发送
- for (pack_code, cell_type,organ_code), df in df_merge.groupby(["pack_model", "device_cell_type",'organ_code']):
- adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
- adjustable_param = adjustable_param.to_dict("records")
- pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
- pack_param = pack_param.to_dict("records")
-
- count = 0
- sn_list = []
- for d in df.index:
- sn = df.loc[d, 'sn']
- imei = df.loc[d, 'imei']
- sn_list.append({'sn':sn, 'imei':imei})
- count = count + 1
- if count >= max_count:
- send_data = {'snlist':sn_list, 'adjustable_param':adjustable_param, 'pack_param':pack_param, 'algo_list':algo_list, 'pack_code':pack_code, 'cell_type':cell_type,
- 'start_time':start_time, 'end_time':end_time,'organ_code':organ_code}
- print(send_data)
- kafka_producer.send(topic, bytes(json.dumps(send_data),'utf-8'))
- count = 0
- sn_list = []
- mysql_kw_conn.close()
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- if __name__ == '__main__':
-
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/chenenze/zlwl-algos/" # 设置相对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "schedule" # 应用名, 建议与topic的后缀相同
-
- sysUtils = SysUtils(cur_env, app_path)
-
- mysqlUtils = MysqlUtils()
- mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
- mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
-
-
- mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
- mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
- redis_params = sysUtils.get_cf_param('redis')
- redisUtils = RedisUtils()
- rc = redisUtils.get_redis_conncect(redis_params)
- kafka_params = sysUtils.get_cf_param('kafka')
- kafkaUtils = KafkaUtils()
- kafka_producer = kafkaUtils.get_kafka_producer(kafka_params, client_id="test")
-
- logger_main = sysUtils.get_logger(app_name, log_base_path)
-
- topic = "topic_test_cez_1"
- # sn_list_all = [
- # 'LY9139BB0MALBZ308',
- # 'LY9139BB0MALBZ325',
- # 'LY9139BB0MALBZ423',
- # 'LY9139BB0MALBZ504',
- # 'LY9139BB0MALBZ793',
- # 'LY9139BB0MALBZ809',
- # 'LY9139BB0MALBZ812',
- # 'LY9139BB1MALBZ429',
- # 'LY9139BB1MALBZ799',
- # 'LY9139BB1MALBZ804',
- # 'LY9139BB2MALBZ424',
- # 'LY9139BB2MALBZ505',
- # 'LY9139BB2MALBZ794',
- # 'LY9139BB3MALBZ318',
- # 'LY9139BB3MALBZ805',
- # 'LY9139BB4MALBZ795',
- # 'LY9139BB4MALBZ800',
- # 'LY9139BB5MALBZ319',
- # 'LY9139BB5MALBZ806',
- # 'LY9139BB6MALBZ426',
- # 'LY9139BB6MALBZ796',
- # 'LY9139BB6MALBZ801',
- # 'LY9139BB7MALBZ323',
- # 'LY9139BB7MALBZ807',
- # 'LY9139BB7MALBZ810',
- # 'LY9139BB8MALBZ329',
- # 'LY9139BB8MALBZ427',
- # 'LY9139BB8MALBZ430',
- # 'LY9139BB8MALBZ797',
- # 'LY9139BB8MALBZ802',
- # 'LY9139BB9MALBZ310',
- # 'LY9139BB9MALBZ338',
- # 'LY9139BB9MALBZ808',
- # 'LY9139BB9MALBZ811',
- # 'LY9139BBXMALBZ428',
- # 'LY9139BBXMALBZ431',
- # 'LY9139BBXMALBZ798',
- # 'LY9139BBXMALBZ803',
- # 'LY9F49BC1MALBZ877',
- # 'LY9F49BC3MALBZ878',
- # 'LY9F49BC3MALBZ881',
- # 'LY9F49BC4MALBZ081',
- # 'LY9F49BC4MALBZ470',
- # 'LY9F49BC5MALBZ364',
- # 'LY9F49BC5MALBZ879',
- # 'LY9F49BC5MALBZ882',
- # 'LY9F49BC7MALBZ480',
- # 'LY9F49BC7MALBZ883',
- # 'LY9F49BC8MALBZ083',
- # 'LY9F49BCXMALBZ876'
- # ]
- sn_list_all = ['LY9F49BC7MALBZ883']
- max_count = 1
- start_time_str = "2022-02-16 00:00:00"
- end_time_str = "2022-02-17 00:00:00"
- period = datetime.timedelta(hours=24)
- start_time_dt = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
- end_time_dt = datetime.datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
- # start_times = []
- # end_times = []
- while start_time_dt < end_time_dt:
- segment_start_time_dt = end_time_dt - period
- if segment_start_time_dt < start_time_dt:
- segment_start_time_dt = start_time_dt
- # start_times.append(start_time.strftime('%Y-%m-%d %H:%M:%S'))
- # end_times.append(segment_end_time.strftime('%Y-%m-%d %H:%M:%S'))
-
- start_time = segment_start_time_dt.strftime('%Y-%m-%d %H:%M:%S')
- end_time = end_time_dt.strftime('%Y-%m-%d %H:%M:%S')
- sn_list = sn_list_all
- print(start_time, end_time)
- main()
- time.sleep(1)
- end_time_dt = segment_start_time_dt
-
|