123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- from ..base.DBInterface import DBInterface
- import phoenixdb
- import traceback
- class DBHbase(DBInterface):
- """phoenixdb 连接hbase
- Args:
- DBInterface (_type_): _description_
- """
-
- def __init__(self, params:dict):
- self.host = params['host']
- self.port = params['port']
- self.username = params.get("username", None)
- self.password = params.get("password", None)
- self.url = f"http://{self.host}:{self.port}"
- self.cursor = None
- self.conn = None
- self.islindorm = params.get("is_lindorm", 0)
- self.pagesize = int(params.get("pagesize", 10000))
-
- def connect(self):
- if self.username and self.password:
- self.conn = phoenixdb.connect(self.url, autocommit=True, user=self.username, password=self.password)
- else:
- self.conn = phoenixdb.connect(self.url, autocommit=True)
- self.cursor = self.conn.cursor()
-
- def connect_lindorm(self,database):
- self.database = database
- if self.username and self.password:
- connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,'database': database}
- else:
- connect_kw_args = {'database': self.database}
- self.conn = phoenixdb.connect(self.url, autocommit=True, **connect_kw_args)
- self.cursor = self.conn.cursor()
-
-
- def page_query(self, sql:str, tag=False):
- """分页查询
- Args:
- sql (str): _description_
- Raises:
- Exception: _description_
- """
- retry_count = 0
- while True:
- try:
- if not self.conn or not self.cursor:
- print("连接hbase数据库")
- if self.islindorm:
- self.connect_lindorm(self.database)
- else:
- self.connect()
- print("连接hbase数据库 成功")
-
- if self.cursor:
- rows = []
- if not tag: # 非标签取数方式,采用分页方式
-
- page_size = self.pagesize
- last_row_length = page_size
- page_index = 1
-
- while last_row_length == page_size:
- new_sql = sql + " LIMIT %s OFFSET %s" % (page_size, (page_index - 1) * page_size)
- self.cursor.execute(new_sql)
- last_rows = self.cursor.fetchall()
- last_row_length = len(last_rows)
- rows.extend(last_rows)
- page_index += 1
- else: # 取标签数据,取消分页,不然有些时间段取数会报错
- new_sql = sql
- self.cursor.execute(new_sql)
- last_rows = self.cursor.fetchall()
- last_row_length = len(last_rows)
- rows.extend(last_rows)
- page_index += 1
- return rows
-
- except Exception as e:
- self.close()
- print(traceback.format_exc())
- if "Cache of region boundaries are out of date" in str(e):
- print("出现 region out of date 错误,跳过该次取数")
- return []
- retry_count += 1
- print(f"---------WARNING: HBASE 获取数据出现异常, 开始第{retry_count}次重试....")
- if retry_count > 5:
- print("---------ERROR: 超过重试次数,取数失败")
- raise Exception(e)
-
-
-
-
- def close(self):
- if self.cursor:
- self.cursor.close()
- self.cursor = None
- if self.conn:
- self.conn.close()
- self.conn = None
|