''' 暂时采用http方式获取历史数据。 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。 ''' __author__ = 'wlm' from re import S import time import datetime import os import urllib.request import time import pandas as pd import numpy as np import json import requests import pdb # import http.client # http.client.HTTPConnection._http_vsn = 10 # http.client.HTTPConnection._http_vsn_str = 'HTTP/1.1' class DBManager(): def __init__(self, host='', port='', auth='', db='', username='', password=''): pass def __enter__(self): self.connect() return self def __exit__(self): self.close() def connect(self): conn_success_flag = 0 while not conn_success_flag: try: pass # 连接数据库 except Exception as e: conn_success_flag = 0 time.sleep(5) else: conn_success_flag = 1 pass # 连接成功, 获取cursor def close(self): try: pass # 断开数据库 except Exception as e: print(e) else: print('数据库已断开连接') # 以下各个函数实现 通过http方式获取数据 @staticmethod def _get_var_name(cellnum,Tempnum,Othernum): temp = [] for i in range(cellnum): temp.append('单体电压'+str(i+1)) for i in range(Tempnum): temp.append('单体温度'+str(i+1)) for i in range(Othernum): temp.append('其他温度'+str(i+1)) return temp @staticmethod def _download_json_data(url): ''' 返回json数据的生成器,一次一行 ''' i = 0 while 1: try: r = requests.get(url, stream=True, timeout=100, headers={'Connection':'close'}) break except requests.exceptions.RequestException as e: if (i == 0): print() print('\r' + 'Server Error, retry {}......'.format(str(i)), end=" ") time.sleep(5) i+=1 # print(r.content) # pdb.set_trace() for line in r.iter_lines(): if line: yield json.loads(line) @staticmethod def _convert_to_dataframe(data, mode=0): CellU = [] CellT = [] OtherT = [] CellU_Num = 0 CellT_Num = 0 OtherT_Num = 0 CellU_Num = len(data['ffBatteryStatus']['cellVoltageList']) CellT_Num = len(data['ffBatteryStatus']['cellTempList']) try: OtherT_Num = len(data['ffBatteryStatus']['otherTempList']) except: OtherT_Num = 0 for i in range(CellU_Num): CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000) for i in range(CellT_Num): CellU.append(data['ffBatteryStatus']['cellTempList'][i]) for i in range(OtherT_Num): CellU.append(data['ffBatteryStatus']['otherTempList'][i]) if mode == 0: data_len = 11 data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode'] ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState'] ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len) elif mode == 1: data_len = 7 data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'] ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState'] ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh']]).reshape(1,data_len) data_block = np.append(data_block,CellU) data_block = np.append(data_block,CellT) data_block = np.append(data_block,OtherT) data_block = data_block.reshape(1,len(data_block)) return data_block,CellU_Num,CellT_Num,OtherT_Num @staticmethod def _convert_to_dataframe_Gps(data, mode=0): if mode == 0: if data['info']['subType'] == 1: data_block = np.array([data['info']['obdTime'],data['ffGps']['satellites'],data['ffGps']['latitude'],data['ffGps']['longitude'] ,data['ffGps']['altitude'],data['ffGps']['speed']]).reshape(1,6) df = pd.DataFrame( columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]'],data=data_block) elif data['info']['subType'] == 2: df = pd.DataFrame( columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]']) if mode == 1: data_block = np.array([data['info']['obdTime'],data['ffGps']['latitude'],data['ffGps']['longitude'] ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,5) df = pd.DataFrame( columns=['时间戳','纬度','经度','速度[km/h]','有效位'],data=data_block) return df @staticmethod def _get_data(urls,type_name,mode=0): if type_name == 'BMS': if mode == 0: name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]','充电状态','加热','SOC[%]','SOH[%]','单体均衡状态'] elif mode == 1: name_const = ['时间戳','GSM信号','总电流[A]','总电压[V]','充电状态','SOC[%]','SOH[%]'] i=0 CellUNum = 0 CellTNum = 0 OtherTNumm = 0 st = time.time() for line in DBManager._download_json_data(urls): et = time.time() if i==0: data_blocks,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode) i+=1 continue try: data_block,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode) except: continue data_blocks = np.concatenate((data_blocks,data_block),axis=0) # print('\r'+str(i),end=" ") # print(data_block) # print(urls) # print(time.time()-et) i+=1 name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNumm) name_const.extend(name_var) columns_name = name_const if i==0: data_blocks = [] df_all = pd.DataFrame(columns=columns_name,data=data_blocks) df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='GPS': df_all = pd.DataFrame(columns=['时间戳','纬度','经度','速度[km/h]','有效位']) for line in DBManager._download_json_data(urls): df_add = DBManager._convert_to_dataframe_Gps(line, mode) df_all = df_all.append(df_add,ignore_index=True) df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all def get_data(self, bms_url='http://172.16.126.13/store/load?dataType=12&limit=0&sn=', gps_url='http://172.16.126.13/store/load?dataType=16&limit=0&sn=', sn='', start_time='', end_time='', gps_switch=True, mode=0): ''' 获取指定 sn 和起止日期的bms和gps数据. 添加了重试机制。 --------------输入参数------------ bms_url:bms 数据url, 可采用默认值 gps_url: gps 数据url, 可采用默认值 sn: str, 电池sn号 start_time: str, 开始时间 end_time: str, 结束时间 gps_switch: True:获取gps数据; False:不获取gps数据 mode: 0:正常取数; 1:7255 取数 --------------输出参数------------ bms_data: 获取到的bms数据 gps_data: 获取到的gps数据 ''' bms_all_data = pd.DataFrame() gps_all_data = pd.DataFrame() maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1 print("### start to get data {} from {} to {}".format(sn, start_time, end_time)) # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据 for j in range(int(maxnum)): timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j) timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1) #滴滴的数据sub=0 if timefrom.strftime('%Y-%m-%d %H:%M:%S') > end_time: break elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time: timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') File_url_bms = bms_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') File_url_gps = gps_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') #print('{}_{}_----getting data----'.format(sn, timefrom)) while True: try: print('\r' + "# get data from {} to {}.........".format(str(timefrom), str(timeto)), end=" ") bms_data = DBManager._get_data(File_url_bms,'BMS',mode) if gps_switch: gps_data = DBManager._get_data(File_url_gps,'GPS', mode) except Exception as e: if 'Connection broken' in str(e): continue else: raise Exception else: bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True) if gps_switch: gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True) break if not bms_all_data.empty: bms_all_data = bms_all_data.reset_index(drop=True) if not gps_all_data.empty: gps_all_data = gps_all_data.reset_index(drop=True) print('all data-getting done, total_count is {} \n'.format(str(len(bms_all_data)))) return bms_all_data, gps_all_data