import datetime import gc import re import configparser import pymysql import pandas as pd from multiprocessing import Pool import json import os import traceback import warnings from datetime import datetime, timedelta from apscheduler.schedulers.blocking import BlockingScheduler from sshtunnel import SSHTunnelForwarder from ZlwlAlgosCommon.utils.ProUtils import * from ZlwlAlgosCommon.service.station.StationAlgoService import StationAlgoService def main(): process_num=1 try: warnings.filterwarnings("ignore") dingding_utils = DingdingUtils() mysqlUtils = MysqlUtils() file_name = '昨日新增工单.xlsx' yesterday = datetime.now() - timedelta(days=1) yesterday_str = yesterday.strftime('%Y-%m-%d') today_str = datetime.now().strftime('%Y-%m-%d') if os.path.exists(file_name): os.remove(file_name) try: # 调用算法前的准备工作 today = datetime.now() today_str = today.strftime('%Y-%m-%d') logger_main.info(f"process-{process_num}: 配置中间件") # mysql APP_KEY = sysUtils.get_cf_param('dingding').get("app_key") APP_SECRET = sysUtils.get_cf_param('dingding').get("app_secret") Group_Chat_access_token=sysUtils.get_cf_param('dingding').get("group_chat_access_token") Group_Chat_access_token_ZK=sysUtils.get_cf_param('dingding').get("group_chat_access_token_zk") access_token = dingding_utils.get_access_token(APP_KEY,APP_SECRET) ZK_mysql_algo_params = sysUtils.get_cf_param('mysql-algo-zk') mysql_engine, mysql_Session= mysqlUtils.get_mysql_engine(ZK_mysql_algo_params) except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) try: # #钉钉群发送消息 media_id_ZK = dingding_utils.upload_file(access_token, file_name) fileName='昨日新增工单' dingding_utils.send_file_to_dingGroup(media_id_ZK,fileName,today_str,Group_Chat_access_token_ZK,access_token, 'xlsx','xlsx') # dingding_utils.send_markdown_to_dingtalk("新增工单",Combined_markdown,Group_Chat_access_token,access_token) except Exception as e: logger_main.error('{}运行出错'.format(f'发送数据')) logger_main.error(str(e)) logger_main.error(traceback.format_exc()) except Exception as e: logger_main.error(f'process-{process_num}: {e}') logger_main.error(f'process-{process_num}: {traceback.format_exc()}') #cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, redis_conn) if __name__ == '__main__': # while(True): # 配置量 cur_env = 'dev' # 设置运行环境 app_path = "/home/wangliming/project/zlwl-algos/" # 设置app绝对路径 log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 app_name = "task_day_1_5" # 应用名 sysUtils = SysUtils(cur_env, app_path) logger_main = sysUtils.get_logger(app_name, log_base_path) logger_main.info(f"本次主进程号: {os.getpid()}") # 读取配置文件 (该部分请不要修改) # main() scheduler = BlockingScheduler() scheduler.add_job(main, 'interval', start_date='2023-08-26 08:00:00', days=1, id='diag_job') try: logger_main.info(os.getpid()) scheduler.start() except Exception as e: print(str(e)) print(traceback.format_exc()) logger_main.error(str(e)) logger_main.error(traceback.format_exc())