123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import time
- import pandas as pd
- from ...base.AlgoService import AlgoService
- from ...impl.DBHbase import DBHbase
- # 计算hash值
- class GetHashCode:
- def convert_n_bytes(self, n, b):
- bits = b * 8
- return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1)
- def convert_4_bytes(self, n):
- return self.convert_n_bytes(n, 4)
- @classmethod
- def getHashCode(cls, s):
- h = 0
- n = len(s)
- for i, c in enumerate(s):
- h = h + ord(c) * 31 ** (n - 1 - i)
- return cls().convert_4_bytes(h)
-
- class HzAlgoService():
-
- def __init__(self,hbase_params:dict, mysql_params:dict = None, type=1):
- if type == 1:
- # 项目内部调用
- self._init__(hbase_params, mysql_params)
- else:
- # 外部调用,只使用内部的部分功能
- pass
-
- def _init__(self, hbase_params:dict, mysql_params:dict = None):
- hbase_params.update({"is_lindorm":1,"pagesize":300000})
- self.dbhbase = DBHbase(hbase_params)
- self.dbhbase.connect_lindorm(hbase_params['db'])
- self.mysql_params = mysql_params
- # self.batterystatus_parser = BatteryStatus_pb2.DataPackage()
- # self.gps_parser = GPS_pb2.DataPackage()
-
- def __enter__(self):
- return self
- def __exit__(self, *exc_info):
- self.close()
-
- def _check_param(self, sn_list, vin_list, start_time, end_time, columns=[]):
- if sn_list and vin_list:
- raise Exception("不能同时指定sn和vin进行查询")
- if not isinstance(sn_list, list) or not isinstance(vin_list, list) or not isinstance(columns, list):
- raise Exception("sn 和 vin 和columns 必须为列表")
- if not sn_list and not vin_list:
- raise Exception("未指定sn或vin")
- if not start_time or not end_time:
- raise Exception("未指定时间段")
-
- st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) )
- et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) )
- if st > int(time.time()) or et > int(time.time()):
- raise Exception("查询时间超出现实时间")
-
- if len(sn_list) > 5000 or len(vin_list) > 5000:
- raise Exception("单次查询的sn或vin码过多")
- return st, et
-
-
-
- def get_original_hz_algo_data(self, table, sn="", vin="", start_time:str="", end_time: str="") -> pd.DataFrame:
-
-
- # 获取列名
- sql_select = "desc {}".format(table)
- # print(sql_select)
- self.dbhbase.cursor.execute(sql_select)
- column_rows = self.dbhbase.cursor.fetchall()
- df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows)
- columns = []
- for r in column_rows:
- columns.append(r[2])
- sel_columns = columns
- # 获取数据
- MAX_LONG = 9223372036854775807
- # 参数起始时间,结束时间,vin
- st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- # 判断基于vin还是sn
- if len(vin) > 0:
- prefix = str(abs(GetHashCode.getHashCode(vin)))+"_"+vin+"_"
- else:
- prefix = str(abs(GetHashCode.getHashCode(sn)))+"_"+sn+"_"
- start_rows = prefix + str(MAX_LONG-et)
- end_rows = prefix + str(MAX_LONG-st)
- id_field = "id"
- # 列名拼接
- columns_str = ""
- for column in sel_columns:
- columns_str = columns_str + column + ','
- columns_str = columns_str[0:-1]
- sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, table, id_field,
- start_rows, id_field, end_rows)
- # print(sql_select)
- data_rows = self.dbhbase.page_query(sql_select)
- return df_column_rows, data_rows
-
- def get_original_hz_dwads_data(self, table, dt_start:str="", dt_end: str="") -> pd.DataFrame:
-
-
- # 获取列名
- sql_select = "desc {}".format(table)
- # print(sql_select)
- self.dbhbase.cursor.execute(sql_select)
- column_rows = self.dbhbase.cursor.fetchall()
- df_column_rows = pd.DataFrame(columns=['TABLE_SCHEMA', 'TABLE_NAME','COLUMN_NAME','TYPE','IS_PRIMARY_KEY','SORT_ORDER'], data=column_rows)
- columns = []
- for r in column_rows:
- columns.append(r[2])
- sel_columns = columns
- id_field = "dt"
- # 列名拼接
- columns_str = ""
- for column in sel_columns:
- columns_str = columns_str + column + ','
- columns_str = columns_str[0:-1]
- sql_select = "select {} from {} where {} >= '{}' and {} <= '{}'".format(columns_str, table, id_field,
- dt_start, id_field, dt_end)
- # print(sql_select)
- self.dbhbase.cursor.execute(sql_select)
- data_rows = self.dbhbase.cursor.fetchall()
- # data_rows = self.dbhbase.page_query(sql_select)
- return df_column_rows, data_rows
-
-
- def close(self):
- self.dbhbase.close()
-
-
|