zhuxi 2 years ago
parent
commit
d1602d61d8

+ 279 - 0
LIB/MIDDLE/ThermoRunaway/V1_0_2/Trunaway.py

@@ -0,0 +1,279 @@
+from sklearn.preprocessing import StandardScaler 
+import keras
+import os
+import pandas as pd
+import numpy as np
+from LIB.BACKEND import DataPreProcess
+import datetime
+
+#数据预处理
+
+#删除采样异常点
+def delete(data_bms):
+    listV=[s for s in list(data_bms) if '单体电压' in s]
+    listT=[s for s in list(data_bms) if '单体温度' in s]
+    listT2=[s for s in list(data_bms) if '其他温度' in s]
+    #data_bms2=data_bms.copy()
+    for i in range(1,len(listV)+1):
+        data_bms=data_bms[(data_bms['单体电压'+str(i)]>1000) & (data_bms['单体电压'+str(i)]<6000)]
+    for i in range(1,len(listT)+1):
+        data_bms=data_bms[(data_bms['单体温度'+str(i)]>-20) & (data_bms['单体温度'+str(i)]<100)]
+    #for i in range(1,len(listT2)+1):
+        #data_bms=data_bms[(data_bms['其他温度'+str(1)]>-20) & (data_bms['其他温度'+str(1)]<100)]
+    #data_outliers=data_bms2.iloc[list(set(list(data_bms2.index)).difference(set(list(data_bms.index))))]
+    data_bms=data_bms.reset_index(drop=True)
+    return data_bms
+ 
+#构建时间序列&选取静置状态
+def data_groups(data_bms,sn,start_time,end_time):
+    data_bms=data_bms.drop(['GSM信号','外电压','开关状态','故障等级','故障代码','绝缘电阻','上锁状态','加热状态','单体均衡状态','总输出状态'],axis=1,errors='ignore')
+    data_set=pd.DataFrame()
+    start_time=start_time[:17]+'00'
+    end_time=end_time[:17]+'00'
+    data_set['时间戳'] = pd.date_range(start=start_time, end=end_time, freq='T')  #每分钟一条记录
+    #给数据重建新特征:充放电状态,序列
+    if len(data_bms['总电流[A]']==0)>0:
+        if sn[:4] in ['MGMC','UD02']:
+            #data_bms=rest_stscs_v1.cell_statistic.rest_sta(data_bms)
+            data_bms=DataPreProcess.DataPreProcess.data_split_by_status_forMGMCUD02(DataPreProcess, data_bms, drive_interval_threshold=120, charge_interval_threshold=300,drive_stand_threshold=120, charge_stand_threshold=300)
+        else:
+            data_bms=DataPreProcess.DataPreProcess.data_split_by_status(DataPreProcess, data_bms, drive_interval_threshold=120, charge_interval_threshold=300,drive_stand_threshold=120, charge_stand_threshold=300)
+    else:
+        data_bms['data_split_by_status']=1
+        data_bms['data_status']='work'     
+    #构建等差时间序列
+    data_bms['时间戳']=pd.to_datetime(data_bms['时间戳'])
+    for i in range(len(data_bms)):
+        data_bms.loc[i,'时间戳'] = data_bms.loc[i,'时间戳'].replace(second=0) 
+    data_bms.drop_duplicates(subset='时间戳',keep='last',inplace=False)
+    data_bms2=pd.merge(data_set,data_bms,on='时间戳',how='left')
+    data_bms2=data_bms2.fillna(method='ffill')
+    data_bms2=data_bms2.fillna(method='bfill')   
+    data_bms2.drop_duplicates(subset='时间戳',keep='last',inplace=True)
+    data_bms2=data_bms2.reset_index()
+    #删除无用特征
+    data_bms2=data_bms2.drop(['Unnamed: 0','level_0','index','Unnamed: 0.1','充电状态','data_split_by_crnt'],axis=1,errors='ignore')
+    #按状态分表
+    data_stand=data_bms2[data_bms2['data_status']=='stand']
+    return data_stand
+
+#标记时段
+def split(data0):
+    data0=data0.reset_index(drop=True)
+    data0=data0.drop(['Unnamed: 0','Unnamed: 0.1'],axis=1,errors='ignore')
+    data0['n_split']=np.nan
+    data1=data0.copy()
+    data1.drop_duplicates(subset=['data_split_by_status'],keep='first',inplace=True)
+    data1['n_split']=range(1,len(data1)+1)
+    data0.loc[data1.index,'n_split']=list(data1['n_split'])
+    data0['n_split']=list(data0['n_split'].fillna(method='ffill'))
+    time=list(map(lambda x: str(x),list(data0['时间戳'])))
+    data0['时间戳']=time
+    return data0
+
+
+
+####################################################################################################################
+
+#每10min一条记录:平均
+def create_dataset(data_set):   #X为dataframe,y为serie
+    data_set=data_set.drop(['总电流[A]','SOH[%]','data_status','data_split_by_status'],axis=1,errors='ignore')
+    time=list(map(lambda x: x[:15]+'0'+x[16:],list(data_set['时间戳'])))
+    data_set['时间戳']=time
+    List_n_split=sorted(list(set(data_set['n_split'])))
+    data_set2=pd.DataFrame()
+    for k in List_n_split:
+        dataset=data_set[data_set['n_split']==k]
+        if len(dataset)>10:
+            dataset=dataset.reset_index(drop=True)
+            sn=list(dataset['sn'].values)[0]
+            dataset=dataset.drop(['sn','n_split'],axis=1)
+            dataset2=dataset.groupby(dataset['时间戳']).mean()
+            dataset2=dataset2.reset_index()
+            dataset2['sn']=sn
+            dataset2['n_split']=k
+            data_set2=data_set2.append(dataset2)
+    return data_set2
+
+# 计算各单体电压下降量
+def cal_dataset(df_stand):   #X为dataframe,y为serie
+    List_n_split=sorted(list(set(df_stand['n_split'])))
+    listV=[s for s in list(df_stand) if '单体电压' in s]
+    listT=[s for s in list(df_stand) if '温度' in s]
+    newdataset=pd.DataFrame()
+    for k in List_n_split:
+        dataset=df_stand[df_stand['n_split']==k]
+        dataset=dataset.reset_index(drop=True)
+        dataset2=dataset[listV]
+        dataset3=dataset2.diff()   #periods=1, axis=0  
+        dataset3['最大电压下降']=dataset3[listV].min(axis=1)
+        dataset3['平均电压下降']=dataset3[listV].mean(axis=1)
+        dataset3['电压下降低偏']=dataset3[listV].mean(axis=1)-dataset3[listV].min(axis=1)
+        dataset3=dataset3.drop(listV+['平均电压下降'],axis=1)
+        dataset4=dataset.drop(listT+listV+['总电压[V]'],axis=1)
+        dataset5=pd.merge(dataset4,dataset3,left_index=True,right_index=True)
+        dataset5=dataset5.dropna(axis=0) 
+        newdataset=newdataset.append(dataset5) 
+    return newdataset 
+
+#每1hour一条记录:总和
+def timeserie(data_set):   #X为dataframe,y为serie
+    List_n_split=sorted(list(set(data_set['n_split'])))
+    time=list(map(lambda x: x[:14]+'00'+x[16:],list(data_set['时间戳'])))
+    data_set['时间戳']=time
+    data_set2=pd.DataFrame()
+    for k in List_n_split:
+        dataset=data_set[data_set['n_split']==k]
+        if len(dataset)>10:
+            dataset=dataset.reset_index(drop=True)
+            sn=list(dataset['sn'].values)[0]
+            soc=list(dataset['SOC[%]'].values)[0]
+            dataset=dataset.drop(['sn','n_split'],axis=1)
+            dataset2=dataset.groupby(dataset['时间戳']).sum()
+            dataset2=dataset2.reset_index()
+            dataset2['sn']=sn
+            dataset2['n_split']=k
+            dataset2['SOC[%]']=soc
+            data_set2=data_set2.append(dataset2)
+    return data_set2
+
+def makescaler_test(scaler,data_test):
+    data_test=data_test.reset_index(drop=True)
+    data_test_pro=data_test.drop(['n_split','时间戳','sn','SOC[%]'],axis=1)
+    test_sc=scaler.transform(np.array(data_test_pro))
+    test_sc=pd.DataFrame(test_sc)
+    test_sc['n_split']=data_test['n_split'].values
+    return test_sc
+
+#滑窗
+def create_win(data_set,data_train,time_steps=5):   #X为dataframe,y为serie
+    a,b=[],[] 
+    index=pd.DataFrame()
+    List_n_split=sorted(list(set(data_set['n_split'])))
+    for k in List_n_split:
+        dataset=data_set[data_set['n_split']==k]
+        datatrain=data_train[data_train['n_split']==k]
+        if len(dataset)>time_steps:
+            dataset2=dataset.reset_index(drop=True)
+            dataset=dataset.drop(['n_split'],axis=1)
+            dataX, dataY = [], []
+            index_step=[]
+            for i in range(len(dataset)-time_steps):      
+                v1 = dataset.iloc[i:(i+time_steps)].values
+                v2 = dataset.iloc[i+time_steps]
+                dataX.append(v1)
+                dataY.append(v2)
+                index_step.append(i)
+            dataset3=dataset2.iloc[:len(dataset2)-time_steps]
+            newdatatrain=datatrain[:len(dataset3)]
+            newdatatrain2=newdatatrain.copy()
+            newdatatrain2['window_step']=index_step
+            dataX2=np.array(dataX)
+            dataY2=np.array(dataY)
+            a.append(dataX2)             
+            b.append(dataY2)
+            index=index.append(newdatatrain2)
+    aa=np.vstack(a)
+    bb=np.vstack(b) 
+    return aa,bb,index
+
+def pred(Test,model):
+    test_pred = model.predict(Test)
+    test_loss = np.mean(np.abs(test_pred - Test), axis=1)
+    return test_loss
+
+def ref(test_loss,new_test):
+    test_loss_sum=test_loss.sum(axis=1)
+    test_loss_max=test_loss.max(axis=1)
+    ref_test=new_test.reset_index(drop=True)
+    ref_test['test_loss_sum']=test_loss_sum
+    ref_test['test_loss_max']=test_loss_max
+    ref_test['test_loss压差']=test_loss[:,0]
+    ref_test['test_loss降幅']=test_loss[:,1]
+    ref_test['test_loss降差']=test_loss[:,2]
+    return ref_test
+
+def difftime(delta):
+    seconds = delta.total_seconds() 
+    minutes = seconds/60 
+    return minutes
+
+def diffmin(res):
+    start=list(res['start_time'])
+    end=list(res['end_time'])
+    start=list(map(lambda x: datetime.datetime.strptime(str(x),'%Y-%m-%d %H:%M:%S'),start))
+    end=list(map(lambda x: datetime.datetime.strptime(str(x),'%Y-%m-%d %H:%M:%S'),end))
+    diff=np.array(end)-np.array(start)
+    diff_min=list(map(lambda x: difftime(x),diff))
+    return diff_min
+
+def res_output(TestOrg,scaler,model,group,end_time):
+    df_res=pd.DataFrame(columns=['product_id', 'start_time', 'end_time', 'diff_min','soc','loss_sum','loss_max','diffV','downV','diffdownV','window_step'])
+    test2=create_dataset(TestOrg)
+    test3=cal_dataset(test2)
+    newtest=timeserie(test3)
+    if len(newtest)>0:
+        test_sc=makescaler_test(scaler,newtest)
+        Test,y_test,win_test=create_win(test_sc,newtest,time_steps=3)
+        test_loss=pred(Test,model)
+        ref_test=ref(test_loss,win_test)
+        ref_test['test_loss_diff']=list(map(lambda x: x[0]-x[1], zip(list(ref_test['test_loss_sum']), list(ref_test['test_loss_max']))))
+
+        if group=='MGMCL':
+            res=ref_test[(ref_test['test_loss_max']>0.04) & (ref_test['SOC[%]']>15) & (ref_test['test_loss_sum']>0.06) & (ref_test['window_step']>0) &  (ref_test['最大电压下降']<-3)]
+        elif group=='PK504':
+            res=ref_test[(ref_test['test_loss_diff']>0.03) & (ref_test['test_loss_max']>0.03) & (ref_test['SOC[%]']>15) & (ref_test['window_step']>0) &  (ref_test['最大电压下降']<-3) &((ref_test['test_loss_sum']>3) | (ref_test['SOC[%]']<90))]
+        else:
+            res=ref_test[(ref_test['test_loss_diff']>0.6) & (ref_test['test_loss_max']>0.6) & (ref_test['SOC[%]']>15) & (ref_test['window_step']>0) &  (ref_test['电压下降低偏']>3.5) &((ref_test['test_loss_sum']>3) | (ref_test['SOC[%]']<90))]
+        res=res.reset_index()
+        for k in range(len(res)):
+            if res.loc[k,'最大电压下降']<-130:
+                sn=res.loc[k,'sn']
+                win=res.loc[k,'window_step']
+                index = res[(res["sn"]== sn)&(res["window_step"]== win)].index.tolist()[0]
+                res=res.drop([index-2,index-1,index])
+        
+        maxsum=list(res['test_loss_sum'].groupby(res['n_split']).max())
+        maxmax=list(res['test_loss_max'].groupby(res['n_split']).max())
+        res_start=res.drop_duplicates(subset=['n_split'],keep='first',inplace=False)
+        res_end=res.drop_duplicates(subset=['n_split'],keep='last',inplace=False)
+        start=list(map(lambda x:str(x),list(res_start['时间戳'].values)))
+        end=list(map(lambda x:str(x),list(res_end['时间戳'].values)))
+        product_id=list(res_start['sn'].values)
+        df_res['product_id']=product_id
+        df_res['start_time']=start
+        df_res['end_time']=end
+        df_res['loss_sum']=list(map(lambda x:round(x,3),maxsum))
+        df_res['loss_max']=list(map(lambda x:round(x,3),maxmax))
+        soc=list(res_start['SOC[%]'].values)
+        df_res['SOC']=soc
+        df_res['diffV']=list(res_start['单体压差'].values)
+        df_res['downV']=list(res_start['最大电压下降'].values)
+        df_res['diffdownV']=list(res_start['电压下降低偏'].values)
+        #df_res['window_step']=list(res_start['window_step'].values)
+        diff_min=diffmin(df_res)
+        df_res['diff_min']=diff_min
+        df_res.reset_index(drop=True,inplace=True)
+        end=datetime.datetime.strptime(str(df_res.loc[len(df_res)-1,'end_time']),'%Y-%m-%d %H:%M:%S')
+        end_time=datetime.datetime.strptime(str(end_time),'%Y-%m-%d %H:%M:%S')
+        diff=(end_time-end).total_seconds()
+        if diff<600:
+            df_res.loc[len(df_res)-1,'end_time']='0000-00-00 00:00:00'
+    return df_res,diff
+
+##################################################################################################################
+
+
+def arrange(result,result_final,start_time,diff):
+    result=result.reset_index(drop=True)
+    start=datetime.datetime.strptime(str(result.loc[0,'start_time']),'%Y-%m-%d %H:%M:%S')
+    start_time=datetime.datetime.strptime(str(start_time),'%Y-%m-%d %H:%M:%S')
+    diff_time=(start-start_time).total_seconds()
+    if diff_time<600:
+        result_final['end_time']=result.loc[0,'end_time']
+        diff_min_org=result_final['diff_min']
+        diff_min_new=result.loc[0,'diff_min']
+        result_final['diff_min']=diff_min_org+(diff_time+diff)/60+diff_min_new
+        result=result.drop(0)
+    return result,result_final
+

+ 167 - 0
LIB/MIDDLE/ThermoRunaway/V1_0_2/main_pred.py

@@ -0,0 +1,167 @@
+
+from LIB.MIDDLE.ThermoRunaway.V1_0_2.Trunaway import *
+import pymysql
+import datetime
+import pandas as pd
+from LIB.BACKEND import DBManager
+dbManager = DBManager.DBManager()
+from sqlalchemy import create_engine
+from urllib import parse
+import datetime, time
+from apscheduler.schedulers.blocking import BlockingScheduler
+import traceback
+import pickle
+from keras.models import load_model
+import logging
+import logging.handlers
+import os
+import re
+
+#...................................故障检测函数......................................................................................................................
+def diag_cal():
+    global SNnums
+
+    start=time.time()
+    now_time=datetime.datetime.now()
+    start_time=now_time-datetime.timedelta(hours=6)
+    start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
+    end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
+
+    #数据库配置
+    host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+    port=3306
+    db='safety_platform'
+    user='qx_read'
+    password='Qx@123456'
+
+    #读取结果库数据......................................................
+    param='product_id,start_time,end_time,diff_min,SOC,loss_sum,loss_max,diffV,downV,diffdownV'
+    tablename='thermo_runaway'
+    mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
+    cursor = mysql.cursor()
+    sql =  "select {} from {} where end_time='0000-00-00 00:00:00'".format(param,tablename)
+    cursor.execute(sql)
+    res = cursor.fetchall()
+    df_diag_ram= pd.DataFrame(res,columns=param.split(','))
+    
+
+    db_res_engine = create_engine(
+        "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+            user, parse.quote_plus(password), host, port, db
+        ))
+    
+
+    
+
+    #调用主函数................................................................................................................................................................
+    for sn in SNnums:
+        try:
+            group=sn[:5]
+            df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
+            data_bms = df_data['bms']
+            data_bms['sn']=sn
+            if len(data_bms)>0:
+                logger.info("SN: {} 数据开始预处理".format(sn))
+                data_bms=delete(data_bms)
+                data_stand=data_groups(data_bms,sn,start_time,end_time)
+                df_stand=split(data_stand)   
+                res=pd.DataFrame()
+                if len(df_stand)>0:
+                    #读取训练产出的缩放指标:均值&方差
+                    logger.info("SN: {} 数据开始模型预测".format(sn))
+                    scaler = scaler_dict[group]
+                    #读取训练产出的模型状态空间:电压模型&温度模型
+                    model = model_dict[group]
+                    res,diff=res_output(df_stand,scaler,model,group,end_time)
+
+                    df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn]
+                    if not df_diag_ram_sn.empty:   #该sn相关结果非空
+                        new_res,update_res=arrange(res,df_diag_ram_sn,start_time,diff)
+                        if len(update_res)>0:
+                            cursor.execute("DELETE FROM thermo_runaway WHERE end_time = '0000-00-00 00:00:00' and product_id='{}'".format(sn))
+                            mysql.commit()
+                            update_res.to_sql("thermo_runaway",con=db_res_engine, if_exists="append",index=False)
+                        #新增结果存入结果库................................................................
+                        if len(new_res)>0:
+                            new_res.to_sql("thermo_runaway",con=db_res_engine, if_exists="append",index=False)
+                    else:
+                        res.to_sql("thermo_runaway",con=db_res_engine, if_exists="append",index=False)
+
+            # end=time.time()
+            # print(end-start)  
+                
+        except Exception as e:
+            logger.error(str(e))
+            logger.error(traceback.format_exc())
+
+    cursor.close()
+    mysql.close()
+
+#...............................................主函数起定时作用.......................................................................................................................
+if __name__ == "__main__":
+    
+    # 日志
+    log_path = 'log/'
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger("main")
+    logger.setLevel(logging.DEBUG)
+    
+     # 根据日期滚动(每天产生1个文件)
+    fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_info.log'.format(log_path), when="D", interval=1, backupCount=30,
+                                                    encoding="utf-8")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M-%S"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.INFO)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_error.log'.format(log_path), when="D", interval=1, backupCount=30,
+                                                    encoding="utf-8")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M-%S"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    logger.info("pid is {}".format(os.getpid()))
+    
+     # # 更新sn列表
+    host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
+    port=3306
+    db='qixiang_oss'
+    user='qixiang_oss'
+    password='Qixiang2021'
+    conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
+    cursor = conn.cursor()
+    cursor.execute("select sn, imei, add_time from app_device where status in (1,2,3)")
+    res = cursor.fetchall()
+    df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
+    df_sn = df_sn.reset_index(drop=True)
+    conn.close();
+    
+    SNnums = list(df_sn['sn'])
+    
+    scaler_list=[]
+    model_list=[]
+    for group in ['PK504','MGMCL','PK500']:
+        scaler=pickle.load(open('LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/scaler_'+group+'_05.pkl', 'rb'))
+        model=load_model('LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/model_'+group+'_05.h5')
+        scaler_list.append(scaler)
+        model_list.append(model)
+    scaler_dict={'PK504':scaler_list[0],'MGMCL':scaler_list[1],'PK500':scaler_list[2]}
+    model_dict={'PK504':model_list[0],'MGMCL':model_list[1],'PK500':model_list[2]}
+
+    diag_cal()
+    #定时任务.......................................................................................................................................................................
+    scheduler = BlockingScheduler()
+    scheduler.add_job(diag_cal, 'interval', hours=6)
+
+    try:  
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))
+        logger.error(traceback.format_exc())

BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/model_MGMCL_05.h5


BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/model_PK500_05.h5


BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/model_PK504_05.h5


BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/scaler_MGMCL_05.pkl


BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/scaler_PK500_05.pkl


BIN
LIB/MIDDLE/ThermoRunaway/V1_0_2/train_out/scaler_PK504_05.pkl