123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- from sshtunnel import SSHTunnelForwarder
- import pandas as pd
- from ZlwlAlgosCommon.service.hz.HzAlgoService import HzAlgoService
- from urllib import parse
- from pydoris.doris_client import DorisClient
- import json
- import datetime
- import phoenixdb
-
- # 数仓相关库函数
- class DwService():
-
- def __init__(self, tables, doris_host, doris_port, doris_user, doris_password, doris_db,
- doris_table_format, hbase_host, hbase_port, hbase_user, hbase_password, hbase_db, start_time="", end_time="",
- settings={}):
-
- self.tables = tables
- self.doris_host = doris_host
- self.doris_port = doris_port
- self.doris_user = doris_user
- self.doris_password = doris_password
- self.doris_db = doris_db
- self.doris_table_format = doris_table_format
- self.hbase_host = hbase_host
- self.hbase_port = hbase_port
- self.hbase_user = hbase_user
- self.hbase_password = hbase_password
- self.hbase_db = hbase_db
- self.start_time = start_time
- self.end_time = end_time
- settings_default = {"write_threshold":7, "process_id":1, "doris_write_max_everytime":100000}
- for k,v in settings.items():
- settings_default.update({k:v})
- self.settings = settings_default
-
- def _get_db(self):
- self.doris_client = DorisClient(self.doris_host, self.doris_port, self.doris_user, self.doris_password)
- self.doris_client.options\
- .set_json_format()\
- .set_auto_uuid_label()\
- .set_option('strip_outer_array', 'true')
- hbase_params = {
- "host":self.hbase_host,
- "port":self.hbase_port,
- "username":self.hbase_user,
- "password":self.hbase_password,
- "db":self.hbase_db
- }
- self.hz_service = HzAlgoService(hbase_params=hbase_params)
-
- def _get_hzalgo(self, table, sn="", vin="", start_time="", end_time=""):
-
- df_columns, data_rows = self.hz_service.get_original_hz_algo_data(table, sn=sn, vin=vin, start_time=start_time, end_time=end_time)
- columns = df_columns['COLUMN_NAME'].tolist()
- types = df_columns['TYPE'].tolist()
- res_dict = []
- # 字典列表组装
- for r in data_rows:
- mp = {}
- i = 0
- skip_flag = False
- for item in r:
- # 数据类型特殊处理
- if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段
- if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据
- skip_flag = True
- if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段
- if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据
- skip_flag = True
- if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP':
- mp.update({columns[i]: str(item)})
- # elif types[i] == 'tinyint':
- # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item})
- # elif 'decimal' in types[i] :
- # mp.update({columns[i]: str(item)})
- else:
- mp.update({columns[i]:item})
- i = i + 1
- if not skip_flag:
- res_dict.append(mp)
- return res_dict
-
- def _get_hzdwads(self, table, dt_start, dt_end):
- # 获取合众mc中dw层和ads层同步到lindorm中的数据
- df_columns, data_rows = self.hz_service.get_original_hz_dwads_data(table, dt_start, dt_end)
- columns = df_columns['COLUMN_NAME'].tolist()
- types = df_columns['TYPE'].tolist()
- res_dict = []
- # 字典列表组装
- for r in data_rows:
- mp = {}
- i = 0
- skip_flag = False
- for item in r:
- # 数据类型特殊处理
- if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段
- if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据
- skip_flag = True
- if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段
- if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据
- skip_flag = True
- if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP':
- mp.update({columns[i]: str(item)})
- # elif types[i] == 'tinyint':
- # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item})
- # elif 'decimal' in types[i] :
- # mp.update({columns[i]: str(item)})
- else:
- mp.update({columns[i]:item})
- i = i + 1
- if not skip_flag:
- res_dict.append(mp)
- return res_dict
-
- def _write_to_doris(self, res_dict, table):
-
-
- print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始导入doris...............')
- grp = int(self.settings.get("doris_write_max_everytime")) # 每次写入的数据行数上限
- for i in range(len(res_dict)):
- self.doris_client.options.set_auto_uuid_label()
- self.doris_client._session.should_strip_auth = lambda old_url, new_url: False
- resp = self.doris_client._session.request(
- 'PUT', url=self.doris_client._build_url(self.doris_db, self.doris_table_format.format(table)),
- data=json.dumps(res_dict[i*grp:(i+1)*grp]).encode('utf-8'),
- headers=self.doris_client.options.get_options(),
- auth=self.doris_client._auth
- )
- load_status = json.loads(resp.text)['Status'] == 'Success'
- if resp.status_code == 200 and resp.reason == 'OK' and load_status:
- print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入成功')
- print(resp.text)
- else:
- print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入失败')
- raise Exception(resp.text)
- if (i+1) * grp > len(res_dict)-1:
- break
- def hz_algo_from_lindorm_to_doris(self, table, sn_list=[], vin_list=[]):
- # 获取合众lindorm中算法结果的数据,同步到doris中
- self._get_db()
- for table in self.tables:
- print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............')
- res_dict = []
- count = 0
-
- if len(sn_list) > 0:
- sn_or_vin_list = sn_list
- elif len(vin_list) > 0:
- sn_or_vin_list = vin_list
- all_count = len(sn_or_vin_list)
- for sn_or_vin in sn_or_vin_list:
- count = count + 1
- if len(sn_list) > 0:
- res_dict.extend(self._get_hzalgo(table, sn=sn_or_vin, start_time=self.start_time, end_time=self.end_time))
- elif len(vin_list) > 0:
- res_dict.extend(self._get_hzalgo(table, vin=sn_or_vin, start_time=self.start_time, end_time=self.end_time))
-
- if len(res_dict) >= self.settings.get("write_threshold"):
- print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}")
- self._write_to_doris(res_dict, table)
- res_dict = []
- if len(res_dict) > 0:
- print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}")
- self._write_to_doris(res_dict, table)
- print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成')
-
- def hz_dwads_from_lindorm_to_doris(self, dt_start, dt_end):
- # 获取合众lindorm中从MC同步过来的dw和ads层的数据
- self._get_db()
- for table in self.tables:
- print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............')
- res_dict = []
- res_dict.extend(self._get_hzdwads(table, dt_start, dt_end))
- print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-共获取到{len(res_dict)}行数据...............')
- if len(res_dict) > 0:
- self._write_to_doris(res_dict, table)
- print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成')
-
-
|