mysql_to_doris.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. from sshtunnel import SSHTunnelForwarder
  2. from sqlalchemy.orm import sessionmaker
  3. from sqlalchemy import create_engine
  4. import pandas as pd
  5. from urllib import parse
  6. from pydoris.doris_client import DorisClient
  7. import json
  8. import datetime
  9. import numpy as np
  10. # 数仓相关库函数
  11. class DwService():
  12. def __init__(self, tables, doris_host, doris_port, doris_user, doris_password, doris_db,
  13. doris_table_format, mysql_host, mysql_port, mysql_user, mysql_password, mysql_db, start_time="", end_time="", ssh_host="", ssh_name="", ssh_password="",
  14. settings={}):
  15. self.tables = tables
  16. self.doris_host = doris_host
  17. self.doris_port = doris_port
  18. self.doris_user = doris_user
  19. self.doris_password = doris_password
  20. self.doris_db = doris_db
  21. self.doris_table_format = doris_table_format
  22. self.mysql_host = mysql_host
  23. self.mysql_port = int(mysql_port)
  24. self.mysql_user = mysql_user
  25. self.mysql_password = mysql_password
  26. self.mysql_db = mysql_db
  27. self.start_time = start_time
  28. self.end_time = end_time
  29. self.ssh_host = ssh_host
  30. self.ssh_name = ssh_name
  31. self.ssh_password = ssh_password
  32. settings_default = {"skip_endtime_0000":"true","skip_endtime_1970":"true"}
  33. for k,v in settings.items():
  34. settings_default.update({k:v})
  35. self.settings = settings_default
  36. def _get_db(self):
  37. self.doris_client = DorisClient(self.doris_host, self.doris_port, self.doris_user, self.doris_password)
  38. self.doris_client.options\
  39. .set_json_format()\
  40. .set_auto_uuid_label()\
  41. .set_option('strip_outer_array', 'true')
  42. self.db_engine = create_engine(
  43. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(self.mysql_user,parse.quote_plus(self.mysql_password),
  44. self.mysql_host, int(self.mysql_port), self.mysql_db),pool_recycle=3600,pool_size=5)
  45. self.Session = sessionmaker(self.db_engine)
  46. def _get_and_write(self, session, sql, table, ):
  47. # 从mysql取数,处理然后写入doris
  48. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............')
  49. self.doris_client.options.set_auto_uuid_label()
  50. df_desc = pd.read_sql(f'desc {table}', self.db_engine)
  51. res = session.execute(sql).fetchall()
  52. columns = df_desc['Field'].tolist()
  53. types = df_desc['Type'].tolist()
  54. res_dict = []
  55. # 字典列表组装
  56. for r in res:
  57. mp = {}
  58. i = 0
  59. skip_flag = False
  60. for item in r:
  61. # 数据类型特殊处理
  62. if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段
  63. if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据
  64. skip_flag = True
  65. if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段
  66. if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据
  67. skip_flag = True
  68. if types[i] == 'datetime' or types[i] == 'date' or types[i] == 'timestamp':
  69. mp.update({columns[i]: str(item)})
  70. elif types[i] == 'tinyint':
  71. mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item})
  72. elif 'decimal' in types[i] :
  73. mp.update({columns[i]: str(item)})
  74. else:
  75. mp.update({columns[i]:item})
  76. i = i + 1
  77. if not skip_flag:
  78. res_dict.append(mp)
  79. if len(res_dict) > 0 :
  80. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始导入doris...............')
  81. self.doris_client._session.should_strip_auth = lambda old_url, new_url: False
  82. resp = self.doris_client._session.request(
  83. 'PUT', url=self.doris_client._build_url(self.doris_db, self.doris_table_format.format(table)),
  84. data=json.dumps(res_dict).encode('utf-8'),
  85. headers=self.doris_client.options.get_options(),
  86. auth=self.doris_client._auth
  87. )
  88. load_status = json.loads(resp.text)['Status'] == 'Success'
  89. if resp.status_code == 200 and resp.reason == 'OK' and load_status:
  90. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入成功')
  91. print(resp.text)
  92. else:
  93. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入失败')
  94. raise Exception(resp.text)
  95. else:
  96. print(f"{datetime.datetime.now()}-本次需同步的数据条数0")
  97. def full_table_sync(self):
  98. # 全量更新
  99. sql_format = 'select * from {}'
  100. self._get_db()
  101. with self.Session() as session:
  102. for table in self.tables:
  103. sql = sql_format.format(table)
  104. self._get_and_write(session, sql, table)
  105. def inc_table_sync(self):
  106. # 增量更新
  107. if self.start_time == "" or self.end_time == "":
  108. raise Exception("输入参数错误")
  109. sql_format = "select * from {} where {} >= '{}' and {} < '{}'"
  110. self._get_db()
  111. with self.Session() as session:
  112. for table_dict in self.tables:
  113. for table, value in table_dict.items():
  114. time_field = value.get('time_field')
  115. sql = sql_format.format(table, time_field, self.start_time, time_field, self.end_time)
  116. self._get_and_write(session, sql, table)
  117. def full_table_sync_with_ssh(self):
  118. with SSHTunnelForwarder(self.ssh_host, 22, ssh_username=self.ssh_name, ssh_password=self.ssh_password, remote_bind_address=(self.mysql_host, self.mysql_port)) as tunnel:
  119. self.mysql_host = '127.0.0.1'
  120. self.mysql_port = tunnel.local_bind_port
  121. self.full_table_sync()
  122. def inc_table_sync_with_ssh(self):
  123. if self.ssh_host == "" or self.ssh_name == "" or self.ssh_password == "":
  124. raise Exception("输入参数错误")
  125. with SSHTunnelForwarder(self.ssh_host, 22, ssh_username=self.ssh_name, ssh_password=self.ssh_password, remote_bind_address=(self.mysql_host, self.mysql_port)) as tunnel:
  126. self.mysql_host = '127.0.0.1'
  127. self.mysql_port = tunnel.local_bind_port
  128. self.inc_table_sync()