main.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import time
  2. import json
  3. import traceback
  4. import os
  5. import minio
  6. import zipfile
  7. import pandas as pd
  8. from utils.HbaseService import HbaseService
  9. from utils.ProUtils import *
  10. from utils.DataConvert import DataConvert
  11. def data_insert(logger_main, data_convert, f, df, factory_type, user):
  12. logger_main.info(f"{f}数据文件读取中......")
  13. # sn, st, et, tag, tag_child = f.split(".")[0].split('_')
  14. logger_main.info(f"{f}数据入库中......")
  15. if factory_type == 1: # 骑享数据无需入库
  16. logger_main.info("骑享数据无需入库,跳过")
  17. return
  18. if factory_type == 2: # 合众数据数据转化
  19. df = data_convert._convert_hezhong(df, factory_type)
  20. df['user'] = user
  21. hbaseService.write_data(df)
  22. logger_main.info(f"{f}数据入库完成。")
  23. def main(redis_params):
  24. global logger_main, hbaseService, minio_params, mysql_algo_params
  25. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  26. minioClient = minio.Minio(endpoint=minio_params['endpoint'], access_key=minio_params['access_key'], secret_key=minio_params['secret_key'],
  27. secure=eval(minio_params['secure']))
  28. bucket_name = minio_params['bucket_name']
  29. data_base_dir = "./data"
  30. if not os.path.exists(data_base_dir):
  31. os.mkdir(data_base_dir)
  32. id = redis_params.get("log_id")
  33. user = redis_params.get("user")
  34. file_url = redis_params.get("file_url")
  35. factory_type = redis_params.get("factory_type")
  36. file_name = file_url.split("/")[-1]
  37. file_save_path = os.path.join(data_base_dir, file_url.replace("/","_"))
  38. file_obj = minioClient.get_object(bucket_name, file_url)
  39. data_convert = DataConvert()
  40. logger_main.info(f"从minio读取文件中")
  41. try:
  42. if file_url.endswith(".csv"):
  43. df = pd.read_csv(file_obj)
  44. data_insert(logger_main, data_convert, file_name, df, factory_type, user)
  45. elif file_url.endswith(".xlsx") or file_url.endswith(".xls"):
  46. with open(file_save_path, 'wb') as f:
  47. for d in file_obj:
  48. f.write(d)
  49. df = pd.read_excel(file_save_path)
  50. data_insert(logger_main, data_convert, file_name, df, factory_type, user)
  51. os.remove(file_save_path)
  52. # 处理压缩文件
  53. elif file_url.endswith(".zip"):
  54. with open(file_save_path, 'wb') as f:
  55. for d in file_obj:
  56. f.write(d)
  57. with zipfile.ZipFile(file_save_path,'r') as zfile:
  58. for fn in zfile.namelist():
  59. with zfile.open(fn) as f:
  60. if fn.endswith(".csv"):
  61. df = pd.read_csv(f)
  62. elif fn.endswith(".xlsx") or fn.endswith(".xls"):
  63. df = pd.read_excel(f)
  64. data_insert(logger_main, data_convert, file_name, df, factory_type, user)
  65. os.remove(file_save_path)
  66. sql = f'update t_offline_log set status=0 where id = {id}'
  67. session = mysql_algo_Session()
  68. session.execute(sql)
  69. session.commit()
  70. except Exception as e:
  71. logger_main.error(str(e))
  72. logger_main.error(traceback.format_exc())
  73. sql = f'update t_offline_log set status=2 where id = {id}'
  74. session = mysql_algo_Session()
  75. session.execute(sql)
  76. session.commit()
  77. finally:
  78. mysql_algo_engine.dispose()
  79. session.close()
  80. logger_main.info("本次任务执行完成")
  81. if __name__ == '__main__':
  82. app_path = r"/home/wangliming/project/zlwl-algos/ALGOS/NONPERIODIC/dataFactory"
  83. cur_env = 'test'
  84. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  85. sysUtils = SysUtils(cur_env, app_path)
  86. hbase_params = sysUtils.get_cf_param('hbase')
  87. hbaseService = HbaseService(hbase_params)
  88. redis_params = sysUtils.get_cf_param('redis')
  89. redisUtils = RedisUtils()
  90. redis_conn = redisUtils.get_redis_conncect(redis_params)
  91. minio_params = sysUtils.get_cf_param('minio')
  92. mysql_algo_params = sysUtils.get_cf_param('mysql')
  93. mysqlUtils = MysqlUtils()
  94. logger_main = sysUtils.get_logger("dataset", log_base_path)
  95. logger_main.info("开始等待任务调度")
  96. while True:
  97. try:
  98. redis_ram_data = redis_conn.lpop("DataFactoryDataSetListWaitForParse")
  99. if not redis_ram_data:
  100. continue
  101. logger_main.info("收到任务{}".format(redis_ram_data))
  102. if redis_ram_data[0] != '"':
  103. redis_ram_data = json.loads(redis_ram_data)
  104. else:
  105. redis_ram_data = json.loads(json.loads(redis_ram_data))
  106. main(redis_ram_data)
  107. except Exception as e:
  108. logger_main.error("等待任务期间出错")
  109. logger_main.error(traceback.format_exc())
  110. logger_main.error("redis重连")
  111. redis_conn = redisUtils.get_redis_conncect(redis_params)
  112. finally:
  113. time.sleep(5)