import time import pandas as pd from ...base.AlgoService import AlgoService from ...impl.DBHbase import DBHbase # 计算hash值 class GetHashCode: def convert_n_bytes(self, n, b): bits = b * 8 return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1) def convert_4_bytes(self, n): return self.convert_n_bytes(n, 4) @classmethod def getHashCode(cls, s): h = 0 n = len(s) for i, c in enumerate(s): h = h + ord(c) * 31 ** (n - 1 - i) return cls().convert_4_bytes(h) class HzAlgoService(): def __init__(self,hbase_params:dict, mysql_params:dict = None, type=1): if type == 1: # 项目内部调用 self._init__(hbase_params, mysql_params) else: # 外部调用,只使用内部的部分功能 pass def _init__(self, hbase_params:dict, mysql_params:dict = None): hbase_params.update({"is_lindorm":1,"pagesize":300000}) self.dbhbase = DBHbase(hbase_params) self.dbhbase.connect_lindorm(hbase_params['db']) self.mysql_params = mysql_params # self.batterystatus_parser = BatteryStatus_pb2.DataPackage() # self.gps_parser = GPS_pb2.DataPackage() def __enter__(self): return self def __exit__(self, *exc_info): self.close() def _check_param(self, sn_list, vin_list, start_time, end_time, columns=[]): if sn_list and vin_list: raise Exception("不能同时指定sn和vin进行查询") if not isinstance(sn_list, list) or not isinstance(vin_list, list) or not isinstance(columns, list): raise Exception("sn 和 vin 和columns 必须为列表") if not sn_list and not vin_list: raise Exception("未指定sn或vin") if not start_time or not end_time: raise Exception("未指定时间段") st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) ) et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) ) if st > int(time.time()) or et > int(time.time()): raise Exception("查询时间超出现实时间") if len(sn_list) > 5000 or len(vin_list) > 5000: raise Exception("单次查询的sn或vin码过多") return st, et def get_original_hz_algo_data(self, table, sn="", vin="", start_time:str="", end_time: str="") -> pd.DataFrame: # 获取列名 sql_select = "desc {}".format(table) # print(sql_select) self.dbhbase.cursor.execute(sql_select) column_rows = self.dbhbase.cursor.fetchall() df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows) columns = [] for r in column_rows: columns.append(r[2]) sel_columns = columns # 获取数据 MAX_LONG = 9223372036854775807 # 参数起始时间,结束时间,vin st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000) et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000) # 判断基于vin还是sn if len(vin) > 0: prefix = str(abs(GetHashCode.getHashCode(vin)))+"_"+vin+"_" else: prefix = str(abs(GetHashCode.getHashCode(sn)))+"_"+sn+"_" start_rows = prefix + str(MAX_LONG-et) end_rows = prefix + str(MAX_LONG-st) id_field = "id" # 列名拼接 columns_str = "" for column in sel_columns: columns_str = columns_str + column + ',' columns_str = columns_str[0:-1] sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, table, id_field, start_rows, id_field, end_rows) # print(sql_select) data_rows = self.dbhbase.page_query(sql_select) return df_column_rows, data_rows def get_original_hz_dwads_data(self, table, dt_start:str="", dt_end: str="") -> pd.DataFrame: # 获取列名 sql_select = "desc {}".format(table) # print(sql_select) self.dbhbase.cursor.execute(sql_select) column_rows = self.dbhbase.cursor.fetchall() df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows) columns = [] for r in column_rows: columns.append(r[2]) sel_columns = columns id_field = "dt" # 列名拼接 columns_str = "" for column in sel_columns: columns_str = columns_str + column + ',' columns_str = columns_str[0:-1] sql_select = "select {} from {} where {} >= '{}' and {} <= '{}'".format(columns_str, table, id_field, dt_start, id_field, dt_end) # print(sql_select) self.dbhbase.cursor.execute(sql_select) data_rows = self.dbhbase.cursor.fetchall() # data_rows = self.dbhbase.page_query(sql_select) return df_column_rows, data_rows def close(self): self.dbhbase.close()