DBLindorm.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from ..base.DBInterface import DBInterface
  2. import phoenixdb
  3. import traceback
  4. class DBLindorm(DBInterface):
  5. """phoenixdb 连接hbase
  6. Args:
  7. DBInterface (_type_): _description_
  8. """
  9. def __init__(self, params:dict):
  10. self.host = params['host']
  11. self.port = params['port']
  12. self.username = params.get("username", None)
  13. self.password = params.get("password", None)
  14. self.database = params.get("database", None)
  15. self.url = f"http://{self.host}:{self.port}"
  16. self.cursor = None
  17. self.conn = None
  18. def connect(self):
  19. try:
  20. connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,
  21. 'database': self.database}
  22. self.conn = phoenixdb.connect(self.url, autocommit=True, **connect_kw_args)
  23. print("lindorm 连接成功")
  24. except AttributeError:
  25. print("Failed to connect")
  26. self.cursor = self.conn.cursor()
  27. def page_query(self, sql:str, tag=False):
  28. """分页查询
  29. Args:
  30. sql (str): _description_
  31. Raises:
  32. Exception: _description_
  33. """
  34. retry_count = 0
  35. while True:
  36. try:
  37. if not self.conn or not self.cursor:
  38. print("连接hbase数据库")
  39. self.connect()
  40. print("连接hbase数据库 成功")
  41. if self.cursor:
  42. rows = []
  43. if not tag: # 非标签取数方式,采用分页方式
  44. page_size = 10000
  45. last_row_length = page_size
  46. page_index = 1
  47. while last_row_length == page_size:
  48. new_sql = sql + " LIMIT %s OFFSET %s" % (page_size, (page_index - 1) * page_size)
  49. print(sql)
  50. self.cursor.execute(new_sql)
  51. last_rows = self.cursor.fetchall()
  52. last_row_length = len(last_rows)
  53. rows.extend(last_rows)
  54. page_index += 1
  55. else: # 取标签数据,取消分页,不然有些时间段取数会报错
  56. new_sql = sql
  57. self.cursor.execute(new_sql)
  58. last_rows = self.cursor.fetchall()
  59. last_row_length = len(last_rows)
  60. rows.extend(last_rows)
  61. page_index += 1
  62. return rows
  63. except Exception as e:
  64. self.close()
  65. print(traceback.format_exc())
  66. if "Cache of region boundaries are out of date" in str(e):
  67. print("出现 region out of date 错误,跳过该次取数")
  68. return []
  69. retry_count += 1
  70. print(f"---------WARNING: HBASE 获取数据出现异常, 开始第{retry_count}次重试....")
  71. if retry_count > 5:
  72. print("---------ERROR: 超过重试次数,取数失败")
  73. raise Exception(e)
  74. def close(self):
  75. if self.cursor:
  76. self.cursor.close()
  77. self.cursor = None
  78. if self.conn:
  79. self.conn.close()
  80. self.conn = None