hbase_to_doris.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from sshtunnel import SSHTunnelForwarder
  2. import pandas as pd
  3. from ZlwlAlgosCommon.service.hz.HzAlgoService import HzAlgoService
  4. from urllib import parse
  5. from pydoris.doris_client import DorisClient
  6. import json
  7. import datetime
  8. import phoenixdb
  9. # 数仓相关库函数
  10. class DwService():
  11. def __init__(self, tables, doris_host, doris_port, doris_user, doris_password, doris_db,
  12. doris_table_format, hbase_host, hbase_port, hbase_user, hbase_password, hbase_db, start_time="", end_time="",
  13. settings={}):
  14. self.tables = tables
  15. self.doris_host = doris_host
  16. self.doris_port = doris_port
  17. self.doris_user = doris_user
  18. self.doris_password = doris_password
  19. self.doris_db = doris_db
  20. self.doris_table_format = doris_table_format
  21. self.hbase_host = hbase_host
  22. self.hbase_port = hbase_port
  23. self.hbase_user = hbase_user
  24. self.hbase_password = hbase_password
  25. self.hbase_db = hbase_db
  26. self.start_time = start_time
  27. self.end_time = end_time
  28. settings_default = {"write_threshold":7, "process_id":1, "doris_write_max_everytime":100000}
  29. for k,v in settings.items():
  30. settings_default.update({k:v})
  31. self.settings = settings_default
  32. def _get_db(self):
  33. self.doris_client = DorisClient(self.doris_host, self.doris_port, self.doris_user, self.doris_password)
  34. self.doris_client.options\
  35. .set_json_format()\
  36. .set_auto_uuid_label()\
  37. .set_option('strip_outer_array', 'true')
  38. hbase_params = {
  39. "host":self.hbase_host,
  40. "port":self.hbase_port,
  41. "username":self.hbase_user,
  42. "password":self.hbase_password,
  43. "db":self.hbase_db
  44. }
  45. self.hz_service = HzAlgoService(hbase_params=hbase_params)
  46. def _get_hzalgo(self, table, sn="", vin="", start_time="", end_time=""):
  47. df_columns, data_rows = self.hz_service.get_original_hz_algo_data(table, sn=sn, vin=vin, start_time=start_time, end_time=end_time)
  48. columns = df_columns['COLUMN_NAME'].tolist()
  49. types = df_columns['TYPE'].tolist()
  50. res_dict = []
  51. # 字典列表组装
  52. for r in data_rows:
  53. mp = {}
  54. i = 0
  55. skip_flag = False
  56. for item in r:
  57. # 数据类型特殊处理
  58. if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段
  59. if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据
  60. skip_flag = True
  61. if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段
  62. if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据
  63. skip_flag = True
  64. if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP':
  65. mp.update({columns[i]: str(item)})
  66. # elif types[i] == 'tinyint':
  67. # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item})
  68. # elif 'decimal' in types[i] :
  69. # mp.update({columns[i]: str(item)})
  70. else:
  71. mp.update({columns[i]:item})
  72. i = i + 1
  73. if not skip_flag:
  74. res_dict.append(mp)
  75. return res_dict
  76. def _get_hzdwads(self, table, dt_start, dt_end):
  77. # 获取合众mc中dw层和ads层同步到lindorm中的数据
  78. df_columns, data_rows = self.hz_service.get_original_hz_dwads_data(table, dt_start, dt_end)
  79. columns = df_columns['COLUMN_NAME'].tolist()
  80. types = df_columns['TYPE'].tolist()
  81. res_dict = []
  82. # 字典列表组装
  83. for r in data_rows:
  84. mp = {}
  85. i = 0
  86. skip_flag = False
  87. for item in r:
  88. # 数据类型特殊处理
  89. if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段
  90. if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据
  91. skip_flag = True
  92. if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段
  93. if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据
  94. skip_flag = True
  95. if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP':
  96. mp.update({columns[i]: str(item)})
  97. # elif types[i] == 'tinyint':
  98. # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item})
  99. # elif 'decimal' in types[i] :
  100. # mp.update({columns[i]: str(item)})
  101. else:
  102. mp.update({columns[i]:item})
  103. i = i + 1
  104. if not skip_flag:
  105. res_dict.append(mp)
  106. return res_dict
  107. def _write_to_doris(self, res_dict, table):
  108. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始导入doris...............')
  109. grp = int(self.settings.get("doris_write_max_everytime")) # 每次写入的数据行数上限
  110. for i in range(len(res_dict)):
  111. self.doris_client.options.set_auto_uuid_label()
  112. self.doris_client._session.should_strip_auth = lambda old_url, new_url: False
  113. resp = self.doris_client._session.request(
  114. 'PUT', url=self.doris_client._build_url(self.doris_db, self.doris_table_format.format(table)),
  115. data=json.dumps(res_dict[i*grp:(i+1)*grp]).encode('utf-8'),
  116. headers=self.doris_client.options.get_options(),
  117. auth=self.doris_client._auth
  118. )
  119. load_status = json.loads(resp.text)['Status'] == 'Success'
  120. if resp.status_code == 200 and resp.reason == 'OK' and load_status:
  121. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入成功')
  122. print(resp.text)
  123. else:
  124. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入失败')
  125. raise Exception(resp.text)
  126. if (i+1) * grp > len(res_dict)-1:
  127. break
  128. def hz_algo_from_lindorm_to_doris(self, table, sn_list=[], vin_list=[]):
  129. # 获取合众lindorm中算法结果的数据,同步到doris中
  130. self._get_db()
  131. for table in self.tables:
  132. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............')
  133. res_dict = []
  134. count = 0
  135. if len(sn_list) > 0:
  136. sn_or_vin_list = sn_list
  137. elif len(vin_list) > 0:
  138. sn_or_vin_list = vin_list
  139. all_count = len(sn_or_vin_list)
  140. for sn_or_vin in sn_or_vin_list:
  141. count = count + 1
  142. if len(sn_list) > 0:
  143. res_dict.extend(self._get_hzalgo(table, sn=sn_or_vin, start_time=self.start_time, end_time=self.end_time))
  144. elif len(vin_list) > 0:
  145. res_dict.extend(self._get_hzalgo(table, vin=sn_or_vin, start_time=self.start_time, end_time=self.end_time))
  146. if len(res_dict) >= self.settings.get("write_threshold"):
  147. print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}")
  148. self._write_to_doris(res_dict, table)
  149. res_dict = []
  150. if len(res_dict) > 0:
  151. print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}")
  152. self._write_to_doris(res_dict, table)
  153. print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成')
  154. def hz_dwads_from_lindorm_to_doris(self, dt_start, dt_end):
  155. # 获取合众lindorm中从MC同步过来的dw和ads层的数据
  156. self._get_db()
  157. for table in self.tables:
  158. print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............')
  159. res_dict = []
  160. res_dict.extend(self._get_hzdwads(table, dt_start, dt_end))
  161. print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-共获取到{len(res_dict)}行数据...............')
  162. if len(res_dict) > 0:
  163. self._write_to_doris(res_dict, table)
  164. print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成')