main_by_hand.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import time, datetime
  2. import json
  3. import traceback
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. import pandas as pd
  6. from ZlwlAlgosCommon.utils.ProUtils import *
  7. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  8. def main():
  9. global sn_list, start_time, end_time, mysql_iotp_engine, mysql_kw_conn, mysql_kw_engine, topic, max_count, logger_main
  10. try:
  11. logger_main.info(f"执行调度{sn_list}")
  12. df_devcode = pd.DataFrame({'sn':sn_list})
  13. # 根据sn 获取对应的参数
  14. sql = f"select sn, imei, pack_model,organ_code, device_cell_type from t_device"
  15. mysql_kw_conn = mysql_kw_engine.connect()
  16. df_t_device = pd.read_sql(sql, mysql_kw_conn)
  17. sql = f"select algo_id, pack_code, param, param_ai from algo_adjustable_param"
  18. df_algo_adjustable_param = pd.read_sql(sql, mysql_kw_conn)
  19. sql = f"select pack_code, param from algo_pack_param"
  20. df_algo_pack_param = pd.read_sql(sql, mysql_kw_conn)
  21. sql = f"select id,algo_id, algo_name, is_activate, global_param, fault_level, fault_code from algo_list"
  22. df_algo_list= pd.read_sql(sql, mysql_kw_conn)
  23. algo_list = df_algo_list.to_dict("records")
  24. df_merge = pd.merge(df_devcode, df_t_device, on='sn', how='inner')
  25. print()
  26. # 分组发送
  27. for (pack_code, cell_type,organ_code), df in df_merge.groupby(["pack_model", "device_cell_type",'organ_code']):
  28. adjustable_param = df_algo_adjustable_param[df_algo_adjustable_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  29. adjustable_param = adjustable_param.to_dict("records")
  30. pack_param = df_algo_pack_param[df_algo_pack_param['pack_code']==pack_code].drop(['pack_code'], axis=1)
  31. pack_param = pack_param.to_dict("records")
  32. count = 0
  33. sn_list = []
  34. for d in df.index:
  35. sn = df.loc[d, 'sn']
  36. imei = df.loc[d, 'imei']
  37. sn_list.append({'sn':sn, 'imei':imei})
  38. count = count + 1
  39. if count >= max_count:
  40. send_data = {'snlist':sn_list, 'adjustable_param':adjustable_param, 'pack_param':pack_param, 'algo_list':algo_list, 'pack_code':pack_code, 'cell_type':cell_type,
  41. 'start_time':start_time, 'end_time':end_time,'organ_code':organ_code}
  42. print(send_data)
  43. kafka_producer.send(topic, bytes(json.dumps(send_data),'utf-8'))
  44. time.sleep(1)
  45. count = 0
  46. sn_list = []
  47. mysql_kw_conn.close()
  48. except Exception as e:
  49. logger_main.error(str(e))
  50. logger_main.error(traceback.format_exc())
  51. if __name__ == '__main__':
  52. cur_env = 'dev' # 设置运行环境
  53. app_path = "/home/shouxueqi/projects/zlwl-algos/zlwl-algos/" # 设置相对路径/home/shouxueqi/projects/zlwl-algos/zlwl-algos/ALGOS/PERIODIC/task_min_5/group_1/main.py
  54. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  55. app_name = "schedule" # 应用名, 建议与topic的后缀相同
  56. sysUtils = SysUtils(cur_env, app_path)
  57. mysqlUtils = MysqlUtils()
  58. mysql_iotp_params = sysUtils.get_cf_param('mysql-iotp')
  59. mysql_iotp_engine, mysql_iotp_Session= mysqlUtils.get_mysql_engine(mysql_iotp_params)
  60. mysql_kw_params = sysUtils.get_cf_param('mysql-algo')
  61. mysql_kw_engine, mysql_kw_Session= mysqlUtils.get_mysql_engine(mysql_kw_params)
  62. redis_params = sysUtils.get_cf_param('redis')
  63. redisUtils = RedisUtils()
  64. rc = redisUtils.get_redis_conncect(redis_params)
  65. kafka_params = sysUtils.get_cf_param('kafka')
  66. kafkaUtils = KafkaUtils()
  67. kafka_producer = kafkaUtils.get_kafka_producer(kafka_params, client_id="test")
  68. logger_main = sysUtils.get_logger(app_name, log_base_path)
  69. max_count = 1
  70. start_time = "2023-07-19 00:00:00"
  71. end_time = "2023-07-20 00:00:00"
  72. # sn_list_total = ['LY9139BB0MALBZ308','LY9139BB0MALBZ325','LY9139BB0MALBZ423','LY9139BB0MALBZ504','LY9139BB0MALBZ793','LY9139BB0MALBZ809','LY9139BB0MALBZ812','LY9139BB1MALBZ429','LY9139BB1MALBZ799','LY9139BB1MALBZ804',
  73. # 'LY9139BB2MALBZ424','LY9139BB2MALBZ505','LY9139BB2MALBZ794','LY9139BB3MALBZ318','LY9139BB3MALBZ805','LY9139BB4MALBZ795','LY9139BB4MALBZ800','LY9139BB5MALBZ319','LY9139BB5MALBZ806','LY9139BB6MALBZ426',
  74. # 'LY9139BB6MALBZ796','LY9139BB6MALBZ801','LY9139BB7MALBZ323','LY9139BB7MALBZ807','LY9139BB8MALBZ329','LY9139BB7MALBZ810','LY9139BB8MALBZ427','LY9139BB8MALBZ430','LY9139BB8MALBZ797','LY9139BB8MALBZ802',
  75. # 'LY9139BB9MALBZ338','LY9139BB9MALBZ808','LY9139BB9MALBZ811','LY9139BBXMALBZ428','LY9139BBXMALBZ431','LY9139BBXMALBZ798','LY9139BBXMALBZ803','LY9F49BC1MALBZ877','LY9F49BC3MALBZ878','LY9F49BC3MALBZ881',
  76. # 'LY9F49BC4MALBZ081','LY9F49BC4MALBZ470','LY9F49BC5MALBZ364','LY9F49BC5MALBZ879','LY9F49BC5MALBZ882','LY9F49BC7MALBZ480','LY9F49BC7MALBZ883','LY9F49BC8MALBZ083','LY9F49BCXMALBZ876','LY9139BB9MALBZ310']
  77. # sn_list_total =['PJXCLL128N234P005', 'PJXCLL128N234P007', 'PJXCLL128N234P003', 'PJXCLL128N234P010', 'PJXCLL128N234P002', 'PJXCLL128N234P004', 'PJXCLL128N234P001', 'PJXCLL128N234P006', 'PJXCLL128N234P008']#['PK50001A100000411','PK50001A100000173'] #,
  78. # sn_list=sn_list_total
  79. # # mysql
  80. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  81. mysqlUtils = MysqlUtils()
  82. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  83. mysql_algo_conn = mysql_algo_engine.connect()
  84. df_snpk_list = pd.read_sql("select sn, imei,pack_model,device_cell_type,scrap_status from t_device", mysql_algo_conn)
  85. df_snpk_list=df_snpk_list[df_snpk_list['scrap_status']<4]
  86. df_snpk_list=df_snpk_list.rename(columns={'pack_model':'pack_code'})
  87. df_snpk_list=df_snpk_list[df_snpk_list['sn'].str.contains('N234P')]
  88. df_sn_list=list(df_snpk_list['sn'])
  89. sn_list_total=df_sn_list
  90. sn_list_total=['662CC700041']
  91. #sn_list_total=['PJXCLL128N234P008']
  92. # while True:
  93. # start_time = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S") + datetime.timedelta(weeks=4)
  94. # end_time = start_time + datetime.timedelta(weeks=4)
  95. # start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
  96. # end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
  97. # print(start_time)
  98. # if start_time > "2022-12-05 00:00:00":
  99. # break
  100. topic = "topic_test_sxq"#topic_task_min_10
  101. # main()
  102. for i in range(0,len(sn_list_total)): #
  103. sn_list=[sn_list_total[i]]
  104. main()
  105. # main()
  106. # task_day_1_sxq_test = task_day_1_1_sxq_test
  107. # group_id_task_day_1_sxq_test = group_task_day_1_sxq_test