123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 |
- # 从hbase 获取数据
- import logging
- import traceback
- import phoenixdb
- import time
- import pandas as pd
- import numpy as np
- from decimal import Decimal
- import gc
- # 计算hash值
- class GetHashCode:
- def convert_n_bytes(self, n, b):
- bits = b * 8
- return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1)
- def convert_4_bytes(self, n):
- return self.convert_n_bytes(n, 4)
- @classmethod
- def getHashCode(cls, s):
- h = 0
- n = len(s)
- for i, c in enumerate(s):
- h = h + ord(c) * 31 ** (n - 1 - i)
- return cls().convert_4_bytes(h)
- class LindormConnection:
- # ods_signal: 清洗后的数据
- # stage_signal: 原始数据
- def __init__(self, database_url='ld-uf69n0762fjx616l6-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30060',
- username='root', password='root',
- database='test', table='ods_signal'):
- self.database_url = database_url;
- self.username = username;
- self.password = password;
- self.database = database
- self.table = table
- self.conn = None
- return
- def __enter__(self):
- self._connect();
- return self
- def __exit__(self, exc_type, exc_value, traceback):
- if self.conn and not self.conn._closed:
- self.conn.close()
- def _connect(self):
- try:
- connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,
- 'database': self.database}
- self.conn = phoenixdb.connect(self.database_url, autocommit=True, **connect_kw_args)
- print("lindorm 连接成功")
- except AttributeError:
- print("Failed to connect")
- def get_data(self, vin='', sn='', start_time='', end_time='', sel_columns=[]):
- while True:
- try:
- statement = None
- with self.conn.cursor() as statement:
- df = self._get_data(vin, sn, start_time, end_time, sel_columns, statement)
- return df
- except Exception as e:
- if statement and not statement._closed:
- statement.close()
- if self.conn and not self.conn._closed:
- self.conn.close()
- traceback.print_exc()
- print(str(e))
- if ("vin 和 sn 只能填写一个" in str(e)):
- raise Exception(str(e))
- break
- self._connect()
- time.sleep(1)
- def _get_data(self, vin='', sn='', start_time='', end_time='', sel_columns=[], statement=None):
- # 连接url
- # 创建表
- # sql_create_table = "create table if not exists test_python(c1 integer, c2 integer, primary key(c1))"
- # print(sql_create_table)
- # statement.execute(sql_create_table)
- # # 插入一行数据
- # sql_upsert = "upsert into test_python(c1, c2) values(1,1)"
- # print(sql_upsert)
- # statement.execute(sql_upsert)
- # # 插入多行数据
- # with connection.cursor() as stat:
- # sql_upsert = "upsert into test_python(c1, c2) values(?,?)"
- # print(sql_upsert)
- # stat.executemany(sql_upsert, [(2, 2), (3, 3)])
- # # 删除数据
- # sql_delete = "delete from test_python where c1=2"
- # print(sql_delete)
- # statement.execute(sql_delete)
- # # 修改数据
- # sql_update = "upsert into test_python(c1, c2) values(1,10)"
- # print(sql_update)
- # statement.execute(sql_update)
- # 获取数据
- # sql_select = "select * from ods_signal where VIN='LUZAGBDA8LA011827' limit 1"
- # print(sql_select)
- # statement.execute(sql_select)
- # rows = statement.fetchall()
- # 获取列名
- sql_select = "desc {}".format(self.table)
- # print(sql_select)
- statement.execute(sql_select)
- rows = statement.fetchall()
- columns = []
- for r in rows:
- columns.append(r[2])
- # 调整表结构
- if len(sel_columns) == 0:
- raise Exception("没有指定查询参数")
- # 判断所取的列是否都存在
- # if len(sel_columns) > 0:
- # if len((set(sel_columns)-(set(sel_columns)&set(columns))))>0:
- # raise Exception('{}列名不存在,请检查列名'.format(str(set(sel_columns)-(set(sel_columns)&set(columns)))))
- # else:
- # sel_columns = columns
- # 获取vin列表
- # sql_select = "select distinct VIN as vin from ods_signal"
- # print(sql_select)
- # statement.execute(sql_select)
- # rows = statement.fetchall()
- # vins = []
- # for r in rows:
- # vins.append(r[0])
- # vins = list(set(vins))
- # 获取数据
- MAX_LONG = 9223372036854775807
- # 参数起始时间,结束时间,vin
- # 判断基于vin还是sn
- if self.table != 'dwd_batt_persona_standing_proc_di':
- st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- if len(vin) > 0:
- prefix = str(abs(GetHashCode.getHashCode(vin))) + "_" + vin + "_"
- id = 'id'
- elif len(sn) > 0:
- prefix = str(abs(GetHashCode.getHashCode(sn))) + "_" + sn + "_"
- id = 'SnKey'
- else:
- raise Exception("vin 和 sn 只能填写一个")
- start_rows = prefix + str(MAX_LONG - et)
- end_rows = prefix + str(MAX_LONG - st)
- else:
- if len(vin) > 0:
- prefix = vin + "_"
- id = 'id'
- else:
- raise Exception("vin 和 sn 只能填写一个")
- st = time.strftime('%Y%m%d',time.strptime(start_time, '%Y-%m-%d %H:%M:%S'))
- et = time.strftime('%Y%m%d',time.strptime(end_time, '%Y-%m-%d %H:%M:%S'))
- st_time = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')))
- et_time = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')))
- start_rows = prefix + str(st_time) + '_' + st
- end_rows = prefix + str(et_time) + '_' + et
- # 列名拼接
- # s0 = time.time() * 1000
- columns_str = ""
- for column in sel_columns:
- columns_str = columns_str + column + ','
- columns_str = columns_str[0:-1]
- sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, self.table, id, start_rows,
- id, end_rows)
- # print(sql_select)
- rows = []
- page_size = 2000
- last_row_length = 2000
- page_index = 1
- while last_row_length == page_size:
- sql = sql_select + ' LIMIT %s OFFSET %s' % (page_size, (page_index - 1) * page_size)
- statement.execute(sql)
-
- last_rows = statement.fetchall()
- last_row_length = len(last_rows)
- rows.extend(last_rows)
- page_index += 1
- # statement.execute(sql_select)
- # rows = statement.fetchall()
- # print('sql_select: %s' % (time.time() * 1000 - s0))
- df = pd.DataFrame(rows, columns=sel_columns)
- #数据类型转换
- # for colunms_name in sel_columns:
- # if colunms_name in self.infodatatype.keys():
- # df[colunms_name]=df[colunms_name].astype(self.infodatatype[colunms_name])
- # else:
- # pass
-
- # 空直接返回
- if df.empty:
- return df
- stage_signal_tables = ['stage_signal', 'stage_signal_new']
- ods_signal_tables = ['ods_signal', 'ods_signal_new', 'ods_signal_base']
- if self.table in stage_signal_tables:
- df = df.sort_values(['time_value'])
- # df['time_value'] = df['time_value'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
- elif self.table in ods_signal_tables:
- df = df.sort_values(['Time'])
- df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
- df['Time'] = df['Time'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
- df = df.reset_index(drop=True)
- # 处理数据类型
- if self.table in ods_signal_tables:
- for column in sel_columns:
- if type(df.loc[0, column]) == Decimal:
- df[column] = df[column].apply(lambda x: x if pd.isnull(x) else np.float64(x))
- return df
- # # 禁用表,删除表之前需要先禁用表
- # sql_offline_table = "offline table test_python"
- # print(sql_offline_table)
- # statement.execute(sql_offline_table)
- # # 删除表
- # sql_drop_table = "drop table if exists test_python"
- # print(sql_drop_table)
- # statement.execute(sql_drop_table)
- # 获取用车表单数据
- def get_algo_data(self, vin='', start_time='', end_time='', sel_columns=[]):
- for i in range(10):
- try:
- statement = None
- with self.conn.cursor() as statement:
- df = self._get_algo_data(vin, start_time, end_time, sel_columns, statement)
- return df
- except Exception as e:
- if statement and not statement._closed:
- statement.close()
- if self.conn and not self.conn._closed:
- self.conn.close()
- self._connect()
- time.sleep(1)
- raise Exception(str(e))
- return pd.DataFrame(columns=sel_columns)
- def _get_algo_data(self, vin='', start_time='', end_time='', sel_columns=[], statement=None):
- # 获取列名
- sql_select = "desc {}".format(self.table)
- # print(sql_select)
- statement.execute(sql_select)
- rows = statement.fetchall()
- columns = []
- for r in rows:
- columns.append(r[2])
- # # 判断所取的列是否都存在
- if len(sel_columns) > 0:
- if len((set(sel_columns) - (set(sel_columns) & set(columns)))) > 0:
- raise Exception('{}列名不存在,请检查列名'.format(str(set(sel_columns) - (set(sel_columns) & set(columns)))))
- else:
- sel_columns = columns
- # 获取数据
- MAX_LONG = 9223372036854775807
- # 参数起始时间,结束时间,vin
- st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
- # 判断基于vin还是sn
- if len(vin) > 0:
- prefix = str(abs(GetHashCode.getHashCode(vin)))+"_"+vin+"_"
- id = 'id'
- else:
- raise Exception("vin不能为空")
- start_rows = prefix + str(MAX_LONG-et)
- end_rows = prefix + str(MAX_LONG-st)
- # time_field = "time_st"
- # 列名拼接
- columns_str = ""
- for column in sel_columns:
- columns_str = columns_str + column + ','
- columns_str = columns_str[0:-1]
- sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, self.table, id,
- start_rows, id, end_rows)
- # print(sql_select)
- statement.execute(sql_select)
- rows = statement.fetchall()
- df = pd.DataFrame(rows, columns=sel_columns)
- # 空直接返回
- if df.empty:
- return df
- # stage_signal_tables = ['stage_signal', 'stage_signal_new']
- # ods_signal_tables = ['ods_signal', 'ods_signal_new']
- # if self.table in stage_signal_tables:
- # df = df.sort_values(['time_value'])
- # # df['time_value'] = df['time_value'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
- # elif self.table in ods_signal_tables:
- # df = df.sort_values(['Time'])
- # df['Time'] = pd.to_datetime(list(df['Time']),utc=True, unit='ms').tz_convert('Asia/Shanghai')
- # df['Time'] = df['Time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
- # df = df.reset_index(drop=True)
- # 处理数据类型
- # if self.table in ods_signal_tables:
- # for column in sel_columns:
- # if type(df.loc[0,column]) == Decimal:
- # df[column] = df[column].apply(lambda x: x if pd.isnull(x) else np.float64(x))
- return df
- # # 禁用表,删除表之前需要先禁用表
- # sql_offline_table = "offline table test_python"
- # print(sql_offline_table)
- # statement.execute(sql_offline_table)
- # # 删除表
- # sql_drop_table = "drop table if exists test_python"
- # print(sql_drop_table)
- # statement.execute(sql_drop_table)
- # 获取离线表单数据
- def get_offline_data(self, result_id, column_name):
- i = 1
- while True:
- try:
- i = i + 1
- statement = None
- with self.conn.cursor() as statement:
- # 获取列名
- sql_select = "desc {}".format(self.table)
- statement.execute(sql_select)
- rows = statement.fetchall()
- sel_columns = column_name.split(',')
- sql_select = "select {} from {} where result_id = '{}' order by row_id".format(column_name,
- self.table,
- result_id)
- statement.execute(sql_select)
- rows = statement.fetchall()
- df = pd.DataFrame(rows, columns=sel_columns)
- return df
- except Exception as e:
- if statement and not statement._closed:
- statement.close()
- if self.conn and not self.conn._closed:
- self.conn.close()
- if i > 9:
- return pd.DataFrame()
- break
- self._connect()
- time.sleep(1)
- # 获得新表结构的列
- def _get_select_param(self, sel_columns):
- columns_str = ""
- has_info = False
- # 新表结构
- if self.table == 'ods_signal_base':
- for c in sel_columns:
- if c not in self.signalCol:
- has_info = True
- continue
- columns_str = columns_str + c + ','
- if has_info:
- columns_str += 'Info,'
- else:
- for column in sel_columns:
- columns_str = columns_str + column + ','
- return columns_str[0:-1], has_info
- # 结构转换
- def _transform_result(self, sel_columns, rows):
- new_rows = []
- for r in rows:
- info_dict = self._info_to_dict(r[len(r) - 1])
- new_row = []
- col_last = 0
- for i in range(len(sel_columns)):
- if sel_columns[i] not in self.signalCol:
- new_row.append(info_dict[sel_columns[i]])
- else:
- new_row.append(r[col_last])
- col_last += 1
- new_rows.append(new_row)
- return new_rows
- # info转换为
- def _info_to_dict(self, info):
- info_array = info.split('@@')
- info_dict = {}
- if len(info_array) < len(self.infoRelation):
- raise Exception("数据异常: %s - %s ->%s" %(len(info_array), len(self.infoRelation), info))
- for k, v in self.infoRelation.items():
- info_dict[k] = info_array[v]
-
- return info_dict
- # if __name__ == '__main__':
- #
- # sel_columns = ["VIN", "SN", "Time", "VehState", "CellMaxVolNum", "CellMinVolNum", "CellMinVolNum", "CellMaxTempNum"]
- #
- # s = time.time() * 1000
- # with LindormConnection(table='ods_signal_new') as dbm:
- # df_data = dbm.get_data(sn='0CZPE007DN0111B700000067', start_time='2022-07-13 15:00:00',
- # end_time='2022-07-21 03:00:00', sel_columns=sel_columns)
- # print(len(df_data))
- # print(time.time() * 1000 - s)
- #
- # s = time.time() * 1000
- # with LindormConnection(table='ods_signal_base') as dbm:
- # df_data = dbm.get_data(sn='0CZPE007DN0111B700000067', start_time='2022-07-13 15:00:00',
- # end_time='2022-07-21 03:00:00', sel_columns=sel_columns)
- # print(len(df_data))
- # print(time.time() * 1000 - s)
- #
|