DBManager.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. '''
  2. 暂时采用http方式获取历史数据。
  3. 预留:后期若改用通过访问数据库的形式进行数据的获取,则本文件负责数据库的连接,sql指令的执行,数据获取等功能。
  4. '''
  5. __author__ = 'wlm'
  6. import time
  7. import datetime
  8. import os
  9. import urllib.request
  10. import time
  11. import pandas as pd
  12. import numpy as np
  13. import json
  14. import requests
  15. import pdb
  16. # import http.client
  17. # http.client.HTTPConnection._http_vsn = 10
  18. # http.client.HTTPConnection._http_vsn_str = 'HTTP/1.1'
  19. class DBManager():
  20. def __init__(self, host='', port='', auth='', db='', username='', password=''):
  21. pass
  22. def __enter__(self):
  23. self.connect()
  24. return self
  25. def __exit__(self):
  26. self.close()
  27. def connect(self):
  28. conn_success_flag = 0
  29. while not conn_success_flag:
  30. try:
  31. pass # 连接数据库
  32. except Exception as e:
  33. conn_success_flag = 0
  34. time.sleep(5)
  35. else:
  36. conn_success_flag = 1
  37. pass # 连接成功, 获取cursor
  38. def close(self):
  39. try:
  40. pass # 断开数据库
  41. except Exception as e:
  42. print(e)
  43. else:
  44. print('数据库已断开连接')
  45. # 以下各个函数实现 通过http方式获取数据
  46. @staticmethod
  47. def _get_var_name(cellnum,Tempnum,Othernum):
  48. temp = []
  49. for i in range(cellnum):
  50. temp.append('单体电压'+str(i+1))
  51. for i in range(Tempnum):
  52. temp.append('单体温度'+str(i+1))
  53. for i in range(Othernum):
  54. temp.append('其他温度'+str(i+1))
  55. return temp
  56. @staticmethod
  57. def _download_json_data(url):
  58. '''
  59. 返回json数据的生成器,一次一行
  60. '''
  61. i = 0
  62. while i<5:
  63. try:
  64. r = requests.get(url,stream=True, timeout=50)
  65. break
  66. except requests.exceptions.RequestException:
  67. print('Server Error')
  68. time.sleep(5)
  69. i+=1
  70. # print(r.content)
  71. # pdb.set_trace()
  72. for line in r.iter_lines():
  73. if line:
  74. yield json.loads(line)
  75. @staticmethod
  76. def _convert_to_dataframe(data, mode=0):
  77. CellU = []
  78. CellT = []
  79. OtherT = []
  80. CellU_Num = 0
  81. CellT_Num = 0
  82. OtherT_Num = 0
  83. CellU_Num = len(data['ffBatteryStatus']['cellVoltageList'])
  84. CellT_Num = len(data['ffBatteryStatus']['cellTempList'])
  85. try:
  86. OtherT_Num = len(data['ffBatteryStatus']['otherTempList'])
  87. except:
  88. OtherT_Num = 0
  89. for i in range(CellU_Num):
  90. CellU.append(data['ffBatteryStatus']['cellVoltageList'][i]*1000)
  91. for i in range(CellT_Num):
  92. CellU.append(data['ffBatteryStatus']['cellTempList'][i])
  93. for i in range(OtherT_Num):
  94. CellU.append(data['ffBatteryStatus']['otherTempList'][i])
  95. if mode == 0:
  96. data_len = 11
  97. data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi'],data['ffBatteryStatus']['errorLevel'],data['ffBatteryStatus']['errorCode']
  98. ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState'],data['ffBatteryStatus']['heatState']
  99. ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh'],data['ffBatteryStatus']['cellVolBalance']]).reshape(1,data_len)
  100. elif mode == 1:
  101. data_len = 7
  102. data_block = np.array([data['info']['obdTime'],data['ffBatteryStatus']['rssi']
  103. ,data['ffBatteryStatus']['current'],data['ffBatteryStatus']['voltageInner'],data['ffBatteryStatus']['chargeState']
  104. ,data['ffBatteryStatus']['soc'],data['ffBatteryStatus']['soh']]).reshape(1,data_len)
  105. data_block = np.append(data_block,CellU)
  106. data_block = np.append(data_block,CellT)
  107. data_block = np.append(data_block,OtherT)
  108. data_block = data_block.reshape(1,len(data_block))
  109. return data_block,CellU_Num,CellT_Num,OtherT_Num
  110. @staticmethod
  111. def _convert_to_dataframe_Gps(data, mode=0):
  112. if mode == 0:
  113. if data['info']['subType'] == 1:
  114. data_block = np.array([data['info']['obdTime'],data['ffGps']['satellites'],data['ffGps']['latitude'],data['ffGps']['longitude']
  115. ,data['ffGps']['altitude'],data['ffGps']['speed']]).reshape(1,6)
  116. df = pd.DataFrame(
  117. columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]'],data=data_block)
  118. elif data['info']['subType'] == 2:
  119. df = pd.DataFrame(
  120. columns=['时间戳','卫星数','纬度','经度','海拔m','速度[km/h]'])
  121. if mode == 1:
  122. data_block = np.array([data['info']['obdTime'],data['ffGps']['latitude'],data['ffGps']['longitude']
  123. ,data['ffGps']['speed'], data['ffGps']['isValid']]).reshape(1,5)
  124. df = pd.DataFrame(
  125. columns=['时间戳','纬度','经度','速度[km/h]','有效位'],data=data_block)
  126. return df
  127. @staticmethod
  128. def _get_data(urls,type_name,mode=0):
  129. if type_name == 'BMS':
  130. if mode == 0:
  131. name_const = ['时间戳','GSM信号','故障等级','故障代码','总电流[A]','总电压[V]','充电状态','加热','SOC[%]','SOH[%]','单体均衡状态']
  132. elif mode == 1:
  133. name_const = ['时间戳','GSM信号','总电流[A]','总电压[V]','充电状态','SOC[%]','SOH[%]']
  134. i=0
  135. CellUNum = 0
  136. CellTNum = 0
  137. OtherTNumm = 0
  138. st = time.time()
  139. for line in DBManager._download_json_data(urls):
  140. et = time.time()
  141. if i==0:
  142. data_blocks,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode)
  143. i+=1
  144. continue
  145. try:
  146. data_block,CellUNum,CellTNum,OtherTNumm = DBManager._convert_to_dataframe(line, mode)
  147. except:
  148. continue
  149. data_blocks = np.concatenate((data_blocks,data_block),axis=0)
  150. print('\r'+str(i),end=" ")
  151. # print(data_block)
  152. # print(urls)
  153. # print(time.time()-et)
  154. i+=1
  155. name_var = DBManager._get_var_name(CellUNum,CellTNum,OtherTNumm)
  156. name_const.extend(name_var)
  157. columns_name = name_const
  158. if i==0:
  159. data_blocks = []
  160. df_all = pd.DataFrame(columns=columns_name,data=data_blocks)
  161. df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
  162. return df_all
  163. elif type_name =='GPS':
  164. df_all = pd.DataFrame(columns=['时间戳','纬度','经度','速度[km/h]','有效位'])
  165. for line in DBManager._download_json_data(urls):
  166. df_add = DBManager._convert_to_dataframe_Gps(line, mode)
  167. df_all = df_all.append(df_add,ignore_index=True)
  168. df_all.loc[:,'时间戳'] = df_all.loc[:,'时间戳'].apply(lambda x:time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(int(x)/1000)))
  169. return df_all
  170. def get_data(self, bms_url='http://172.16.126.13/store/load?dataType=12&limit=0&sn=', gps_url='http://172.16.126.13/store/load?dataType=16&limit=0&sn=',
  171. sn='', start_time='', end_time='', gps_switch=True, mode=0):
  172. '''
  173. 获取指定 sn 和起止日期的bms和gps数据.
  174. 添加了重试机制。
  175. --------------输入参数------------
  176. bms_url:bms 数据url, 可采用默认值
  177. gps_url: gps 数据url, 可采用默认值
  178. sn: str, 电池sn号
  179. start_time: str, 开始时间
  180. end_time: str, 结束时间
  181. gps_switch: True:获取gps数据; False:不获取gps数据
  182. mode: 0:正常取数; 1:7255 取数
  183. --------------输出参数------------
  184. bms_data: 获取到的bms数据
  185. gps_data: 获取到的gps数据
  186. '''
  187. bms_all_data = pd.DataFrame()
  188. gps_all_data = pd.DataFrame()
  189. maxnum = (datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")).days +1
  190. print("get data {} from {} to {}".format(sn, start_time, end_time))
  191. # 为避免chunkEncodingError错误,数据每天获取一次,然后将每天的数据合并,得到最终的数据
  192. for j in range(maxnum):
  193. timefrom = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j)
  194. timeto = datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S")+ datetime.timedelta(days=j+1)
  195. #滴滴的数据sub=0
  196. if timefrom.strftime('%Y-%m-%d %H:%M:%S') > end_time:
  197. break
  198. elif timeto.strftime('%Y-%m-%d %H:%M:%S') > end_time:
  199. timeto = datetime.datetime.strptime(end_time, '%Y-%m-%d %H:%M:%S')
  200. File_url_bms = bms_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
  201. File_url_gps = gps_url + sn + "&from="+timefrom.strftime('%Y-%m-%d %H:%M:%S')+"&to="+timeto.strftime('%Y-%m-%d %H:%M:%S')
  202. #print('{}_{}_----getting data----'.format(sn, timefrom))
  203. while True:
  204. try:
  205. bms_data = DBManager._get_data(File_url_bms,'BMS',mode)
  206. if gps_switch:
  207. gps_data = DBManager._get_data(File_url_gps,'GPS', mode)
  208. except Exception as e:
  209. if 'Connection broken' in str(e):
  210. continue
  211. else:
  212. raise Exception
  213. else:
  214. bms_all_data = pd.concat([bms_all_data, bms_data], ignore_index=True)
  215. if gps_switch:
  216. gps_all_data = pd.concat([gps_all_data, gps_data], ignore_index=True)
  217. break
  218. if not bms_all_data.empty:
  219. bms_all_data = bms_all_data.reset_index(drop=True)
  220. if not gps_all_data.empty:
  221. gps_all_data = gps_all_data.reset_index(drop=True)
  222. print('all data-getting done, total_count is {} \n'.format(str(len(bms_all_data))))
  223. return bms_all_data, gps_all_data