main_by_hand.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import json
  2. import traceback
  3. import pandas as pd
  4. from ZlwlAlgosCommon.utils.ProUtils import *
  5. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  6. import pandas as pd
  7. import time
  8. import datetime
  9. def main():
  10. global sn_list, start_time, end_time, mysql_iotp_engine, mysql_kw_conn, mysql_kw_engine, topic, max_count, logger_main
  11. try:
  12. logger_main.info(f"执行调度{sn_list}")
  13. df_devcode = pd.DataFrame({'sn':sn_list})
  14. # 根据sn 获取对应的参数
  15. sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
  16. mysql_kw_conn = mysql_kw_engine.connect()
  17. df_t_device = pd.read_sql(sql, mysql_kw_conn)
  18. sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
  19. df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
  20. sql = f"select pack_code, param from algo_pack_param"
  21. df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
  22. sql = f"select id,algo_id, algo_name, is_activate, global_param, fault_level, fault_code from algo_list"
  23. df_algo_list= pd.read_sql(sql, mysql_kw_conn)
  24. algo_list = df_algo_list.to_dict("records")
  25. df_merge = pd.merge(df_devcode, df_t_device, on='sn', how='inner')
  26. print()
  27. # 分组发送
  28. for (pack_code, cell_type,organ_code), df in df_merge.groupby(["pack_model", "device_cell_type",'organ_code']):
  29. adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  30. adjustable_param = adjustable_param.to_dict("records")
  31. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  32. pack_param = pack_param.to_dict("records")
  33. count = 0
  34. sn_list = []
  35. for d in df.index:
  36. sn = df.loc[d, 'sn']
  37. imei = df.loc[d, 'imei']
  38. sn_list.append({'sn':sn, 'imei':imei})
  39. count = count + 1
  40. if count >= max_count:
  41. send_data = {'snlist':sn_list, 'adjustable_param':adjustable_param, 'pack_param':pack_param, 'algo_list':algo_list, 'pack_code':pack_code, 'cell_type':cell_type,
  42. 'start_time':start_time, 'end_time':end_time,'organ_code':organ_code}
  43. print(send_data)
  44. kafka_producer.send(topic, bytes(json.dumps(send_data),'utf-8'))
  45. count = 0
  46. sn_list = []
  47. time.sleep(1)
  48. mysql_kw_conn.close()
  49. except Exception as e:
  50. logger_main.error(str(e))
  51. logger_main.error(traceback.format_exc())
  52. if __name__ == '__main__':
  53. cur_env = 'dev' # 设置运行环境
  54. app_path = "/home/likun/project/zlwl-algos/" # 设置app绝对路径
  55. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  56. app_name = "task_day_7_split" # 应用名, 建议与topic的后缀相同
  57. sysUtils = SysUtils(cur_env, app_path)
  58. mysqlUtils = MysqlUtils()
  59. mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
  60. mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
  61. mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
  62. mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
  63. redis_params = sysUtils.get_cf_param('redis')
  64. redisUtils = RedisUtils()
  65. rc = redisUtils.get_redis_conncect(redis_params)
  66. kafka_params = sysUtils.get_cf_param('kafka')
  67. kafkaUtils = KafkaUtils()
  68. kafka_producer = kafkaUtils.get_kafka_producer(kafka_params, client_id="test")
  69. logger_main = sysUtils.get_logger(app_name, log_base_path)
  70. max_count = 1
  71. topic = "topic_task_test_hour_half_lk"
  72. sn_list = ["LY9139BB0MALBZ308",
  73. "LY9139BB0MALBZ325",
  74. "LY9139BB0MALBZ423",
  75. "LY9139BB0MALBZ504",
  76. "LY9139BB0MALBZ793",
  77. "LY9139BB0MALBZ809",
  78. "LY9139BB0MALBZ812"]
  79. start_time = "2022-01-07 00:00:00"
  80. end_time = "2022-01-07 02:00:00"
  81. main()
  82. #while True:
  83. # main()
  84. # start_time =pd.to_datetime(start_time) + datetime.timedelta(minutes=30)
  85. # end_time = start_time + datetime.timedelta(minutes=30)
  86. # start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
  87. # end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
  88. # print(start_time)
  89. # if start_time > "2022-01-10 00:00:00":
  90. # break