from ..base.DBInterface import DBInterface import phoenixdb import traceback class DBLindorm(DBInterface): """phoenixdb 连接hbase Args: DBInterface (_type_): _description_ """ def __init__(self, params:dict): self.host = params['host'] self.port = params['port'] self.username = params.get("username", None) self.password = params.get("password", None) self.database = params.get("database", None) self.url = f"http://{self.host}:{self.port}" self.cursor = None self.conn = None def connect(self): try: connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password, 'database': self.database} self.conn = phoenixdb.connect(self.url, autocommit=True, **connect_kw_args) print("lindorm 连接成功") except AttributeError: print("Failed to connect") self.cursor = self.conn.cursor() def page_query(self, sql:str, tag=False): """分页查询 Args: sql (str): _description_ Raises: Exception: _description_ """ retry_count = 0 while True: try: if not self.conn or not self.cursor: print("连接hbase数据库") self.connect() print("连接hbase数据库 成功") if self.cursor: rows = [] if not tag: # 非标签取数方式,采用分页方式 page_size = 10000 last_row_length = page_size page_index = 1 while last_row_length == page_size: new_sql = sql + " LIMIT %s OFFSET %s" % (page_size, (page_index - 1) * page_size) print(sql) self.cursor.execute(new_sql) last_rows = self.cursor.fetchall() last_row_length = len(last_rows) rows.extend(last_rows) page_index += 1 else: # 取标签数据,取消分页,不然有些时间段取数会报错 new_sql = sql self.cursor.execute(new_sql) last_rows = self.cursor.fetchall() last_row_length = len(last_rows) rows.extend(last_rows) page_index += 1 return rows except Exception as e: self.close() print(traceback.format_exc()) if "Cache of region boundaries are out of date" in str(e): print("出现 region out of date 错误,跳过该次取数") return [] retry_count += 1 print(f"---------WARNING: HBASE 获取数据出现异常, 开始第{retry_count}次重试....") if retry_count > 5: print("---------ERROR: 超过重试次数,取数失败") raise Exception(e) def close(self): if self.cursor: self.cursor.close() self.cursor = None if self.conn: self.conn.close() self.conn = None