lmstack 3 年之前
父节点
当前提交
376d2c46bd
共有 2 个文件被更改,包括 233 次插入71 次删除
  1. 143 0
      LIB/OTHER/test copy.ipynb
  2. 90 71
      LIB/OTHER/test.ipynb

+ 143 - 0
LIB/OTHER/test copy.ipynb

@@ -0,0 +1,143 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import oss2\n",
+    "from itertools import islice\n",
+    "import pandas as pd\n",
+    "columns = [\"id\",\"VIN\",\"VehModel\",\"VehState\",\"Time\",\"CellVoltage\",\"PackVoltage\",'CellMaxVolNum',\"CellMinVolNum\",'CellTemp','CellMaxTempNum','CellMinTempNum','PackCrnt','VehRmnRng','VehOdo','FltCode',\n",
+    "'FltLvl','VehSpd','PedalAngle','PackSoc','MotorPwr','EnmTemp','InsulationRss','ChrgSta','GPS_lon','GPS_lat','GPS_alt',\n",
+    "  'PowerAlreadyTime','BMSSta','ACSta','OncCmState','HVILSta','OfcAccumTime','OncAccumTime','BmsOncChgCp','BmsOfcPileOutputVol','BmsOfcPileOutputCur','BmsOfcCc','BmsOfcAllowed',\n",
+    "  'OncOutCurrent','OncOutVoltage','OfcStationMaxOutVolt','OfcStationMinOutVolt','OfcStationMaxOutCrnt','OfcStationMinOutCrnt','BmsOfcgVoltageRequest','BmsOfcType','BmsOfcCurrentRequest',\n",
+    "  'BmsOncChgRequest','BmsRequestOutputCurrent','BmsRequestOutputVoltage','BmsOncCc','BmsS2State','HvUpState','HvDownState','CellMaxVol','CellMinVol','CellMaxTemp','CellMinTemp','Tripmeter',\n",
+    "  'VehAccX','VehAccY','StringAngle','PTCSta','PTCPwr','PosRlySta','NegRlySta','OfcPreRlySta','OfcRlySta','OfcPosChrgerTemp','OfcNegChrgerTemp','OncPosChrgerTemp','OncNegChrgerTemp',\n",
+    "  'InstPwrCmsmtion','AvgPwrCmsmtion','SubPwrCmsmtion','BmsCellVoltDiff','DriveMode','BmsBatTempDiff','BmsClmThermalSts','BmsBatteryTotalCapacity','BmsBatteryAvaiCapacity',\n",
+    "  'BmsOncChgCpConn','CellVoltRecTime','CellVoltTotalCount','CellVoltFrameNo','CellVoltFrameCount','CellTempTotalCount','SN','BatteryModel']\n",
+    "auth = oss2.Auth('LTAI4FyoDnXMjqd78rdRDFHN', 'sdFl6vjM9l2rvWiUTzuFQb2xqjUoY5')\n",
+    "bucket = oss2.Bucket(auth, \"https://oss-cn-hangzhou.aliyuncs.com\", 'ff-hezhong')\n",
+    "df_data = pd.DataFrame(columns = columns)       \n",
+    "for b in oss2.ObjectIterator(bucket,'alg/LUZAGAAA1MA038075/202202251410/'):\n",
+    "    bucket.get_object_to_file(b.key, 'test.txt')\n",
+    "    df = pd.read_csv('test.txt', names=columns, sep=' ')\n",
+    "    df_data = pd.concat([df_data,df])\n",
+    "df_data = df_data.reset_index(drop=True)\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import ssl\n",
+    "from kafka import KafkaProducer, KafkaConsumer\n",
+    "from kafka.errors import kafka_errors\n",
+    "import traceback\n",
+    "import json\n",
+    "import pdb\n",
+    "import os\n",
+    "import pandas as pd\n",
+    "context = ssl.create_default_context()\n",
+    "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n",
+    "## The new python(2.7.8+) may cannot ignore the hostname check,\n",
+    "## you could set to ssl.CERT_NONE to walk around the problem,\n",
+    "## or you can change the client to confluent-python-demo \n",
+    "\n",
+    "# context.verify_mode = ssl.CERT_NONE\n",
+    "context.verify_mode = ssl.CERT_REQUIRED\n",
+    "topic = 'toAlg'\n",
+    "# context.check_hostname = True\n",
+    "context.load_verify_locations(\"ca-cert\")\n",
+    "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n",
+    "                                            'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n",
+    "                                            'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n",
+    "                         sasl_mechanism=\"PLAIN\",\n",
+    "                         ssl_context=context,\n",
+    "                         security_protocol='SASL_SSL',\n",
+    "                         api_version = (2,2),\n",
+    "                         sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n",
+    "                         sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n",
+    "                         consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n",
+    "                         client_id='consumer-python3',\n",
+    "                         auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n",
+    "                         enable_auto_commit=True, # 自动提交消费者的offset\n",
+    "                         auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n",
+    "                         group_id='test'\n",
+    "\n",
+    ")\n",
+    "i = 1\n",
+    "for message in consumer:\n",
+    "      print(message.value)\n",
+    "    \n",
+    "\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "for i in range(10):\n",
+    "    df = df_data.copy()\n",
+    "    df_data = pd.concat([df_data,df])"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/plain": [
+       "1419264"
+      ]
+     },
+     "execution_count": 10,
+     "metadata": {},
+     "output_type": "execute_result"
+    }
+   ],
+   "source": [
+    "len(df_data)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  }
+ ],
+ "metadata": {
+  "interpreter": {
+   "hash": "5ac93a79b26608c768d42fbd754dd4f69161017bfc4d38cb4b18d60e5198dbca"
+  },
+  "kernelspec": {
+   "display_name": "Python 3.6.2 ('ana_py36')",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.6.2"
+  },
+  "orig_nbformat": 4
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}

+ 90 - 71
LIB/OTHER/test.ipynb

@@ -2,7 +2,7 @@
  "cells": [
   {
    "cell_type": "code",
-   "execution_count": 1,
+   "execution_count": 7,
    "metadata": {},
    "outputs": [],
    "source": [
@@ -20,103 +20,122 @@
     "auth = oss2.Auth('LTAI4FyoDnXMjqd78rdRDFHN', 'sdFl6vjM9l2rvWiUTzuFQb2xqjUoY5')\n",
     "bucket = oss2.Bucket(auth, \"https://oss-cn-hangzhou.aliyuncs.com\", 'ff-hezhong')\n",
     "df_data = pd.DataFrame(columns = columns)       \n",
-    "for b in islice(oss2.ObjectIterator(bucket,'alg/LUZAGAAA1MA038075/202202251410/'), 0):\n",
+    "for b in oss2.ObjectIterator(bucket,'alg/LUZAGAAA1MA038075/202202251410/'):\n",
     "    bucket.get_object_to_file(b.key, 'test.txt')\n",
     "    df = pd.read_csv('test.txt', names=columns, sep=' ')\n",
-    "    df_data = pd.concat([df_data,df])"
+    "    df_data = pd.concat([df_data,df])\n",
+    "df_data = df_data.reset_index(drop=True)\n"
    ]
   },
   {
    "cell_type": "code",
-   "execution_count": 3,
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import ssl\n",
+    "from kafka import KafkaProducer, KafkaConsumer\n",
+    "from kafka.errors import kafka_errors\n",
+    "import traceback\n",
+    "import json\n",
+    "import pdb\n",
+    "import os\n",
+    "import pandas as pd\n",
+    "import oss2\n",
+    "from itertools import islice\n",
+    "import pandas as pd\n",
+    "context = ssl.create_default_context()\n",
+    "context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)\n",
+    "## The new python(2.7.8+) may cannot ignore the hostname check,\n",
+    "## you could set to ssl.CERT_NONE to walk around the problem,\n",
+    "## or you can change the client to confluent-python-demo \n",
+    "\n",
+    "# context.verify_mode = ssl.CERT_NONE\n",
+    "context.verify_mode = ssl.CERT_REQUIRED\n",
+    "topic = 'toAlg'\n",
+    "# context.check_hostname = True\n",
+    "context.load_verify_locations(\"ca-cert\")\n",
+    "consumer = KafkaConsumer(topic, bootstrap_servers=['alikafka-pre-cn-8ed2kw57901x-1.alikafka.aliyuncs.com:9093',\n",
+    "                                            'alikafka-pre-cn-8ed2kw57901x-2.alikafka.aliyuncs.com:9093',\n",
+    "                                            'alikafka-pre-cn-8ed2kw57901x-3.alikafka.aliyuncs.com:9093'],\n",
+    "                         sasl_mechanism=\"PLAIN\",\n",
+    "                         ssl_context=context,\n",
+    "                         security_protocol='SASL_SSL',\n",
+    "                         api_version = (2,2),\n",
+    "                         sasl_plain_username='alikafka_pre-cn-8ed2kw57901x',\n",
+    "                         sasl_plain_password='hlQsApgUUdUxcWEr1uQoM9BeuF8t8vMF',\n",
+    "                         consumer_timeout_ms= 100000000, # 如果10秒内kafka中没有可供消费的数据,自动退出\n",
+    "                         client_id='consumer-python3',\n",
+    "                         auto_offset_reset='earliest',# 消费kafka中最近的数据,如果设置为earliest则消费最早的数据,不管这些数据是否消费\n",
+    "                         enable_auto_commit=True, # 自动提交消费者的offset\n",
+    "                         auto_commit_interval_ms=3000, ## 自动提交消费者offset的时间间隔,\n",
+    "\n",
+    ")\n",
+    "\n",
+    "columns = [\"id\",\"VIN\",\"VehModel\",\"VehState\",\"Time\",\"CellVoltage\",\"PackVoltage\",'CellMaxVolNum',\"CellMinVolNum\",'CellTemp','CellMaxTempNum','CellMinTempNum','PackCrnt','VehRmnRng','VehOdo','FltCode',\n",
+    "'FltLvl','VehSpd','PedalAngle','PackSoc','MotorPwr','EnmTemp','InsulationRss','ChrgSta','GPS_lon','GPS_lat','GPS_alt',\n",
+    "  'PowerAlreadyTime','BMSSta','ACSta','OncCmState','HVILSta','OfcAccumTime','OncAccumTime','BmsOncChgCp','BmsOfcPileOutputVol','BmsOfcPileOutputCur','BmsOfcCc','BmsOfcAllowed',\n",
+    "  'OncOutCurrent','OncOutVoltage','OfcStationMaxOutVolt','OfcStationMinOutVolt','OfcStationMaxOutCrnt','OfcStationMinOutCrnt','BmsOfcgVoltageRequest','BmsOfcType','BmsOfcCurrentRequest',\n",
+    "  'BmsOncChgRequest','BmsRequestOutputCurrent','BmsRequestOutputVoltage','BmsOncCc','BmsS2State','HvUpState','HvDownState','CellMaxVol','CellMinVol','CellMaxTemp','CellMinTemp','Tripmeter',\n",
+    "  'VehAccX','VehAccY','StringAngle','PTCSta','PTCPwr','PosRlySta','NegRlySta','OfcPreRlySta','OfcRlySta','OfcPosChrgerTemp','OfcNegChrgerTemp','OncPosChrgerTemp','OncNegChrgerTemp',\n",
+    "  'InstPwrCmsmtion','AvgPwrCmsmtion','SubPwrCmsmtion','BmsCellVoltDiff','DriveMode','BmsBatTempDiff','BmsClmThermalSts','BmsBatteryTotalCapacity','BmsBatteryAvaiCapacity',\n",
+    "  'BmsOncChgCpConn','CellVoltRecTime','CellVoltTotalCount','CellVoltFrameNo','CellVoltFrameCount','CellTempTotalCount','SN','BatteryModel']\n",
+    "auth = oss2.Auth('LTAI4FyoDnXMjqd78rdRDFHN', 'sdFl6vjM9l2rvWiUTzuFQb2xqjUoY5')\n",
+    "bucket = oss2.Bucket(auth, \"https://oss-cn-hangzhou.aliyuncs.com\", 'ff-hezhong')\n",
+    "   \n",
+    "\n",
+    "for message in consumer:\n",
+    "      print(message.value)\n",
+    "      # df_data = pd.DataFrame(columns = columns)    \n",
+    "      # for b in oss2.ObjectIterator(bucket,'alg/'+str(message.value,'utf-8').replace(\"_\",\"/\")):\n",
+    "      #       bucket.get_object_to_file(b.key, 'test.txt')\n",
+    "      #       df = pd.read_csv('test.txt', names=columns, sep=' ')\n",
+    "      #       df_data = pd.concat([df_data,df])\n",
+    "      # df_data = df_data.reset_index(drop=True)\n",
+    "      # print(str(message.value) + str(len(df_data)))\n",
+    "      \n",
+    "    \n",
+    "\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 8,
    "metadata": {},
    "outputs": [
     {
-     "name": "stdout",
-     "output_type": "stream",
-     "text": [
-      "alg/LUZAGAAA1MA038075/202202251410/56311b72-3254-4dff-b6a8-33b2b2ab31c7.txt\n",
-      "alg/LUZAGAAA1MA038075/202202251410/d5173cf1-87e2-4117-a200-b7c8d118fa18.txt\n",
-      "alg/LUZAGAAA1MA038075/202202251410/eeb136bb-5170-4944-9008-a0dd3624ea8e.txt\n"
-     ]
+     "data": {
+      "text/plain": [
+       "'LUZAGAAA9MA036462/202202251746'"
+      ]
+     },
+     "execution_count": 8,
+     "metadata": {},
+     "output_type": "execute_result"
     }
    ],
    "source": [
-    "for b in oss2.ObjectIterator(bucket,'alg/LUZAGAAA1MA038075/202202251410/'):\n",
-    "      print(b.key)\n",
-    "\n"
+    "str(message.value,'utf-8').replace(\"_\",\"/\")"
    ]
   },
   {
    "cell_type": "code",
-   "execution_count": 4,
+   "execution_count": 10,
    "metadata": {},
    "outputs": [
     {
      "data": {
-      "text/html": [
-       "<div>\n",
-       "<style scoped>\n",
-       "    .dataframe tbody tr th:only-of-type {\n",
-       "        vertical-align: middle;\n",
-       "    }\n",
-       "\n",
-       "    .dataframe tbody tr th {\n",
-       "        vertical-align: top;\n",
-       "    }\n",
-       "\n",
-       "    .dataframe thead th {\n",
-       "        text-align: right;\n",
-       "    }\n",
-       "</style>\n",
-       "<table border=\"1\" class=\"dataframe\">\n",
-       "  <thead>\n",
-       "    <tr style=\"text-align: right;\">\n",
-       "      <th></th>\n",
-       "      <th>id</th>\n",
-       "      <th>VIN</th>\n",
-       "      <th>VehModel</th>\n",
-       "      <th>VehState</th>\n",
-       "      <th>Time</th>\n",
-       "      <th>CellVoltage</th>\n",
-       "      <th>PackVoltage</th>\n",
-       "      <th>CellMaxVolNum</th>\n",
-       "      <th>CellMinVolNum</th>\n",
-       "      <th>CellTemp</th>\n",
-       "      <th>...</th>\n",
-       "      <th>BmsBatteryTotalCapacity</th>\n",
-       "      <th>BmsBatteryAvaiCapacity</th>\n",
-       "      <th>BmsOncChgCpConn</th>\n",
-       "      <th>CellVoltRecTime</th>\n",
-       "      <th>CellVoltTotalCount</th>\n",
-       "      <th>CellVoltFrameNo</th>\n",
-       "      <th>CellVoltFrameCount</th>\n",
-       "      <th>CellTempTotalCount</th>\n",
-       "      <th>SN</th>\n",
-       "      <th>BatteryModel</th>\n",
-       "    </tr>\n",
-       "  </thead>\n",
-       "  <tbody>\n",
-       "  </tbody>\n",
-       "</table>\n",
-       "<p>0 rows × 90 columns</p>\n",
-       "</div>"
-      ],
       "text/plain": [
-       "Empty DataFrame\n",
-       "Columns: [id, VIN, VehModel, VehState, Time, CellVoltage, PackVoltage, CellMaxVolNum, CellMinVolNum, CellTemp, CellMaxTempNum, CellMinTempNum, PackCrnt, VehRmnRng, VehOdo, FltCode, FltLvl, VehSpd, PedalAngle, PackSoc, MotorPwr, EnmTemp, InsulationRss, ChrgSta, GPS_lon, GPS_lat, GPS_alt, PowerAlreadyTime, BMSSta, ACSta, OncCmState, HVILSta, OfcAccumTime, OncAccumTime, BmsOncChgCp, BmsOfcPileOutputVol, BmsOfcPileOutputCur, BmsOfcCc, BmsOfcAllowed, OncOutCurrent, OncOutVoltage, OfcStationMaxOutVolt, OfcStationMinOutVolt, OfcStationMaxOutCrnt, OfcStationMinOutCrnt, BmsOfcgVoltageRequest, BmsOfcType, BmsOfcCurrentRequest, BmsOncChgRequest, BmsRequestOutputCurrent, BmsRequestOutputVoltage, BmsOncCc, BmsS2State, HvUpState, HvDownState, CellMaxVol, CellMinVol, CellMaxTemp, CellMinTemp, Tripmeter, VehAccX, VehAccY, StringAngle, PTCSta, PTCPwr, PosRlySta, NegRlySta, OfcPreRlySta, OfcRlySta, OfcPosChrgerTemp, OfcNegChrgerTemp, OncPosChrgerTemp, OncNegChrgerTemp, InstPwrCmsmtion, AvgPwrCmsmtion, SubPwrCmsmtion, BmsCellVoltDiff, DriveMode, BmsBatTempDiff, BmsClmThermalSts, BmsBatteryTotalCapacity, BmsBatteryAvaiCapacity, BmsOncChgCpConn, CellVoltRecTime, CellVoltTotalCount, CellVoltFrameNo, CellVoltFrameCount, CellTempTotalCount, SN, BatteryModel]\n",
-       "Index: []\n",
-       "\n",
-       "[0 rows x 90 columns]"
+       "1419264"
       ]
      },
-     "execution_count": 4,
+     "execution_count": 10,
      "metadata": {},
      "output_type": "execute_result"
     }
    ],
    "source": [
-    "df_data"
+    "len(df_data)"
    ]
   },
   {
@@ -146,7 +165,7 @@
    "name": "python",
    "nbconvert_exporter": "python",
    "pygments_lexer": "ipython3",
-   "version": "3.8.12"
+   "version": "3.6.2"
   },
   "orig_nbformat": 4
  },