main.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import datetime
  2. import json
  3. from multiprocessing import Pool
  4. import os
  5. import time
  6. import traceback
  7. from apscheduler.schedulers.blocking import BlockingScheduler
  8. import pandas as pd
  9. from ZlwlAlgosCommon.utils.ProUtils import *
  10. from ZlwlAlgosCommon.service.station.StationAlgoService import StationAlgoService
  11. from plc_base import plc_base
  12. def main():
  13. try:
  14. algo_list = ['plcbase'] # 本调度所包含的算法名列表。
  15. station_num = os.environ.get("station_num", "皋兰站")
  16. plc_sn=os.environ.get("plc_sn", "c100_1")
  17. loggers = sysUtils.get_loggers(algo_list, log_base_path, 0) # 为每个算法分配一个logger
  18. logger_main.info("配置中间件")
  19. influxdb_params = sysUtils.get_cf_param('influxdb')
  20. influxdb_params['token'] = os.environ.get("TOKEN", "r3UyJsR5ETic3bD-fg8-q9joytVWF1I8iqVBA0-TcGkiPb4XBHwXKy8OKiOA7ZMOCK1UB2H6JCmUH1M0_ZG16A==") # 路径覆盖
  21. mysql_local_params = sysUtils.get_cf_param('mysql-local')
  22. mysql_cloud_params = sysUtils.get_cf_param('mysql-cloud')
  23. mysqlUtils = MysqlUtils()
  24. mysql_local_engine, mysql_local_Session= mysqlUtils.get_mysql_engine(mysql_local_params)
  25. mysql_local_conn = mysql_local_engine.connect()
  26. station_algo_service = StationAlgoService(mysql_local_params, influxdb_params)
  27. now_time=datetime.datetime.now()
  28. start_time=now_time-datetime.timedelta(hours=24)
  29. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  30. end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
  31. except Exception as e:
  32. logger_main.error(str(e))
  33. logger_main.error(traceback.format_exc())
  34. time.sleep(10)
  35. return
  36. ##读取电机标准的趋势曲线参数
  37. #读取mysql数据
  38. try:
  39. sql = "select * from algo_pack_param"
  40. base_param = pd.read_sql(sql, mysql_local_conn)
  41. para_base=json.loads(base_param["param"][base_param["pack_code"]=="plc_base"].iloc[0])
  42. logger_main.info('读取趋势曲线参数')
  43. except Exception as e:
  44. logger_main.error("读取mysql出错")
  45. logger_main.error(str(e))
  46. logger_main.error(traceback.format_exc())
  47. time.sleep(10)
  48. return
  49. try:
  50. logger_main.info(f"准备获取{start_time}-{end_time}的数据")
  51. data = station_algo_service.get_data(1, start_time, end_time)
  52. logger_main.info(f"数据获取完成,共获取到{len(data)}的数据")
  53. logger_main.info(f"data:{data.shape}")
  54. if len(data[(data["车辆精定位位置"]>0)|(data["车辆精定位位置"]<=0)])>0 and len(set(data["换电步骤"]))>=44:
  55. group_steps_base,group_base=plc_base.plc_base(data,para_base)
  56. group_steps_base["error_code"]=group_steps_base["error_code"].astype(str)
  57. group_steps_base["time_date"]=pd.to_datetime(group_steps_base["step_time_b"]).dt.date
  58. ##基于换电过程和换电步骤的指标(需要对应建表)
  59. group_steps_base["plc_sn"]=plc_sn
  60. group_steps_base["station_num"]=station_num
  61. logger_main.info(f"group_steps_base:{group_steps_base.shape}")
  62. group_steps_base.to_sql("plc_health_base",con=mysql_local_conn, if_exists="append",index=False)
  63. logger_main.info("group_steps_base_tomysql_ok")
  64. group_base["error_code"]=group_base["error_code"].astype(str)
  65. group_base["pause_steps"]=group_base["pause_steps"].astype(str)
  66. group_base["stop_steps"]=group_base["stop_steps"].astype(str)
  67. group_base["fault_steps"]=group_base["fault_steps"].astype(str)
  68. group_base["time_date"]=pd.to_datetime(group_base["c_time_b"]).dt.date
  69. ##基于换电过程的指标(需要对应建表)
  70. group_base["plc_sn"]=plc_sn
  71. group_base["station_num"]=station_num
  72. logger_main.info(f"group_base:{group_base.shape}")
  73. group_base.to_sql("plc_health_group_base",con=mysql_local_conn, if_exists="append",index=False)
  74. logger_main.info("group_base_tomysql_ok")
  75. ##识别疑似问题
  76. ##获取指标参考值
  77. except Exception as e:
  78. print(str(e))
  79. print(traceback.format_exc())
  80. logger_main.error(str(e))
  81. logger_main.error(traceback.format_exc())
  82. if __name__ == '__main__':
  83. #定时任务.......................................................................................................................................................................
  84. cur_env = 'test' # 设置运行环境
  85. #app_path = r"/home/wangliming/project/zlwl-algos/" # 设置app绝对路径
  86. app_path = r"D:\work\zlwl-algos"
  87. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  88. app_name = "plc_early_warning_local" # 应用名
  89. sysUtils = SysUtils(cur_env, app_path)
  90. logger_main = sysUtils.get_logger(app_name, log_base_path)
  91. logger_main.info(f"本次主进程号: {os.getpid()}")
  92. main()
  93. scheduler = BlockingScheduler()
  94. scheduler.add_job(main, 'interval', days=1, id='plcbase')
  95. try:
  96. logger_main.info(f"定时任务开启......")
  97. scheduler.start()
  98. except Exception as e:
  99. print(str(e))
  100. print(traceback.format_exc())
  101. logger_main.error(str(e))
  102. logger_main.error(traceback.format_exc())