{ "cells": [ { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " 27存储成功" ] } ], "source": [ "import ssl\n", "from kafka import KafkaProducer, KafkaConsumer\n", "from kafka.errors import kafka_errors\n", "import traceback\n", "import json\n", "import pdb\n", "import os\n", "import pandas as pd\n", "context = ssl.create_default_context()\n", "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n", "## The new python(2.7.8+) may cannot ignore the hostname check,\n", "## you could set to ssl.CERT_NONE to walk around the problem,\n", "## or you can change the client to confluent-python-demo \n", "\n", "# context.verify_mode = ssl.CERT_NONE\n", "context.verify_mode = ssl.CERT_REQUIRED\n", "topic = 'sys_battery_module_model_topic_temp'\n", "# context.check_hostname = True\n", "context.load_verify_locations(\"ca-cert\")\n", "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n", " sasl_mechanism=\"PLAIN\",\n", " ssl_context=context,\n", " security_protocol='SASL_SSL',\n", " api_version = (2,2),\n", " sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n", " sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n", " consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n", " client_id='consumer-python3',\n", " auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n", " enable_auto_commit=True, # 自动提交消费者的offset\n", " auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n", "\n", ")\n", "i = 0\n", "\n", "for message in consumer:\n", " # df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])\n", " if topic+'.csv' not in os.listdir():\n", " pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', index=False)\n", " else:\n", " pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', mode='a', index=False, header=None)\n", " i = i + 1\n", " print(\"\\r\",str(i)+\"存储成功\",end=\"\",flush=True)\n", " " ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
codechild_codecreate_timeidmodel_type
02101TABJ92021-01-07 10:19:2801d0d517da13492f9255cc34ef5f8d1cM
\n", "
" ], "text/plain": [ " code child_code create_time id \\\n", "0 2101TAB J9 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n", "\n", " model_type \n", "0 M " ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = df.append(json.loads(str(message.value,'utf-8')),ignore_index=True)\n", "df" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
codechild_codecreate_timeidmodel_type
0J92101TAB2021-01-07 10:19:2801d0d517da13492f9255cc34ef5f8d1cM
\n", "
" ], "text/plain": [ " code child_code create_time id \\\n", "0 J9 2101TAB 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n", "\n", " model_type \n", "0 M " ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca" }, "kernelspec": { "display_name": "Python 3.6.2 ('ana_py36')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }