import os import time import pandas as pd from sqlalchemy import create_engine import logging import logging.handlers import traceback import re import CBMSBatSoc import datetime import dateutil.relativedelta from urllib import parse from apscheduler.schedulers.blocking import BlockingScheduler import warnings from multiprocessing import Process def fun(host, port, db, user, password, runenv, logger, period_second): global df_ram try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) except: logger.error(u"数据库连接错误", exc_info=True) logger.error(traceback.format_exc) return # 获取配置信息 df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine) # db_engine.dispose() logger.info("pid is {}".format(os.getpid())) now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-1) # end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S") start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S") logger.info("time range from {} to {} ".format(start_time, end_time)) for i in range(len(df_confs)): try: factory = df_confs.loc[i, 'factory'] sn = df_confs.loc[i, 'device_name'] if '康普盾' in factory: celltype = 1 cell_volt_count = 120 cell_temp_count = 40 elif '华霆' in factory: celltype = 2 cell_volt_count = 204 cell_temp_count = 40 elif '力神' in factory: celltype = 99 cell_volt_count = 72 cell_temp_count = 32 logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn)) sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time) logger.info(sql) df_bms = pd.read_sql(sql, db_engine) if df_bms.empty: continue df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt', 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp', 'CellVolt', 'id'] # 单体数据拆分 cell_temp = df_bms['CellTemp'] cell_volt = df_bms['CellVolt'] cell_temps = [] [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp] cell_volts = [] [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt] del_indexes = [] for i, cell_volt in enumerate(cell_volts): if len(cell_volt) != cell_volt_count: del_indexes.append(i) for i, cell_temp in enumerate(cell_temps): if len(cell_temp) != cell_temp_count: del_indexes.append(i) del_indexes = list(set(del_indexes)) for i, del_index in enumerate(del_indexes): del_index = del_index - i cell_volts.pop(del_index) cell_temps.pop(del_index) df_bms = df_bms.drop(index=del_indexes, axis=1) cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)] celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)] df_bms[cellvolt_name] = cell_volts df_bms[celltemp_name] = cell_temps df_bms = df_bms.drop(columns='CellVolt', axis=0) df_bms = df_bms.drop(columns='CellTemp', axis=0) df_bms = df_bms.reset_index(drop=True) logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes))) # 算法执行 df_soh = pd.read_sql( "select time_st,time_sp,sn,method,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn), db_engine); if celltype>50: df_socdiff = pd.read_sql( "select time,sn,cellsoc_diff from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn), db_engine); else: df_socdiff = pd.DataFrame() df_ram_sn=df_ram[df_ram['sn']==sn] BatSoc= CBMSBatSoc.BatSoc(sn, celltype, df_bms, df_soh, df_ram_sn, df_socdiff) df_res, df_ram_sn=BatSoc.batsoc() if not df_ram_sn.empty: sn_index=df_ram.loc[df_ram['sn']==sn].index df_ram=df_ram.drop(index=sn_index) df_ram=df_ram.append(df_ram_sn) df_ram.reset_index(inplace=True,drop=True) # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! if (runenv == 'dev'): logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn)) df_res = pd.DataFrame({'time': ["2021-01-01 00:00:00"], 'sn': [sn], 'bms_soc': [10],'packsoc': [1], 'socdsp':[100], 'cellsocmin': [99], 'cellsocmax': [100], 'socmin_num':[10], 'socmax_num':[20], 'cellsoc_diff':['12,22'], 'cellsoc':[10,20], 'ocvweight':[1], 'socstep':[2]}) if not df_res.empty: df_res.columns = ['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax', 'socmin_num', 'socmax_num', 'cellsoc_diff', 'cellsoc', 'ocvweight', 'socstep'] df_res['factory'] = factory df_res['add_time'] = datetime.datetime.now() df_res.to_sql("soc_result", con=db_engine, if_exists="append", index=False) logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn)) logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn)) except: logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True) logger.error(traceback.format_exc) db_engine.dispose() logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time)) # 多进程运行 # def mainprocess(host, port, db, user, password, runenv, logger, period_second, process): # # process = int(process) # pool = multiprocessing.Pool(processes = process) # # for i in range(process): # sn_list = SNnums[i] # pool.apply_async(fun, (host, port, db, user, password, runenv, logger, period_second, )) # # pool.close() # pool.join() def heart_beat(host, port, db, user, password, log): while True: try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) while True: now = datetime.datetime.now() db_engine.execute("update status set algo_soc='{}'".format(now)) log.info("soc心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except: log.error(u"soc心跳错误", exc_info=True) log.error(traceback.format_exc) time.sleep(5) finally: db_engine.dispose() # 开启定时任务 if __name__ == '__main__': warnings.filterwarnings("ignore") env_dist = os.environ # 配置信息信息 host = env_dist.get("ALI_HOST", '120.25.223.1') port = env_dist.get("ALI_PORT", '4901') db = env_dist.get("ALI_DB", 'ali') user = env_dist.get("ALI_ROOT", 'root') password = env_dist.get("ALI_PASSWORD", '123456') runenv = env_dist.get("ALI_RUNENV", 'pro') process = env_dist.get("ALI_PROCESS", '10') period_second = int(env_dist.get("ALI_PERIOD_SECOND", '10')) algo_name = 'soc' # 日志配置 log_path = './log' + '/' + algo_name if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger() logger.setLevel(logging.DEBUG) fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$") fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logger.addHandler(fh) fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!") # 算法参数初始化 column_name=['time', 'sn', 'bms_soc', 'soc','cellsoc','standingtime','rampackcrnt','ramcellvolt','kocellvoltmin','kocellvoltmax','ocvweight','as_accum','socstep'] df_ram=pd.DataFrame(columns=column_name) p = Process(target=heart_beat, args=(host, port, db, user, password, logger,)) p.start() scheduler = BlockingScheduler() # heart_beat(host, port, db, user, password, logger) # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True) fun(host, port, db, user, password, runenv, logger, period_second) scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second ,max_instances=1, coalesce=True) try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))