import pandas as pd import datetime import os import traceback from apscheduler.schedulers.blocking import BlockingScheduler from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.station.StationAlgoService import StationAlgoService from plcbase.V_1_0_0 import plc_base from plcbase.V_1_0_0 import base_check def main(): mysql_swap_battery_params = sysUtils.get_cf_param('mysql-swap-battery') influxdb_params = sysUtils.get_cf_param('influxdb') mysqlUtils = MysqlUtils() mysql_swap_battery_engine, mysql_swap_battery_Session= mysqlUtils.get_mysql_engine(mysql_swap_battery_params) mysql_swap_battery_conn = mysql_swap_battery_engine.connect() station_algo_service = StationAlgoService(mysql_swap_battery_params, influxdb_params) now_time=datetime.datetime.now() start_time=now_time-datetime.timedelta(hours=24*30) start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') end_time=now_time.strftime('%Y-%m-%d %H:%M:%S') try: data = station_algo_service.get_data(1, start_time, end_time) print(data.columns) data.columns=["time", "sn", "steps", "status", "x_v", "y_v", "z_v", "r_v", "pause", "x_s", "y_s", "z_s", "r_s", "x1_toque", "x2_toque", "y_toque", "z_toque", "r_toque", "pressure_1", "pressure_2", "pressure_3", "pressure_4", "veh_coarse_position", "emer_stop", "veh_precise_position", "veh_precise_h", "veh_precise_deflect", "low_bat_local", "low_bat_h", "low_bat_seat_h", "trans_house_loc", "trans_house_h", "trans_house_angle", "low_bat_trans_house_loc", "low_bat_add_seat_h", "full_bat_house_loc", "full_bat_local", "full_bat_h", "full_bat_oncar_loc", "lift_from_th_h", "position_sense", "full_house_id", "lock_full_extended", "lock_full_retracted", "seat_all_in", "seat_all_notin", "grating_sensor", "power_exchange_mode", "low_bat_house_id", "change_num", "control_house_id", "base_sensor1", "base_sensor2", "base_sensor3", "base_sensor4", "base_sensor5", "base_sensor6", "base_sensor7", "base_sensor8", "light", "fan1", "fan2", "fan3", "fan4", "station_control_break", "entire_station_break", "error_code1", "error_code2", "error_code3", "error_code4", "error_code5", "error_code6", "error_code7", "error_code8", "error_code9", "error_code10", "h_axis_origin", "lock1_retracted", "pushrod1_extended", "pushrod1_retracted", "pushrod2_extended", "pushrod2_retracted", "pushrod3_extended", "pushrod3_retracted", "pushrod4_extended", "pushrod4_retracted", "lock3_retracted", "lock1_extended", "lift1_inplace", "lock3_extended", "lift2_inplace", "lift3_inplace", "lift4_inplace", "lock4_retracted", "lock4_extended", "lock2_retracted", "lock2_extended", "pixel_x", "pixel_y", "dis_ontime", "dis1_process", "dis2_process", "lift_code_dist", "deflect2", "pushrod2_out_time", "pushrod3_out_time", "pushrod2_in_time", "pushrod3_in_time", "pushrod1_out_time", "pushrod4_out_time", "pushrod1_in_time", "pushrod4_in_time"] group_steps_base,group_base=plc_base(data) group_steps_base["error_code"]=group_steps_base["error_code"].astype(str) group_steps_base["time_date"]=pd.to_datetime(group_steps_base["step_time_b"]).dt.date ##基于换电过程和换电步骤的指标(需要对应建表) group_steps_base.to_sql("plc_health_base",con=mysql_swap_battery_conn, if_exists="append",index=False) group_base["error_code"]=group_base["error_code"].astype(str) group_base["pause_steps"]=group_base["pause_steps"].astype(str) group_base["stop_steps"]=group_base["stop_steps"].astype(str) group_base["fault_steps"]=group_base["fault_steps"].astype(str) group_base["time_date"]=pd.to_datetime(group_base["c_time_b"]).dt.date ##基于换电过程的指标(需要对应建表) group_base.to_sql("plc_health_group_base",con=mysql_swap_battery_conn, if_exists="append",index=False) ##识别疑似问题 ##获取指标参考值 sqls="select * from plc_health_para" con_df=pd.read_sql_query(sqls, mysql_swap_battery_conn) measure_col_s=['step_time', 'x_v_max', 'y_v_max', 'z_v_max', 'x_s_max', 'y_s_max', 'z_s_max', 'x1_tq_max', 'x2_tq_max', 'y_tq_max', 'z_tq_max', 'pres_1_max', 'pres_2_max', 'pres_3_max', 'pres_4_max', 'coarse_pos_max', 'precise_pos_max', 'precise_h_max', 'low_bat_local_max', 'low_bat_h_max', 'low_bat_outseat_h_max', 'trans_house_loc_max', 'trans_house_h_max', 'trans_house_angle_max', 'low_bat_trans_h_loc_max', 'low_bat_seat_h_max', 'full_bat_house_loc_max', 'full_bat_local_max', 'full_bat_h_max', 'lift_from_th_h_max', 'lift_code_dist_max'] df_uinon,df_uinon_group=base_check(group_steps_base,con_df,measure_col_s,p=0.1) df_uinon_group['act']=df_uinon_group['act'].astype(str) df_uinon_group['con']=df_uinon_group['con'].astype(str) df_uinon.to_sql("plc_check_flow",con=mysql_swap_battery_conn, if_exists="append",index=False) df_uinon_group.to_sql("plc_check_result",con=mysql_swap_battery_conn, if_exists="append",index=False) except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) if __name__ == '__main__': #定时任务....................................................................................................................................................................... cur_env = 'dev' # 设置运行环境 app_path = r"/home/wangliming/project/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "plcbase" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) main() scheduler = BlockingScheduler() scheduler.add_job(main, 'interval', days=1, id='plcbase') try: logger_main.info(os.getpid()) scheduler.start() except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc())