import configparser import logging import logging.handlers import os import re from urllib import parse import json from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import oss2 import redis from kafka import KafkaConsumer,KafkaProducer from redis import RedisCluster import pandas as pd import requests from alibabacloud_dingtalk.robot_1_0.client import Client as dingtalkrobot_1_0Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_dingtalk.robot_1_0 import models as dingtalkrobot__1__0_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient class SysUtils: """通过配置文件目录,获取配置文件对应内容 """ def __init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__))))), type=1): if type == 1: # 项目内部调用 self._init__(cur_env, app_path) else: # 外部调用,只使用内部的部分功能 pass def _init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__)))))): self.cur_env = os.environ.get("PRO_ENV", cur_env) # 环境覆盖 self.app_path = os.environ.get("APP_PATH", app_path) # 路径覆盖 if not os.path.exists(os.path.join(self.app_path, "RESOURCES")): raise Exception("$APP_PATH exit, {} is error. {}/RESOURCES is not exist".format(self.app_path, self.app_path)) if not os.path.exists(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini")): raise Exception("config-{} is not exist".format(self.cur_env)) # if not os.path.exists("{}/Log".format(self.app_path)): # os.makedirs("{}/Log".format(self.app_path)) print("start env: {} ".format(self.cur_env)) self._get_cf() self.env_params = os.environ # 获取环境变量参数 def _get_cf(self): self.cf = configparser.ConfigParser() self.cf.read(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini")) def get_cf_param(self, section): """根据不同的type 返回不同的参数 """ options = self.cf.options(section) params = {} for o in options: params.update({o:self.cf.get(section, o)}) return params # if type == 'mysql': # host = self.cf.get(tag, 'host') # port = int(self.cf.get(tag, 'port')) # db = self.cf.get(tag, 'db') # user = self.cf.get(tag, 'user') # password = parse.quote_plus(self.cf.get(tag, 'password')) # print("mysql: {} - {}".format(host, port)) # return {'host':host, 'port':port, 'db':db, 'user':user, 'password':password} # elif type == 'redis': # host = self.cf.get(tag, 'host') # port = int(self.cf.get(tag, 'port')) # password = self.cf.get(tag, 'password') # cluster_enable = self.cf.get(tag, 'cluster_enable', fallback=None) # print("redis: {} - {}".format(host, port)) # return {'host':host, 'port':port, 'password':password, 'cluster_enable':cluster_enable} # elif type == 'hbase': # host = self.cf.get(tag, 'host') # port = int(self.cf.get(tag, 'port')) # db = self.cf.get(tag, 'db', fallback=None) # print("hbase: {} - {}".format(host, port)) # return {'host':host, 'port':port, 'db':db} # elif type == 'kafka': # host = self.cf.get(tag, 'host') # port = int(self.cf.get(tag, 'port')) # topic = self.cf.get(tag, 'topic', fallback=None) # group_id = self.cf.get(tag, 'group_id', fallback=None) # print("kafka: {} - {}".format(host, port)) # return {'host':host, 'port':port, 'topic':topic, 'group_id':group_id} # elif type == 'oss': # bucket = self.cf.get(tag, 'bucket') # accesskeyId = self.cf.get(tag, 'accesskeyId') # accesskeySecret = self.cf.get(tag, 'accesskeySecret') # endpoint = self.cf.get(tag, 'endpointout') # print("hbase: {} - {}".format(host, port)) # return {'bucket':bucket, 'accesskeyId':accesskeyId, 'accesskeySecret':accesskeySecret, 'endpoint':endpoint} def get_log_handler(self, log_file, level=logging.INFO): # 根据日期滚动 if not os.path.exists(os.path.dirname(log_file)): os.makedirs(os.path.dirname(log_file)) fh = logging.handlers.TimedRotatingFileHandler(filename=log_file, when="D", interval=1, backupCount=7, encoding="utf-8") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M-%S" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}") fh.setFormatter(formatter) fh.setLevel(level) return fh def get_logger(self, log_name, log_path): # 日志配置 logger = logging.getLogger(log_name) logger.setLevel(logging.INFO) logger.addHandler(self.get_log_handler("{}/{}.info.log".format(log_path, log_name), logging.INFO)) logger.addHandler(self.get_log_handler("{}/{}.error.log".format(log_path, log_name), logging.ERROR)) return logger def get_loggers(self, algo_list, log_path, process_num): '''根据算啦列表,返回多个logger ''' loggers = {} for algo in algo_list: algo_log_path = f"{log_path}/{algo}/process_{process_num}" # 设置日志路径 logger = self.get_logger(algo, algo_log_path) logger.info("pid is " + str(os.getpid())) loggers.update({algo:logger}) return loggers class MysqlUtils: def __init__(self): pass def get_mysql_engine(self, params): self.db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( params['user'], parse.quote_plus(params['password']), params['host'], int(params['port']), params['db'] ),pool_recycle=3600,pool_size=5) self.Session = sessionmaker(bind=self.db_engine) return self.db_engine, self.Session class RedisUtils: def __init__(self): pass def get_redis_conncect(self, params): rc = None if (r:=params.get('cluster_enable', None)) and r == 'false': rc = redis.Redis(host=params['host'], port=int(params['port']), password=params['password'],decode_responses=True) elif (r:=params.get('cluster_enable', None)) and r == 'true': rc = RedisCluster(startup_nodes=[{"host":params['host'], "port":int(params['port'])}], password=params['password'],decode_responses=True) return rc class KafkaUtils: def __init__(self): pass def get_kafka_consumer(self, params, topic_key, group_id_key, client_id="none"): consumer = KafkaConsumer(params[topic_key], bootstrap_servers=['{}:{}'.format(params['host'], params['port'])], client_id=client_id, auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费 enable_auto_commit=True, # 自动提交消费者的offset auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔, max_poll_records=1, # 每次拉取消息的最大上限 max_poll_interval_ms=6000000, # 两次拉取消息的最大间隔时长 group_id=params[group_id_key] ) return consumer def get_kafka_producer(self, params, client_id="none"): producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(params['host'], params['port'])], client_id=client_id, retries=10) return producer class OssUtils: def __init__(self): pass def get_bucket(self): oss_bucket, oss_accesskeyID, oss_accesskeySecret, oss_endpoint = self.sys_util.get_oss() auth = oss2.Auth(oss_accesskeyID, oss_accesskeySecret) bucket = oss2.Bucket(auth, "https://{}".format(oss_endpoint), oss_bucket) return bucket def download_file_oss(self, url, filepath='', filename=''): bucket = self.get_bucket() url = url.replace('://', '') path = url[url.index('/') + 1:] bucket.get_object_to_file(path, filepath + filename) class CleanUtils: def clean(self, mysql_algo_conn=None, mysql_algo_engine=None, mysql_iotp_conn=None, mysql_iotp_engine=None, kafka_consumer=None, rc=None): if not mysql_algo_conn is None: mysql_algo_conn.close() mysql_algo_engine.dispose() if not mysql_iotp_conn is None: mysql_iotp_conn.close() mysql_iotp_engine.dispose() if not kafka_consumer is None: kafka_consumer.close() if not rc is None: rc.close() class DingdingUtils(): def __init__(self): pass # 获取 Access Token def get_access_token(self, APP_KEY,APP_SECRET): url = f"https://oapi.dingtalk.com/gettoken?appkey={APP_KEY}&appsecret={APP_SECRET}" response = requests.get(url) data = response.json() return data.get('access_token') # 上传文件并获取 MediaId def upload_file(self, access_token, file_path): url = f"https://oapi.dingtalk.com/media/upload?access_token={access_token}&type=file" files = {'media': open(file_path, 'rb')} response = requests.post(url, files=files) data = response.json() return data.get('media_id') def _create_client(self) -> dingtalkrobot_1_0Client: """ 使用 Token 初始化账号Client @return: Client @throws Exception """ config = open_api_models.Config() config.protocol = 'https' config.region_id = 'central' return dingtalkrobot_1_0Client(config) def send_file_to_dingGroup(self, media_id, fileName, date_string, Group_Chat_access_token, access_token, suffix='docx', filetype='docx'): client = self._create_client() org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders() org_group_send_headers.x_acs_dingtalk_access_token = access_token File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest( msg_param=str({ 'mediaId':media_id, 'fileName':'{}_{}.{}'.format(fileName,date_string,suffix), 'fileType':filetype}), msg_key='sampleFile', token=Group_Chat_access_token ) try: client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions()) except Exception as err: if not UtilClient.empty(err.code) and not UtilClient.empty(err.message): # err 中含有 code 和 message 属性,可帮助开发定位问题 pass def send_markdown_to_dingtalk(self, message_title, markdown_content,Group_Chat_access_token,access_token): client = self._create_client() org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders() org_group_send_headers.x_acs_dingtalk_access_token = access_token File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest( msg_param=str({"title": message_title,"text": markdown_content}), msg_key='sampleMarkdown', token=Group_Chat_access_token ) try: client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions()) except Exception as err: if not UtilClient.empty(err.code) and not UtilClient.empty(err.message): # err 中含有 code 和 message 属性,可帮助开发定位问题 pass class DocxUtils(): def __init__(self): pass def add_table_to_word(self,doc, df): # 将DataFrame数据写入表格 table = doc.add_table(rows=1, cols=len(df.columns)) table.style = 'Table Grid' # 添加表头 header_cells = table.rows[0].cells for col, header in enumerate(df.columns): header_cells[col].text = header header_cells[col].paragraphs[0].runs[0].bold = True header_cells[col].paragraphs[0].alignment = True # 添加数据行 for row_index, row_data in df.iterrows(): row_cells = table.add_row().cells for col, value in enumerate(row_data): row_cells[col].text = str(value) row_cells[col].paragraphs[0].alignment = 1 row_cells[col].vertical_alignment = 1 def add_table_to_word_with_conbine(self, doc, df, combine_by, combine_columns): # 带合并单元格的数据写入word # combine_by: 分组列,根据该列决定那些行需要合并 # combine_columns: 决定哪些列需要合并 table = doc.add_table(rows=1, cols=len(df.columns)) table.style = 'Table Grid' # 添加表头 header_cells = table.rows[0].cells for col, header in enumerate(df.columns): header_cells[col].text = header header_cells[col].paragraphs[0].runs[0].bold = True header_cells[col].paragraphs[0].alignment = True last_row_count = 0 cur_row_count = 0 if len(df) > 0: for g, df_t in df.groupby(by=combine_by, sort=False): # 根据分组列,将数据分组写入表格,然后合并单元格;重复该过程 cur_row_count = cur_row_count + len(df_t) for row_index, row_data in df_t.iterrows(): row_cells = table.add_row().cells for col, value in enumerate(row_data): # print(col, value) row_cells[col].text = str(value) row_cells[col].paragraphs[0].alignment = 1 row_cells[col].vertical_alignment = 1 # 合并单元格 start_row_num = 1 + last_row_count for c in combine_columns: c_idx = df.columns.get_loc(c) text = table.cell(start_row_num, c_idx).text for i in range(start_row_num, start_row_num + len(df_t)): table.cell(start_row_num, c_idx).merge(table.cell(i, c_idx)) table.cell(start_row_num, c_idx).text = text last_row_count = len(df_t) + last_row_count def _set_font_and_style(self, run, font_name, font_size, bold=False, italic=False, underline=False): font = run.font font.name = font_name #font.size = Pt(font_size) run.bold = bold run.italic = italic run.underline = underline def set_font_and_style(self, doc): # 遍历文档中的段落和文本并设置字体 for paragraph in doc.paragraphs: for run in paragraph.runs: self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False) # 遍历文档中的表格并设置字体 for table in doc.tables: for row in table.rows: for cell in row.cells: for paragraph in cell.paragraphs: for run in paragraph.runs: self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False)