import time import json import traceback import os import minio import zipfile import pandas as pd from utils.HbaseService import HbaseService from utils.ProUtils import * from utils.DataConvert import DataConvert def data_insert(logger_main, data_convert, f, df, factory_type, user): logger_main.info(f"{f}数据文件读取中......") # sn, st, et, tag, tag_child = f.split(".")[0].split('_') logger_main.info(f"{f}数据入库中......") if factory_type == 1: # 骑享数据无需入库 logger_main.info("骑享数据无需入库,跳过") return if factory_type == 2: # 合众数据数据转化 df = data_convert._convert_hezhong(df, factory_type) df['user'] = user hbaseService.write_data(df) logger_main.info(f"{f}数据入库完成。") def main(redis_params): global logger_main, hbaseService, minio_params, mysql_algo_params mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params) minioClient = minio.Minio(endpoint=minio_params['endpoint'], access_key=minio_params['access_key'], secret_key=minio_params['secret_key'], secure=eval(minio_params['secure'])) bucket_name = minio_params['bucket_name'] data_base_dir = "./data" if not os.path.exists(data_base_dir): os.mkdir(data_base_dir) id = redis_params.get("log_id") user = redis_params.get("user") file_url = redis_params.get("file_url") factory_type = redis_params.get("factory_type") file_name = file_url.split("/")[-1] file_save_path = os.path.join(data_base_dir, file_url.replace("/","_")) file_obj = minioClient.get_object(bucket_name, file_url) data_convert = DataConvert() logger_main.info(f"从minio读取文件中") try: if file_url.endswith(".csv"): df = pd.read_csv(file_obj) data_insert(logger_main, data_convert, file_name, df, factory_type, user) elif file_url.endswith(".xlsx") or file_url.endswith(".xls"): with open(file_save_path, 'wb') as f: for d in file_obj: f.write(d) df = pd.read_excel(file_save_path) data_insert(logger_main, data_convert, file_name, df, factory_type, user) os.remove(file_save_path) # 处理压缩文件 elif file_url.endswith(".zip"): with open(file_save_path, 'wb') as f: for d in file_obj: f.write(d) with zipfile.ZipFile(file_save_path,'r') as zfile: for fn in zfile.namelist(): with zfile.open(fn) as f: if fn.endswith(".csv"): df = pd.read_csv(f) elif fn.endswith(".xlsx") or fn.endswith(".xls"): df = pd.read_excel(f) data_insert(logger_main, data_convert, file_name, df, factory_type, user) os.remove(file_save_path) sql = f'update t_offline_log set status=0 where id = {id}' session = mysql_algo_Session() session.execute(sql) session.commit() except Exception as e: logger_main.error(str(e)) logger_main.error(traceback.format_exc()) sql = f'update t_offline_log set status=2 where id = {id}' session = mysql_algo_Session() session.execute(sql) session.commit() finally: mysql_algo_engine.dispose() session.close() logger_main.info("本次任务执行完成") if __name__ == '__main__': app_path = r"/home/wangliming/project/zlwl-algos/ALGOS/NONPERIODIC/dataFactory" cur_env = 'test' log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径 sysUtils = SysUtils(cur_env, app_path) hbase_params = sysUtils.get_cf_param('hbase') hbaseService = HbaseService(hbase_params) redis_params = sysUtils.get_cf_param('redis') redisUtils = RedisUtils() redis_conn = redisUtils.get_redis_conncect(redis_params) minio_params = sysUtils.get_cf_param('minio') mysql_algo_params = sysUtils.get_cf_param('mysql') mysqlUtils = MysqlUtils() logger_main = sysUtils.get_logger("dataset", log_base_path) logger_main.info("开始等待任务调度") while True: try: redis_ram_data = redis_conn.lpop("DataFactoryDataSetListWaitForParse") if not redis_ram_data: continue logger_main.info("收到任务{}".format(redis_ram_data)) if redis_ram_data[0] != '"': redis_ram_data = json.loads(redis_ram_data) else: redis_ram_data = json.loads(json.loads(redis_ram_data)) main(redis_ram_data) except Exception as e: logger_main.error("等待任务期间出错") logger_main.error(traceback.format_exc()) logger_main.error("redis重连") redis_conn = redisUtils.get_redis_conncect(redis_params) finally: time.sleep(5)