DBManager.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. # 从hbase 获取数据
  2. import logging
  3. import traceback
  4. import phoenixdb
  5. import time
  6. import pandas as pd
  7. import numpy as np
  8. from decimal import Decimal
  9. import gc
  10. # 计算hash值
  11. class GetHashCode:
  12. def convert_n_bytes(self, n, b):
  13. bits = b * 8
  14. return (n + 2 ** (bits - 1)) % 2 ** bits - 2 ** (bits - 1)
  15. def convert_4_bytes(self, n):
  16. return self.convert_n_bytes(n, 4)
  17. @classmethod
  18. def getHashCode(cls, s):
  19. h = 0
  20. n = len(s)
  21. for i, c in enumerate(s):
  22. h = h + ord(c) * 31 ** (n - 1 - i)
  23. return cls().convert_4_bytes(h)
  24. class LindormConnection:
  25. # ods_signal: 清洗后的数据
  26. # stage_signal: 原始数据
  27. def __init__(self, database_url='ld-uf69n0762fjx616l6-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30060',
  28. username='root', password='root',
  29. database='test', table='ods_signal'):
  30. self.database_url = database_url;
  31. self.username = username;
  32. self.password = password;
  33. self.database = database
  34. self.table = table
  35. self.conn = None
  36. return
  37. def __enter__(self):
  38. self._connect();
  39. return self
  40. def __exit__(self, exc_type, exc_value, traceback):
  41. if self.conn and not self.conn._closed:
  42. self.conn.close()
  43. def _connect(self):
  44. try:
  45. connect_kw_args = {'lindorm_user': self.username, 'lindorm_password': self.password,
  46. 'database': self.database}
  47. self.conn = phoenixdb.connect(self.database_url, autocommit=True, **connect_kw_args)
  48. print("lindorm 连接成功")
  49. except AttributeError:
  50. print("Failed to connect")
  51. def get_data(self, vin='', sn='', start_time='', end_time='', sel_columns=[]):
  52. while True:
  53. try:
  54. statement = None
  55. with self.conn.cursor() as statement:
  56. df = self._get_data(vin, sn, start_time, end_time, sel_columns, statement)
  57. return df
  58. except Exception as e:
  59. if statement and not statement._closed:
  60. statement.close()
  61. if self.conn and not self.conn._closed:
  62. self.conn.close()
  63. traceback.print_exc()
  64. print(str(e))
  65. if ("vin 和 sn 只能填写一个" in str(e)):
  66. raise Exception(str(e))
  67. break
  68. self._connect()
  69. time.sleep(1)
  70. def _get_data(self, vin='', sn='', start_time='', end_time='', sel_columns=[], statement=None):
  71. # 连接url
  72. # 创建表
  73. # sql_create_table = "create table if not exists test_python(c1 integer, c2 integer, primary key(c1))"
  74. # print(sql_create_table)
  75. # statement.execute(sql_create_table)
  76. # # 插入一行数据
  77. # sql_upsert = "upsert into test_python(c1, c2) values(1,1)"
  78. # print(sql_upsert)
  79. # statement.execute(sql_upsert)
  80. # # 插入多行数据
  81. # with connection.cursor() as stat:
  82. # sql_upsert = "upsert into test_python(c1, c2) values(?,?)"
  83. # print(sql_upsert)
  84. # stat.executemany(sql_upsert, [(2, 2), (3, 3)])
  85. # # 删除数据
  86. # sql_delete = "delete from test_python where c1=2"
  87. # print(sql_delete)
  88. # statement.execute(sql_delete)
  89. # # 修改数据
  90. # sql_update = "upsert into test_python(c1, c2) values(1,10)"
  91. # print(sql_update)
  92. # statement.execute(sql_update)
  93. # 获取数据
  94. # sql_select = "select * from ods_signal where VIN='LUZAGBDA8LA011827' limit 1"
  95. # print(sql_select)
  96. # statement.execute(sql_select)
  97. # rows = statement.fetchall()
  98. # 获取列名
  99. sql_select = "desc {}".format(self.table)
  100. # print(sql_select)
  101. statement.execute(sql_select)
  102. rows = statement.fetchall()
  103. columns = []
  104. for r in rows:
  105. columns.append(r[2])
  106. # 调整表结构
  107. if len(sel_columns) == 0:
  108. raise Exception("没有指定查询参数")
  109. # 判断所取的列是否都存在
  110. # if len(sel_columns) > 0:
  111. # if len((set(sel_columns)-(set(sel_columns)&set(columns))))>0:
  112. # raise Exception('{}列名不存在,请检查列名'.format(str(set(sel_columns)-(set(sel_columns)&set(columns)))))
  113. # else:
  114. # sel_columns = columns
  115. # 获取vin列表
  116. # sql_select = "select distinct VIN as vin from ods_signal"
  117. # print(sql_select)
  118. # statement.execute(sql_select)
  119. # rows = statement.fetchall()
  120. # vins = []
  121. # for r in rows:
  122. # vins.append(r[0])
  123. # vins = list(set(vins))
  124. # 获取数据
  125. MAX_LONG = 9223372036854775807
  126. # 参数起始时间,结束时间,vin
  127. # 判断基于vin还是sn
  128. if self.table != 'dwd_batt_persona_standing_proc_di':
  129. st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  130. et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  131. if len(vin) > 0:
  132. prefix = str(abs(GetHashCode.getHashCode(vin))) + "_" + vin + "_"
  133. id = 'id'
  134. elif len(sn) > 0:
  135. prefix = str(abs(GetHashCode.getHashCode(sn))) + "_" + sn + "_"
  136. id = 'SnKey'
  137. else:
  138. raise Exception("vin 和 sn 只能填写一个")
  139. start_rows = prefix + str(MAX_LONG - et)
  140. end_rows = prefix + str(MAX_LONG - st)
  141. else:
  142. if len(vin) > 0:
  143. prefix = vin + "_"
  144. id = 'id'
  145. else:
  146. raise Exception("vin 和 sn 只能填写一个")
  147. st = time.strftime('%Y%m%d',time.strptime(start_time, '%Y-%m-%d %H:%M:%S'))
  148. et = time.strftime('%Y%m%d',time.strptime(end_time, '%Y-%m-%d %H:%M:%S'))
  149. st_time = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')))
  150. et_time = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')))
  151. start_rows = prefix + str(st_time) + '_' + st
  152. end_rows = prefix + str(et_time) + '_' + et
  153. # 列名拼接
  154. # s0 = time.time() * 1000
  155. columns_str = ""
  156. for column in sel_columns:
  157. columns_str = columns_str + column + ','
  158. columns_str = columns_str[0:-1]
  159. sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, self.table, id, start_rows,
  160. id, end_rows)
  161. # print(sql_select)
  162. rows = []
  163. page_size = 2000
  164. last_row_length = 2000
  165. page_index = 1
  166. while last_row_length == page_size:
  167. sql = sql_select + ' LIMIT %s OFFSET %s' % (page_size, (page_index - 1) * page_size)
  168. statement.execute(sql)
  169. last_rows = statement.fetchall()
  170. last_row_length = len(last_rows)
  171. rows.extend(last_rows)
  172. page_index += 1
  173. # statement.execute(sql_select)
  174. # rows = statement.fetchall()
  175. # print('sql_select: %s' % (time.time() * 1000 - s0))
  176. df = pd.DataFrame(rows, columns=sel_columns)
  177. #数据类型转换
  178. # for colunms_name in sel_columns:
  179. # if colunms_name in self.infodatatype.keys():
  180. # df[colunms_name]=df[colunms_name].astype(self.infodatatype[colunms_name])
  181. # else:
  182. # pass
  183. # 空直接返回
  184. if df.empty:
  185. return df
  186. stage_signal_tables = ['stage_signal', 'stage_signal_new']
  187. ods_signal_tables = ['ods_signal', 'ods_signal_new', 'ods_signal_base']
  188. if self.table in stage_signal_tables:
  189. df = df.sort_values(['time_value'])
  190. # df['time_value'] = df['time_value'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
  191. elif self.table in ods_signal_tables:
  192. df = df.sort_values(['Time'])
  193. df['Time'] = pd.to_datetime(list(df['Time']), utc=True, unit='ms').tz_convert('Asia/Shanghai')
  194. df['Time'] = df['Time'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
  195. df = df.reset_index(drop=True)
  196. # 处理数据类型
  197. if self.table in ods_signal_tables:
  198. for column in sel_columns:
  199. if type(df.loc[0, column]) == Decimal:
  200. df[column] = df[column].apply(lambda x: x if pd.isnull(x) else np.float64(x))
  201. return df
  202. # # 禁用表,删除表之前需要先禁用表
  203. # sql_offline_table = "offline table test_python"
  204. # print(sql_offline_table)
  205. # statement.execute(sql_offline_table)
  206. # # 删除表
  207. # sql_drop_table = "drop table if exists test_python"
  208. # print(sql_drop_table)
  209. # statement.execute(sql_drop_table)
  210. # 获取用车表单数据
  211. def get_algo_data(self, vin='', start_time='', end_time='', sel_columns=[]):
  212. for i in range(10):
  213. try:
  214. statement = None
  215. with self.conn.cursor() as statement:
  216. df = self._get_algo_data(vin, start_time, end_time, sel_columns, statement)
  217. return df
  218. except Exception as e:
  219. if statement and not statement._closed:
  220. statement.close()
  221. if self.conn and not self.conn._closed:
  222. self.conn.close()
  223. self._connect()
  224. time.sleep(1)
  225. raise Exception(str(e))
  226. return pd.DataFrame(columns=sel_columns)
  227. def _get_algo_data(self, vin='', start_time='', end_time='', sel_columns=[], statement=None):
  228. # 获取列名
  229. sql_select = "desc {}".format(self.table)
  230. # print(sql_select)
  231. statement.execute(sql_select)
  232. rows = statement.fetchall()
  233. columns = []
  234. for r in rows:
  235. columns.append(r[2])
  236. # # 判断所取的列是否都存在
  237. if len(sel_columns) > 0:
  238. if len((set(sel_columns) - (set(sel_columns) & set(columns)))) > 0:
  239. raise Exception('{}列名不存在,请检查列名'.format(str(set(sel_columns) - (set(sel_columns) & set(columns)))))
  240. else:
  241. sel_columns = columns
  242. # 获取数据
  243. MAX_LONG = 9223372036854775807
  244. # 参数起始时间,结束时间,vin
  245. st = int(time.mktime(time.strptime(start_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  246. et = int(time.mktime(time.strptime(end_time, '%Y-%m-%d %H:%M:%S')) * 1000)
  247. # 判断基于vin还是sn
  248. if len(vin) > 0:
  249. prefix = str(abs(GetHashCode.getHashCode(vin)))+"_"+vin+"_"
  250. id = 'id'
  251. else:
  252. raise Exception("vin不能为空")
  253. start_rows = prefix + str(MAX_LONG-et)
  254. end_rows = prefix + str(MAX_LONG-st)
  255. # time_field = "time_st"
  256. # 列名拼接
  257. columns_str = ""
  258. for column in sel_columns:
  259. columns_str = columns_str + column + ','
  260. columns_str = columns_str[0:-1]
  261. sql_select = "select {} from {} where {} > '{}' and {} <= '{}'".format(columns_str, self.table, id,
  262. start_rows, id, end_rows)
  263. # print(sql_select)
  264. statement.execute(sql_select)
  265. rows = statement.fetchall()
  266. df = pd.DataFrame(rows, columns=sel_columns)
  267. # 空直接返回
  268. if df.empty:
  269. return df
  270. # stage_signal_tables = ['stage_signal', 'stage_signal_new']
  271. # ods_signal_tables = ['ods_signal', 'ods_signal_new']
  272. # if self.table in stage_signal_tables:
  273. # df = df.sort_values(['time_value'])
  274. # # df['time_value'] = df['time_value'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
  275. # elif self.table in ods_signal_tables:
  276. # df = df.sort_values(['Time'])
  277. # df['Time'] = pd.to_datetime(list(df['Time']),utc=True, unit='ms').tz_convert('Asia/Shanghai')
  278. # df['Time'] = df['Time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
  279. # df = df.reset_index(drop=True)
  280. # 处理数据类型
  281. # if self.table in ods_signal_tables:
  282. # for column in sel_columns:
  283. # if type(df.loc[0,column]) == Decimal:
  284. # df[column] = df[column].apply(lambda x: x if pd.isnull(x) else np.float64(x))
  285. return df
  286. # # 禁用表,删除表之前需要先禁用表
  287. # sql_offline_table = "offline table test_python"
  288. # print(sql_offline_table)
  289. # statement.execute(sql_offline_table)
  290. # # 删除表
  291. # sql_drop_table = "drop table if exists test_python"
  292. # print(sql_drop_table)
  293. # statement.execute(sql_drop_table)
  294. # 获取离线表单数据
  295. def get_offline_data(self, result_id, column_name):
  296. i = 1
  297. while True:
  298. try:
  299. i = i + 1
  300. statement = None
  301. with self.conn.cursor() as statement:
  302. # 获取列名
  303. sql_select = "desc {}".format(self.table)
  304. statement.execute(sql_select)
  305. rows = statement.fetchall()
  306. sel_columns = column_name.split(',')
  307. sql_select = "select {} from {} where result_id = '{}' order by row_id".format(column_name,
  308. self.table,
  309. result_id)
  310. statement.execute(sql_select)
  311. rows = statement.fetchall()
  312. df = pd.DataFrame(rows, columns=sel_columns)
  313. return df
  314. except Exception as e:
  315. if statement and not statement._closed:
  316. statement.close()
  317. if self.conn and not self.conn._closed:
  318. self.conn.close()
  319. if i > 9:
  320. return pd.DataFrame()
  321. break
  322. self._connect()
  323. time.sleep(1)
  324. # 获得新表结构的列
  325. def _get_select_param(self, sel_columns):
  326. columns_str = ""
  327. has_info = False
  328. # 新表结构
  329. if self.table == 'ods_signal_base':
  330. for c in sel_columns:
  331. if c not in self.signalCol:
  332. has_info = True
  333. continue
  334. columns_str = columns_str + c + ','
  335. if has_info:
  336. columns_str += 'Info,'
  337. else:
  338. for column in sel_columns:
  339. columns_str = columns_str + column + ','
  340. return columns_str[0:-1], has_info
  341. # 结构转换
  342. def _transform_result(self, sel_columns, rows):
  343. new_rows = []
  344. for r in rows:
  345. info_dict = self._info_to_dict(r[len(r) - 1])
  346. new_row = []
  347. col_last = 0
  348. for i in range(len(sel_columns)):
  349. if sel_columns[i] not in self.signalCol:
  350. new_row.append(info_dict[sel_columns[i]])
  351. else:
  352. new_row.append(r[col_last])
  353. col_last += 1
  354. new_rows.append(new_row)
  355. return new_rows
  356. # info转换为
  357. def _info_to_dict(self, info):
  358. info_array = info.split('@@')
  359. info_dict = {}
  360. if len(info_array) < len(self.infoRelation):
  361. raise Exception("数据异常: %s - %s ->%s" %(len(info_array), len(self.infoRelation), info))
  362. for k, v in self.infoRelation.items():
  363. info_dict[k] = info_array[v]
  364. return info_dict
  365. # if __name__ == '__main__':
  366. #
  367. # sel_columns = ["VIN", "SN", "Time", "VehState", "CellMaxVolNum", "CellMinVolNum", "CellMinVolNum", "CellMaxTempNum"]
  368. #
  369. # s = time.time() * 1000
  370. # with LindormConnection(table='ods_signal_new') as dbm:
  371. # df_data = dbm.get_data(sn='0CZPE007DN0111B700000067', start_time='2022-07-13 15:00:00',
  372. # end_time='2022-07-21 03:00:00', sel_columns=sel_columns)
  373. # print(len(df_data))
  374. # print(time.time() * 1000 - s)
  375. #
  376. # s = time.time() * 1000
  377. # with LindormConnection(table='ods_signal_base') as dbm:
  378. # df_data = dbm.get_data(sn='0CZPE007DN0111B700000067', start_time='2022-07-13 15:00:00',
  379. # end_time='2022-07-21 03:00:00', sel_columns=sel_columns)
  380. # print(len(df_data))
  381. # print(time.time() * 1000 - s)
  382. #