1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798 |
- from ..base.DBInterface import DBInterface
- import phoenixdb
- import traceback
- class DBLindorm(DBInterface):
- """phoenixdb 连接hbase
- Args:
- DBInterface (_type_): _description_
- """
-
- def __init__(self, params:dict):
- self.host = params['host']
- self.port = params['port']
- self.username = params.get("username", None)
- self.password = params.get("password", None)
- self.database = params.get("database", None)
- self.url = f"http://{self.host}:{self.port}"
- self.cursor = None
- self.conn = None
-
- def connect(self):
- try:
- connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,
- 'database': self.database}
- self.conn = phoenixdb.connect(self.url, autocommit=True, **connect_kw_args)
- print("lindorm 连接成功")
- except AttributeError:
- print("Failed to connect")
-
- self.cursor = self.conn.cursor()
-
-
- def page_query(self, sql:str, tag=False):
- """分页查询
- Args:
- sql (str): _description_
- Raises:
- Exception: _description_
- """
- retry_count = 0
- while True:
-
- try:
- if not self.conn or not self.cursor:
- print("连接hbase数据库")
- self.connect()
- print("连接hbase数据库 成功")
-
- if self.cursor:
- rows = []
- if not tag: # 非标签取数方式,采用分页方式
-
- page_size = 10000
- last_row_length = page_size
- page_index = 1
-
- while last_row_length == page_size:
- new_sql = sql + " LIMIT %s OFFSET %s" % (page_size, (page_index - 1) * page_size)
- print(sql)
- self.cursor.execute(new_sql)
- last_rows = self.cursor.fetchall()
- last_row_length = len(last_rows)
- rows.extend(last_rows)
- page_index += 1
- else: # 取标签数据,取消分页,不然有些时间段取数会报错
- new_sql = sql
- self.cursor.execute(new_sql)
- last_rows = self.cursor.fetchall()
- last_row_length = len(last_rows)
- rows.extend(last_rows)
- page_index += 1
- return rows
-
- except Exception as e:
- self.close()
- print(traceback.format_exc())
- if "Cache of region boundaries are out of date" in str(e):
- print("出现 region out of date 错误,跳过该次取数")
- return []
- retry_count += 1
- print(f"---------WARNING: HBASE 获取数据出现异常, 开始第{retry_count}次重试....")
- if retry_count > 5:
- print("---------ERROR: 超过重试次数,取数失败")
- raise Exception(e)
-
-
-
-
- def close(self):
- if self.cursor:
- self.cursor.close()
- self.cursor = None
- if self.conn:
- self.conn.close()
- self.conn = None
|