123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import datetime
- import json
- from multiprocessing import Pool
- import os
- import time
- import traceback
- from apscheduler.schedulers.blocking import BlockingScheduler
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.station.StationAlgoService import StationAlgoService
- from plc_base import plc_base
- def main():
- try:
- algo_list = ['plcbase'] # 本调度所包含的算法名列表。
- station_num = os.environ.get("station_num", "皋兰站")
- plc_sn=os.environ.get("plc_sn", "c100_1")
- loggers = sysUtils.get_loggers(algo_list, log_base_path, 0) # 为每个算法分配一个logger
- logger_main.info("配置中间件")
-
- influxdb_params = sysUtils.get_cf_param('influxdb')
- influxdb_params['token'] = os.environ.get("TOKEN", "r3UyJsR5ETic3bD-fg8-q9joytVWF1I8iqVBA0-TcGkiPb4XBHwXKy8OKiOA7ZMOCK1UB2H6JCmUH1M0_ZG16A==") # 路径覆盖
- mysql_local_params = sysUtils.get_cf_param('mysql-local')
- mysql_cloud_params = sysUtils.get_cf_param('mysql-cloud')
- mysqlUtils = MysqlUtils()
- mysql_local_engine, mysql_local_Session= mysqlUtils.get_mysql_engine(mysql_local_params)
- mysql_local_conn = mysql_local_engine.connect()
-
- station_algo_service = StationAlgoService(mysql_local_params, influxdb_params)
-
- now_time=datetime.datetime.now()
- start_time=now_time-datetime.timedelta(hours=24)
- start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
- end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
-
-
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- time.sleep(10)
- return
-
- ##读取电机标准的趋势曲线参数
-
- #读取mysql数据
- try:
- sql = "select * from algo_pack_param"
- base_param = pd.read_sql(sql, mysql_local_conn)
-
- para_base=json.loads(base_param["param"][base_param["pack_code"]=="plc_base"].iloc[0])
-
- logger_main.info('读取趋势曲线参数')
- except Exception as e:
- logger_main.error("读取mysql出错")
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- time.sleep(10)
- return
-
-
- try:
- logger_main.info(f"准备获取{start_time}-{end_time}的数据")
- data = station_algo_service.get_data(1, start_time, end_time)
- logger_main.info(f"数据获取完成,共获取到{len(data)}的数据")
- logger_main.info(f"data:{data.shape}")
- if len(data[(data["车辆精定位位置"]>0)|(data["车辆精定位位置"]<=0)])>0 and len(set(data["换电步骤"]))>=44:
- group_steps_base,group_base=plc_base.plc_base(data,para_base)
- group_steps_base["error_code"]=group_steps_base["error_code"].astype(str)
- group_steps_base["time_date"]=pd.to_datetime(group_steps_base["step_time_b"]).dt.date
- ##基于换电过程和换电步骤的指标(需要对应建表)
- group_steps_base["plc_sn"]=plc_sn
- group_steps_base["station_num"]=station_num
-
- logger_main.info(f"group_steps_base:{group_steps_base.shape}")
-
- group_steps_base.to_sql("plc_health_base",con=mysql_local_conn, if_exists="append",index=False)
-
- logger_main.info("group_steps_base_tomysql_ok")
- group_base["error_code"]=group_base["error_code"].astype(str)
- group_base["pause_steps"]=group_base["pause_steps"].astype(str)
- group_base["stop_steps"]=group_base["stop_steps"].astype(str)
- group_base["fault_steps"]=group_base["fault_steps"].astype(str)
- group_base["time_date"]=pd.to_datetime(group_base["c_time_b"]).dt.date
- ##基于换电过程的指标(需要对应建表)
- group_base["plc_sn"]=plc_sn
- group_base["station_num"]=station_num
-
- logger_main.info(f"group_base:{group_base.shape}")
- group_base.to_sql("plc_health_group_base",con=mysql_local_conn, if_exists="append",index=False)
-
- logger_main.info("group_base_tomysql_ok")
-
- ##识别疑似问题
-
- ##获取指标参考值
-
- except Exception as e:
- print(str(e))
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- if __name__ == '__main__':
-
- #定时任务.......................................................................................................................................................................
- cur_env = 'test' # 设置运行环境
- #app_path = r"/home/wangliming/project/zlwl-algos/" # 设置app绝对路径
- app_path = r"D:\work\zlwl-algos"
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "plc_early_warning_local" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- main()
- scheduler = BlockingScheduler()
- scheduler.add_job(main, 'interval', days=1, id='plcbase')
-
- try:
- logger_main.info(f"定时任务开启......")
- scheduler.start()
-
- except Exception as e:
- print(str(e))
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
|