HzAlgoService.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import time
  2. import pandas as pd
  3. from ...base.AlgoService import AlgoService
  4. from ...impl.DBHbase import DBHbase
  5. # 计算hash值
  6. class GetHashCode:
  7. def convert_n_bytes(self, n, b):
  8. bits = b * 8
  9. return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1)
  10. def convert_4_bytes(self, n):
  11. return self.convert_n_bytes(n, 4)
  12. @classmethod
  13. def getHashCode(cls, s):
  14. h = 0
  15. n = len(s)
  16. for i, c in enumerate(s):
  17. h = h + ord(c) * 31 ** (n - 1 - i)
  18. return cls().convert_4_bytes(h)
  19. class HzAlgoService():
  20. def __init__(self,hbase_params:dict, mysql_params:dict = None, type=1):
  21. if type == 1:
  22. # 项目内部调用
  23. self._init__(hbase_params, mysql_params)
  24. else:
  25. # 外部调用,只使用内部的部分功能
  26. pass
  27. def _init__(self, hbase_params:dict, mysql_params:dict = None):
  28. hbase_params.update({"is_lindorm":1,"pagesize":300000})
  29. self.dbhbase = DBHbase(hbase_params)
  30. self.dbhbase.connect_lindorm(hbase_params['db'])
  31. self.mysql_params = mysql_params
  32. # self.batterystatus_parser = BatteryStatus_pb2.DataPackage()
  33. # self.gps_parser = GPS_pb2.DataPackage()
  34. def __enter__(self):
  35. return self
  36. def __exit__(self, *exc_info):
  37. self.close()
  38. def _check_param(self, sn_list, vin_list, start_time, end_time, columns=[]):
  39. if sn_list and vin_list:
  40. raise Exception("不能同时指定sn和vin进行查询")
  41. if not isinstance(sn_list, list) or not isinstance(vin_list, list) or not isinstance(columns, list):
  42. raise Exception("sn 和 vin 和columns 必须为列表")
  43. if not sn_list and not vin_list:
  44. raise Exception("未指定sn或vin")
  45. if not start_time or not end_time:
  46. raise Exception("未指定时间段")
  47. st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) )
  48. et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) )
  49. if st > int(time.time()) or et > int(time.time()):
  50. raise Exception("查询时间超出现实时间")
  51. if len(sn_list) > 5000 or len(vin_list) > 5000:
  52. raise Exception("单次查询的sn或vin码过多")
  53. return st, et
  54. def get_original_hz_algo_data(self, table, sn="", vin="", start_time:str="", end_time: str="") -> pd.DataFrame:
  55. # 获取列名
  56. sql_select = "desc {}".format(table)
  57. # print(sql_select)
  58. self.dbhbase.cursor.execute(sql_select)
  59. column_rows = self.dbhbase.cursor.fetchall()
  60. df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows)
  61. columns = []
  62. for r in column_rows:
  63. columns.append(r[2])
  64. sel_columns = columns
  65. # 获取数据
  66. MAX_LONG = 9223372036854775807
  67. # 参数起始时间,结束时间,vin
  68. st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  69. et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  70. # 判断基于vin还是sn
  71. if len(vin) > 0:
  72. prefix = str(abs(GetHashCode.getHashCode(vin)))+"_"+vin+"_"
  73. else:
  74. prefix = str(abs(GetHashCode.getHashCode(sn)))+"_"+sn+"_"
  75. start_rows = prefix + str(MAX_LONG-et)
  76. end_rows = prefix + str(MAX_LONG-st)
  77. id_field = "id"
  78. # 列名拼接
  79. columns_str = ""
  80. for column in sel_columns:
  81. columns_str = columns_str + column + ','
  82. columns_str = columns_str[0:-1]
  83. sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, table, id_field,
  84. start_rows, id_field, end_rows)
  85. # print(sql_select)
  86. data_rows = self.dbhbase.page_query(sql_select)
  87. return df_column_rows, data_rows
  88. def get_original_hz_dwads_data(self, table, dt_start:str="", dt_end: str="") -> pd.DataFrame:
  89. # 获取列名
  90. sql_select = "desc {}".format(table)
  91. # print(sql_select)
  92. self.dbhbase.cursor.execute(sql_select)
  93. column_rows = self.dbhbase.cursor.fetchall()
  94. df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows)
  95. columns = []
  96. for r in column_rows:
  97. columns.append(r[2])
  98. sel_columns = columns
  99. id_field = "dt"
  100. # 列名拼接
  101. columns_str = ""
  102. for column in sel_columns:
  103. columns_str = columns_str + column + ','
  104. columns_str = columns_str[0:-1]
  105. sql_select = "select {} from {} where {} >= '{}' and {} <= '{}'".format(columns_str, table, id_field,
  106. dt_start, id_field, dt_end)
  107. # print(sql_select)
  108. self.dbhbase.cursor.execute(sql_select)
  109. data_rows = self.dbhbase.cursor.fetchall()
  110. # data_rows = self.dbhbase.page_query(sql_select)
  111. return df_column_rows, data_rows
  112. def close(self):
  113. self.dbhbase.close()