{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# 从hbase中获取数据\n", " # -*- coding: utf-8 -*-\n", "# 以下两个模块可以通过 pip install thrift 安装获得\n", "from thrift.protocol import TBinaryProtocol\n", "from thrift.transport import THttpClient\n", "# 下面的模块通过 thrift --gen py hbase.thrift 来生成\n", "from hbase import THBaseService\n", "from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TNamespaceDescriptor, TGet, TPut, TScan\n", "\n", "# 连接地址\n", "url = \"http://host:9190\"\n", "transport = THttpClient.THttpClient(url)\n", "headers = {}\n", "# 用户名\n", "headers[\"ACCESSKEYID\"]=\"root\";\n", "# 密码\n", "headers[\"ACCESSSIGNATURE\"]=\"root\"\n", "transport.setCustomHeaders(headers)\n", "protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)\n", "client = THBaseService.Client(protocol)\n", "transport.open()\n", "# 具体操作,最后close连接\n", "transport.close()" ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "desc ods_signal\n", "select distinct VIN as vin from ods_signal\n", "select * from ods_signal where id > '1808931144_LUZAGAAA7MA038095_9223370390704375807' and id < '1808931144_LUZAGAAA7MA038095_9223370390790775807'\n", "select * from ods_signal where id > '1873439984_LUZBGAFBXMA042955_9223370390704375807' and id < '1873439984_LUZBGAFBXMA042955_9223370390790775807'\n", "select * from ods_signal where id > '2033890755_LUZAGBDA8LA005283_9223370390704375807' and id < '2033890755_LUZAGBDA8LA005283_9223370390790775807'\n", "select * from ods_signal where id > '680280554_LUZAGAAA8MA053138_9223370390704375807' and id < '680280554_LUZAGAAA8MA053138_9223370390790775807'\n" ] } ], "source": [ "import phoenixdb\n", "import time\n", "import pandas as pd\n", "\n", "# 计算hash值\n", "class GetHashCode:\n", " def convert_n_bytes(self,n, b):\n", " bits = b*8\n", " return (n + 2**(bits-1)) % 2**bits - 2**(bits-1)\n", " def convert_4_bytes(self,n):\n", " return self.convert_n_bytes(n, 4)\n", " @classmethod\n", " def getHashCode(cls,s):\n", " h = 0\n", " n = len(s)\n", " for i, c in enumerate(s):\n", " h = h + ord(c)*31**(n-1-i)\n", " return cls().convert_4_bytes(h)\n", "\n", "\n", "class PhoenixConnectionTest:\n", "\n", " def __init__(self):\n", " return\n", "\n", " def _connect(self, connect_kw_args):\n", " try:\n", " r = phoenixdb.connect(database_url, autocommit=True, **connect_kw_args)\n", " return r\n", " except AttributeError:\n", " print (\"Failed to connect\")\n", " \n", "# 连接url\n", "database_url = \"ld-bp16866o8uwo0a590-proxy-lindorm-pub.lindorm.rds.aliyuncs.com:30060\"\n", "\n", "connect_kw_args = {'lindorm_user': 'root', 'lindorm_password': 'root', 'database': 'test'}\n", "db = PhoenixConnectionTest()\n", "connection = db._connect(connect_kw_args)\n", "\n", "with connection.cursor() as statement:\n", " # 创建表\n", " # sql_create_table = \"create table if not exists test_python(c1 integer, c2 integer, primary key(c1))\"\n", " # print(sql_create_table)\n", " # statement.execute(sql_create_table)\n", "\n", " # # 插入一行数据\n", " # sql_upsert = \"upsert into test_python(c1, c2) values(1,1)\"\n", " # print(sql_upsert)\n", " # statement.execute(sql_upsert)\n", "\n", " # # 插入多行数据\n", " # with connection.cursor() as stat:\n", " # sql_upsert = \"upsert into test_python(c1, c2) values(?,?)\"\n", " # print(sql_upsert)\n", " # stat.executemany(sql_upsert, [(2, 2), (3, 3)])\n", "\n", " # # 删除数据\n", " # sql_delete = \"delete from test_python where c1=2\"\n", " # print(sql_delete)\n", " # statement.execute(sql_delete)\n", "\n", " # # 修改数据\n", " # sql_update = \"upsert into test_python(c1, c2) values(1,10)\"\n", " # print(sql_update)\n", " # statement.execute(sql_update)\n", "\n", " # 获取列名\n", " sql_select = \"desc ods_signal\"\n", " print(sql_select)\n", " statement.execute(sql_select)\n", " rows = statement.fetchall()\n", " columns = []\n", " for r in rows:\n", " columns.append(r[2])\n", " \n", " # 获取vin列表\n", " sql_select = \"select distinct VIN as vin from ods_signal\"\n", " print(sql_select)\n", " statement.execute(sql_select)\n", " rows = statement.fetchall()\n", " vins = []\n", " for r in rows:\n", " vins.append(r[0])\n", " vins = list(set(vins))\n", " \n", " # 获取数据\n", " MAX_LONG=9223372036854775807\n", " # 参数起始时间,结束时间,vin\n", " start_time = '2022-03-01 00:00:00'\n", " end_time = '2022-03-02 00:00:00'\n", " st = int(time.mktime(time.strptime(start_time,'%Y-%m-%d %H:%M:%S')) * 1000)\n", " et = int(time.mktime(time.strptime(end_time,'%Y-%m-%d %H:%M:%S')) * 1000)\n", " \n", " dfs = []\n", " for vin in vins:\n", " \n", " prefix = str(abs(GetHashCode.getHashCode(vin)))+\"_\"+vin+\"_\"\n", " start_rows = prefix + str(MAX_LONG-et)\n", " end_rows = prefix + str(MAX_LONG-st)\n", " sql_select = \"select * from ods_signal where id > '{}' and id < '{}'\".format(start_rows, end_rows)\n", " print(sql_select)\n", " statement.execute(sql_select)\n", " rows = statement.fetchall()\n", " dfs.append(pd.DataFrame(rows, columns=columns))\n", " \n", " \n", "\n", " # # 禁用表,删除表之前需要先禁用表\n", " # sql_offline_table = \"offline table test_python\"\n", " # print(sql_offline_table)\n", " # statement.execute(sql_offline_table)\n", "\n", " # # 删除表\n", " # sql_drop_table = \"drop table if exists test_python\"\n", " # print(sql_drop_table)\n", " # statement.execute(sql_drop_table)\n", "\n", "connection.close()" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [], "source": [ "a = 0\n", "for df in dfs:\n", " a = a + len(df)" ] }, { "cell_type": "code", "execution_count": 70, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "25671" ] }, "execution_count": 70, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 计算id hash\n", "a" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "9223370390727590241" ] }, "execution_count": 43, "metadata": {}, "output_type": "execute_result" } ], "source": [ "MAX_LONG - 1646127185566" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1646064000000" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['1808931144_LUZAGAAA7MA038095_9223370390727590241',\n", " '2105094752_0CZPE007DN0111B800000059_9223370390727590241',\n", " 'LUZAGAAA7MA038095',\n", " 'EP12',\n", " 2,\n", " 1646127185566,\n", " '4.006,4.009,4.009,4.011,4.008,4.009,4.010,4.009,4.010,4.010,4.009,4.010,4.005,4.009,4.009,4.009,4.010,4.009,4.010,4.009,4.009,4.008,4.008,4.010,4.006,4.010,4.010,4.009,4.009,4.010,4.009,4.008,4.008,4.006,4.009,4.009,4.008,4.009,4.010,4.009,4.009,4.010,4.009,4.010,4.009,4.009,4.010,4.011,4.009,4.011,4.012,4.012,4.011,4.011,4.011,4.011,4.011,4.010,4.010,4.010,4.006,4.010,4.009,4.010,4.009,4.009,4.009,4.006,4.008,4.009,4.010,4.009,4.006,4.009,4.008,4.010,4.010,4.011,4.008,4.010,4.011,4.009,4.009,4.009,4.011,4.005,4.009,4.011,4.010,4.009,4.010,4.010,4.009,4.009,4.009,4.010',\n", " Decimal('384.600006'),\n", " 66,\n", " 99,\n", " '12,12,12,12,12,12,12,12,12,12,12,12,12,12,12,12,11,12,12,12,12,12,12,12,12,12,12,12,12,12,12,12',\n", " 1,\n", " 29,\n", " Decimal('0.000000'),\n", " Decimal('324.500000'),\n", " Decimal('3927.900000'),\n", " None,\n", " 0,\n", " Decimal('0.000000'),\n", " 0,\n", " Decimal('77.000000'),\n", " Decimal('0.000000'),\n", " Decimal('255.000000'),\n", " Decimal('4000.000000'),\n", " 3,\n", " Decimal('115.956260'),\n", " Decimal('30.075305'),\n", " Decimal('14.500000'),\n", " 65535,\n", " 4,\n", " 0,\n", " -999999999,\n", " -999999999,\n", " -999999999,\n", " Decimal('-999999999.000000'),\n", " Decimal('-999999999.000000'),\n", " 0,\n", " -999999999,\n", " 255,\n", " Decimal('65535.000000'),\n", " Decimal('65535.000000'),\n", " -999999999,\n", " -999999999,\n", " -999999999,\n", " -999999999,\n", " 0,\n", " -999999999,\n", " 0,\n", " -999999999,\n", " Decimal('-999999999.000000'),\n", " Decimal('-999999999.000000'),\n", " 0,\n", " 255,\n", " 0,\n", " 0,\n", " Decimal('4012.000000'),\n", " Decimal('4006.000000'),\n", " Decimal('12.000000'),\n", " Decimal('11.000000'),\n", " Decimal('94.000000'),\n", " Decimal('-0.300000'),\n", " Decimal('0.189999'),\n", " Decimal('65535.000000'),\n", " 255,\n", " Decimal('65535.000000'),\n", " 0,\n", " 0,\n", " 0,\n", " 0,\n", " Decimal('65535.000000'),\n", " Decimal('65535.000000'),\n", " Decimal('65535.000000'),\n", " Decimal('65535.000000'),\n", " Decimal('0.000000'),\n", " Decimal('0.000000'),\n", " Decimal('65535.000000'),\n", " 65535,\n", " 255,\n", " 255,\n", " 0,\n", " Decimal('4294967295.000000'),\n", " Decimal('4294967295.000000'),\n", " 0,\n", " 1640988682000,\n", " 96,\n", " 1,\n", " 96,\n", " 32,\n", " '0CZPE007DN0111B800000059',\n", " 'E',\n", " -999999999,\n", " '2101TAB',\n", " 122,\n", " '2P96S',\n", " 'J9',\n", " '华鼎国联动力电池有限公司',\n", " 3,\n", " '华鼎国联动力电池有限公司']]" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rows" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['id',\n", " 'SnKey',\n", " 'VIN',\n", " 'VehModel',\n", " 'VehState',\n", " 'Time',\n", " 'CellVoltage',\n", " 'PackVoltage',\n", " 'CellMaxVolNum',\n", " 'CellMinVolNum',\n", " 'CellTemp',\n", " 'CellMaxTempNum',\n", " 'CellMinTempNum',\n", " 'PackCrnt',\n", " 'VehRmnRng',\n", " 'VehOdo',\n", " 'FltCode',\n", " 'FltLvl',\n", " 'VehSpd',\n", " 'PedalAngle',\n", " 'PackSoc',\n", " 'MotorPwr',\n", " 'EnmTemp',\n", " 'InsulationRss',\n", " 'ChrgSta',\n", " 'GPS_lon',\n", " 'GPS_lat',\n", " 'GPS_alt',\n", " 'PowerAlreadyTime',\n", " 'BMSSta',\n", " 'ACSta',\n", " 'HVILSta',\n", " 'OfcAccumTime',\n", " 'BmsOncChgCp',\n", " 'BmsOfcPileOutputVol',\n", " 'BmsOfcPileOutputCur',\n", " 'BmsOfcCc',\n", " 'BmsOfcAllowed',\n", " 'OncCmState',\n", " 'OncOutCurrent',\n", " 'OncOutVoltage',\n", " 'OfcStationMaxOutVolt',\n", " 'OfcStationMinOutVolt',\n", " 'OfcStationMaxOutCrnt',\n", " 'OfcStationMinOutCrnt',\n", " 'BmsOfcgVoltageRequest',\n", " 'BmsOfcType',\n", " 'BmsOfcCurrentRequest',\n", " 'BmsOncChgRequest',\n", " 'BmsRequestOutputCurrent',\n", " 'BmsRequestOutputVoltage',\n", " 'BmsOncCc',\n", " 'BmsS2State',\n", " 'HvUpState',\n", " 'HvDownState',\n", " 'CellMaxVol',\n", " 'CellMinVol',\n", " 'CellMaxTemp',\n", " 'CellMinTemp',\n", " 'Tripmeter',\n", " 'VehAccX',\n", " 'VehAccY',\n", " 'StringAngle',\n", " 'PTCSta',\n", " 'PTCPwr',\n", " 'PosRlySta',\n", " 'NegRlySta',\n", " 'OfcPreRlySta',\n", " 'OfcRlySta',\n", " 'OfcPosChrgerTemp',\n", " 'OfcNegChrgerTemp',\n", " 'OncPosChrgerTemp',\n", " 'OncNegChrgerTemp',\n", " 'InstPwrCmsmtion',\n", " 'AvgPwrCmsmtion',\n", " 'SubPwrCmsmtion',\n", " 'BmsCellVoltDiff',\n", " 'DriveMode',\n", " 'BmsBatTempDiff',\n", " 'BmsClmThermalSts',\n", " 'BmsBatteryTotalCapacity',\n", " 'BmsBatteryAvaiCapacity',\n", " 'BmsOncChgCpConn',\n", " 'CellVoltRecTime',\n", " 'CellVoltTotalCount',\n", " 'CellVoltFrameNo',\n", " 'CellVoltFrameCount',\n", " 'CellTempTotalCount',\n", " 'SN',\n", " 'CellBatteryType',\n", " 'OncAccumTime',\n", " 'PackModelCode',\n", " 'PackCapacity',\n", " 'PackSeriesParallerl',\n", " 'ModuleModelCode',\n", " 'ModuleManufactory',\n", " 'CellDictShape',\n", " 'CellManufactory']" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "columns" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "ac7555030399793291671316948dfc1cf0035da70681b2458d9e2645bf33bfe4" }, "kernelspec": { "display_name": "Python 3.8.12 ('algo_dev_env')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }