import os import time import pandas as pd from sqlalchemy import create_engine import logging import logging.handlers import traceback import re import CBMSBatDiag import datetime import dateutil.relativedelta from urllib import parse from apscheduler.schedulers.blocking import BlockingScheduler import warnings import pymysql from urllib import parse from multiprocessing import Process def diag_cal(host, port, db, user, password, runenv, logger, period_second): logger.info("pid is {}".format(os.getpid())) try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) conn = pymysql.connect(host=host, port=int(port), user=user, password=password, database=db) cursor = conn.cursor() except: logger.error(u"数据库连接错误", exc_info=True) logger.error(traceback.format_exc) return # 获取历史报警数据 df_diag_ram = pd.read_sql("select time_st,time_sp,sn,faultcode,faultlv,faultinfo,faultadvice from fault_result where time_sp='0000-00-00 00:00:00'",db_engine); # 获取配置信息 df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine) # db_engine.dispose() now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-5) # end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S") start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S") logger.info("time range from {} to {} ".format(start_time, end_time)) for i in range(len(df_confs)): try: factory = df_confs.loc[i, 'factory'] sn = df_confs.loc[i, 'device_name'] if '康普盾' in factory: celltype = 1 cell_volt_count = 120 cell_temp_count = 40 elif '华霆' in factory: celltype = 2 cell_volt_count = 68 cell_temp_count = 40 elif '力神' in factory: celltype = 99 cell_volt_count = 72 cell_temp_count = 32 logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn)) sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time) logger.info(sql) df_bms = pd.read_sql(sql, db_engine) if df_bms.empty: continue df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt', 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp', 'CellVolt', 'id'] # 单体数据拆分 cell_temp = df_bms['CellTemp'] cell_volt = df_bms['CellVolt'] cell_temps = [] [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp] cell_volts = [] [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt] del_indexes = [] for i, cell_volt in enumerate(cell_volts): if len(cell_volt) != cell_volt_count: del_indexes.append(i) for i, cell_temp in enumerate(cell_temps): if len(cell_temp) != cell_temp_count: del_indexes.append(i) for i, del_index in enumerate(del_indexes): del_index = del_index - i cell_volts.pop(del_index) cell_temps.pop(del_index) df_bms = df_bms.drop(index=del_indexes, axis=1) cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)] celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)] df_bms[cellvolt_name] = cell_volts df_bms[celltemp_name] = cell_temps df_bms = df_bms.drop(columns='CellVolt', axis=0) df_bms = df_bms.drop(columns='CellTemp', axis=0) df_bms = df_bms.reset_index(drop=True) logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes))) # 算法执行 df_soh = pd.read_sql( "select time_st,sn,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn), db_engine); df_uniform = pd.read_sql( "select time,sn,cellsoc_diff,cellmin_num,cellmax_num from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn), db_engine); df_soc = pd.read_sql( "select time,sn,packsoc from soc_result where factory ='{}' and sn = '{}' order by add_time desc limit 1".format(factory, sn), db_engine); #电池诊断................................................................................................................................................................ if not df_bms.empty: df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn] df_diag_ram_sn.reset_index(inplace=True,drop=True) batdiag=CBMSBatDiag.BatDiag(sn,celltype,df_bms,df_soh,df_soc,df_uniform,df_diag_ram_sn) df_diag_res, df_health_res=batdiag.diag() #获取电池故障结果和电池评分结果 #电池评分写入数据库 if not df_health_res.empty: #变为历史故障更改数据库 df_health_res['add_time'] = datetime.datetime.now() df_health_res['factory'] = factory df_health_res.to_sql("health_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} health_result 写入成功!!!\n".format(sn), exc_info=True) #历史故障筛选并更改数据库故障结束时间......................................................... if not df_diag_res.empty: df_diag_now=df_diag_res[df_diag_res['time_sp'] == '0000-00-00 00:00:00'] #去除历史故障 df_diag_new = pd.concat([df_diag_res,df_diag_ram_sn,df_diag_ram_sn]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障 df_diag_end=pd.concat([df_diag_res,df_diag_new,df_diag_new]).drop_duplicates(subset=['time_st','faultcode'],keep=False)#此次判断中新增故障 df_diag_end=df_diag_end[df_diag_end['time_sp'] != '0000-00-00 00:00:00'] df_diag_end.reset_index(inplace=True,drop=True) #重置索引 if not df_diag_end.empty: #变为历史故障更改数据库 try: for i in range(0,len(df_diag_end)): sql = '''update fault_result set update_time='{}', time_sp='{}' where sn='{}' and faultcode={} and time_st='{}' '''.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), df_diag_end.loc[i,'time_sp'], sn, df_diag_end.loc[i,'faultcode'], df_diag_end.loc[i,'time_st']) cursor.execute(sql) conn.commit() logger.info(u"{} fault_result更新成功\n".format(sn), exc_info=True) except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) #新增故障筛选并存入数据库..................................................................... if not df_diag_new.empty: df_diag_new['add_time'] = datetime.datetime.now() df_diag_new['factory'] = factory df_diag_new.to_sql("fault_result",con=db_engine, if_exists="append",index=False) logger.info(u"{} fault_result 写入成功!!!\n".format(sn), exc_info=True) #更新diag的Ram数据 df_diag_ram=df_diag_ram.drop(df_diag_ram[df_diag_ram.sn==sn].index) df_diag_ram=df_diag_ram.append(df_diag_now, ignore_index=True) df_diag_ram.reset_index(inplace=True,drop=True) #重置索引 logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn)) except: logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True) logger.error(traceback.format_exc) db_engine.dispose() cursor.close() conn.close() logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time)) def heart_beat(host, port, db, user, password, log): while True: try: db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, parse.quote_plus(password), host, port, db )) while True: now = datetime.datetime.now() db_engine.execute("update status set algo_diag='{}'".format(now)) log.info("diag心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S")) time.sleep(5) except: log.error(u"diag心跳错误", exc_info=True) log.error(traceback.format_exc) time.sleep(5) finally: db_engine.dispose() # 开启定时任务 if __name__ == '__main__': warnings.filterwarnings("ignore") env_dist = os.environ # 配置信息信息 # host = env_dist.get("ALI_HOST", '120.25.223.1') # port = env_dist.get("ALI_PORT", '4901') # db = env_dist.get("ALI_DB", 'ali') # user = env_dist.get("ALI_ROOT", 'root') # password = host = env_dist.get("ALI_HOST", '192.168.31.141') port = env_dist.get("ALI_PORT", '3306') db = env_dist.get("ALI_DB", 'ali') user = env_dist.get("ALI_ROOT", 'root') password = env_dist.get("ALI_PASSWORD", 'Ali@123456') runenv = env_dist.get("ALI_RUNENV", 'pro') period_second = int(env_dist.get("ALI_PERIOD_SECOND", '60')) algo_name = 'diag' # 日志配置 log_path = './log' + '/' + algo_name if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger() logger.setLevel(logging.DEBUG) fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$") fh.setFormatter(formatter) fh.setLevel(logging.DEBUG) logger.addHandler(fh) fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024, backupCount=5, encoding="utf-8", mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!") # 调度 p = Process(target=heart_beat, args=(host, port, db, user, password, logger,)) p.start() scheduler = BlockingScheduler() # heart_beat(host, port, db, user, password, logger) # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True) diag_cal(host, port, db, user, password, runenv, logger, period_second) scheduler.add_job(func=diag_cal, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second ,max_instances=1, coalesce=True) try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))