import os import time import pandas as pd from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import logging import logging.handlers import traceback import re import CBMSSafetyAlarm import datetime import dateutil.relativedelta from urllib import parse from apscheduler.schedulers.blocking import BlockingScheduler import warnings import pymysql from multiprocessing import Process def fun(host, port, db, user, password, runenv, logger, period_second): logger.info("pid is {}".format(os.getpid())) global df_bms_ram, df_alarm_ram try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db) cursor = conn.cursor() # 获取配置信息 df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine) # db_engine.dispose() except: logger.error(u"数据库连接错误", exc_info=True) logger.error(traceback.format_exc) return # 获取历史报警数据 df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where faultcode = 119 and time_sp='0000-00-00 00:00:00'",db_engine); now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second) #1min end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S") start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S") logger.info("time range from {} to {} ".format(start_time, end_time)) for i in range(len(df_confs)): try: factory = df_confs.loc[i, 'factory'] sn = df_confs.loc[i, 'device_name'] if '康普盾' in factory: celltype = 1 cell_volt_count = 120 cell_temp_count = 40 elif '华霆' in factory: celltype = 2 cell_volt_count = 68 cell_temp_count = 40 elif '力神' in factory: celltype = 99 cell_volt_count = 72 cell_temp_count = 32 logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn)) sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time) logger.info(sql) df_bms = pd.read_sql(sql, db_engine) if df_bms.empty: continue df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt', 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp', 'CellVolt', 'id'] # 单体数据拆分 cell_temp = df_bms['CellTemp'] cell_volt = df_bms['CellVolt'] cell_temps = [] [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp] cell_volts = [] [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt] del_indexes = [] for i, cell_volt in enumerate(cell_volts): if len(cell_volt) != cell_volt_count: del_indexes.append(i) for i, cell_temp in enumerate(cell_temps): if len(cell_temp) != cell_temp_count: del_indexes.append(i) del_indexes = list(set(del_indexes)) for i, del_index in enumerate(del_indexes): del_index = del_index - i cell_volts.pop(del_index) cell_temps.pop(del_index) df_bms = df_bms.drop(index=del_indexes, axis=1) cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)] celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)] df_bms[cellvolt_name] = cell_volts df_bms[celltemp_name] = cell_temps df_bms = df_bms.drop(columns='CellVolt', axis=0) df_bms = df_bms.drop(columns='CellTemp', axis=0) df_bms = df_bms.reset_index(drop=True) logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes))) # 算法执行 #电池诊断................................................................................................................................................................ df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn] df_bms_ram_sn=df_bms_ram[df_bms_ram['sn']==sn] df_alarm_ram_sn=df_alarm_ram[df_alarm_ram['sn']==sn] if df_diag_ram_sn.empty: SafetyAlarm=CBMSSafetyAlarm.SafetyAlarm(sn, celltype, df_bms, df_diag_ram_sn, df_bms_ram_sn, df_alarm_ram_sn) df_diag_res, df_bms_res, df_ram_res=SafetyAlarm.safety_alarm_diag() #更新bms的ram数据 sn_index=df_bms_ram.loc[df_bms_ram['sn']==sn].index df_bms_ram=df_bms_ram.drop(index=sn_index) df_bms_ram=df_bms_ram.append(df_bms_res) sn_index=df_alarm_ram.loc[df_alarm_ram['sn']==sn].index df_alarm_ram=df_alarm_ram.drop(index=sn_index) df_alarm_ram=df_alarm_ram.append(df_ram_res) #当前热失控故障写入数据库 if not df_diag_res.empty: df_diag_res['add_time'] = datetime.datetime.now() df_diag_res['factory'] = factory df_diag_res.to_sql("fault_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} 写入成功!!!\n".format(sn), exc_info=True) #当前热失控已超过一天变为历史故障并更改数据库 else: fault_time=df_diag_ram_sn.iloc[-1]['time_st'] if (now_time-fault_time).total_seconds()>24 * 3600: df_diag_ram_sn['time_sp']=end_time try: cursor.execute(''' update fault_result set update_time='{}',time_sp='{}' where sn='{}' and time_sp='0000-00-00 00:00:00' and faultcode={} '''.format(datetime.datetime.now(), end_time,sn, 119)) conn.commit() logger.info(u"{} 更新成功\n".format(sn), exc_info=True) except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn)) except: logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True) logger.error(traceback.format_exc) db_engine.dispose() logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time)) def heart_beat(host, port, db, user, password, log): while True: try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) while True: now = datetime.datetime.now() db_engine.execute("update status set algo_safetyalarm='{}'".format(now)) log.info("safetyalarm心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except: log.error(u"safetyalarm心跳错误", exc_info=True) log.error(traceback.format_exc) time.sleep(5) finally: db_engine.dispose() if __name__ == '__main__': warnings.filterwarnings("ignore") env_dist = os.environ # 配置信息信息 host = env_dist.get("ALI_HOST", '120.25.223.1') port = env_dist.get("ALI_PORT", '4901') db = env_dist.get("ALI_DB", 'ali') user = env_dist.get("ALI_ROOT", 'root') password = env_dist.get("ALI_PASSWORD", '123456') runenv = env_dist.get("ALI_RUNENV", 'dev') period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60')) algo_name = 'safetyalarm' # 日志配置 log_path = './log' + '/' + algo_name if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger() logger.setLevel(logging.DEBUG) fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$") fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logger.addHandler(fh) fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) #参数初始化 df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp', 'packsoc']) df_alarm_ram=pd.DataFrame(columns=['sn','time','safetywarning1','safetywarning2']) logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!") # 开启定时任务 p = Process(target=heart_beat, args=(host, port, db, user, password, logger,)) p.start() scheduler = BlockingScheduler() # heart_beat(host, port, db, user, password, logger) # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True) fun(host, port, db, user, password, runenv, logger, period_second) scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second ,max_instances=1, coalesce=True) try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))