DBHbase.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from ..base.DBInterface import DBInterface
  2. import phoenixdb
  3. import traceback
  4. class DBHbase(DBInterface):
  5. """phoenixdb 连接hbase
  6. Args:
  7. DBInterface (_type_): _description_
  8. """
  9. def __init__(self, params:dict):
  10. self.host = params['host']
  11. self.port = params['port']
  12. self.username = params.get("username", None)
  13. self.password = params.get("password", None)
  14. self.url = f"http://{self.host}:{self.port}"
  15. self.cursor = None
  16. self.conn = None
  17. self.islindorm = params.get("is_lindorm", 0)
  18. self.pagesize = int(params.get("pagesize", 10000))
  19. def connect(self):
  20. if self.username and self.password:
  21. self.conn = phoenixdb.connect(self.url, autocommit=True, user=self.username, password=self.password)
  22. else:
  23. self.conn = phoenixdb.connect(self.url, autocommit=True)
  24. self.cursor = self.conn.cursor()
  25. def connect_lindorm(self,database):
  26. self.database = database
  27. if self.username and self.password:
  28. connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,'database': database}
  29. else:
  30. connect_kw_args = {'database': self.database}
  31. self.conn = phoenixdb.connect(self.url, autocommit=True, **connect_kw_args)
  32. self.cursor = self.conn.cursor()
  33. def page_query(self, sql:str, tag=False):
  34. """分页查询
  35. Args:
  36. sql (str): _description_
  37. Raises:
  38. Exception: _description_
  39. """
  40. retry_count = 0
  41. while True:
  42. try:
  43. if not self.conn or not self.cursor:
  44. print("连接hbase数据库")
  45. if self.islindorm:
  46. self.connect_lindorm(self.database)
  47. else:
  48. self.connect()
  49. print("连接hbase数据库 成功")
  50. if self.cursor:
  51. rows = []
  52. if not tag: # 非标签取数方式,采用分页方式
  53. page_size = self.pagesize
  54. last_row_length = page_size
  55. page_index = 1
  56. while last_row_length == page_size:
  57. new_sql = sql + " LIMIT %s OFFSET %s" % (page_size, (page_index - 1) * page_size)
  58. self.cursor.execute(new_sql)
  59. last_rows = self.cursor.fetchall()
  60. last_row_length = len(last_rows)
  61. rows.extend(last_rows)
  62. page_index += 1
  63. else: # 取标签数据,取消分页,不然有些时间段取数会报错
  64. new_sql = sql
  65. self.cursor.execute(new_sql)
  66. last_rows = self.cursor.fetchall()
  67. last_row_length = len(last_rows)
  68. rows.extend(last_rows)
  69. page_index += 1
  70. return rows
  71. except Exception as e:
  72. self.close()
  73. print(traceback.format_exc())
  74. if "Cache of region boundaries are out of date" in str(e):
  75. print("出现 region out of date 错误,跳过该次取数")
  76. return []
  77. retry_count += 1
  78. print(f"---------WARNING: HBASE 获取数据出现异常, 开始第{retry_count}次重试....")
  79. if retry_count > 5:
  80. print("---------ERROR: 超过重试次数,取数失败")
  81. raise Exception(e)
  82. def close(self):
  83. if self.cursor:
  84. self.cursor.close()
  85. self.cursor = None
  86. if self.conn:
  87. self.conn.close()
  88. self.conn = None