{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" 27存储成功"
]
}
],
"source": [
"import ssl\n",
"from kafka import KafkaProducer, KafkaConsumer\n",
"from kafka.errors import kafka_errors\n",
"import traceback\n",
"import json\n",
"import pdb\n",
"import os\n",
"import pandas as pd\n",
"context = ssl.create_default_context()\n",
"context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n",
"## The new python(2.7.8+) may cannot ignore the hostname check,\n",
"## you could set to ssl.CERT_NONE to walk around the problem,\n",
"## or you can change the client to confluent-python-demo \n",
"\n",
"# context.verify_mode = ssl.CERT_NONE\n",
"context.verify_mode = ssl.CERT_REQUIRED\n",
"topic = 'sys_battery_module_model_topic_temp'\n",
"# context.check_hostname = True\n",
"context.load_verify_locations(\"ca-cert\")\n",
"consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n",
" 'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n",
" 'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n",
" sasl_mechanism=\"PLAIN\",\n",
" ssl_context=context,\n",
" security_protocol='SASL_SSL',\n",
" api_version = (2,2),\n",
" sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n",
" sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n",
" consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n",
" client_id='consumer-python3',\n",
" auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n",
" enable_auto_commit=True, # 自动提交消费者的offset\n",
" auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n",
"\n",
")\n",
"i = 0\n",
"\n",
"for message in consumer:\n",
" # df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])\n",
" if topic+'.csv' not in os.listdir():\n",
" pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', index=False)\n",
" else:\n",
" pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', mode='a', index=False, header=None)\n",
" i = i + 1\n",
" print(\"\\r\",str(i)+\"存储成功\",end=\"\",flush=True)\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" code | \n",
" child_code | \n",
" create_time | \n",
" id | \n",
" model_type | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" 2101TAB | \n",
" J9 | \n",
" 2021-01-07 10:19:28 | \n",
" 01d0d517da13492f9255cc34ef5f8d1c | \n",
" M | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" code child_code create_time id \\\n",
"0 2101TAB J9 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n",
"\n",
" model_type \n",
"0 M "
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df = df.append(json.loads(str(message.value,'utf-8')),ignore_index=True)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"df = pd.DataFrame(columns=['code','child_code','create_time','id','model_type'])"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
"\n",
"
\n",
" \n",
" \n",
" | \n",
" code | \n",
" child_code | \n",
" create_time | \n",
" id | \n",
" model_type | \n",
"
\n",
" \n",
" \n",
" \n",
" 0 | \n",
" J9 | \n",
" 2101TAB | \n",
" 2021-01-07 10:19:28 | \n",
" 01d0d517da13492f9255cc34ef5f8d1c | \n",
" M | \n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" code child_code create_time id \\\n",
"0 J9 2101TAB 2021-01-07 10:19:28 01d0d517da13492f9255cc34ef5f8d1c \n",
"\n",
" model_type \n",
"0 M "
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"interpreter": {
"hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca"
},
"kernelspec": {
"display_name": "Python 3.6.2 ('ana_py36')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.2"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}