123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- '''
- 暂时采用http方式获取历史数据。
- 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。
- '''
- __author__ = 'lmstack'
- import time
- import datetime
- import time
- import pandas as pd
- import numpy as np
- import json
- import requests
- import pymysql
- import pdb
- from collections import Counter
- class DBManager():
- def __init__(self, host='', port='', auth='', db='', username='', password=''):
- self.host = host
- self.port = port
- self.auth = auth
- self.db = db
- self.username = username
- self.password = password
- pass
- def __enter__(self):
- self.connect()
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def connect(self):
- conn_success_flag = 0
- while not conn_success_flag:
- try:
- self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db)
- except Exception as e:
- conn_success_flag = 0
- print("数据库连接失败 :{}".format(e))
- time.sleep(5)
- else:
- conn_success_flag = 1
- self.cursor = self.conn.cursor()
- def close(self):
- try:
- self.conn.close()
- except Exception as e:
- print(e)
- else:
- print('数据库已断开连接')
- def add(table, keyvalue):
- fields_str = ''
- values_str = ''
- for k,v in keyvalue.items():
- fields_str += k+' '
- sql = 'insert into table {} ({}) values ({})'.format(table, fields_str, values_str)
- # 以下各个函数实现 通过http方式获取数据
- @staticmethod
- def _get_var_name(cellnum,Tempnum,Othernum):
- temp = []
- for i in range(cellnum):
- temp.append('单体电压'+str(i+1))
- for i in range(Tempnum):
- temp.append('单体温度'+str(i+1))
- for i in range(Othernum):
- temp.append('其他温度'+str(i+1))
- return temp
- @staticmethod
- def _download_json_data(url):
- '''
- 返回json数据的生成器,一次一行
- '''
- i = 0
- while 1:
- try:
- r = requests.get(url, stream=True, timeout=(0.1, 1000), headers={'Connection':'keep-alive', 'Accept':'*/*', 'Accept-Encoding':'gzip,deflate,br'})
- break
- except requests.exceptions.RequestException as e:
- # if (i == 0):
- # print()
- # print('\r' + 'Server Error, retry {}......'.format(str(i)), end=" ")
- time.sleep(0.1)
- i+=1
- # print(r.content)
- # pdb.set_trace()
- data = []
- for line in r.iter_lines():
- if line:
- data.append(json.loads(line))
- # yield json.loads(line)
- return data
- @staticmethod
- def _convert_to_dataframe_bms(data, mode=0, CellUNum=0, CellTNum=0, OtherTNum=0):
- CellU = []
- CellT = []
- OtherT = []
- CellU_Num = CellUNum
- CellT_Num = CellTNum
- OtherT_Num = OtherTNum
- # try:
- # CellU_Num = len(data['ffBatteryStatus']['cellVoltageList'])
- # CellU = np.array(data['ffBatteryStatus']['cellVoltageList']) * 1000
- # CellU = CellU.tolist()
- # except:
- # CellU_Num = 0
- # try:
- # CellT_Num = len(data['ffBatteryStatus']['cellTempList'])
- # CellU.extend(data['ffBatteryStatus']['cellTempList'])
- # except:
- # CellT_Num = 0
- # try:
- # OtherT_Num = len(data['ffBatteryStatus']['otherTempList'])
- # CellU.extend(data['ffBatteryStatus']['otherTempList'])
- # except:
- # OtherT_Num = 0
-
- CellU = np.array(data['ffBatteryStatus'].get('cellVoltageList',[]))
- CellT = data['ffBatteryStatus'].get('cellTempList',[])
- OtherT = data['ffBatteryStatus'].get('otherTempList',[])
- if (len(CellU) != CellU_Num or len(CellT) != CellT_Num or len(OtherT) != OtherT_Num):
- return pd.DataFrame()
- CellU = CellU * 1000
- # for i in range(CellU_Num):
- # CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000)
- # for i in range(CellT_Num):
- # CellU.append(data['ffBatteryStatus']['cellTempList'][i])
- # for i in range(OtherT_Num):
- # CellU.append(data['ffBatteryStatus']['otherTempList'][i])
- if mode == 0:
- data_len = 16
-
- # data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode']
- # ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['voltageOutter'],
- # data['ffBatteryStatus']['totalOutputState'],data['ffBatteryStatus']['lockedState'],
- # data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState'],data['ffBatteryStatus']['cellVoltageDiff']
- # ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len)
- data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus'].get('rssi',None),data['ffBatteryStatus'].get('errorLevel', None),data['ffBatteryStatus'].get('errorCode', None),
- data['ffBatteryStatus'].get('current',None),data['ffBatteryStatus'].get('voltageInner', None),data['ffBatteryStatus'].get('voltageOutter', None),
- data['ffBatteryStatus'].get('totalOutputState', None),data['ffBatteryStatus'].get('lockedState', None),
- data['ffBatteryStatus'].get('chargeState', None),data['ffBatteryStatus'].get('heatState', None),data['ffBatteryStatus'].get('cellVoltageDiff', None)
- ,data['ffBatteryStatus'].get('soc', None),data['ffBatteryStatus'].get('soh', None),data['ffBatteryStatus'].get('cellVolBalance', None),data['ffBatteryStatus'].get('insResis', None)]).reshape(1,data_len)
- elif mode == 1:
- data_len = 12
-
- data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus'].get('rssi',None)
- ,data['ffBatteryStatus'].get('errorLevel', None),data['ffBatteryStatus'].get('errorCode', None),data['ffBatteryStatus'].get('switchState', None)
- ,data['ffBatteryStatus'].get('current',None),data['ffBatteryStatus'].get('voltageInner', None),data['ffBatteryStatus'].get('chargeState', None),
- data['ffBatteryStatus'].get('cellVoltageDiff', None),data['ffBatteryStatus'].get('soc', None),data['ffBatteryStatus'].get('soh', None),data['ffBatteryStatus'].get('insResis', None)]).reshape(1,data_len)
- data_block = np.append(data_block,CellU)
- data_block = np.append(data_block,CellT)
- data_block = np.append(data_block,OtherT)
- data_block = data_block.reshape(1,len(data_block))
- return data_block
- @staticmethod
- def _convert_to_dataframe_gps(data, mode=0):
- if mode == 0:
- # if data['info']['subType'] == 1:
- data_block = np.array([data['info'].get('obdTime',None),data['ffGps'].get('locationType',None), data['ffGps'].get('satellites',None),
- data['ffGps'].get('latitude',None),data['ffGps'].get('longitude',None),data['ffGps'].get('speed',None),
- data['ffGps'].get('altitude',None), data['ffGps'].get('direction', None), data['ffGps'].get('mileage',None)]).reshape(1,9)
- df = pd.DataFrame(
- columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向', '里程/m'],data=data_block)
- # elif data['info']['subType'] == 2:
- # df = pd.DataFrame(
- # columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向'])
- if mode == 1:
- data_block = np.array([data['info']['obdTime'],data['ffGps']['locationType'],data['ffGps']['latitude'],data['ffGps']['longitude']
- ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,6)
- df = pd.DataFrame(
- columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位'],data=data_block)
- return df
-
- @staticmethod
- def _convert_to_dataframe_system(data, mode=0):
- if mode == 0:
-
- data_block = np.array([data['info'].get('obdTime', None),data['ffSystemInfo'].get('heatTargetTemp', None), data['ffSystemInfo'].get('heatTimeout',None),
- time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(data['ffSystemInfo'].get('rentalStartTime'))/1000)),
- data['ffSystemInfo'].get('rentalPeriodDays',None),data['ffSystemInfo'].get('bmsInterval',None),
- data['ffSystemInfo'].get('gpsInterval', None)]).reshape(1,7)
- df = pd.DataFrame(
- columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期'],data=data_block)
- if mode == 1:
- df = pd.DataFrame()
- return df
-
- @staticmethod
- def _convert_to_dataframe_accum(data, mode=0):
- if mode == 0:
-
- data_block = np.array([data['info'].get('obdTime',None),data['ffBatteryAccum'].get('SOH_AlgUnexTime',None), data['ffBatteryAccum'].get('CHG_AHaccum',None),
- data['ffBatteryAccum'].get('CHG_PHaccum',None), data['ffBatteryAccum'].get('DSG_AHaccum',None),
- data['ffBatteryAccum'].get('DSG_PHaccum',None),data['ffBatteryAccum'].get('OverTemp_CHG_AHaccum',None),
- data['ffBatteryAccum'].get('OverTemp_CHG_PHaccum',None)]).reshape(1,8)
- df = pd.DataFrame(
- columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量',
- '累计高温充电电量', '累计高温充电能量'],data=data_block)
- if mode == 1:
- data_block = np.array([data['info'].get('obdTime',None), data['ffBatteryAccum'].get('CHG_AHaccum',None),
- data['ffBatteryAccum'].get('CHG_PHaccum',None), data['ffBatteryAccum'].get('DSG_AHaccum',None),
- data['ffBatteryAccum'].get('DSG_PHaccum',None),data['ffBatteryAccum'].get('totalMileage',None)]).reshape(1,6)
- df = pd.DataFrame(
- columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程'],data=data_block)
- return df
-
-
-
- @staticmethod
- def _get_data(urls,type_name,mode=0):
- data = DBManager._download_json_data(urls)
- if type_name == 'bms':
- if mode == 0:
- name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]', '外电压', '总输出状态', '上锁状态', '充电状态','加热状态',
- '单体压差', 'SOC[%]','SOH[%]','单体均衡状态', '绝缘电阻']
- elif mode == 1:
- name_const = ['时间戳','GSM信号','故障等级', '故障代码','开关状态', '总电流[A]','总电压[V]','充电状态', '单体压差', 'SOC[%]','SOH[%]', '绝缘电阻']
- i=0
- st = time.time()
-
- # 计算本次最大电芯数量
- CellUNum = 0
- CellTNum = 0
- OtherTNum = 0
-
- cellUNumList = []
- cellTNumList = []
- otherTNumList = []
- for line in data:
- temp = len(line['ffBatteryStatus'].get('cellVoltageList', []))
- cellUNumList.append(temp)
- # if (temp > CellUNum):
- # CellUNum = temp
- temp = len(line['ffBatteryStatus'].get('cellTempList', []))
- cellTNumList.append(temp)
- # if (temp > CellTNum):
- # CellTNum = temp
- temp = len(line['ffBatteryStatus'].get('otherTempList', []))
- otherTNumList.append(temp)
- # if (temp > OtherTNum):
- # OtherTNum = temp
-
- if (len(data)>0):
- result = Counter(cellUNumList)
- CellUNum = max(result, key=result.get)
-
- result = Counter(cellTNumList)
- CellTNum = max(result, key=result.get)
- result = Counter(otherTNumList)
- OtherTNum = max(result, key=result.get)
-
- data_blocks = pd.DataFrame()
- for line in data:
- et = time.time()
- try:
- if i==0:
- data_blocks = DBManager._convert_to_dataframe_bms(line, mode, CellUNum, CellTNum, OtherTNum)
- if (len(data_blocks)>0):
- i+=1
- continue
- except Exception as e:
- # print(e)
- continue
-
- try:
- data_block = DBManager._convert_to_dataframe_bms(line, mode, CellUNum, CellTNum, OtherTNum)
- except Exception as e:
- # print(e)
- continue
-
- try:
- if (len(data_block)>0):
- data_blocks = np.concatenate((data_blocks,data_block),axis=0)
- except Exception as e:
- print(e)
- if 'all the input array dimensions for the concatenation axis must match exactly' in str(e) or \
- 'all the input array dimensions except for the concatenation axis must match exactly' in str(e):
- pass
- else:
- raise e
- i+=1
-
- name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNum)
- name_const.extend(name_var)
- columns_name = name_const
- if i==0:
- data_blocks = []
- df_all = pd.DataFrame(columns=columns_name,data=data_blocks)
-
- if not df_all.empty:
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
- elif type_name =='gps':
- if mode == 0:
- df_all = pd.DataFrame(columns=['时间戳','定位类型', '卫星数','纬度','经度','速度[km/h]','海拔','航向'])
- elif mode == 1:
- df_all = pd.DataFrame(columns=['时间戳','定位类型', '纬度','经度','速度[km/h]','有效位'])
-
- for line in data:
- df_add = DBManager._convert_to_dataframe_gps(line, mode)
- df_all = df_all.append(df_add,ignore_index=True)
- if not df_all.empty:
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
- elif type_name =='system':
- if mode == 0:
- df_all = pd.DataFrame(columns=['时间戳','加热目标温度', '加热超时','租赁开始时间','租赁天数','bms上传周期','gps上传周期'])
- elif mode == 1:
- df_all = pd.DataFrame()
- for line in data:
- df_add = DBManager._convert_to_dataframe_system(line, mode)
- df_all = df_all.append(df_add,ignore_index=True)
- if not df_all.empty:
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
-
- elif type_name =='accum':
- if mode == 0:
- df_all = pd.DataFrame(columns=['时间戳','SOH未标定时间', '累计充电电量','累计充电能量','累计放电电量','累计放电能量',
- '累计高温充电电量', '累计高温充电能量'])
- elif mode == 1:
- df_all = pd.DataFrame(columns=['时间戳','累计充电电量','累计充电能量','累计放电电量','累计放电能量', '累积里程'])
- for line in data:
- df_add = DBManager._convert_to_dataframe_accum(line, mode)
- df_all = df_all.append(df_add,ignore_index=True)
- if not df_all.empty:
- df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
- return df_all
- def get_data(self, url='http://172.16.126.13/store/load?dataType={}&limit=0&sn={}', sn='', start_time='', end_time='',
- data_groups=['bms', 'gps']):
- '''
- 获取指定 sn 和起止日期的bms和gps数据.
- 添加了重试机制。
- --------------输入参数------------
- url:数据获取url, 可采用默认值
- sn: str, 电池sn号
- start_time: str, 开始时间
- end_time: str, 结束时间
- data_groups: 选择需要获取的数据组,可填入多个字符串(默认只获取bms和gps数据)
- bms: bms数据
- gps:gps数据
- system:system数据
- accum:accum数据
-
- --------------输出参数------------
- df_data: {'bms':dataframe, 'gps':dataframe, 'system':dataframe, ;accum':dataframe}
- '''
-
- if len(set(data_groups) - (set(data_groups) and set(['bms', 'gps', 'system', 'accum']))) > 0:
- raise Exception("data_groups 参数错误")
-
-
- # mode: 0:正常取数; 1:7255 取数
- if sn[0:2] == 'UD' or sn[0:2] == 'MG':
- mode = 1
- else:
- mode = 0
- bms_all_data = pd.DataFrame()
- gps_all_data = pd.DataFrame()
- system_all_data = pd.DataFrame()
- accum_all_data = pd.DataFrame()
- maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1
- print("### start to get data {} from {} to {}".format(sn, start_time, end_time))
- # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据
- for j in range(int(maxnum)):
- timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j)
- timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1)
- #滴滴的数据sub=0
- if timefrom.strftime('%Y-%m-%d %H:%M:%S') >= end_time:
- break
- elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time:
- timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
- #print('{}_{}_----getting data----'.format(sn, timefrom))
- bms_data = pd.DataFrame()
- gps_data = pd.DataFrame()
- system_data = pd.DataFrame()
- accum_data = pd.DataFrame()
- while True:
- try:
- print('\r' + "# get data from {} to {}.........".format(str(timefrom), str(timeto)), end=" ")
- for data_group in data_groups:
- if data_group == 'bms':
- file_url = url.format(12, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- bms_data = DBManager._get_data(file_url,'bms',mode)
- if data_group == 'gps':
- file_url = url.format(16, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- gps_data = DBManager._get_data(file_url,'gps',mode)
- if data_group == 'system':
- file_url = url.format(13, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- system_data = DBManager._get_data(file_url,'system',mode)
- if data_group == 'accum':
- file_url = url.format(23, sn) + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
- accum_data = DBManager._get_data(file_url,'accum',mode)
- except Exception as e:
- if 'Connection broken' in str(e):
- continue
- elif 'Number of manager items must equal union of block items' in str(e):
- break
- else:
- raise Exception
- else:
- bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True)
- gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True)
- system_all_data = pd.concat([system_all_data, system_data], ignore_index=True)
- accum_all_data = pd.concat([accum_all_data, accum_data], ignore_index=True)
- break
- bms_all_data = bms_all_data.reset_index(drop=True)
- gps_all_data = gps_all_data.reset_index(drop=True)
- system_all_data = system_all_data.reset_index(drop=True)
- accum_all_data = accum_all_data.reset_index(drop=True)
- print('\nall data-getting done, bms_count is {}, gps_count is {}, system_count is {}, accum_count is {} \n'.format(
- str(len(bms_all_data)), str(len(gps_all_data)), str(len(system_all_data)), str(len(accum_all_data))))
- return {'bms':bms_all_data, 'gps':gps_all_data, 'system':system_all_data, 'accum':accum_all_data}
|