import os import time import pandas as pd from sqlalchemy import create_engine import logging.handlers import traceback import re import CBMSBatSoh import datetime import dateutil.relativedelta from urllib import parse from apscheduler.schedulers.blocking import BlockingScheduler import warnings from multiprocessing import Process def fun(host, port, db, user, password, runenv, logger, period_second): logger.info("pid is {}".format(os.getpid())) try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) except: logger.error(u"数据库连接错误", exc_info=True) logger.error(traceback.format_exc) return # 获取配置信息 df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine) # db_engine.dispose() now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-6*3600) #两周前 end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S") start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S") logger.info("time range from {} to {} ".format(start_time, end_time)) for i in range(len(df_confs)): try: factory = df_confs.loc[i, 'factory'] sn = df_confs.loc[i, 'device_name'] if '康普盾' in factory: celltype = 1 cell_volt_count = 120 cell_temp_count = 40 elif '华霆' in factory: celltype = 2 cell_volt_count = 204 cell_temp_count = 40 elif '力神' in factory: celltype = 99 cell_volt_count = 72 cell_temp_count = 32 logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn)) sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time) logger.info(sql) df_bms = pd.read_sql(sql, db_engine) if df_bms.empty: continue df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt', 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp', 'CellVolt', 'id'] # 单体数据拆分 cell_temp = df_bms['CellTemp'] cell_volt = df_bms['CellVolt'] cell_temps = [] [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp] cell_volts = [] [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt] del_indexes = [] for i, cell_volt in enumerate(cell_volts): if len(cell_volt) != cell_volt_count: del_indexes.append(i) for i, cell_temp in enumerate(cell_temps): if len(cell_temp) != cell_temp_count: del_indexes.append(i) del_indexes = list(set(del_indexes)) for i, del_index in enumerate(del_indexes): del_index = del_index - i cell_volts.pop(del_index) cell_temps.pop(del_index) df_bms = df_bms.drop(index=del_indexes, axis=1) cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)] celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)] df_bms[cellvolt_name] = cell_volts df_bms[celltemp_name] = cell_temps df_bms = df_bms.drop(columns='CellVolt', axis=0) df_bms = df_bms.drop(columns='CellTemp', axis=0) df_bms = df_bms.reset_index(drop=True) logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes))) # 算法执行 df_soh = pd.read_sql( "select time_st,time_sp,sn,method,soh,cellsoh,packsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn), db_engine); BatSoh = CBMSBatSoh.BatSoh(sn, celltype, df_bms, df_soh) df_res = BatSoh.batsoh() # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if (runenv == 'dev'): logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn)) df_res = pd.DataFrame({'time_st': ["2021-01-01 00:00:00"], 'time_sp': ["2021-01-02 00:00:00"], 'sn': [sn], 'method': [1], 'bmssoh': [99], 'packsoh': [100], 'soh': [99],"cellsohmin":[10],"cellsohmax":[20],"sohmin_num":[30], 'sohmax_num':[10], 'cellsoh_diff': [1], 'cellsoh': ["98,97"]}) if not df_res.empty: df_res.columns = ['time_st', 'time_sp', 'sn', 'method', 'bmssoh', 'packsoh', 'soh','cellsohmin', 'cellsohmax', 'sohmin_num', 'sohmax_num', 'cellsoh_diff', 'cellsoh'] df_res['factory'] = factory df_res['add_time'] = datetime.datetime.now() df_res.to_sql("soh_result", con=db_engine, if_exists="append", index=False) logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn)) logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn)) except: logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True) logger.error(traceback.format_exc) db_engine.dispose() logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time)) def heart_beat(host, port, db, user, password, log): while True: try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) while True: now = datetime.datetime.now() db_engine.execute("update status set algo_soh='{}'".format(now)) log.info("soh心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except: log.error(u"soh心跳错误", exc_info=True) log.error(traceback.format_exc) time.sleep(5) finally: db_engine.dispose() if __name__ == '__main__': warnings.filterwarnings("ignore") env_dist = os.environ # 配置信息信息 # host = env_dist.get("ALI_HOST", '120.25.223.1') # port = env_dist.get("ALI_PORT", '4901') # db = env_dist.get("ALI_DB", 'ali') # user = env_dist.get("ALI_ROOT", 'root') # password = parse.quote_plus(env_dist.get("ALI_PASSWORD", '123456')) host = env_dist.get("ALI_HOST", '192.168.31.141') port = env_dist.get("ALI_PORT", '3306') db = env_dist.get("ALI_DB", 'ali') user = env_dist.get("ALI_ROOT", 'root') password = env_dist.get("ALI_PASSWORD", 'Ali@123456') runenv = env_dist.get("ALI_RUNENV", 'dev') period_second = int(env_dist.get("ALI_PERIOD_SECOND", '604800')) algo_name = 'soh' # 日志配置 log_path = './log' + '/' + algo_name if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger() logger.setLevel(logging.DEBUG) fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$") fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logger.addHandler(fh) fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!") # 开启定时任务 p = Process(target=heart_beat, args=(host, port, db, user, password, logger,)) p.start() scheduler = BlockingScheduler() # heart_beat(host, port, db, user, password, logger) # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True) fun(host, port, db, user, password, runenv, logger, period_second) scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second ,max_instances=1, coalesce=True) try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))