123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- '''
- 暂时采用http方式获取历史数据。
- 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。
- '''
- __author__ = 'wlm'
- from re import S
- import time
- import datetime
- import os
- import urllib.request
- import time
- import pandas as pd
- import numpy as np
- import json
- import requests
- import pdb
- # import http.client
- # http.client.HTTPConnection._http_vsn = 10
- # http.client.HTTPConnection._http_vsn_str = 'HTTP/1.1'
- class DBManager():
- def __init__(self, host='', port='', auth='', db='', username='', password=''):
- pass
- def __enter__(self):
- self.connect()
- return self
- def __exit__(self):
- self.close()
- def connect(self):
- conn_success_flag = 0
- while not conn_success_flag:
- try:
- pass # 连接数据库
- except Exception as e:
- conn_success_flag = 0
- time.sleep(5)
- else:
- conn_success_flag = 1
- pass # 连接成功, 获取cursor
- def close(self):
- try:
- pass # 断开数据库
- except Exception as e:
- print(e)
- else:
- print('数据库已断开连接')
- # 以下各个函数实现 通过http方式获取数据
- @staticmethod
- def _get_var_name(cellnum,Tempnum,Othernum):
- temp = []
- for i in range(cellnum):
- temp.append('单体电压'+str(i+1))
- for i in range(Tempnum):
- temp.append('单体温度'+str(i+1))
- for i in range(Othernum):
- temp.append('其他温度'+str(i+1))
- return temp
- @staticmethod
- def _download_json_data(url):
- '''
- 返回json数据的生成器,一次一行
- '''
- i = 0
- while 1:
- try:
- r = requests.get(url, stream=True, timeout=100, headers={'Connection':'close'})
- break
- except requests.exceptions.RequestException as e:
- if (i == 0):
- print()
- print('\r' + 'Server Error, retry {}......'.format(str(i)), end=" ")
- time.sleep(5)
- i+=1
- # print(r.content)
- # pdb.set_trace()
- for line in r.iter_lines():
- if line:
- yield json.loads(line)
- @staticmethod
- def _convert_to_dataframe(data, mode=0):
- CellU = []
- CellT = []
- OtherT = []
- CellU_Num = 0
- CellT_Num = 0
- OtherT_Num = 0
- CellU_Num = len(data['ffBatteryStatus']['cellVoltageList'])
- CellT_Num = len(data['ffBatteryStatus']['cellTempList'])
- try:
- OtherT_Num = len(data['ffBatteryStatus']['otherTempList'])
- except:
- OtherT_Num = 0
- for i in range(CellU_Num):
- CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000)
- for i in range(CellT_Num):
- CellU.append(data['ffBatteryStatus']['cellTempList'][i])
- for i in range(OtherT_Num):
- CellU.append(data['ffBatteryStatus']['otherTempList'][i])
- if mode == 0:
- data_len = 11
-
- data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode']
- ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState']
- ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len)
- elif mode == 1:
- data_len = 7
- data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi']
- ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState']
- ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh']]).reshape(1,data_len)
- data_block = np.append(data_block,CellU)
- data_block = np.append(data_block,CellT)
- data_block = np.append(data_block,OtherT)
- data_block = data_block.reshape(1,len(data_block))
- return data_block,CellU_Num,CellT_Num,OtherT_Num
- @staticmethod
- def _convert_to_dataframe_Gps(data, mode=0):
- if mode == 0:
- if data['info']['subType'] == 1:
- data_block = np.array([data['info']['obdTime'],data['ffGps']['satellites'],data['ffGps']['latitude'],data['ffGps']['longitude']
- ,data['ffGps']['altitude'],data['ffGps']['speed']]).reshape(1,6)
- df = pd.DataFrame(
- columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]'],data=data_block)
- elif data['info']['subType'] == 2:
- df = pd.DataFrame(
- columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]'])
- if mode == 1:
- data_block = np.array([data['info']['obdTime'],data['ffGps']['latitude'],data['ffGps']['longitude']
- ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,5)
- df = pd.DataFrame(
- columns=['时间戳','纬度','经度','速度[km/h]','有效位'],data=data_block)
- return df
-
- @staticmethod
- def _get_data(urls,type_name,mode=0):
- if type_name == 'BMS':
- if mode == 0:
- name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]','充电状态','加热','SOC[%]','SOH[%]','单体均衡状态']
- elif mode == 1:
- name_const = ['时间戳','GSM信号','总电流[A]','总电压[V]','充电状态','SOC[%]','SOH[%]']
- i=0
- CellUNum = 0
- CellTNum = 0
- OtherTNumm = 0
- st = time.time()
- for line in DBManager._download_json_data(urls):
- et = time.time()
- if i==0:
- data_blocks,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode)
- i+=1
- continue
- try:
- data_block,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode)
- except:
- continue
- data_blocks = np.concatenate((data_blocks,data_block),axis=0)
- # print('\r'+str(i),end=" ")
- # print(data_block)
- # print(urls)
- # print(time.time()-et)
- i+=1
- name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNumm)
- name_const.extend(name_var)
- columns_name = name_const
- if i==0:
- data_blocks = []
- df_all = pd.DataFrame(columns=columns_name,data=data_blocks)
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
- elif type_name =='GPS':
- df_all = pd.DataFrame(columns=['时间戳','纬度','经度','速度[km/h]','有效位'])
- for line in DBManager._download_json_data(urls):
- df_add = DBManager._convert_to_dataframe_Gps(line, mode)
- df_all = df_all.append(df_add,ignore_index=True)
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
- def get_data(self, bms_url='http://172.16.126.13/store/load?dataType=12&limit=0&sn=', gps_url='http://172.16.126.13/store/load?dataType=16&limit=0&sn=',
- sn='', start_time='', end_time='', gps_switch=True, mode=0):
- '''
- 获取指定 sn 和起止日期的bms和gps数据.
- 添加了重试机制。
- --------------输入参数------------
- bms_url:bms 数据url, 可采用默认值
- gps_url: gps 数据url, 可采用默认值
- sn: str, 电池sn号
- start_time: str, 开始时间
- end_time: str, 结束时间
- gps_switch: True:获取gps数据; False:不获取gps数据
- mode: 0:正常取数; 1:7255 取数
- --------------输出参数------------
- bms_data: 获取到的bms数据
- gps_data: 获取到的gps数据
- '''
- bms_all_data = pd.DataFrame()
- gps_all_data = pd.DataFrame()
- maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1
- print("### start to get data {} from {} to {}".format(sn, start_time, end_time))
- # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据
- for j in range(int(maxnum)):
- timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j)
- timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1)
- #滴滴的数据sub=0
- if timefrom.strftime('%Y-%m-%d %H:%M:%S') > end_time:
- break
- elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time:
- timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
- File_url_bms = bms_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- File_url_gps = gps_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- #print('{}_{}_----getting data----'.format(sn, timefrom))
- while True:
- try:
- print('\r' + "# get data from {} to {}.........".format(str(timefrom), str(timeto)), end=" ")
- bms_data = DBManager._get_data(File_url_bms,'BMS',mode)
- if gps_switch:
- gps_data = DBManager._get_data(File_url_gps,'GPS', mode)
- except Exception as e:
- if 'Connection broken' in str(e):
- continue
- else:
- raise Exception
- else:
- bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True)
- if gps_switch:
- gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True)
- break
- if not bms_all_data.empty:
- bms_all_data = bms_all_data.reset_index(drop=True)
- if not gps_all_data.empty:
- gps_all_data = gps_all_data.reset_index(drop=True)
- print('all data-getting done, total_count is {} \n'.format(str(len(bms_all_data))))
- return bms_all_data, gps_all_data
|