__author__ = 'lmstack' #coding=utf-8 import os import datetime import pandas as pd from LIB.BACKEND import DBManager, Log from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import time, datetime import traceback from LIB.MIDDLE.CellStateEstimation.Common import log import CBMSSafetyAlarm from LIB.MIDDLE.CellStateEstimation.Common import DBDownload from urllib import parse import pymysql import pdb from apscheduler.schedulers.blocking import BlockingScheduler import datacompy import logging import multiprocessing #...................................电池包电芯安全诊断函数...................................................................................................................... def diag_cal(df_sn, df_bms_ram, log_name): # 日志 logger = logging.getLogger() fh = logging.FileHandler(log_name + ".log", encoding="utf-8",mode="a") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.setFormatter(formatter) logger.addHandler(fh) logger.setLevel(logging.INFO) logger.info("pid is {}".format(os.getpid())) # 读取结果数据库 host2='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port2=3306 db2='safety_platform' user2='qx_algo_rw' password2='qx@123456' db_res_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user2, parse.quote_plus(password2), host2, port2, db2 )) conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2) cursor = conn.cursor() result=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice from all_fault_info where factory = '{}'".format('骑享'), db_res_engine) result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']] df_diag_ram=result[(result['end_time']=='0000-00-00 00:00:00') & (result['code']==119)] df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp']) start=time.time() now_time=datetime.datetime.now() start_time=now_time-datetime.timedelta(seconds=70) start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') end_time=now_time.strftime('%Y-%m-%d %H:%M:%S') for i in range(0, len(df_sn)): try: sn = df_sn.loc[i, 'sn'] if 'PK500' in sn: celltype=1 #6040三元电芯 elif 'PK502' in sn: celltype=2 #4840三元电芯 elif 'K504B' in sn: celltype=99 #60ah林磷酸铁锂电芯 elif 'MGMLXN750' in sn: celltype=3 #力信50ah三元电芯 elif 'MGMCLN750' or 'UD' in sn: celltype=4 #CATL 50ah三元电芯 else: print('SN:{},未找到对应电池类型!!!'.format(sn)) continue logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn)) #读取原始数据库数据........................................................................................................................................................ dbManager = DBManager.DBManager() df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms']) df_bms = df_data['bms'] # print(df_bms) #电池诊断................................................................................................................................................................ df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn] df_bms_ram_sn=df_bms_ram[df_bms_ram['sn']==sn] if df_diag_ram_sn.empty: if not df_bms.empty: SafetyAlarm=CBMSSafetyAlarm.SafetyAlarm(sn,celltype,df_bms, df_bms_ram_sn) df_diag_res, df_bms_res=SafetyAlarm.diag() #更新bms的ram数据 sn_index=df_bms_ram.loc[df_bms_ram['sn']==sn].index df_bms_ram=df_bms_ram.drop(index=sn_index) df_bms_ram=df_bms_ram.append(df_bms_res) # sn_index=df_diag_ram.loc[df_diag_ram['product_id']==sn].index # df_diag_ram=df_diag_ram.drop(index=sn_index) # df_diag_ram=df_diag_ram.append(df_diag_res) # df_diag_ram.reset_index(inplace=True,drop=True) #重置索引 #当前热失控故障写入数据库 if not df_diag_res.empty: df_diag_res.columns = ['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice'] df_diag_res['add_time'] = datetime.datetime.now() df_diag_res['factory'] = '骑享' df_diag_res.to_sql("all_fault_info",con=db_res_engine, if_exists="append",index=False) #当前热失控已超过1天变为历史故障并写入数据库,并删除原有数据库中的当前故障和ram中的当前故障 else: fault_time=datetime.datetime.strptime(df_diag_ram_sn.iloc[-1]['start_time'], '%Y-%m-%d %H:%M:%S') if (now_time-fault_time).total_seconds()>24*3600: df_diag_ram_sn['end_time']=end_time df_diag_ram_sn['Batpos']=1 try: cursor.execute(''' update all_fault_info set update_time='{}',end_time='{}', Batpos={} where product_id='{}' and end_time='0000-00-00 00:00:00' and code={} and factory='骑享' '''.format(datetime.datetime.now(), end_time, 1,sn, 119)) conn.commit() conn.close(); except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) conn.close(); except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) cursor.close() conn.close() db_res_engine.dispose() logger.info("pid-{} Done!".format(os.getpid())) #...................................................主进程........................................................................................................... def mainprocess(): global df_bms_ram global log_path # 更新sn列表 # host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' # port=3306 # db='qixiang_oss' # user='qixiang_oss' # password='Qixiang2021' # conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) # cursor = conn.cursor() # cursor.execute("select sn, imei, add_time from app_device") # res = cursor.fetchall() # df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time']) # df_sn = df_sn.reset_index(drop=True) # cursor.close() # conn.close(); df_sn=pd.DataFrame(columns=['sn']) df_sn['sn']=['PK504B10100004434', 'MGMCLN750N215N205', 'PK504B10100004447', 'PK504B10100004379', 'PK504B10100004483'] process = 2 pool = multiprocessing.Pool(processes = process) for i in range(process): sn_list = df_sn[int(len(df_sn)*i/process):int(len(df_sn)*(i+1)/process)] sn_list = sn_list.reset_index(drop=True) log_name = log_path + '/log_' + str(i) pool.apply_async(diag_cal, (sn_list,df_bms_ram,log_name)) pool.close() pool.join() if __name__ == "__main__": # 时间设置 # now_time = datetime.datetime.now() # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日 # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00") # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00") history_run_flag = False # 历史数据运行标志 # # 更新sn列表 # host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' # port=3306 # db='qixiang_oss' # user='qixiang_oss' # password='Qixiang2021' # conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) # cursor = conn.cursor() # cursor.execute("select sn, imei, add_time from app_device") # res = cursor.fetchall() # df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time']) # df_sn = df_sn.reset_index(drop=True) # conn.close(); # 数据库配置 host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port = 3306 user = 'qx_cas' password = parse.quote_plus('Qx@123456') database = 'qx_cas' db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, database )) DbSession = sessionmaker(bind=db_engine) # 运行历史数据配置 df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine) # 日志配置 now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_") log_path = 'log/' + now_str if not os.path.exists(log_path): os.makedirs(log_path) log = Log.Mylog(log_name='batsafetyAlarm', log_level = 'info') log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100) log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100) logger = log.get_logger() logger.info("pid is {}".format(os.getpid())) # 算法参数 host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port=3306 db='safety_platform' user='qx_read' password=parse.quote_plus('Qx@123456') tablename='all_fault_info' db_res_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, db )) #ram参数初始化........................................................................................................ df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp']) #定时任务....................................................................................................................................................................... scheduler = BlockingScheduler() scheduler.add_job(mainprocess, 'interval', seconds=60, id='diag_job') try: scheduler.start() except Exception as e: scheduler.shutdown() logger.error(str(e))