123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import datetime
- import gc
- import re
- import configparser
- import pymysql
- import pandas as pd
- from multiprocessing import Pool
- import json
- import os
- import traceback
- import warnings
- from datetime import datetime, timedelta
- from apscheduler.schedulers.blocking import BlockingScheduler
- from sshtunnel import SSHTunnelForwarder
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.station.StationAlgoService import StationAlgoService
- def main():
- process_num=1
- try:
- warnings.filterwarnings("ignore")
- dingding_utils = DingdingUtils()
- mysqlUtils = MysqlUtils()
- file_name = '昨日新增工单.xlsx'
- yesterday = datetime.now() - timedelta(days=1)
- yesterday_str = yesterday.strftime('%Y-%m-%d')
- today_str = datetime.now().strftime('%Y-%m-%d')
- if os.path.exists(file_name):
- os.remove(file_name)
- try:
- # 调用算法前的准备工作
- today = datetime.now()
- today_str = today.strftime('%Y-%m-%d')
- logger_main.info(f"process-{process_num}: 配置中间件")
- # mysql
- APP_KEY = sysUtils.get_cf_param('dingding').get("app_key")
- APP_SECRET = sysUtils.get_cf_param('dingding').get("app_secret")
- Group_Chat_access_token=sysUtils.get_cf_param('dingding').get("group_chat_access_token")
- Group_Chat_access_token_ZK=sysUtils.get_cf_param('dingding').get("group_chat_access_token_zk")
- access_token = dingding_utils.get_access_token(APP_KEY,APP_SECRET)
- ZK_mysql_algo_params = sysUtils.get_cf_param('mysql-algo-zk')
- mysql_engine, mysql_Session= mysqlUtils.get_mysql_engine(ZK_mysql_algo_params)
-
-
-
-
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- try:
- # #钉钉群发送消息
- media_id_ZK = dingding_utils.upload_file(access_token, file_name)
- fileName='昨日新增工单'
-
- dingding_utils.send_file_to_dingGroup(media_id_ZK,fileName,today_str,Group_Chat_access_token_ZK,access_token, 'xlsx','xlsx')
- # dingding_utils.send_markdown_to_dingtalk("新增工单",Combined_markdown,Group_Chat_access_token,access_token)
- except Exception as e:
- logger_main.error('{}运行出错'.format(f'发送数据'))
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
-
-
-
-
-
- except Exception as e:
- logger_main.error(f'process-{process_num}: {e}')
- logger_main.error(f'process-{process_num}: {traceback.format_exc()}')
- #cleanUtils.clean(mysql_algo_conn, mysql_algo_engine, mysql_iotp_conn, mysql_iotp_engine, kafka_consumer, redis_conn)
- if __name__ == '__main__':
- # while(True):
- # 配置量
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/wangliming/project/zlwl-algos/" # 设置app绝对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "task_day_1_5" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- # 读取配置文件 (该部分请不要修改)
- # main()
- scheduler = BlockingScheduler()
- scheduler.add_job(main, 'interval', start_date='2023-08-26 08:00:00', days=1, id='diag_job')
-
- try:
- logger_main.info(os.getpid())
- scheduler.start()
- except Exception as e:
- print(str(e))
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
|