123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import time
- import json
- import traceback
- import os
- import minio
- import zipfile
- import pandas as pd
- from utils.HbaseService import HbaseService
- from utils.ProUtils import *
- from utils.DataConvert import DataConvert
- def data_insert(logger_main, data_convert, f, df, factory_type, user):
- logger_main.info(f"{f}数据文件读取中......")
- # sn, st, et, tag, tag_child = f.split(".")[0].split('_')
- logger_main.info(f"{f}数据入库中......")
- if factory_type == 1: # 骑享数据无需入库
- logger_main.info("骑享数据无需入库,跳过")
- return
- if factory_type == 2: # 合众数据数据转化
- df = data_convert._convert_hezhong(df, factory_type)
- df['user'] = user
- hbaseService.write_data(df)
- logger_main.info(f"{f}数据入库完成。")
- def main(redis_params):
- global logger_main, hbaseService, minio_params, mysql_algo_params
-
- mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
-
- minioClient = minio.Minio(endpoint=minio_params['endpoint'], access_key=minio_params['access_key'], secret_key=minio_params['secret_key'],
- secure=eval(minio_params['secure']))
- bucket_name = minio_params['bucket_name']
- data_base_dir = "./data"
- if not os.path.exists(data_base_dir):
- os.mkdir(data_base_dir)
-
-
- id = redis_params.get("log_id")
- user = redis_params.get("user")
- file_url = redis_params.get("file_url")
- factory_type = redis_params.get("factory_type")
- file_name = file_url.split("/")[-1]
- file_save_path = os.path.join(data_base_dir, file_url.replace("/","_"))
- file_obj = minioClient.get_object(bucket_name, file_url)
- data_convert = DataConvert()
- logger_main.info(f"从minio读取文件中")
- try:
- if file_url.endswith(".csv"):
- df = pd.read_csv(file_obj)
- data_insert(logger_main, data_convert, file_name, df, factory_type, user)
- elif file_url.endswith(".xlsx") or file_url.endswith(".xls"):
- with open(file_save_path, 'wb') as f:
- for d in file_obj:
- f.write(d)
- df = pd.read_excel(file_save_path)
- data_insert(logger_main, data_convert, file_name, df, factory_type, user)
- os.remove(file_save_path)
- # 处理压缩文件
- elif file_url.endswith(".zip"):
- with open(file_save_path, 'wb') as f:
- for d in file_obj:
- f.write(d)
- with zipfile.ZipFile(file_save_path,'r') as zfile:
- for fn in zfile.namelist():
- with zfile.open(fn) as f:
- if fn.endswith(".csv"):
- df = pd.read_csv(f)
- elif fn.endswith(".xlsx") or fn.endswith(".xls"):
- df = pd.read_excel(f)
- data_insert(logger_main, data_convert, file_name, df, factory_type, user)
- os.remove(file_save_path)
- sql = f'update t_offline_log set status=0 where id = {id}'
- session = mysql_algo_Session()
- session.execute(sql)
- session.commit()
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- sql = f'update t_offline_log set status=2 where id = {id}'
- session = mysql_algo_Session()
- session.execute(sql)
- session.commit()
- finally:
- mysql_algo_engine.dispose()
- session.close()
-
- logger_main.info("本次任务执行完成")
- if __name__ == '__main__':
-
-
-
- app_path = r"/home/wangliming/project/zlwl-algos/ALGOS/NONPERIODIC/dataFactory"
-
- cur_env = 'test'
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- sysUtils = SysUtils(cur_env, app_path)
- hbase_params = sysUtils.get_cf_param('hbase')
- hbaseService = HbaseService(hbase_params)
- redis_params = sysUtils.get_cf_param('redis')
- redisUtils = RedisUtils()
- redis_conn = redisUtils.get_redis_conncect(redis_params)
- minio_params = sysUtils.get_cf_param('minio')
- mysql_algo_params = sysUtils.get_cf_param('mysql')
- mysqlUtils = MysqlUtils()
-
- logger_main = sysUtils.get_logger("dataset", log_base_path)
- logger_main.info("开始等待任务调度")
- while True:
- try:
- redis_ram_data = redis_conn.lpop("DataFactoryDataSetListWaitForParse")
- if not redis_ram_data:
- continue
- logger_main.info("收到任务{}".format(redis_ram_data))
- if redis_ram_data[0] != '"':
- redis_ram_data = json.loads(redis_ram_data)
- else:
- redis_ram_data = json.loads(json.loads(redis_ram_data))
- main(redis_ram_data)
-
- except Exception as e:
- logger_main.error("等待任务期间出错")
- logger_main.error(traceback.format_exc())
- logger_main.error("redis重连")
- redis_conn = redisUtils.get_redis_conncect(redis_params)
- finally:
- time.sleep(5)
|