''' 暂时采用http方式获取历史数据。 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。 ''' __author__ = 'lmstack' import time import datetime import time import pandas as pd import numpy as np import json import requests import pymysql import pdb from collections import Counter class DBManager(): def __init__(self, host='', port='', auth='', db='', username='', password=''): self.host = host self.port = port self.auth = auth self.db = db self.username = username self.password = password pass def __enter__(self): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def connect(self): conn_success_flag = 0 while not conn_success_flag: try: self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) except Exception as e: conn_success_flag = 0 print("数据库连接失败 :{}".format(e)) time.sleep(5) else: conn_success_flag = 1 self.cursor = self.conn.cursor() def close(self): try: self.conn.close() except Exception as e: print(e) else: print('数据库已断开连接') def add(table, keyvalue): fields_str = '' values_str = '' for k,v in keyvalue.items(): fields_str += k+' ' sql = 'insert into table {} ({}) values ({})'.format(table, fields_str, values_str) # 以下各个函数实现 通过http方式获取数据 @staticmethod def _get_var_name(cellnum,Tempnum,Othernum): temp = [] for i in range(cellnum): temp.append('单体电压'+str(i+1)) for i in range(Tempnum): temp.append('单体温度'+str(i+1)) for i in range(Othernum): temp.append('其他温度'+str(i+1)) return temp @staticmethod def _download_json_data(url): ''' 返回json数据的生成器,一次一行 ''' i = 0 while 1: try: r = requests.get(url, stream=True, timeout=(0.1, 1000), headers={'Connection':'keep-alive', 'Accept':'*/*', 'Accept-Encoding':'gzip,deflate,br'}) break except requests.exceptions.RequestException as e: # if (i == 0): # print() # print('\r' + 'Server Error, retry {}......'.format(str(i)), end=" ") time.sleep(0.1) i+=1 # print(r.content) # pdb.set_trace() data = [] for line in r.iter_lines(): if line: data.append(json.loads(line)) # yield json.loads(line) return data @staticmethod def _convert_to_dataframe_bms(data, mode=0, CellUNum=0, CellTNum=0, OtherTNum=0): CellU = [] CellT = [] OtherT = [] CellU_Num = CellUNum CellT_Num = CellTNum OtherT_Num = OtherTNum # try: # CellU_Num = len(data['ffBatteryStatus']['cellVoltageList']) # CellU = np.array(data['ffBatteryStatus']['cellVoltageList']) * 1000 # CellU = CellU.tolist() # except: # CellU_Num = 0 # try: # CellT_Num = len(data['ffBatteryStatus']['cellTempList']) # CellU.extend(data['ffBatteryStatus']['cellTempList']) # except: # CellT_Num = 0 # try: # OtherT_Num = len(data['ffBatteryStatus']['otherTempList']) # CellU.extend(data['ffBatteryStatus']['otherTempList']) # except: # OtherT_Num = 0 CellU = np.array(data['ffBatteryStatus'].get('cellVoltageList',[])) CellT = data['ffBatteryStatus'].get('cellTempList',[]) OtherT = data['ffBatteryStatus'].get('otherTempList',[]) if (len(CellU) != CellU_Num or len(CellT) != CellT_Num or len(OtherT) != OtherT_Num): return pd.DataFrame() CellU = CellU * 1000 # for i in range(CellU_Num): # CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000) # for i in range(CellT_Num): # CellU.append(data['ffBatteryStatus']['cellTempList'][i]) # for i in range(OtherT_Num): # CellU.append(data['ffBatteryStatus']['otherTempList'][i]) if mode == 0: data_len = 16 # data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode'] # ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['voltageOutter'], # data['ffBatteryStatus']['totalOutputState'],data['ffBatteryStatus']['lockedState'], # data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState'],data['ffBatteryStatus']['cellVoltageDiff'] # ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len) data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus'].get('rssi',None),data['ffBatteryStatus'].get('errorLevel', None),data['ffBatteryStatus'].get('errorCode', None), data['ffBatteryStatus'].get('current',None),data['ffBatteryStatus'].get('voltageInner', None),data['ffBatteryStatus'].get('voltageOutter', None), data['ffBatteryStatus'].get('totalOutputState', None),data['ffBatteryStatus'].get('lockedState', None), data['ffBatteryStatus'].get('chargeState', None),data['ffBatteryStatus'].get('heatState', None),data['ffBatteryStatus'].get('cellVoltageDiff', None) ,data['ffBatteryStatus'].get('soc', None),data['ffBatteryStatus'].get('soh', None),data['ffBatteryStatus'].get('cellVolBalance', None),data['ffBatteryStatus'].get('insResis', None)]).reshape(1,data_len) elif mode == 1: data_len = 12 data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus'].get('rssi',None) ,data['ffBatteryStatus'].get('errorLevel', None),data['ffBatteryStatus'].get('errorCode', None),data['ffBatteryStatus'].get('switchState', None) ,data['ffBatteryStatus'].get('current',None),data['ffBatteryStatus'].get('voltageInner', None),data['ffBatteryStatus'].get('chargeState', None), data['ffBatteryStatus'].get('cellVoltageDiff', None),data['ffBatteryStatus'].get('soc', None),data['ffBatteryStatus'].get('soh', None),data['ffBatteryStatus'].get('insResis', None)]).reshape(1,data_len) data_block = np.append(data_block,CellU) data_block = np.append(data_block,CellT) data_block = np.append(data_block,OtherT) data_block = data_block.reshape(1,len(data_block)) return data_block @staticmethod def _convert_to_dataframe_gps(data, mode=0): if mode == 0: # if data['info']['subType'] == 1: data_block = np.array([data['info'].get('obdTime',None),data['ffGps'].get('locationType',None), data['ffGps'].get('satellites',None), data['ffGps'].get('latitude',None),data['ffGps'].get('longitude',None),data['ffGps'].get('speed',None), data['ffGps'].get('altitude',None), data['ffGps'].get('direction', None), data['ffGps'].get('mileage',None)]).reshape(1,9) df = pd.DataFrame( columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向', '里程/m'],data=data_block) # elif data['info']['subType'] == 2: # df = pd.DataFrame( # columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向']) if mode == 1: data_block = np.array([data['info']['obdTime'],data['ffGps']['locationType'],data['ffGps']['latitude'],data['ffGps']['longitude'] ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,6) df = pd.DataFrame( columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位'],data=data_block) return df @staticmethod def _convert_to_dataframe_system(data, mode=0): if mode == 0: data_block = np.array([data['info'].get('obdTime', None),data['ffSystemInfo'].get('heatTargetTemp', None), data['ffSystemInfo'].get('heatTimeout',None), time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(data['ffSystemInfo'].get('rentalStartTime'))/1000)), data['ffSystemInfo'].get('rentalPeriodDays',None),data['ffSystemInfo'].get('bmsInterval',None), data['ffSystemInfo'].get('gpsInterval', None)]).reshape(1,7) df = pd.DataFrame( columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期'],data=data_block) if mode == 1: df = pd.DataFrame() return df @staticmethod def _convert_to_dataframe_accum(data, mode=0): if mode == 0: data_block = np.array([data['info'].get('obdTime',None),data['ffBatteryAccum'].get('SOH_AlgUnexTime',None), data['ffBatteryAccum'].get('CHG_AHaccum',None), data['ffBatteryAccum'].get('CHG_PHaccum',None), data['ffBatteryAccum'].get('DSG_AHaccum',None), data['ffBatteryAccum'].get('DSG_PHaccum',None),data['ffBatteryAccum'].get('OverTemp_CHG_AHaccum',None), data['ffBatteryAccum'].get('OverTemp_CHG_PHaccum',None)]).reshape(1,8) df = pd.DataFrame( columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累计高温充电电量', '累计高温充电能量'],data=data_block) if mode == 1: data_block = np.array([data['info'].get('obdTime',None), data['ffBatteryAccum'].get('CHG_AHaccum',None), data['ffBatteryAccum'].get('CHG_PHaccum',None), data['ffBatteryAccum'].get('DSG_AHaccum',None), data['ffBatteryAccum'].get('DSG_PHaccum',None),data['ffBatteryAccum'].get('totalMileage',None)]).reshape(1,6) df = pd.DataFrame( columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程'],data=data_block) return df @staticmethod def _convert_to_dataframe_storage_accum(data, mode=0): if mode == 0: data_block = np.array([data['info'].get('obdTime',None),data['ffEnergyStoreStat'].get('usedEnergy',None), data['ffEnergyStoreStat'].get('inputEnergy',None), data['ffEnergyStoreStat'].get('outputEnergy',None), data['ffEnergyStoreStat'].get('usedEnergy2',None), data['ffEnergyStoreStat'].get('inputEnergy2',None), data['ffEnergyStoreStat'].get('outputEnergy2',None)]).reshape(1,7) df = pd.DataFrame( columns=['时间戳','累计用电kWh数1', '累计正向输入kWh1','累计反向输出kWh数1', '累计用电kWh数2', '累计正向输入kWh2','累计反向输出kWh数2'],data=data_block) if mode == 1: data_block = np.array([data['info'].get('obdTime',None),data['ffEnergyStoreStat'].get('usedEnergy',None), data['ffEnergyStoreStat'].get('inputEnergy',None), data['ffEnergyStoreStat'].get('outputEnergy',None), data['ffEnergyStoreStat'].get('usedEnergy2',None), data['ffEnergyStoreStat'].get('inputEnergy2',None), data['ffEnergyStoreStat'].get('outputEnergy2',None)]).reshape(1,7) df = pd.DataFrame( columns=['时间戳','累计用电kWh数1', '累计正向输入kWh1','累计反向输出kWh数1', '累计用电kWh数2', '累计正向输入kWh2','累计反向输出kWh数2'],data=data_block) return df @staticmethod def _get_data(urls,type_name,mode=0): data = DBManager._download_json_data(urls) if type_name == 'bms': if mode == 0: name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]', '外电压', '总输出状态', '上锁状态', '充电状态','加热状态', '单体压差', 'SOC[%]','SOH[%]','单体均衡状态', '绝缘电阻'] elif mode == 1: name_const = ['时间戳','GSM信号','故障等级', '故障代码','开关状态', '总电流[A]','总电压[V]','充电状态', '单体压差', 'SOC[%]','SOH[%]', '绝缘电阻'] i=0 st = time.time() # 计算本次最大电芯数量 CellUNum = 0 CellTNum = 0 OtherTNum = 0 cellUNumList = [] cellTNumList = [] otherTNumList = [] for line in data: temp = len(line['ffBatteryStatus'].get('cellVoltageList', [])) cellUNumList.append(temp) # if (temp > CellUNum): # CellUNum = temp temp = len(line['ffBatteryStatus'].get('cellTempList', [])) cellTNumList.append(temp) # if (temp > CellTNum): # CellTNum = temp temp = len(line['ffBatteryStatus'].get('otherTempList', [])) otherTNumList.append(temp) # if (temp > OtherTNum): # OtherTNum = temp if (len(data)>0): result = Counter(cellUNumList) CellUNum = max(result, key=result.get) result = Counter(cellTNumList) CellTNum = max(result, key=result.get) result = Counter(otherTNumList) OtherTNum = max(result, key=result.get) data_blocks = pd.DataFrame() for line in data: et = time.time() try: if i==0: data_blocks = DBManager._convert_to_dataframe_bms(line, mode, CellUNum, CellTNum, OtherTNum) if (len(data_blocks)>0): i+=1 continue except Exception as e: # print(e) continue try: data_block = DBManager._convert_to_dataframe_bms(line, mode, CellUNum, CellTNum, OtherTNum) except Exception as e: # print(e) continue try: if (len(data_block)>0): data_blocks = np.concatenate((data_blocks,data_block),axis=0) except Exception as e: print(e) if 'all the input array dimensions for the concatenation axis must match exactly' in str(e) or \ 'all the input array dimensions except for the concatenation axis must match exactly' in str(e): pass else: raise e i+=1 name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNum) name_const.extend(name_var) columns_name = name_const if i==0: data_blocks = [] df_all = pd.DataFrame(columns=columns_name,data=data_blocks) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='gps': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向']) elif mode == 1: df_all = pd.DataFrame(columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位']) for line in data: df_add = DBManager._convert_to_dataframe_gps(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='system': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期']) elif mode == 1: df_all = pd.DataFrame() for line in data: df_add = DBManager._convert_to_dataframe_system(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='accum': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累计高温充电电量', '累计高温充电能量']) elif mode == 1: df_all = pd.DataFrame(columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程']) for line in data: df_add = DBManager._convert_to_dataframe_accum(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all elif type_name =='storage_accum': if mode == 0: df_all = pd.DataFrame(columns=['时间戳','累计用电kWh数1', '累计正向输入kWh1','累计反向输出kWh数1', '累计用电kWh数2', '累计正向输入kWh2','累计反向输出kWh数2']) elif mode == 1: df_all = pd.DataFrame() for line in data: df_add = DBManager._convert_to_dataframe_storage_accum(line, mode) df_all = df_all.append(df_add,ignore_index=True) if not df_all.empty: df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000))) return df_all def get_data(self, url='http://172.16.126.13/store/load?dataType={}&limit=0&sn={}', sn='', start_time='', end_time='', data_groups=['bms', 'gps']): ''' 获取指定 sn 和起止日期的bms和gps数据. 添加了重试机制。 --------------输入参数------------ url:数据获取url, 可采用默认值 sn: str, 电池sn号 start_time: str, 开始时间 end_time: str, 结束时间 data_groups: 选择需要获取的数据组,可填入多个字符串(默认只获取bms和gps数据) bms: bms数据 gps:gps数据 system:system数据 accum:accum数据 --------------输出参数------------ df_data: {'bms':dataframe, 'gps':dataframe, 'system':dataframe, ;accum':dataframe} ''' if len(set(data_groups) - (set(data_groups) and set(['bms', 'gps', 'system', 'accum', 'storage_accum']))) > 0: raise Exception("data_groups 参数错误") # mode: 0:正常取数; 1:7255 取数 if sn[0:2] == 'UD' or sn[0:2] == 'MG': mode = 1 else: mode = 0 bms_all_data = pd.DataFrame() gps_all_data = pd.DataFrame() system_all_data = pd.DataFrame() accum_all_data = pd.DataFrame() storage_accum_all_data = pd.DataFrame() maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1 print("### start to get data {} from {} to {}".format(sn, start_time, end_time)) # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据 for j in range(int(maxnum)): timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j) timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1) #滴滴的数据sub=0 if timefrom.strftime('%Y-%m-%d %H:%M:%S') >= end_time: break elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time: timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S') #print('{}_{}_----getting data----'.format(sn, timefrom)) bms_data = pd.DataFrame() gps_data = pd.DataFrame() system_data = pd.DataFrame() accum_data = pd.DataFrame() storage_accum_data = pd.DataFrame() while True: try: print('\r' + "# get data from {} to {}.........".format(str(timefrom), str(timeto)), end=" ") for data_group in data_groups: if data_group == 'bms': file_url = url.format(12, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') bms_data = DBManager._get_data(file_url,'bms',mode) if data_group == 'gps': file_url = url.format(16, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') gps_data = DBManager._get_data(file_url,'gps',mode) if data_group == 'system': file_url = url.format(13, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') system_data = DBManager._get_data(file_url,'system',mode) if data_group == 'accum': file_url = url.format(23, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') accum_data = DBManager._get_data(file_url,'accum',mode) if data_group == 'storage_accum': file_url = url.format(29, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S') storage_accum_data = DBManager._get_data(file_url,'storage_accum',mode) except Exception as e: if 'Connection broken' in str(e): continue elif 'Number of manager items must equal union of block items' in str(e): break else: raise Exception else: bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True) gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True) system_all_data = pd.concat([system_all_data, system_data], ignore_index=True) accum_all_data = pd.concat([accum_all_data, accum_data], ignore_index=True) storage_accum_all_data = pd.concat([storage_accum_all_data, storage_accum_data], ignore_index=True) break bms_all_data = bms_all_data.reset_index(drop=True) gps_all_data = gps_all_data.reset_index(drop=True) system_all_data = system_all_data.reset_index(drop=True) accum_all_data = accum_all_data.reset_index(drop=True) storage_accum_all_data = storage_accum_all_data.reset_index(drop=True) print('\nall data-getting done, bms_count is {}, gps_count is {}, system_count is {}, accum_count is {} ,storage_accum_count is {}\n'.format( str(len(bms_all_data)), str(len(gps_all_data)), str(len(system_all_data)), str(len(accum_all_data)), str(len(storage_accum_all_data)))) return {'bms':bms_all_data, 'gps':gps_all_data, 'system':system_all_data, 'accum':accum_all_data,'storage_accum':storage_accum_all_data}