{ "cells": [ { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import ssl\n", "from kafka import KafkaProducer, KafkaConsumer\n", "from kafka.errors import kafka_errors\n", "import traceback\n", "import json\n", "import pdb\n", "import os\n", "import pandas as pd\n", "context = ssl.create_default_context()\n", "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n", "## The new python(2.7.8+) may cannot ignore the hostname check,\n", "## you could set to ssl.CERT_NONE to walk around the problem,\n", "## or you can change the client to confluent-python-demo \n", "\n", "# context.verify_mode = ssl.CERT_NONE\n", "context.verify_mode = ssl.CERT_REQUIRED\n", "topic = 's_kafka_tbox_analysis_hh_delta_ext_topic'\n", "# context.check_hostname = True\n", "context.load_verify_locations(\"ca-cert\")\n", "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n", " 'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n", " sasl_mechanism=\"PLAIN\",\n", " ssl_context=context,\n", " security_protocol='SASL_SSL',\n", " api_version = (2,2),\n", " sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n", " sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n", " consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n", " client_id='consumer-python3',\n", " auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n", " enable_auto_commit=True, # 自动提交消费者的offset\n", " auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n", "\n", ")\n", "\n", "i = 1\n", "with open(\"result.txt\",'a') as file_handle:\n", " for message in consumer:\n", " file_handle.write(str(message.value,'utf-8'))\n", " break\n", "\n", " # if topic+'.csv' not in os.listdir():\n", " # pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', index=False)\n", " # else:\n", " # pd.DataFrame(json.loads(str(message.value,'utf-8')),index=[0]).to_csv(topic+'.csv', mode='a', index=False, header=None)\n", " # i = i + 1\n", " # print(\"\\r\",str(i)+\"存储成功\",end=\"\",flush=True)\n", " " ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'{\"a_alarmmaxalarmlevel\":\"0\",\"electstoraempdataentitylist\":\"[2*-2*1641027716000*1*24*15,15,15,15,15,15,14,15,15,15,14,14,14,14,14,14,15,14,15,15,15,15,15,15]\",\"electstoragevoltdataentitylist\":\"[2*-2*1641027716000*1*373.0*35.6*96*1*96*3.893,3.895,3.897,3.895,3.889,3.895,3.893,3.895,3.895,3.899,3.899,3.899,3.899,3.899,3.897,3.897,3.893,3.895,3.895,3.891,3.891,3.895,3.895,3.893,3.893,3.895,3.895,3.895,3.895,3.899,3.891,3.891,3.887,3.891,3.891,3.889,3.887,3.891,3.891,3.889,3.887,3.893,3.893,3.891,3.887,3.891,3.889,3.889,3.887,3.891,3.891,3.891,3.887,3.891,3.891,3.889,3.887,3.891,3.891,3.891,3.893,3.895,3.893,3.893,3.889,3.893,3.893,3.893,3.891,3.893,3.893,3.893,3.891,3.893,3.895,3.893,3.889,3.895,3.895,3.893,3.891,3.895,3.895,3.895,3.899,3.901,3.901,3.899,3.899,3.899,3.897,3.897,3.893,3.897,3.897,3.895]\",\"ep_faultcodes\":\"\",\"ex_maxbattempassemblyno\":\"1\",\"ex_maxbattempprobeinassemblycode\":\"1\",\"ex_maxvoltbatmonomercode\":\"79\",\"ex_minbattempprobeinassemblycode\":\"1\",\"ex_minbattempprobeno\":\"18\",\"ex_minvoltbatmonomercode\":\"29\",\"g_acceleratorpedalstroke\":\"23\",\"g_chargestatus\":\"3\",\"g_mileage\":\"1705.2\",\"g_nsulationresistance\":\"30000\",\"g_soc\":\"72\",\"g_speed\":\"49.8\",\"g_status\":\"1\",\"g_totalcurrent\":\"35.6\",\"g_totalvoltage\":\"373.0\",\"l_altitude\":\"120.6\",\"l_lat\":\"26.005169\",\"l_lng\":\"114.933771\",\"model\":\"EP30\",\"motordataentitylist\":\"[2*-2*1641027716000*1*1*24*4341*19.3*47*379.0*24.3]\",\"p_bmsbatterytotalcapacity\":\"153.0\",\"p_bmschargeccsig\":\"0\",\"p_bmschargecpsig\":\"0\",\"p_bmschargingallowed\":null,\"p_bmschargingstationmaxoutpa\":null,\"p_bmschargingstationmaxoutpv\":null,\"p_bmschargingstationminoutpa\":null,\"p_bmschargingstationminoutpv\":null,\"p_bmsclmthermalsts\":\"1\",\"p_bmsdcchargecontsts\":\"0\",\"p_bmsdiffbatttemp\":\"1\",\"p_bmsdiffcellvoltage\":\"9\",\"p_bmsfastchargecc\":null,\"p_bmsfastcharotaltime\":null,\"p_bmsfccurrentrequest\":\"0\",\"p_bmsfctype\":null,\"p_bmsfcvoltagerequest\":\"0\",\"p_bmshdsahactivecpsum\":\"152.75\",\"p_bmshighestbatttemp\":\"15\",\"p_bmshighestcellvoltage\":\"3898\",\"p_bmslowestbatttemp\":\"14\",\"p_bmslowestcellvoltage\":\"3889\",\"p_bmsnegbattcontsts\":\"1\",\"p_bmsnegfastchgporttem\":\"13\",\"p_bmsnegslowchgporttem\":\"-32\",\"p_bmsposbattcontsts\":\"1\",\"p_bmsposfastchgporttem\":\"13\",\"p_bmsposslowchgporttem\":\"13\",\"p_bmsprechargecontsts\":\"0\",\"p_bmsquichgcc\":\"0\",\"p_bmsquichgpileoutputcur\":null,\"p_bmsquichgpileoutputvol\":null,\"p_bmsrequestoutputcurrent\":null,\"p_bmsrequestoutputmode\":\"0\",\"p_bmsrequestoutputvoltage\":null,\"p_bmss2switch\":null,\"p_bmsslowchargecc\":null,\"p_bmsslowcharotaltim\":null,\"p_bmsslowchgcp\":null,\"p_bmsworkingmode\":null,\"p_clmacsts\":\"0\",\"p_obcchargecurrent\":\"372.85\",\"p_obcchargevoltage\":\"372.85\",\"p_obccmstate\":\"7\",\"p_obcs2sts\":\"0\",\"p_ptcpowerconsumption\":\"3.6\",\"p_ptcwrkngsts\":\"3\",\"p_vcuhvilmajorloop\":null,\"p_vcupmmpwrdwnst\":\"0\",\"p_vcupmmpwrupst\":\"5\",\"time_value\":\"2022-01-01 17:01:55\",\"v_airouttemp\":\"12.0\",\"v_airstatus\":\"1\",\"v_avgpowerconsumption\":\"17.9\",\"v_drivemode\":\"3\",\"v_instpowerconsumption\":\"12.5\",\"v_poweralreadytime\":\"2555\",\"v_powerchargestatus\":\"0\",\"v_powerresiduemileage\":\"299.0\",\"v_subpowerconsumption\":\"19.1\",\"v_tripmeter\":\"20.0\",\"v_wheelangle\":\"17.6\",\"v_xaxisacceleration\":\"1.02\",\"v_yaxisacceleration\":\"0.26\",\"vin\":\"LUZBGAFBXMA042955\"}'" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "str(message.value,'utf-8')" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "ConsumerRecord(topic='sys_battery_pack_entity_storage_topic', partition=0, offset=1, timestamp=1645586299071, timestamp_type=0, key=None, value=b'{\"code\":\"0CZPE007DN0111B2P0000010\",\"create_time\":\"2021-03-31 16:05:38\",\"id\":\"000038a9f8c745a19da116d82085e54c\",\"model_id\":\"2101TAB\",\"vin\":\"LUZAGAAA4MA008178\"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=158, serialized_header_size=-1)" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "message" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "interpreter": { "hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca" }, "kernelspec": { "display_name": "Python 3.6.2 ('ana_py36')", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.2" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }