123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- import configparser
- import logging
- import logging.handlers
- import os
- import re
- from urllib import parse
- import json
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- import oss2
- import redis
- from kafka import KafkaConsumer,KafkaProducer
- from redis import RedisCluster
- import pandas as pd
- import requests
- from alibabacloud_dingtalk.robot_1_0.client import Client as dingtalkrobot_1_0Client
- from alibabacloud_tea_openapi import models as open_api_models
- from alibabacloud_dingtalk.robot_1_0 import models as dingtalkrobot__1__0_models
- from alibabacloud_tea_util import models as util_models
- from alibabacloud_tea_util.client import Client as UtilClient
- class SysUtils:
- """通过配置文件目录,获取配置文件对应内容
- """
- def __init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__))))), type=1):
- if type == 1:
- # 项目内部调用
- self._init__(cur_env, app_path)
- else:
- # 外部调用,只使用内部的部分功能
- pass
- def _init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__)))))):
- self.cur_env = os.environ.get("PRO_ENV", cur_env) # 环境覆盖
- self.app_path = os.environ.get("APP_PATH", app_path) # 路径覆盖
- if not os.path.exists(os.path.join(self.app_path, "RESOURCES")):
- raise Exception("$APP_PATH exit, {} is error. {}/RESOURCES is not exist".format(self.app_path, self.app_path))
- if not os.path.exists(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini")):
- raise Exception("config-{} is not exist".format(self.cur_env))
- # if not os.path.exists("{}/Log".format(self.app_path)):
- # os.makedirs("{}/Log".format(self.app_path))
- print("start env: {} ".format(self.cur_env))
- self._get_cf()
- self.env_params = os.environ # 获取环境变量参数
-
- def _get_cf(self):
- self.cf = configparser.ConfigParser()
- self.cf.read(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini"))
-
- def get_cf_param(self, section):
- """根据不同的type 返回不同的参数
- """
- options = self.cf.options(section)
- params = {}
- for o in options:
- params.update({o:self.cf.get(section, o)})
- return params
- # if type == 'mysql':
- # host = self.cf.get(tag, 'host')
- # port = int(self.cf.get(tag, 'port'))
- # db = self.cf.get(tag, 'db')
- # user = self.cf.get(tag, 'user')
- # password = parse.quote_plus(self.cf.get(tag, 'password'))
- # print("mysql: {} - {}".format(host, port))
- # return {'host':host, 'port':port, 'db':db, 'user':user, 'password':password}
- # elif type == 'redis':
- # host = self.cf.get(tag, 'host')
- # port = int(self.cf.get(tag, 'port'))
- # password = self.cf.get(tag, 'password')
- # cluster_enable = self.cf.get(tag, 'cluster_enable', fallback=None)
- # print("redis: {} - {}".format(host, port))
- # return {'host':host, 'port':port, 'password':password, 'cluster_enable':cluster_enable}
- # elif type == 'hbase':
- # host = self.cf.get(tag, 'host')
- # port = int(self.cf.get(tag, 'port'))
- # db = self.cf.get(tag, 'db', fallback=None)
- # print("hbase: {} - {}".format(host, port))
- # return {'host':host, 'port':port, 'db':db}
- # elif type == 'kafka':
- # host = self.cf.get(tag, 'host')
- # port = int(self.cf.get(tag, 'port'))
- # topic = self.cf.get(tag, 'topic', fallback=None)
- # group_id = self.cf.get(tag, 'group_id', fallback=None)
- # print("kafka: {} - {}".format(host, port))
- # return {'host':host, 'port':port, 'topic':topic, 'group_id':group_id}
- # elif type == 'oss':
- # bucket = self.cf.get(tag, 'bucket')
- # accesskeyId = self.cf.get(tag, 'accesskeyId')
- # accesskeySecret = self.cf.get(tag, 'accesskeySecret')
- # endpoint = self.cf.get(tag, 'endpointout')
- # print("hbase: {} - {}".format(host, port))
- # return {'bucket':bucket, 'accesskeyId':accesskeyId, 'accesskeySecret':accesskeySecret, 'endpoint':endpoint}
-
- def get_log_handler(self, log_file, level=logging.INFO):
- # 根据日期滚动
- if not os.path.exists(os.path.dirname(log_file)):
- os.makedirs(os.path.dirname(log_file))
- fh = logging.handlers.TimedRotatingFileHandler(filename=log_file, when="D", interval=1, backupCount=7,
- encoding="utf-8")
- formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
- fh.suffix = "%Y-%m-%d_%H-%M-%S"
- fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
- fh.setFormatter(formatter)
- fh.setLevel(level)
- return fh
-
- def get_logger(self, log_name, log_path):
- # 日志配置
- logger = logging.getLogger(log_name)
- logger.setLevel(logging.INFO)
- logger.addHandler(self.get_log_handler("{}/{}.info.log".format(log_path, log_name), logging.INFO))
- logger.addHandler(self.get_log_handler("{}/{}.error.log".format(log_path, log_name), logging.ERROR))
- return logger
-
- def get_loggers(self, algo_list, log_path, process_num):
- '''根据算啦列表,返回多个logger
- '''
- loggers = {}
- for algo in algo_list:
- algo_log_path = f"{log_path}/{algo}/process_{process_num}" # 设置日志路径
- logger = self.get_logger(algo, algo_log_path)
- logger.info("pid is " + str(os.getpid()))
- loggers.update({algo:logger})
- return loggers
-
-
-
- class MysqlUtils:
- def __init__(self):
- pass
-
- def get_mysql_engine(self, params):
- self.db_engine = create_engine(
- "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
- params['user'],
- parse.quote_plus(params['password']),
- params['host'],
- int(params['port']),
- params['db']
- ),pool_recycle=3600,pool_size=5)
- self.Session = sessionmaker(bind=self.db_engine)
- return self.db_engine, self.Session
-
- class RedisUtils:
- def __init__(self):
- pass
-
- def get_redis_conncect(self, params):
- rc = None
- if (r:=params.get('cluster_enable', None)) and r == 'false':
- rc = redis.Redis(host=params['host'], port=int(params['port']), password=params['password'],decode_responses=True)
- elif (r:=params.get('cluster_enable', None)) and r == 'true':
- rc = RedisCluster(startup_nodes=[{"host":params['host'], "port":int(params['port'])}], password=params['password'],decode_responses=True)
- return rc
-
- class KafkaUtils:
- def __init__(self):
- pass
-
- def get_kafka_consumer(self, params, topic_key, group_id_key, client_id="none"):
- consumer = KafkaConsumer(params[topic_key], bootstrap_servers=['{}:{}'.format(params['host'], params['port'])],
- client_id=client_id,
- auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费
- enable_auto_commit=True, # 自动提交消费者的offset
- auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,
- max_poll_records=1, # 每次拉取消息的最大上限
- max_poll_interval_ms=6000000, # 两次拉取消息的最大间隔时长
- group_id=params[group_id_key]
- )
- return consumer
- def get_kafka_producer(self, params, client_id="none"):
- producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(params['host'], params['port'])],
- client_id=client_id, retries=10)
- return producer
-
- class OssUtils:
- def __init__(self):
- pass
-
- def get_bucket(self):
- oss_bucket, oss_accesskeyID, oss_accesskeySecret, oss_endpoint = self.sys_util.get_oss()
- auth = oss2.Auth(oss_accesskeyID, oss_accesskeySecret)
- bucket = oss2.Bucket(auth, "https://{}".format(oss_endpoint), oss_bucket)
- return bucket
- def download_file_oss(self, url, filepath='', filename=''):
- bucket = self.get_bucket()
- url = url.replace('://', '')
- path = url[url.index('/') + 1:]
- bucket.get_object_to_file(path, filepath + filename)
-
- class CleanUtils:
-
- def clean(self, mysql_algo_conn=None, mysql_algo_engine=None, mysql_iotp_conn=None,
- mysql_iotp_engine=None, kafka_consumer=None, rc=None):
- if not mysql_algo_conn is None:
- mysql_algo_conn.close()
- mysql_algo_engine.dispose()
- if not mysql_iotp_conn is None:
- mysql_iotp_conn.close()
- mysql_iotp_engine.dispose()
- if not kafka_consumer is None:
- kafka_consumer.close()
- if not rc is None:
- rc.close()
-
- class DingdingUtils():
-
- def __init__(self):
- pass
- # 获取 Access Token
- def get_access_token(self, APP_KEY,APP_SECRET):
- url = f"https://oapi.dingtalk.com/gettoken?appkey={APP_KEY}&appsecret={APP_SECRET}"
- response = requests.get(url)
- data = response.json()
- return data.get('access_token')
- # 上传文件并获取 MediaId
- def upload_file(self, access_token, file_path):
- url = f"https://oapi.dingtalk.com/media/upload?access_token={access_token}&type=file"
- files = {'media': open(file_path, 'rb')}
- response = requests.post(url, files=files)
- data = response.json()
- return data.get('media_id')
- def _create_client(self) -> dingtalkrobot_1_0Client:
- """
- 使用 Token 初始化账号Client
- @return: Client
- @throws Exception
- """
- config = open_api_models.Config()
- config.protocol = 'https'
- config.region_id = 'central'
- return dingtalkrobot_1_0Client(config)
- def send_file_to_dingGroup(self, media_id, fileName, date_string, Group_Chat_access_token, access_token, suffix='docx', filetype='docx'):
- client = self._create_client()
- org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders()
- org_group_send_headers.x_acs_dingtalk_access_token = access_token
- File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest(
- msg_param=str({ 'mediaId':media_id, 'fileName':'{}_{}.{}'.format(fileName,date_string,suffix), 'fileType':filetype}),
- msg_key='sampleFile',
- token=Group_Chat_access_token
- )
- try:
- client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions())
- except Exception as err:
- if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
- # err 中含有 code 和 message 属性,可帮助开发定位问题
- pass
- def send_markdown_to_dingtalk(self, message_title, markdown_content,Group_Chat_access_token,access_token):
- client = self._create_client()
- org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders()
- org_group_send_headers.x_acs_dingtalk_access_token = access_token
- File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest(
- msg_param=str({"title": message_title,"text": markdown_content}),
- msg_key='sampleMarkdown',
- token=Group_Chat_access_token
- )
- try:
- client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions())
- except Exception as err:
- if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
- # err 中含有 code 和 message 属性,可帮助开发定位问题
- pass
-
-
-
- class DocxUtils():
- def __init__(self):
- pass
-
- def add_table_to_word(self,doc, df):
- # 将DataFrame数据写入表格
- table = doc.add_table(rows=1, cols=len(df.columns))
- table.style = 'Table Grid'
- # 添加表头
- header_cells = table.rows[0].cells
- for col, header in enumerate(df.columns):
- header_cells[col].text = header
- header_cells[col].paragraphs[0].runs[0].bold = True
- header_cells[col].paragraphs[0].alignment = True
- # 添加数据行
- for row_index, row_data in df.iterrows():
- row_cells = table.add_row().cells
- for col, value in enumerate(row_data):
- row_cells[col].text = str(value)
- row_cells[col].paragraphs[0].alignment = 1
- row_cells[col].vertical_alignment = 1
-
- def add_table_to_word_with_conbine(self, doc, df, combine_by, combine_columns):
- # 带合并单元格的数据写入word
- # combine_by: 分组列,根据该列决定那些行需要合并
- # combine_columns: 决定哪些列需要合并
- table = doc.add_table(rows=1, cols=len(df.columns))
- table.style = 'Table Grid'
- # 添加表头
- header_cells = table.rows[0].cells
- for col, header in enumerate(df.columns):
- header_cells[col].text = header
- header_cells[col].paragraphs[0].runs[0].bold = True
- header_cells[col].paragraphs[0].alignment = True
- last_row_count = 0
- cur_row_count = 0
- if len(df) > 0:
- for g, df_t in df.groupby(by=combine_by, sort=False):
- # 根据分组列,将数据分组写入表格,然后合并单元格;重复该过程
- cur_row_count = cur_row_count + len(df_t)
- for row_index, row_data in df_t.iterrows():
- row_cells = table.add_row().cells
- for col, value in enumerate(row_data):
- # print(col, value)
- row_cells[col].text = str(value)
- row_cells[col].paragraphs[0].alignment = 1
- row_cells[col].vertical_alignment = 1
- # 合并单元格
-
- start_row_num = 1 + last_row_count
- for c in combine_columns:
- c_idx = df.columns.get_loc(c)
- text = table.cell(start_row_num, c_idx).text
- for i in range(start_row_num, start_row_num + len(df_t)):
- table.cell(start_row_num, c_idx).merge(table.cell(i, c_idx))
- table.cell(start_row_num, c_idx).text = text
- last_row_count = len(df_t) + last_row_count
- def _set_font_and_style(self, run, font_name, font_size, bold=False, italic=False, underline=False):
- font = run.font
- font.name = font_name
- #font.size = Pt(font_size)
- run.bold = bold
- run.italic = italic
- run.underline = underline
- def set_font_and_style(self, doc):
- # 遍历文档中的段落和文本并设置字体
- for paragraph in doc.paragraphs:
- for run in paragraph.runs:
- self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False)
- # 遍历文档中的表格并设置字体
- for table in doc.tables:
- for row in table.rows:
- for cell in row.cells:
- for paragraph in cell.paragraphs:
- for run in paragraph.runs:
- self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False)
|