''' 暂时采用http方式获取历史数据。 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。 ''' __author__ = 'lmstack' import time import datetime import time import pandas as pd import numpy as np import json import requests import pymysql import pdb class DBManager(): def __init__(self, host='', port='', auth='', db='', username='', password=''): self.host = host self.port = port self.auth = auth self.db = db self.username = username self.password = password pass def __enter__(self): self.connect() return self def __exit__(self): self.close() def connect(self): conn_success_flag = 0 while not conn_success_flag: try: self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) except Exception as e: conn_success_flag = 0 print("数据库连接失败 :{}".format(e)) time.sleep(5) else: conn_success_flag = 1 self.cursor = self.conn.cursor() def close(self): try: self.conn.close() except Exception as e: print(e) else: print('数据库已断开连接') def add(table, keyvalue): fields_str = '' values_str = '' for k,v in keyvalue.items(): fields_str += k+' ' sql = 'insert into table {} ({}) values ({})'.format(table, fields_str, values_str) # 以下各个函数实现 通过http方式获取数据 @staticmethod def _get_var_name(cellnum,Tempnum,Othernum): temp = [] for i in range(cellnum): temp.append('单体电压'+str(i+1)) for i in range(Tempnum): temp.append('单体温度'+str(i+1)) for i in range(Othernum): temp.append('其他温度'+str(i+1)) return temp @staticmethod def _download_json_data(url): ''' 返回json数据的生成器,一次一行 ''' i = 0 while 1: try: r = requests.get(url, stream=True, timeout=100, headers={'Connection':'close'}) break except requests.exceptions.RequestException as e: if (i == 0): print() print('\r' + 'Server Error, retry {}......'.format(str(i)), end=" ") time.sleep(5) i+=1 # print(r.content) # pdb.set_trace() for line in r.iter_lines(): if line: yield json.loads(line) @staticmethod def _convert_to_dataframe_bms(data, mode=0): CellU = [] CellT = [] OtherT = [] CellU_Num = 0 CellT_Num = 0 OtherT_Num = 0 CellU_Num = len(data['ffBatteryStatus']['cellVoltageList']) CellT_Num = len(data['ffBatteryStatus']['cellTempList']) try: OtherT_Num = len(data['ffBatteryStatus']['otherTempList']) except: OtherT_Num = 0 for i in range(CellU_Num): CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000) for i in range(CellT_Num): CellU.append(data['ffBatteryStatus']['cellTempList'][i]) for i in range(OtherT_Num): CellU.append(data['ffBatteryStatus']['otherTempList'][i]) if mode == 0: data_len = 15 data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode'] ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['voltageOutter'], data['ffBatteryStatus']['totalOutputState'],data['ffBatteryStatus']['lockedState'], data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState'],data['ffBatteryStatus']['cellVoltageDiff'] ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len) elif mode == 1: data_len = 11 data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'] ,data['ffBatteryStatus'].get('errorLevel'),data['ffBatteryStatus'].get('errorCode'),data['ffBatteryStatus']['switchState'] ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState'], data['ffBatteryStatus']['cellVoltageDiff'],data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh']]).reshape(1,data_len) data_block = np.append(data_block,CellU) data_block = np.append(data_block,CellT) data_block = np.append(data_block,OtherT) data_block = data_block.reshape(1,len(data_block)) return data_block,CellU_Num,CellT_Num,OtherT_Num @staticmethod def _convert_to_dataframe_gps(data, mode=0): if mode == 0: if data['info']['subType'] == 1: data_block = np.array([data['info']['obdTime'],data['ffGps']['locationType'], data['ffGps']['satellites'], data['ffGps']['latitude'],data['ffGps']['longitude'],data['ffGps']['speed'], data['ffGps']['altitude'], data['ffGps']['direction']]).reshape(1,8) df = pd.DataFrame( columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向'],data=data_block) elif data['info']['subType'] == 2: df = pd.DataFrame( columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向']) if mode == 1: data_block = np.array([data['info']['obdTime'],data['ffGps']['locationType'],data['ffGps']['latitude'],data['ffGps']['longitude'] ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,6) df = pd.DataFrame( columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位'],data=data_block) return df @staticmethod def _convert_to_dataframe_system(data, mode=0): if mode == 0: data_block = np.array([data['info']['obdTime'],data['ffSystemInfo']['heatTargetTemp'], data['ffSystemInfo']['heatTimeout'], time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(data['ffSystemInfo']['rentalStartTime'])/1000)), data['ffSystemInfo']['rentalPeriodDays'],data['ffSystemInfo']['bmsInterval'], data['ffSystemInfo']['gpsInterval']]).reshape(1,7) df = pd.DataFrame( columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期'],data=data_block) if mode == 1: df = pd.DataFrame() return df @staticmethod def _convert_to_dataframe_accum(data, mode=0): if mode == 0: data_block = np.array([data['info']['obdTime'],data['ffBatteryAccum']['SOH_AlgUnexTime'], data['ffBatteryAccum']['CHG_AHaccum'], data['ffBatteryAccum']['CHG_PHaccum'], data['ffBatteryAccum']['DSG_AHaccum'], data['ffBatteryAccum']['DSG_PHaccum'],data['ffBatteryAccum']['OverTemp_CHG_AHaccum'], data['ffBatteryAccum']['OverTemp_CHG_PHaccum']]).reshape(1,8) df = pd.DataFrame( columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累计高温充电电量', '累计高温充电能量'],data=data_block) if mode == 1: data_block = np.array([data['info']['obdTime'], data['ffBatteryAccum']['CHG_AHaccum'], data['ffBatteryAccum']['CHG_PHaccum'], data['ffBatteryAccum']['DSG_AHaccum'], data['ffBatteryAccum']['DSG_PHaccum'],data['ffBatteryAccum']['totalMileage']]).reshape(1,6) df = pd.DataFrame( columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程'],data=data_block) return df @staticmethod def _get_data(urls,type_name,mode=0): if type_name == 'bms': if mode == 0: name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]', '外电压', '总输出状态', '上锁状态', '充电状态','加热状态', '单体压差', 'SOC[%]','SOH[%]','单体均衡状态'] elif mode == 1: name_const = ['时间戳','GSM信号','故障等级', '故障代码','开关状态', '总电流[A]','总电压[V]','充电状态', '单体压差', 'SOC[%]','SOH[%]'] i=0 CellUNum = 0 CellTNum = 0 OtherTNumm = 0 st = time.time() for line in DBManager._download_json_data(urls): et = time.time() try: if i==0: data_blocks,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe_bms(line, mode) i+=1 continue except: i = 0 try: data_block,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe_bms(line, mode) except: continue try: data_blocks = np.concatenate((data_blocks,data_block),axis=0) except Exception as e: if 'all the input array dimensions for the concatenation axis must match exactly' in str(e) or \ 'all the input array dimensions except for the concatenation axis must match exactly' in str(e): pass else: raise e # print('\r'+str(i),end=" ") # print(data_block) # print(urls) # print(time.time()-et) i+=1 name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNumm) name_const.extend(name_var) columns_name = name_const if i==0: data_blocks = [] df_all = pd.DataFrame(columns=columns_name,data=data_blocks) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='gps': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向']) elif mode == 1: df_all = pd.DataFrame(columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位']) for line in DBManager._download_json_data(urls): df_add = DBManager._convert_to_dataframe_gps(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='system': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期']) elif mode == 1: df_all = pd.DataFrame() for line in DBManager._download_json_data(urls): df_add = DBManager._convert_to_dataframe_system(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='accum': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累计高温充电电量', '累计高温充电能量']) elif mode == 1: df_all = pd.DataFrame(columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程']) for line in DBManager._download_json_data(urls): df_add = DBManager._convert_to_dataframe_accum(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all def get_data(self, url='http://172.16.126.13/store/load?dataType={}&limit=0&sn={}', sn='', start_time='', end_time='', data_groups=['bms', 'gps']): ''' 获取指定 sn 和起止日期的bms和gps数据. 添加了重试机制。 --------------输入参数------------ url:数据获取url, 可采用默认值 sn: str, 电池sn号 start_time: str, 开始时间 end_time: str, 结束时间 data_groups: 选择需要获取的数据组,可填入多个字符串(默认只获取bms和gps数据) bms: bms数据 gps:gps数据 system:system数据 accum:accum数据 --------------输出参数------------ df_data: {'bms':dataframe, 'gps':dataframe, 'system':dataframe, ;accum':dataframe} ''' if len(set(data_groups) - (set(data_groups) and set(['bms', 'gps', 'system', 'accum']))) > 0: raise Exception("data_groups 参数错误") # mode: 0:正常取数; 1:7255 取数 if sn[0:2] == 'UD' or sn[0:2] == 'MG': mode = 1 else: mode = 0 bms_all_data = pd.DataFrame() gps_all_data = pd.DataFrame() system_all_data = pd.DataFrame() accum_all_data = pd.DataFrame() maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1 print("### start to get data {} from {} to {}".format(sn, start_time, end_time)) # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据 for j in range(int(maxnum)): timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j) timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1) #滴滴的数据sub=0 if timefrom.strftime('%Y-%m-%d %H:%M:%S') >= end_time: break elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time: timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') #print('{}_{}_----getting data----'.format(sn, timefrom)) bms_data = pd.DataFrame() gps_data = pd.DataFrame() system_data = pd.DataFrame() accum_data = pd.DataFrame() while True: try: print('\r' + "# get data from {} to {}.........".format(str(timefrom), str(timeto)), end=" ") for data_group in data_groups: if data_group == 'bms': file_url = url.format(12, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') bms_data = DBManager._get_data(file_url,'bms',mode) if data_group == 'gps': file_url = url.format(16, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') gps_data = DBManager._get_data(file_url,'gps',mode) if data_group == 'system': file_url = url.format(13, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') system_data = DBManager._get_data(file_url,'system',mode) if data_group == 'accum': file_url = url.format(23, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') accum_data = DBManager._get_data(file_url,'accum',mode) except Exception as e: if 'Connection broken' in str(e): continue else: raise Exception else: bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True) gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True) system_all_data = pd.concat([system_all_data, system_data], ignore_index=True) accum_all_data = pd.concat([accum_all_data, accum_data], ignore_index=True) break bms_all_data = bms_all_data.reset_index(drop=True) gps_all_data = gps_all_data.reset_index(drop=True) system_all_data = system_all_data.reset_index(drop=True) accum_all_data = accum_all_data.reset_index(drop=True) print('\nall data-getting done, bms_count is {}, gps_count is {}, system_count is {}, accum_count is {} \n'.format( str(len(bms_all_data)), str(len(gps_all_data)), str(len(system_all_data)), str(len(accum_all_data)))) return {'bms':bms_all_data, 'gps':gps_all_data, 'system':system_all_data, 'accum':accum_all_data}