|
@@ -0,0 +1,234 @@
|
|
|
+{
|
|
|
+ "cells": [
|
|
|
+ {
|
|
|
+ "cell_type": "code",
|
|
|
+ "execution_count": 2,
|
|
|
+ "metadata": {},
|
|
|
+ "outputs": [
|
|
|
+ {
|
|
|
+ "name": "stdout",
|
|
|
+ "output_type": "stream",
|
|
|
+ "text": [
|
|
|
+ " 27存储成功"
|
|
|
+ ]
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "source": [
|
|
|
+ "import ssl\n",
|
|
|
+ "from kafka import KafkaProducer, KafkaConsumer\n",
|
|
|
+ "from kafka.errors import kafka_errors\n",
|
|
|
+ "import traceback\n",
|
|
|
+ "import json\n",
|
|
|
+ "import pdb\n",
|
|
|
+ "import os\n",
|
|
|
+ "import pandas as pd\n",
|
|
|
+ "context = ssl.create_default_context()\n",
|
|
|
+ "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n",
|
|
|
+ "## The new python(2.7.8+) may cannot ignore the hostname check,\n",
|
|
|
+ "## you could set to ssl.CERT_NONE to walk around the problem,\n",
|
|
|
+ "## or you can change the client to confluent-python-demo \n",
|
|
|
+ "\n",
|
|
|
+ "# context.verify_mode = ssl.CERT_NONE\n",
|
|
|
+ "context.verify_mode = ssl.CERT_REQUIRED\n",
|
|
|
+ "topic = 'sys_battery_module_model_topic_temp'\n",
|
|
|
+ "# context.check_hostname = True\n",
|
|
|
+ "context.load_verify_locations(\"ca-cert\")\n",
|
|
|
+ "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n",
|
|
|
+ " 'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n",
|
|
|
+ " 'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n",
|
|
|
+ " sasl_mechanism=\"PLAIN\",\n",
|
|
|
+ " ssl_context=context,\n",
|
|
|
+ " security_protocol='SASL_SSL',\n",
|
|
|
+ " api_version = (2,2),\n",
|
|
|
+ " sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n",
|
|
|
+ " sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n",
|
|
|
+ " consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n",
|
|
|
+ " client_id='consumer-python3',\n",
|
|
|
+ " auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n",
|
|
|
+ " enable_auto_commit=True, # 自动提交消费者的offset\n",
|
|
|
+ " auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n",
|
|
|
+ "\n",
|
|
|
+ ")\n",
|
|
|
+ "i = 0\n",
|
|
|
+ "\n",
|
|
|
+ "for message in consumer:\n",
|
|
|
+ " # df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])\n",
|
|
|
+ " if topic+'.csv' not in os.listdir():\n",
|
|
|
+ " pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', index=False)\n",
|
|
|
+ " else:\n",
|
|
|
+ " pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', mode='a', index=False, header=None)\n",
|
|
|
+ " i = i + 1\n",
|
|
|
+ " print(\"\\r\",str(i)+\"存储成功\",end=\"\",flush=True)\n",
|
|
|
+ " "
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "cell_type": "code",
|
|
|
+ "execution_count": 15,
|
|
|
+ "metadata": {},
|
|
|
+ "outputs": [
|
|
|
+ {
|
|
|
+ "data": {
|
|
|
+ "text/html": [
|
|
|
+ "<div>\n",
|
|
|
+ "<style scoped>\n",
|
|
|
+ " .dataframe tbody tr th:only-of-type {\n",
|
|
|
+ " vertical-align: middle;\n",
|
|
|
+ " }\n",
|
|
|
+ "\n",
|
|
|
+ " .dataframe tbody tr th {\n",
|
|
|
+ " vertical-align: top;\n",
|
|
|
+ " }\n",
|
|
|
+ "\n",
|
|
|
+ " .dataframe thead th {\n",
|
|
|
+ " text-align: right;\n",
|
|
|
+ " }\n",
|
|
|
+ "</style>\n",
|
|
|
+ "<table border=\"1\" class=\"dataframe\">\n",
|
|
|
+ " <thead>\n",
|
|
|
+ " <tr style=\"text-align: right;\">\n",
|
|
|
+ " <th></th>\n",
|
|
|
+ " <th>code</th>\n",
|
|
|
+ " <th>child_code</th>\n",
|
|
|
+ " <th>create_time</th>\n",
|
|
|
+ " <th>id</th>\n",
|
|
|
+ " <th>model_type</th>\n",
|
|
|
+ " </tr>\n",
|
|
|
+ " </thead>\n",
|
|
|
+ " <tbody>\n",
|
|
|
+ " <tr>\n",
|
|
|
+ " <th>0</th>\n",
|
|
|
+ " <td>2101TAB</td>\n",
|
|
|
+ " <td>J9</td>\n",
|
|
|
+ " <td>2021-01-07 10:19:28</td>\n",
|
|
|
+ " <td>01d0d517da13492f9255cc34ef5f8d1c</td>\n",
|
|
|
+ " <td>M</td>\n",
|
|
|
+ " </tr>\n",
|
|
|
+ " </tbody>\n",
|
|
|
+ "</table>\n",
|
|
|
+ "</div>"
|
|
|
+ ],
|
|
|
+ "text/plain": [
|
|
|
+ " code child_code create_time id \\\n",
|
|
|
+ "0 2101TAB J9 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n",
|
|
|
+ "\n",
|
|
|
+ " model_type \n",
|
|
|
+ "0 M "
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ "execution_count": 15,
|
|
|
+ "metadata": {},
|
|
|
+ "output_type": "execute_result"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "source": [
|
|
|
+ "df = df.append(json.loads(str(message.value,'utf-8')),ignore_index=True)\n",
|
|
|
+ "df"
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "cell_type": "code",
|
|
|
+ "execution_count": 12,
|
|
|
+ "metadata": {},
|
|
|
+ "outputs": [],
|
|
|
+ "source": [
|
|
|
+ "df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])"
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "cell_type": "code",
|
|
|
+ "execution_count": 11,
|
|
|
+ "metadata": {},
|
|
|
+ "outputs": [
|
|
|
+ {
|
|
|
+ "data": {
|
|
|
+ "text/html": [
|
|
|
+ "<div>\n",
|
|
|
+ "<style scoped>\n",
|
|
|
+ " .dataframe tbody tr th:only-of-type {\n",
|
|
|
+ " vertical-align: middle;\n",
|
|
|
+ " }\n",
|
|
|
+ "\n",
|
|
|
+ " .dataframe tbody tr th {\n",
|
|
|
+ " vertical-align: top;\n",
|
|
|
+ " }\n",
|
|
|
+ "\n",
|
|
|
+ " .dataframe thead th {\n",
|
|
|
+ " text-align: right;\n",
|
|
|
+ " }\n",
|
|
|
+ "</style>\n",
|
|
|
+ "<table border=\"1\" class=\"dataframe\">\n",
|
|
|
+ " <thead>\n",
|
|
|
+ " <tr style=\"text-align: right;\">\n",
|
|
|
+ " <th></th>\n",
|
|
|
+ " <th>code</th>\n",
|
|
|
+ " <th>child_code</th>\n",
|
|
|
+ " <th>create_time</th>\n",
|
|
|
+ " <th>id</th>\n",
|
|
|
+ " <th>model_type</th>\n",
|
|
|
+ " </tr>\n",
|
|
|
+ " </thead>\n",
|
|
|
+ " <tbody>\n",
|
|
|
+ " <tr>\n",
|
|
|
+ " <th>0</th>\n",
|
|
|
+ " <td>J9</td>\n",
|
|
|
+ " <td>2101TAB</td>\n",
|
|
|
+ " <td>2021-01-07 10:19:28</td>\n",
|
|
|
+ " <td>01d0d517da13492f9255cc34ef5f8d1c</td>\n",
|
|
|
+ " <td>M</td>\n",
|
|
|
+ " </tr>\n",
|
|
|
+ " </tbody>\n",
|
|
|
+ "</table>\n",
|
|
|
+ "</div>"
|
|
|
+ ],
|
|
|
+ "text/plain": [
|
|
|
+ " code child_code create_time id \\\n",
|
|
|
+ "0 J9 2101TAB 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n",
|
|
|
+ "\n",
|
|
|
+ " model_type \n",
|
|
|
+ "0 M "
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ "execution_count": 11,
|
|
|
+ "metadata": {},
|
|
|
+ "output_type": "execute_result"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "source": [
|
|
|
+ "df"
|
|
|
+ ]
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "cell_type": "code",
|
|
|
+ "execution_count": null,
|
|
|
+ "metadata": {},
|
|
|
+ "outputs": [],
|
|
|
+ "source": []
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "metadata": {
|
|
|
+ "interpreter": {
|
|
|
+ "hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca"
|
|
|
+ },
|
|
|
+ "kernelspec": {
|
|
|
+ "display_name": "Python 3.6.2 ('ana_py36')",
|
|
|
+ "language": "python",
|
|
|
+ "name": "python3"
|
|
|
+ },
|
|
|
+ "language_info": {
|
|
|
+ "codemirror_mode": {
|
|
|
+ "name": "ipython",
|
|
|
+ "version": 3
|
|
|
+ },
|
|
|
+ "file_extension": ".py",
|
|
|
+ "mimetype": "text/x-python",
|
|
|
+ "name": "python",
|
|
|
+ "nbconvert_exporter": "python",
|
|
|
+ "pygments_lexer": "ipython3",
|
|
|
+ "version": "3.6.2"
|
|
|
+ },
|
|
|
+ "orig_nbformat": 4
|
|
|
+ },
|
|
|
+ "nbformat": 4,
|
|
|
+ "nbformat_minor": 2
|
|
|
+}
|