ProUtils.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. import configparser
  2. import logging
  3. import logging.handlers
  4. import os
  5. import re
  6. from urllib import parse
  7. import json
  8. from sqlalchemy import create_engine
  9. from sqlalchemy.orm import sessionmaker
  10. import oss2
  11. import redis
  12. from kafka import KafkaConsumer,KafkaProducer
  13. from redis import RedisCluster
  14. import pandas as pd
  15. import requests
  16. from alibabacloud_dingtalk.robot_1_0.client import Client as dingtalkrobot_1_0Client
  17. from alibabacloud_tea_openapi import models as open_api_models
  18. from alibabacloud_dingtalk.robot_1_0 import models as dingtalkrobot__1__0_models
  19. from alibabacloud_tea_util import models as util_models
  20. from alibabacloud_tea_util.client import Client as UtilClient
  21. class SysUtils:
  22. """通过配置文件目录,获取配置文件对应内容
  23. """
  24. def __init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__))))), type=1):
  25. if type == 1:
  26. # 项目内部调用
  27. self._init__(cur_env, app_path)
  28. else:
  29. # 外部调用,只使用内部的部分功能
  30. pass
  31. def _init__(self, cur_env='dev', app_path=os.path.dirname(os.path.dirname(os.path.dirname((os.path.abspath(__file__)))))):
  32. self.cur_env = os.environ.get("PRO_ENV", cur_env) # 环境覆盖
  33. self.app_path = os.environ.get("APP_PATH", app_path) # 路径覆盖
  34. if not os.path.exists(os.path.join(self.app_path, "RESOURCES")):
  35. raise Exception("$APP_PATH exit, {} is error. {}/RESOURCES is not exist".format(self.app_path, self.app_path))
  36. if not os.path.exists(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini")):
  37. raise Exception("config-{} is not exist".format(self.cur_env))
  38. # if not os.path.exists("{}/Log".format(self.app_path)):
  39. # os.makedirs("{}/Log".format(self.app_path))
  40. print("start env: {} ".format(self.cur_env))
  41. self._get_cf()
  42. self.env_params = os.environ # 获取环境变量参数
  43. def _get_cf(self):
  44. self.cf = configparser.ConfigParser()
  45. self.cf.read(os.path.join(self.app_path, "RESOURCES", f"config-{self.cur_env}.ini"))
  46. def get_cf_param(self, section):
  47. """根据不同的type 返回不同的参数
  48. """
  49. options = self.cf.options(section)
  50. params = {}
  51. for o in options:
  52. params.update({o:self.cf.get(section, o)})
  53. return params
  54. # if type == 'mysql':
  55. # host = self.cf.get(tag, 'host')
  56. # port = int(self.cf.get(tag, 'port'))
  57. # db = self.cf.get(tag, 'db')
  58. # user = self.cf.get(tag, 'user')
  59. # password = parse.quote_plus(self.cf.get(tag, 'password'))
  60. # print("mysql: {} - {}".format(host, port))
  61. # return {'host':host, 'port':port, 'db':db, 'user':user, 'password':password}
  62. # elif type == 'redis':
  63. # host = self.cf.get(tag, 'host')
  64. # port = int(self.cf.get(tag, 'port'))
  65. # password = self.cf.get(tag, 'password')
  66. # cluster_enable = self.cf.get(tag, 'cluster_enable', fallback=None)
  67. # print("redis: {} - {}".format(host, port))
  68. # return {'host':host, 'port':port, 'password':password, 'cluster_enable':cluster_enable}
  69. # elif type == 'hbase':
  70. # host = self.cf.get(tag, 'host')
  71. # port = int(self.cf.get(tag, 'port'))
  72. # db = self.cf.get(tag, 'db', fallback=None)
  73. # print("hbase: {} - {}".format(host, port))
  74. # return {'host':host, 'port':port, 'db':db}
  75. # elif type == 'kafka':
  76. # host = self.cf.get(tag, 'host')
  77. # port = int(self.cf.get(tag, 'port'))
  78. # topic = self.cf.get(tag, 'topic', fallback=None)
  79. # group_id = self.cf.get(tag, 'group_id', fallback=None)
  80. # print("kafka: {} - {}".format(host, port))
  81. # return {'host':host, 'port':port, 'topic':topic, 'group_id':group_id}
  82. # elif type == 'oss':
  83. # bucket = self.cf.get(tag, 'bucket')
  84. # accesskeyId = self.cf.get(tag, 'accesskeyId')
  85. # accesskeySecret = self.cf.get(tag, 'accesskeySecret')
  86. # endpoint = self.cf.get(tag, 'endpointout')
  87. # print("hbase: {} - {}".format(host, port))
  88. # return {'bucket':bucket, 'accesskeyId':accesskeyId, 'accesskeySecret':accesskeySecret, 'endpoint':endpoint}
  89. def get_log_handler(self, log_file, level=logging.INFO):
  90. # 根据日期滚动
  91. if not os.path.exists(os.path.dirname(log_file)):
  92. os.makedirs(os.path.dirname(log_file))
  93. fh = logging.handlers.TimedRotatingFileHandler(filename=log_file, when="D", interval=1, backupCount=7,
  94. encoding="utf-8")
  95. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  96. fh.suffix = "%Y-%m-%d_%H-%M-%S"
  97. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
  98. fh.setFormatter(formatter)
  99. fh.setLevel(level)
  100. return fh
  101. def get_logger(self, log_name, log_path):
  102. # 日志配置
  103. logger = logging.getLogger(log_name)
  104. logger.setLevel(logging.INFO)
  105. logger.addHandler(self.get_log_handler("{}/{}.info.log".format(log_path, log_name), logging.INFO))
  106. logger.addHandler(self.get_log_handler("{}/{}.error.log".format(log_path, log_name), logging.ERROR))
  107. return logger
  108. def get_loggers(self, algo_list, log_path, process_num):
  109. '''根据算啦列表,返回多个logger
  110. '''
  111. loggers = {}
  112. for algo in algo_list:
  113. algo_log_path = f"{log_path}/{algo}/process_{process_num}" # 设置日志路径
  114. logger = self.get_logger(algo, algo_log_path)
  115. logger.info("pid is " + str(os.getpid()))
  116. loggers.update({algo:logger})
  117. return loggers
  118. class MysqlUtils:
  119. def __init__(self):
  120. pass
  121. def get_mysql_engine(self, params):
  122. self.db_engine = create_engine(
  123. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  124. params['user'],
  125. parse.quote_plus(params['password']),
  126. params['host'],
  127. int(params['port']),
  128. params['db']
  129. ),pool_recycle=3600,pool_size=5)
  130. self.Session = sessionmaker(bind=self.db_engine)
  131. return self.db_engine, self.Session
  132. class RedisUtils:
  133. def __init__(self):
  134. pass
  135. def get_redis_conncect(self, params):
  136. rc = None
  137. if (r:=params.get('cluster_enable', None)) and r == 'false':
  138. rc = redis.Redis(host=params['host'], port=int(params['port']), password=params['password'],decode_responses=True)
  139. elif (r:=params.get('cluster_enable', None)) and r == 'true':
  140. rc = RedisCluster(startup_nodes=[{"host":params['host'], "port":int(params['port'])}], password=params['password'],decode_responses=True)
  141. return rc
  142. class KafkaUtils:
  143. def __init__(self):
  144. pass
  145. def get_kafka_consumer(self, params, topic_key, group_id_key, client_id="none"):
  146. consumer = KafkaConsumer(params[topic_key], bootstrap_servers=['{}:{}'.format(params['host'], params['port'])],
  147. client_id=client_id,
  148. auto_offset_reset='latest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费
  149. enable_auto_commit=True, # 自动提交消费者的offset
  150. auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,
  151. max_poll_records=1, # 每次拉取消息的最大上限
  152. max_poll_interval_ms=6000000, # 两次拉取消息的最大间隔时长
  153. group_id=params[group_id_key]
  154. )
  155. return consumer
  156. def get_kafka_producer(self, params, client_id="none"):
  157. producer = KafkaProducer(bootstrap_servers=['{}:{}'.format(params['host'], params['port'])],
  158. client_id=client_id, retries=10)
  159. return producer
  160. class OssUtils:
  161. def __init__(self):
  162. pass
  163. def get_bucket(self):
  164. oss_bucket, oss_accesskeyID, oss_accesskeySecret, oss_endpoint = self.sys_util.get_oss()
  165. auth = oss2.Auth(oss_accesskeyID, oss_accesskeySecret)
  166. bucket = oss2.Bucket(auth, "https://{}".format(oss_endpoint), oss_bucket)
  167. return bucket
  168. def download_file_oss(self, url, filepath='', filename=''):
  169. bucket = self.get_bucket()
  170. url = url.replace('://', '')
  171. path = url[url.index('/') + 1:]
  172. bucket.get_object_to_file(path, filepath + filename)
  173. class CleanUtils:
  174. def clean(self, mysql_algo_conn=None, mysql_algo_engine=None, mysql_iotp_conn=None,
  175. mysql_iotp_engine=None, kafka_consumer=None, rc=None):
  176. if not mysql_algo_conn is None:
  177. mysql_algo_conn.close()
  178. mysql_algo_engine.dispose()
  179. if not mysql_iotp_conn is None:
  180. mysql_iotp_conn.close()
  181. mysql_iotp_engine.dispose()
  182. if not kafka_consumer is None:
  183. kafka_consumer.close()
  184. if not rc is None:
  185. rc.close()
  186. class DingdingUtils():
  187. def __init__(self):
  188. pass
  189. # 获取 Access Token
  190. def get_access_token(self, APP_KEY,APP_SECRET):
  191. url = f"https://oapi.dingtalk.com/gettoken?appkey={APP_KEY}&appsecret={APP_SECRET}"
  192. response = requests.get(url)
  193. data = response.json()
  194. return data.get('access_token')
  195. # 上传文件并获取 MediaId
  196. def upload_file(self, access_token, file_path):
  197. url = f"https://oapi.dingtalk.com/media/upload?access_token={access_token}&type=file"
  198. files = {'media': open(file_path, 'rb')}
  199. response = requests.post(url, files=files)
  200. data = response.json()
  201. return data.get('media_id')
  202. def _create_client(self) -> dingtalkrobot_1_0Client:
  203. """
  204. 使用 Token 初始化账号Client
  205. @return: Client
  206. @throws Exception
  207. """
  208. config = open_api_models.Config()
  209. config.protocol = 'https'
  210. config.region_id = 'central'
  211. return dingtalkrobot_1_0Client(config)
  212. def send_file_to_dingGroup(self, media_id, fileName, date_string, Group_Chat_access_token, access_token, suffix='docx', filetype='docx'):
  213. client = self._create_client()
  214. org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders()
  215. org_group_send_headers.x_acs_dingtalk_access_token = access_token
  216. File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest(
  217. msg_param=str({ 'mediaId':media_id, 'fileName':'{}_{}.{}'.format(fileName,date_string,suffix), 'fileType':filetype}),
  218. msg_key='sampleFile',
  219. token=Group_Chat_access_token
  220. )
  221. try:
  222. client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions())
  223. except Exception as err:
  224. if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
  225. # err 中含有 code 和 message 属性,可帮助开发定位问题
  226. pass
  227. def send_markdown_to_dingtalk(self, message_title, markdown_content,Group_Chat_access_token,access_token):
  228. client = self._create_client()
  229. org_group_send_headers = dingtalkrobot__1__0_models.OrgGroupSendHeaders()
  230. org_group_send_headers.x_acs_dingtalk_access_token = access_token
  231. File_send_request = dingtalkrobot__1__0_models.OrgGroupSendRequest(
  232. msg_param=str({"title": message_title,"text": markdown_content}),
  233. msg_key='sampleMarkdown',
  234. token=Group_Chat_access_token
  235. )
  236. try:
  237. client.org_group_send_with_options(File_send_request, org_group_send_headers, util_models.RuntimeOptions())
  238. except Exception as err:
  239. if not UtilClient.empty(err.code) and not UtilClient.empty(err.message):
  240. # err 中含有 code 和 message 属性,可帮助开发定位问题
  241. pass
  242. class DocxUtils():
  243. def __init__(self):
  244. pass
  245. def add_table_to_word(self,doc, df):
  246. # 将DataFrame数据写入表格
  247. table = doc.add_table(rows=1, cols=len(df.columns))
  248. table.style = 'Table Grid'
  249. # 添加表头
  250. header_cells = table.rows[0].cells
  251. for col, header in enumerate(df.columns):
  252. header_cells[col].text = header
  253. header_cells[col].paragraphs[0].runs[0].bold = True
  254. header_cells[col].paragraphs[0].alignment = True
  255. # 添加数据行
  256. for row_index, row_data in df.iterrows():
  257. row_cells = table.add_row().cells
  258. for col, value in enumerate(row_data):
  259. row_cells[col].text = str(value)
  260. row_cells[col].paragraphs[0].alignment = 1
  261. row_cells[col].vertical_alignment = 1
  262. def add_table_to_word_with_conbine(self, doc, df, combine_by, combine_columns):
  263. # 带合并单元格的数据写入word
  264. # combine_by: 分组列,根据该列决定那些行需要合并
  265. # combine_columns: 决定哪些列需要合并
  266. table = doc.add_table(rows=1, cols=len(df.columns))
  267. table.style = 'Table Grid'
  268. # 添加表头
  269. header_cells = table.rows[0].cells
  270. for col, header in enumerate(df.columns):
  271. header_cells[col].text = header
  272. header_cells[col].paragraphs[0].runs[0].bold = True
  273. header_cells[col].paragraphs[0].alignment = True
  274. last_row_count = 0
  275. cur_row_count = 0
  276. if len(df) > 0:
  277. for g, df_t in df.groupby(by=combine_by, sort=False):
  278. # 根据分组列,将数据分组写入表格,然后合并单元格;重复该过程
  279. cur_row_count = cur_row_count + len(df_t)
  280. for row_index, row_data in df_t.iterrows():
  281. row_cells = table.add_row().cells
  282. for col, value in enumerate(row_data):
  283. # print(col, value)
  284. row_cells[col].text = str(value)
  285. row_cells[col].paragraphs[0].alignment = 1
  286. row_cells[col].vertical_alignment = 1
  287. # 合并单元格
  288. start_row_num = 1 + last_row_count
  289. for c in combine_columns:
  290. c_idx = df.columns.get_loc(c)
  291. text = table.cell(start_row_num, c_idx).text
  292. for i in range(start_row_num, start_row_num + len(df_t)):
  293. table.cell(start_row_num, c_idx).merge(table.cell(i, c_idx))
  294. table.cell(start_row_num, c_idx).text = text
  295. last_row_count = len(df_t) + last_row_count
  296. def _set_font_and_style(self, run, font_name, font_size, bold=False, italic=False, underline=False):
  297. font = run.font
  298. font.name = font_name
  299. #font.size = Pt(font_size)
  300. run.bold = bold
  301. run.italic = italic
  302. run.underline = underline
  303. def set_font_and_style(self, doc):
  304. # 遍历文档中的段落和文本并设置字体
  305. for paragraph in doc.paragraphs:
  306. for run in paragraph.runs:
  307. self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False)
  308. # 遍历文档中的表格并设置字体
  309. for table in doc.tables:
  310. for row in table.rows:
  311. for cell in row.cells:
  312. for paragraph in cell.paragraphs:
  313. for run in paragraph.runs:
  314. self._set_font_and_style(run, 'Arial', 12, bold=True, italic=False, underline=False)