from sshtunnel import SSHTunnelForwarder import pandas as pd from ZlwlAlgosCommon.service.hz.HzAlgoService import HzAlgoService from urllib import parse from pydoris.doris_client import DorisClient import json import datetime import phoenixdb # 数仓相关库函数 class DwService(): def __init__(self, tables, doris_host, doris_port, doris_user, doris_password, doris_db, doris_table_format, hbase_host, hbase_port, hbase_user, hbase_password, hbase_db, start_time="", end_time="", settings={}): self.tables = tables self.doris_host = doris_host self.doris_port = doris_port self.doris_user = doris_user self.doris_password = doris_password self.doris_db = doris_db self.doris_table_format = doris_table_format self.hbase_host = hbase_host self.hbase_port = hbase_port self.hbase_user = hbase_user self.hbase_password = hbase_password self.hbase_db = hbase_db self.start_time = start_time self.end_time = end_time settings_default = {"write_threshold":7, "process_id":1, "doris_write_max_everytime":100000} for k,v in settings.items(): settings_default.update({k:v}) self.settings = settings_default def _get_db(self): self.doris_client = DorisClient(self.doris_host, self.doris_port, self.doris_user, self.doris_password) self.doris_client.options\ .set_json_format()\ .set_auto_uuid_label()\ .set_option('strip_outer_array', 'true') hbase_params = { "host":self.hbase_host, "port":self.hbase_port, "username":self.hbase_user, "password":self.hbase_password, "db":self.hbase_db } self.hz_service = HzAlgoService(hbase_params=hbase_params) def _get_hzalgo(self, table, sn="", vin="", start_time="", end_time=""): df_columns, data_rows = self.hz_service.get_original_hz_algo_data(table, sn=sn, vin=vin, start_time=start_time, end_time=end_time) columns = df_columns['COLUMN_NAME'].tolist() types = df_columns['TYPE'].tolist() res_dict = [] # 字典列表组装 for r in data_rows: mp = {} i = 0 skip_flag = False for item in r: # 数据类型特殊处理 if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段 if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据 skip_flag = True if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段 if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据 skip_flag = True if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP': mp.update({columns[i]: str(item)}) # elif types[i] == 'tinyint': # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item}) # elif 'decimal' in types[i] : # mp.update({columns[i]: str(item)}) else: mp.update({columns[i]:item}) i = i + 1 if not skip_flag: res_dict.append(mp) return res_dict def _get_hzdwads(self, table, dt_start, dt_end): # 获取合众mc中dw层和ads层同步到lindorm中的数据 df_columns, data_rows = self.hz_service.get_original_hz_dwads_data(table, dt_start, dt_end) columns = df_columns['COLUMN_NAME'].tolist() types = df_columns['TYPE'].tolist() res_dict = [] # 字典列表组装 for r in data_rows: mp = {} i = 0 skip_flag = False for item in r: # 数据类型特殊处理 if (str(item) == '0000-00-00 00:00:00'): # done表中存在endtime=0000的数据,影响索引字段 if self.settings.get("skip_endtime_0000") == "true": # 跳过endtime0000的数据 skip_flag = True if ('1970-01-01' in str(item) ): # done表中存在endtime=0000的数据,影响索引字段 if self.settings.get("skip_endtime_1970") == "true": # 跳过endtime0000的数据 skip_flag = True if types[i] == 'DATETIME' or types[i] == 'DATE' or types[i] == 'TIMESTAMP': mp.update({columns[i]: str(item)}) # elif types[i] == 'tinyint': # mp.update({columns[i]: 0 if item == 'false' else 1 if item == 'true' else item}) # elif 'decimal' in types[i] : # mp.update({columns[i]: str(item)}) else: mp.update({columns[i]:item}) i = i + 1 if not skip_flag: res_dict.append(mp) return res_dict def _write_to_doris(self, res_dict, table): print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始导入doris...............') grp = int(self.settings.get("doris_write_max_everytime")) # 每次写入的数据行数上限 for i in range(len(res_dict)): self.doris_client.options.set_auto_uuid_label() self.doris_client._session.should_strip_auth = lambda old_url, new_url: False resp = self.doris_client._session.request( 'PUT', url=self.doris_client._build_url(self.doris_db, self.doris_table_format.format(table)), data=json.dumps(res_dict[i*grp:(i+1)*grp]).encode('utf-8'), headers=self.doris_client.options.get_options(), auth=self.doris_client._auth ) load_status = json.loads(resp.text)['Status'] == 'Success' if resp.status_code == 200 and resp.reason == 'OK' and load_status: print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入成功') print(resp.text) else: print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据导入失败') raise Exception(resp.text) if (i+1) * grp > len(res_dict)-1: break def hz_algo_from_lindorm_to_doris(self, table, sn_list=[], vin_list=[]): # 获取合众lindorm中算法结果的数据,同步到doris中 self._get_db() for table in self.tables: print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............') res_dict = [] count = 0 if len(sn_list) > 0: sn_or_vin_list = sn_list elif len(vin_list) > 0: sn_or_vin_list = vin_list all_count = len(sn_or_vin_list) for sn_or_vin in sn_or_vin_list: count = count + 1 if len(sn_list) > 0: res_dict.extend(self._get_hzalgo(table, sn=sn_or_vin, start_time=self.start_time, end_time=self.end_time)) elif len(vin_list) > 0: res_dict.extend(self._get_hzalgo(table, vin=sn_or_vin, start_time=self.start_time, end_time=self.end_time)) if len(res_dict) >= self.settings.get("write_threshold"): print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}") self._write_to_doris(res_dict, table) res_dict = [] if len(res_dict) > 0: print(f"进程{self.settings.get('process_id')}-{table}已同步{count}/{all_count}个, 本次同步的最后一个id为{sn_or_vin}") self._write_to_doris(res_dict, table) print(f'{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成') def hz_dwads_from_lindorm_to_doris(self, dt_start, dt_end): # 获取合众lindorm中从MC同步过来的dw和ads层的数据 self._get_db() for table in self.tables: print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据开始同步...............') res_dict = [] res_dict.extend(self._get_hzdwads(table, dt_start, dt_end)) print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-共获取到{len(res_dict)}行数据...............') if len(res_dict) > 0: self._write_to_doris(res_dict, table) print(f'进程{self.settings.get("process_id")}-{datetime.datetime.now()}-{self.doris_table_format.format(table)} 数据同步完成')