{ "cells": [ { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import oss2\n", "from itertools import islice\n", "import pandas as pd\n", "columns = [\"id\",\"VIN\",\"VehModel\",\"VehState\",\"Time\",\"CellVoltage\",\"PackVoltage\",'CellMaxVolNum',\"CellMinVolNum\",'CellTemp','CellMaxTempNum','CellMinTempNum','PackCrnt','VehRmnRng','VehOdo','FltCode',\n", "'FltLvl','VehSpd','PedalAngle','PackSoc','MotorPwr','EnmTemp','InsulationRss','ChrgSta','GPS_lon','GPS_lat','GPS_alt',\n", " 'PowerAlreadyTime','BMSSta','ACSta','OncCmState','HVILSta','OfcAccumTime','OncAccumTime','BmsOncChgCp','BmsOfcPileOutputVol','BmsOfcPileOutputCur','BmsOfcCc','BmsOfcAllowed',\n", " 'OncOutCurrent','OncOutVoltage','OfcStationMaxOutVolt','OfcStationMinOutVolt','OfcStationMaxOutCrnt','OfcStationMinOutCrnt','BmsOfcgVoltageRequest','BmsOfcType','BmsOfcCurrentRequest',\n", " 'BmsOncChgRequest','BmsRequestOutputCurrent','BmsRequestOutputVoltage','BmsOncCc','BmsS2State','HvUpState','HvDownState','CellMaxVol','CellMinVol','CellMaxTemp','CellMinTemp','Tripmeter',\n", " 'VehAccX','VehAccY','StringAngle','PTCSta','PTCPwr','PosRlySta','NegRlySta','OfcPreRlySta','OfcRlySta','OfcPosChrgerTemp','OfcNegChrgerTemp','OncPosChrgerTemp','OncNegChrgerTemp',\n", " 'InstPwrCmsmtion','AvgPwrCmsmtion','SubPwrCmsmtion','BmsCellVoltDiff','DriveMode','BmsBatTempDiff','BmsClmThermalSts','BmsBatteryTotalCapacity','BmsBatteryAvaiCapacity',\n", " 'BmsOncChgCpConn','CellVoltRecTime','CellVoltTotalCount','CellVoltFrameNo','CellVoltFrameCount','CellTempTotalCount','SN','BatteryModel']\n", "auth = oss2.Auth('LTAI4FyoDnXMjqd78rdRDFHN', 'sdFl6vjM9l2rvWiUTzuFQb2xqjUoY5')\n", "bucket = oss2.Bucket(auth, \"https://oss-cn-hangzhou.aliyuncs.com\", 'ff-hezhong')\n", "df_data = pd.DataFrame(columns = columns) \n", "for b in oss2.ObjectIterator(bucket,'alg/LUZAGAAA1MA038075/202202251410/'):\n", " bucket.get_object_to_file(b.key, 'test.txt')\n", " df = pd.read_csv('test.txt', names=columns, sep=' ')\n", " df_data = pd.concat([df_data,df])\n", "df_data = df_data.reset_index(drop=True)\n" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import ssl\n", "from kafka import KafkaProducer, KafkaConsumer\n", "from kafka.errors import kafka_errors\n", "import traceback\n", "import json\n", "import pdb\n", "import os\n", "import pandas as pd\n", "context = ssl.create_default_context()\n", "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n", "## The new python(2.7.8+) may cannot ignore the hostname check,\n", "## you could set to ssl.CERT_NONE to walk around the problem,\n", "## or you can change the client to confluent-python-demo \n", "\n", "# context.verify_mode = ssl.CERT_NONE\n", "context.verify_mode = ssl.CERT_REQUIRED\n", "topic = 'toAlg'\n", "# context.check_hostname = True\n", "context.load_verify_locations(\"ca-cert\")\n", "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n", " sasl_mechanism=\"PLAIN\",\n", " ssl_context=context,\n", " security_protocol='SASL_SSL',\n", " api_version = (2,2),\n", " sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n", " sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n", " consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n", " client_id='consumer-python3',\n", " auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n", " enable_auto_commit=True, # 自动提交消费者的offset\n", " auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n", " group_id='test'\n", "\n", ")\n", "i = 1\n", "for message in consumer:\n", " print(message.value)\n", " \n", "\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "for i in range(10):\n", " df = df_data.copy()\n", " df_data = pd.concat([df_data,df])" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "1419264" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(df_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca" }, "kernelspec": { "display_name": "Python 3.6.2 ('ana_py36')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }