3 Ревизии 0af1705e96 ... f59f338298

Автор SHA1 Съобщение Дата
  qingfeng f59f338298 Merge branch 'dev' into pro преди 2 години
  zhuxi b40d633012 faultclass train преди 2 години
  zhuxi bfdeee1374 fautclass pred преди 2 години

+ 444 - 0
LIB/MIDDLE/FaultClass/V1_0_0/faultclass.py

@@ -0,0 +1,444 @@
+import pandas as pd
+import numpy as np
+import datetime
+from random import shuffle
+from sklearn.preprocessing import StandardScaler
+from keras.layers import Activation,Dense,Input
+from keras.layers.recurrent import GRU
+from keras.models import Model
+from keras.optimizers import adam_v2
+from keras.layers import Dropout
+import random
+
+#### Process1 - Prediction - Model1+Model2 ###
+
+# Step1 Features
+
+# Model1 
+def features1(dataset2):
+    dataset2=dataset2.drop(['GSM信号','故障等级','故障代码','开关状态','绝缘电阻','外电压','总输出状态','上锁状态','加热状态','单体均衡状态','充电状态','SOH[%]','SOC[%]','总电流[A]'],axis=1,errors='ignore')
+    cellvolt_list = [s for s in list(dataset2) if '单体电压' in s] 
+    celltemp_name = [s for s in list(dataset2) if '温度' in s] 
+    dataset2=dataset2.drop(celltemp_name,axis=1)
+    dataset2['volt_max']=dataset2[cellvolt_list].max(axis=1)
+    dataset2['volt_min']=dataset2[cellvolt_list].min(axis=1) 
+    dataset2=dataset2.drop(cellvolt_list,axis=1)
+    dataset2.reset_index(drop=True,inplace=True)
+    return dataset2
+# Model2
+def features2(dataset2):
+    dataset2=dataset2.drop(['GSM信号','故障等级','故障代码','开关状态','绝缘电阻','外电压','总输出状态','上锁状态','加热状态','单体均衡状态','充电状态','SOH[%]','SOC[%]','单体压差','总电压[V]'],axis=1,errors='ignore')
+    cellvolt_list = [s for s in list(dataset2) if '单体电压' in s] 
+    celltemp_name = [s for s in list(dataset2) if '单体温度' in s] 
+    celltemp_name2 = [s for s in list(dataset2) if '其他温度' in s]
+    dataset2=dataset2.drop(cellvolt_list+celltemp_name2,axis=1)
+    dataset2['temp_max']=dataset2[celltemp_name].max(axis=1)
+    dataset2['temp_min']=dataset2[celltemp_name].min(axis=1) 
+    dataset2['temp_diff']=list(np.array(dataset2['temp_max'])-np.array(dataset2['temp_min']))
+    dataset2=dataset2.drop(celltemp_name,axis=1)
+    dataset2.reset_index(drop=True,inplace=True)
+    return dataset2
+
+# Step2 Splits
+def split(df_bms_tot):
+    df_bms_tot['split']=0
+    for k in range(1,len(df_bms_tot)):
+        timek=df_bms_tot.loc[k,'时间戳']
+        timek1=df_bms_tot.loc[k-1,'时间戳']
+        timek=datetime.datetime.strptime(timek,'%Y-%m-%d %H:%M:%S')     #type: datetime
+        timek1=datetime.datetime.strptime(timek1,'%Y-%m-%d %H:%M:%S')
+        deltatime=(timek-timek1).total_seconds()
+        if (deltatime>600) | (df_bms_tot.loc[k,'sn']!=df_bms_tot.loc[k-1,'sn']):
+            df_bms_tot.loc[k,'split']=df_bms_tot.loc[k-1,'split']+1
+        else:
+            df_bms_tot.loc[k,'split']=df_bms_tot.loc[k-1,'split']
+    return df_bms_tot
+
+# Step3 MakeDataset: TimeSeries
+def makedataset(dataset):
+    df_bms=pd.DataFrame()
+    for split in list(set(dataset['split'])):
+        set2=dataset[dataset['split']==split]
+        set2.reset_index(drop=True,inplace=True)
+        data_set=pd.DataFrame()
+        start=set2.loc[0,'时间戳']
+        end=set2.loc[len(set2)-1,'时间戳']
+        data_set['Time']=pd.date_range(start=start, end=end, freq='S')  #每分钟一条记录
+        data_set['Time']=list(map(lambda x:str(x),list(data_set['Time'])))
+        dfbms=pd.merge(data_set,set2,left_on='Time',right_on='时间戳',how='left')
+        dfbms=dfbms.fillna(method='ffill')
+        dfbms=dfbms.fillna(method='bfill')  
+        dfbms=dfbms.drop(['时间戳'],axis=1)
+        dfbms['Time']=list(map(lambda x:x[:18]+'0',list(dfbms['Time'])))
+        dfbms.drop_duplicates(subset='Time',keep='last',inplace=True)
+        df_bms=df_bms.append(dfbms)
+        df_bms.reset_index(drop=True,inplace=True)
+    return df_bms
+
+# Step4 Scaler
+def scaler_pred(df_bms,scaler):
+    Xtest=df_bms.drop(['Time','sn','split'],axis=1)
+    Xsc_colnames=list(Xtest.columns)
+    Xtsc=scaler.transform(np.array(Xtest))
+    Xtsc=pd.DataFrame(Xtsc)
+    Xtsc.columns=Xsc_colnames
+    return Xtsc
+
+# Step5 MakeIndex
+def make_index(train):
+    indextr=[]
+    for i in list(set(train['split'])):
+        tr=train[train['split'] == i].index.tolist()
+        indextr.append(min(tr))
+    indextr=sorted(indextr)
+    indextr.append(len(train))
+    return indextr
+
+# Step5 CreateWindows
+def create_win_pred(X2,Xtest,index,time_steps=12): 
+    conf=pd.DataFrame() 
+    a=[]
+    for k in range(1,len(index)):
+        dataset=X2[index[k-1]:index[k]]
+        dataset=dataset.reset_index(drop=True)
+        dataset2=Xtest[index[k-1]:index[k]]
+        dataset2=dataset2.reset_index(drop=True)
+        if len(dataset)>time_steps:
+            dataX = []
+            win_step=[]
+            for i in range(len(dataset)-time_steps): 
+                win_step.append(i)
+                #v1 = np.array(dataset.iloc[i:(i+time_steps)],dtype='float32')
+                v1 = dataset.iloc[i:(i+time_steps)].values
+                dataX.append(v1)
+            test=dataset2.iloc[:len(dataset)-time_steps]
+            dataX2=np.array(dataX,dtype='float32')
+            conf=conf.append(test)
+            a.append(dataX2)
+    if len(a)>0:
+        aa=np.vstack(a)
+    else:
+        aa=[]
+    conf.reset_index(drop=True,inplace=True)
+    return aa,conf
+
+# Step6 Prediction
+def prediction(model,cc,conf,col):
+    predict_dd = model.predict(cc)  
+    df_pred=pd.DataFrame(predict_dd)
+    df_pred.columns=col
+    df_pred2 = df_pred.idxmax(axis=1)
+    conf['pred']=df_pred2
+    return conf
+
+# Step7 Output
+def makeres(res,end_time):  
+    df_res=pd.DataFrame(columns=['product_id','start_time','end_time','fault_class','update_time'])
+    result_faults=res[res['pred']!='正常']
+    list_faults=list(set(list(result_faults['pred'])))
+    for fault in list_faults:
+        res_faults=result_faults[result_faults['pred']==fault]
+        res_faults.reset_index(drop=True,inplace=True)
+        update_time=str(res_faults.loc[len(res_faults)-1,'Time'])
+        end=datetime.datetime.strptime(str(res_faults.loc[len(res_faults)-1,'Time']),'%Y-%m-%d %H:%M:%S')
+        end_time=datetime.datetime.strptime(str(end_time),'%Y-%m-%d %H:%M:%S')
+        if (end_time-end).total_seconds()<900:
+            res_faults.loc[len(res_faults)-1,'Time']='0000-00-00 00:00:00'
+        df_res=df_res.append(pd.DataFrame({'product_id':[res_faults.loc[0,'sn']],'start_time':[str(res_faults.loc[0,'Time'])],
+                        'end_time':[str(res_faults.loc[len(res_faults)-1,'Time'])],'fault_class':[res_faults.loc[0,'pred']],
+                        'update_time':[update_time]}))
+    return df_res
+
+# Step7 Merge
+def arrange(result,result_final,start_time):
+    result.reset_index(drop=True,inplace=True)
+    result_final.reset_index(drop=True,inplace=True)
+    list_faults=list(set(list(result_final['fault_class'])))
+    res_update=pd.DataFrame()
+    res_new=result.copy()
+    for fault in list_faults:
+        result0=result_final[result_final['fault_class']==fault]
+        result1=result[result['fault_class']==fault]
+        st=datetime.datetime.strptime(str(result.loc[0,'start_time']),'%Y-%m-%d %H:%M:%S')
+        start_time=datetime.datetime.strptime(str(start_time),'%Y-%m-%d %H:%M:%S')
+        if len(result1)>0:
+            if (start_time-st).total_seconds()<900:
+                result0['end_time']=result1['end_time']
+                result0['update_time']=result1['update_time']
+                res_update=res_update.append(result0)
+                res_new.drop(result1.index,inplace=True)
+            else:
+                result0['end_time']=result0['update_time']
+                res_update=res_update.append(result0)
+                res_new.drop(result1.index,inplace=True)
+        else:
+            result0['end_time']=result0['update_time']
+            res_update=res_update.append(result0)
+    return res_new,res_update
+
+# Step8 Process
+def pred(data_fea,model,scaler,col,end_time,time_steps):
+    df_res=pd.DataFrame()
+    fea=split(data_fea)
+    f=makedataset(fea)
+    sc=scaler_pred(f,scaler)
+    index=make_index(f)
+    dataX,pred=create_win_pred(sc,f,index,time_steps=time_steps)
+    if len(dataX)>0:
+        res=prediction(model,dataX,pred,col)
+        df_res=makeres(res,end_time)
+    return df_res
+
+
+
+#################################################################################################################################
+
+#### Process1 - New Model ###
+
+# Step1 Features Filtre
+def features_filtre(dataset2,cols):
+    dataset2=dataset2.drop(['GSM信号','故障等级','故障代码','开关状态','绝缘电阻','外电压','总输出状态','上锁状态','加热状态','单体均衡状态','充电状态','SOH[%]'],axis=1,errors='ignore')
+    cellvolt_list = [s for s in list(dataset2) if '单体电压' in s] 
+    celltemp_name = [s for s in list(dataset2) if '单体温度' in s] 
+    celltemp_name2 = [s for s in list(dataset2) if '其他温度' in s]
+    dataset2['volt_max']=dataset2[cellvolt_list].max(axis=1)
+    dataset2['volt_min']=dataset2[cellvolt_list].min(axis=1)
+    dataset2['volt_mean'] = round(dataset2[cellvolt_list].mean(axis=1),3)  #每行平均
+    dataset2['volt_sigma'] =list(dataset2[cellvolt_list].apply(lambda x: np.std(x.values),axis=1))
+    cell_volt_max =list(dataset2[cellvolt_list].apply(lambda x: np.argmax(x.values)+1,axis=1))
+    cell_volt_min =list(dataset2[cellvolt_list].apply(lambda x: np.argmin(x.values)+1,axis=1))
+    dataset2['mm_volt_cont'] = list(np.array(cell_volt_max) - np.array(cell_volt_min)) 
+    dataset2['mm_volt_cont']=list(map(lambda x : 1 if (abs(x)==1) | (abs(x)==len(cellvolt_list)-1) else 0, list(dataset2['mm_volt_cont'])))
+    #for k in range(len(dataset2)):
+        #dataset2.loc[k,'mm_volt_cont']=1 if (abs(list(dataset2['mm_volt_cont'])[k])==1) | (abs(list(dataset2['mm_volt_cont'])[k])==len(cellvolt_list)-1) else 0 
+    dataset2=dataset2.drop(cellvolt_list+celltemp_name2,axis=1)
+    dataset2['temp_max']=dataset2[celltemp_name].max(axis=1)
+    dataset2['temp_min']=dataset2[celltemp_name].min(axis=1) 
+    dataset2['temp_diff']=list(np.array(dataset2['temp_max'])-np.array(dataset2['temp_min']))
+    dataset2=dataset2.drop(celltemp_name,axis=1)
+    datatest3=dataset2[cols]
+    datatest3.reset_index(drop=True,inplace=True)
+    return datatest3
+    
+# Step2 Data Filtre
+def data_filtre(datatest3,col_key,compare,threshold):
+    if compare==0:
+        datatest4=datatest3[datatest3[col_key]==threshold]
+    elif compare==1:
+        datatest4=datatest3[datatest3[col_key]>threshold]
+    else:
+        datatest4=datatest3[datatest3[col_key]<threshold]
+    datatest4.reset_index(drop=True,inplace=True)
+    return datatest4
+
+# Step3 Faults Pre-processing
+def make_fault_set(dataset,cols,col_key,compare,threshold_filtre,fault_name):
+    datatest3=features_filtre(dataset,cols)
+    datatest4=data_filtre(datatest3,col_key,compare,threshold_filtre)
+    df_tot=split(datatest4)
+    df_bms=makedataset(df_tot)
+    df_bms['fault_class']=fault_name
+    return df_bms
+
+# Step4 Normal Pre-processing
+def normalset(df_bms,cols):
+    df_bms.drop(['Unnamed: 0'],axis=1,inplace=True)
+    nor_fea1=features_filtre(df_bms,cols)
+    norfea1=split(nor_fea1)
+    normalf1=makedataset(norfea1)
+    normalf1['fault_class']='正常'
+    return normalf1
+
+def normalset2(df_bms1,df_bms2,df_bms3,df_bms4,df_bms5,df_bms6,cols):
+    normalf1=normalset(df_bms1,cols)
+    normalf2=normalset(df_bms2,cols)
+    normalf3=normalset(df_bms3,cols)
+    normalf4=normalset(df_bms4,cols)
+    normalf5=normalset(df_bms5,cols)
+    normalf6=normalset(df_bms6,cols)
+    nor=pd.concat([normalf1,normalf2,normalf3,normalf4,normalf5,normalf6])
+    nor.reset_index(drop=True,inplace=True)
+    return nor
+
+# Step5 Resample
+def resample(nor,df_bms):
+    if len(nor)>2*len(df_bms):
+        sp=list(set(list(nor['split'])))
+        sp_ran=random.sample(sp, k=int(len(sp)*(len(df_bms)/len(nor))))
+        nor=nor[nor['split'].isin(sp_ran)]
+        nor.reset_index(drop=True,inplace=True)
+    if 2*len(nor)<len(df_bms):
+        sp=list(set(list(df_bms['split'])))
+        sp_ran=random.sample(sp, k=int(len(sp)*(len(nor)/len(df_bms))))
+        df_bms=df_bms[df_bms['split'].isin(sp_ran)]
+        df_bms.reset_index(drop=True,inplace=True)
+    return nor,df_bms
+
+# Step6 Shuffle Data
+def shuffle_data(nor,dataset_faults):
+    sn_nor=list(set(nor['sn']))
+    sn_fau=list(set(dataset_faults['sn']))
+    shuffle(sn_nor)
+    shuffle(sn_fau)
+    newtrain=pd.DataFrame()
+    newtest=pd.DataFrame()
+    for s1 in sn_nor[:int(0.8*len(sn_nor))]:
+        nortrain=nor[nor['sn']==s1]
+        nortrain.reset_index(drop=True,inplace=True)
+        newtrain=newtrain.append(nortrain)
+    for s2 in sn_nor[int(0.8*len(sn_nor)):]:
+        nortest=nor[nor['sn']==s2]
+        nortest.reset_index(drop=True,inplace=True)
+        newtest=newtest.append(nortest)
+    for s3 in sn_fau[:int(0.8*len(sn_fau))]:
+        fautrain=dataset_faults[dataset_faults['sn']==s3]
+        fautrain.reset_index(drop=True,inplace=True)
+        newtrain=newtrain.append(fautrain)
+    for s4 in sn_fau[int(0.8*len(sn_fau)):]:
+        fautest=dataset_faults[dataset_faults['sn']==s4]
+        fautest.reset_index(drop=True,inplace=True)
+        newtest=newtest.append(fautest)
+    newtrain.reset_index(drop=True,inplace=True)
+    newtest.reset_index(drop=True,inplace=True)
+    return newtrain,newtest
+
+def shuffle_data2(dftrain):
+    sp=list(set(dftrain['sn']))
+    shuffle(sp)
+    newtrain=pd.DataFrame()
+    for s in sp:
+        ntr=dftrain[dftrain['sn']==s]
+        newtrain=newtrain.append(ntr)
+    newtrain.reset_index(drop=True,inplace=True)
+    return newtrain
+
+# Step7 X & Y
+def xy(train):
+    Xtrain=train.drop(['fault_class','Time','sn','split'],axis=1)
+    Ytrain=train[['fault_class']]          
+    Ytrain2=pd.get_dummies(Ytrain,columns=['fault_class'],prefix_sep='_')
+    return Xtrain,Ytrain,Ytrain2
+
+# Step8 Scaler 
+def scaler_train(Xtrain):
+    Xsc_colnames=list(Xtrain.columns)
+    scaler=StandardScaler()
+    scaler.fit(Xtrain)  #保存train_sc的均值和标准差
+    Xsc=scaler.transform(np.array(Xtrain))
+    Xsc=pd.DataFrame(Xsc)
+    Xsc.columns=Xsc_colnames
+    return Xsc,scaler
+
+def scaler_test(Xtest,scaler):
+    Xsc_colnames=list(Xtest.columns)
+    Xtsc=scaler.transform(np.array(Xtest))
+    Xtsc=pd.DataFrame(Xtsc)
+    Xtsc.columns=Xsc_colnames
+    return Xtsc
+
+# Step9 Create windows 
+def create_win_train(X2,Y2,index,time_steps=6):  
+    a,b=[],[] 
+    for k in range(1,len(index)):
+        dataset=X2[index[k-1]:index[k]]
+        dataset=dataset.reset_index(drop=True)
+        datay=Y2[index[k-1]:index[k]]
+        datay=datay.reset_index(drop=True)
+        if len(dataset)>time_steps:
+            dataX, dataY = [], []
+            for i in range(len(dataset)-time_steps): 
+                v1 = dataset.iloc[i:(i+time_steps)].values
+                v2 = datay.iloc[i].values
+                dataX.append(v1)
+                dataY.append(v2)
+            dataX2=np.array(dataX,dtype='float32')
+            dataY2=np.array(dataY)
+        else:
+            continue
+        a.append(dataX2)             
+        b.append(dataY2)
+    aa=np.vstack(a)
+    bb=np.vstack(b)  
+    return aa,bb
+
+def create_win_test(X2,Y2,Xtest,index,time_steps=12):  
+    a,b=[],[] 
+    conf=pd.DataFrame()
+    for k in range(1,len(index)):
+        dataset=X2[index[k-1]:index[k]]
+        dataset=dataset.reset_index(drop=True)
+        datay=Y2[index[k-1]:index[k]]
+        datay=datay.reset_index(drop=True)
+        dataset2=Xtest[index[k-1]:index[k]]
+        dataset2=dataset2.reset_index(drop=True)
+        if len(dataset)>time_steps:
+            dataX, dataY = [], []
+            win_step=[]
+            for i in range(len(dataset)-time_steps): 
+                win_step.append(i)
+                v1 = dataset.iloc[i:(i+time_steps)].values
+                v2 = datay.iloc[i].values
+                dataX.append(v1)
+                dataY.append(v2)
+            test=dataset2.iloc[:len(dataset)-time_steps]
+            test['win']=win_step
+            test=pd.merge(test,datay,left_index=True,right_index=True)
+            dataX2=np.array(dataX,dtype='float32')
+            dataY2=np.array(dataY)
+        else:
+            continue
+        a.append(dataX2)             
+        b.append(dataY2)
+        conf=conf.append(test)
+    aa=np.vstack(a)
+    bb=np.vstack(b)
+    conf.reset_index(drop=True,inplace=True)
+    return aa,bb,conf 
+
+# Step10 Create Model
+def modelGRU(time_steps,nbr_features,nbr_neurons,nbr_class,Xwin,Ywin,Xtwin,Ytwin,batch_size,epochs,dropout,lr,activation,loss,metrics):
+    time_steps=time_steps
+    inputs = Input(shape=[time_steps,nbr_features])
+    x = GRU(nbr_neurons, input_shape = (time_steps,nbr_features),return_sequences=False, return_state=False)(inputs)
+    x = Dropout(dropout)(x)
+    x = Dense(nbr_class)(x)
+    x = Dropout(dropout)(x)
+    x = Activation(activation)(x)
+    LR = lr
+    model = Model(inputs,x)
+    adam = adam_v2.Adam(LR)
+    model.compile(loss = loss,optimizer = adam,metrics = [metrics])
+    model.fit(Xwin,Ywin,epochs=epochs,validation_data=(Xtwin,Ytwin),batch_size=batch_size,verbose=1,shuffle=True)
+    return model
+
+# Step11 Process
+def pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss):
+    nor,df_bms=resample(nor,df_bms)
+    newtrain,newtest=shuffle_data(nor,df_bms)
+    train_sh=shuffle_data2(newtrain)
+    test_sh=shuffle_data2(newtest)
+    Xtrain,Ytrain,Ytrain2=xy(train_sh)
+    Xtest,Ytest,Ytest2=xy(test_sh)                           
+    Xsc,scaler=scaler_train(Xtrain)
+    Xtsc=scaler_test(Xtest,scaler)
+    indextr=make_index(train_sh)
+    indexte=make_index(test_sh)
+    Xwin,Ywin=create_win_train(Xsc,Ytrain2,indextr,time_steps=time_steps)
+    Xtwin,Ytwin,conf=create_win_test(Xtsc,Ytest2,test_sh,indexte,time_steps=time_steps)
+    model=modelGRU(time_steps=time_steps,nbr_features=nbr_features,nbr_neurons=nbr_neurons,nbr_class=nbr_class,Xwin=Xwin,Ywin=Ywin,
+                    Xtwin=Xtwin,Ytwin=Ytwin,batch_size=batch_size,epochs=epochs,dropout=dropout,lr=lr,activation=activation,
+                    loss=loss,metrics='accuracy')
+    loss,acc=model.evaluate(Xtwin,Ytwin)
+    return scaler,model,acc
+
+
+
+
+
+
+
+
+
+
+
+

+ 42 - 0
LIB/MIDDLE/FaultClass/V1_0_0/main_input.py

@@ -0,0 +1,42 @@
+from sqlalchemy import create_engine
+from urllib import parse
+import pandas as pd
+import pymysql
+
+#用户输入参数
+fault_name='电压采样断线'
+cols=str(['时间戳','sn','单体压差','volt_max','volt_min','volt_mean','volt_sigma','mm_volt_cont'])
+col_key='mm_volt_cont'
+compare=0
+threshold_filtre=1
+
+time_steps=12
+nbr_features=6
+nbr_neurons=5
+nbr_class=2
+batch_size=100
+epochs=5
+dropout=0.5
+lr=1e-3
+activation='softmax'
+loss='categorical_crossentropy'
+
+threshold_accuracy=0.95
+
+#数据库配置
+host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+port=3306
+db='qx_cas'
+user='qx_algo_rw'
+password='qx@123456'
+
+db_res_engine = create_engine(
+    "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+        user, parse.quote_plus(password), host, port, db
+    ))
+#mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
+input_param=pd.DataFrame({'fault_name':[fault_name],'cols':[cols],'col_key':[col_key],'compare':[compare],'threshold_filtre':[threshold_filtre],
+                        'time_steps':[time_steps],'nbr_features':[nbr_features],'nbr_neurons':[nbr_neurons],'nbr_class':[nbr_class],'batch_size':[batch_size],
+                        'epochs':[epochs],'dropout':[dropout],'lr':[lr],'activation':[activation],'loss':[loss],'threshold_accuracy':[threshold_accuracy]})
+input_param.to_sql("faultclass_input",con=db_res_engine, if_exists="append",index=False)
+#mysql.close()

+ 164 - 0
LIB/MIDDLE/FaultClass/V1_0_0/main_pred.py

@@ -0,0 +1,164 @@
+
+from LIB.MIDDLE.FaultClass.V1_0_0.faultclass import *
+import pymysql
+import datetime
+import pandas as pd
+from LIB.BACKEND import DBManager
+dbManager = DBManager.DBManager()
+from sqlalchemy import create_engine
+from urllib import parse
+import datetime, time
+from apscheduler.schedulers.blocking import BlockingScheduler
+import traceback
+import pickle
+from keras.models import load_model
+import logging
+import logging.handlers
+import os
+import re
+
+
+#...................................故障检测函数......................................................................................................................
+def diag_cal():
+    global SNnums
+    global scaler1,scaler2,model1,model2,col1,col2,time_steps1,time_steps2
+
+    start=time.time()
+    now_time=datetime.datetime.now()
+    start_time=now_time-datetime.timedelta(hours=1)
+    start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
+    end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
+
+    #数据库配置
+    host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+    port=3306
+    db='safety_platform'
+    user='qx_read'
+    password='Qx@123456'
+
+    #读取结果库数据......................................................
+    param='product_id,start_time,end_time,diff_min,SOC,AnoScoreV_sum_max,AnoScoreV_max_max,AnoScoreT_sum_max,AnoScoreT_max_max'
+    tablename='fault_detection'
+    mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
+    cursor = mysql.cursor()
+    sql =  "select {} from {} where end_time='0000-00-00 00:00:00'".format(param,tablename)
+    cursor.execute(sql)
+    res = cursor.fetchall()
+    df_diag_ram= pd.DataFrame(res,columns=param.split(','))
+    
+
+    db_res_engine = create_engine(
+        "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+            user, parse.quote_plus(password), host, port, db
+        ))
+
+
+    #调用主函数................................................................................................................................................................
+    for sn in SNnums:
+        try:
+            df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
+            data_bms = df_data['bms']
+            data_bms['sn']=sn
+            if len(data_bms)>0:
+                logger.info("SN: {} 数据开始预处理".format(sn))
+                data_fea1=features1(data_bms)
+                data_fea2=features2(data_bms)
+                logger.info("SN: {} 数据开始模型预测".format(sn))
+                df_res1=pred(data_fea1,model1,scaler1,col1,end_time,time_steps1)
+                df_res2=pred(data_fea2,model2,scaler2,col2,end_time,time_steps2)
+                df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn]
+                res_new1,res_update1=arrange2(df_diag_ram_sn,df_res1,start_time,'B板采样失效')
+                res_new2,res_update2=arrange2(df_diag_ram_sn,df_res2,start_time,'传感器_电芯NTC漂移')
+               
+                if len(res_update1)>0:
+                    cursor.execute("DELETE FROM fault_class WHERE end_time = '0000-00-00 00:00:00' and product_id='{}' and fault_class='{}'".format(sn,'B板采样失效'))
+                    mysql.commit()
+                    res_update1.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+                res_new1.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+                if len(res_update2)>0:
+                    cursor.execute("DELETE FROM fault_class WHERE end_time = '0000-00-00 00:00:00' and product_id='{}' and fault_class='{}'".format(sn,'传感器_电芯NTC漂移'))
+                    mysql.commit()
+                    res_update2.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+                res_new2.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+            
+                        #新增结果存入结果库................................................................
+
+
+            # end=time.time()
+            # print(end-start)  
+                
+        except Exception as e:
+            logger.error(str(e))
+            logger.error(traceback.format_exc())
+
+    cursor.close()
+    mysql.close()
+
+#...............................................主函数起定时作用.......................................................................................................................
+if __name__ == "__main__":
+    
+    # 日志
+    log_path = 'log/'
+    if not os.path.exists(log_path):
+        os.makedirs(log_path)
+    logger = logging.getLogger("main")
+    logger.setLevel(logging.DEBUG)
+    
+     # 根据日期滚动(每天产生1个文件)
+    fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_info.log'.format(log_path), when="D", interval=1, backupCount=30,
+                                                    encoding="utf-8")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M-%S"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.INFO)
+    logger.addHandler(fh)
+
+    fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_error.log'.format(log_path), when="D", interval=1, backupCount=30,
+                                                    encoding="utf-8")
+    formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
+    fh.suffix = "%Y-%m-%d_%H-%M-%S"
+    fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
+    fh.setFormatter(formatter)
+    fh.setLevel(logging.ERROR)
+    logger.addHandler(fh)
+
+    logger.info("pid is {}".format(os.getpid()))
+    
+     # # 更新sn列表
+    host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
+    port=3306
+    db='qixiang_oss'
+    user='qixiang_oss'
+    password='Qixiang2021'
+    conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
+    cursor = conn.cursor()
+    cursor.execute("select sn, imei, add_time from app_device where status in (1,2,3)")
+    res = cursor.fetchall()
+    df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
+    df_sn = df_sn.reset_index(drop=True)
+    conn.close();
+    
+    SNnums = list(df_sn['sn'])
+    
+    scaler1=pickle.load(open('LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_B板采样失效.pkl','rb'))
+    scaler2=pickle.load(open('LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_传感器_电芯NTC漂移.pkl','rb'))
+    model1=load_model('LIB/MIDDLE/FaultClass/V1_0_0/models/model_B板采样失效.h5')
+    model2=load_model('LIB/MIDDLE/FaultClass/V1_0_0/models/model_传感器_电芯NTC漂移.h5')
+    col1=['B板采样失效','正常']
+    col2=['传感器_电芯NTC漂移','正常']
+    time_steps1=60
+    time_steps2=60
+    logger.info("模型加载完成")
+
+    diag_cal()
+    #定时任务.......................................................................................................................................................................
+    scheduler = BlockingScheduler()
+    scheduler.add_job(diag_cal, 'interval', hours=1)
+
+    try:  
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        logger.error(str(e))
+        logger.error(traceback.format_exc())

+ 158 - 0
LIB/MIDDLE/FaultClass/V1_0_0/main_train.py

@@ -0,0 +1,158 @@
+from faultclass import *
+import pymysql
+import datetime
+import pandas as pd
+import datetime
+import pickle
+from LIB.BACKEND import DBManager
+dbManager = DBManager.DBManager()
+from LIB.MIDDLE.CellStateEstimation.Common import log
+mylog=log.Mylog('log.txt','error')
+mylog.logcfg()
+from sqlalchemy import create_engine
+from urllib import parse
+
+#故障
+fault_name='电压采样断线'
+
+#读取文件:正常数据
+
+df_bms1=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset2.csv')
+df_bms2=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset3.csv')
+df_bms3=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset4.csv')
+df_bms4=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset5.csv')
+df_bms5=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset6.csv')
+df_bms6=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset7.csv')
+
+#数据库配置
+host0='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+port0=3306
+db0='qx_cas'
+user0='qx_algo_rw'
+password0='qx@123456'
+
+#读取结果库数据......................................................
+param='fault_name,cols,col_key,compare,threshold_filtre,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss,threshold_accuracy'
+tablename='faultclass_input'
+mysql = pymysql.connect (host=host0, user=user0, password=password0, port=port0, database=db0)
+cursor = mysql.cursor()
+sql =  "select {} from {} where fault_name='{}'".format(param,tablename,fault_name)
+cursor.execute(sql)
+res = cursor.fetchall()
+list_param= pd.DataFrame(res,columns=param.split(','))
+list_param.reset_index(drop=True,inplace=True)
+
+db_res_engine = create_engine(
+    "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+        user0, parse.quote_plus(password0), host0, port0, db0
+    ))
+
+#用户输入参数
+cols=eval(list_param.loc[0,'cols'])
+col_key=list_param.loc[0,'col_key']
+compare=list_param.loc[0,'compare']
+threshold_filtre=list_param.loc[0,'threshold_filtre']
+
+time_steps=list_param.loc[0,'time_steps']
+nbr_features=list_param.loc[0,'nbr_features']
+nbr_neurons=list_param.loc[0,'nbr_neurons']
+nbr_class=list_param.loc[0,'nbr_class']
+batch_size=list_param.loc[0,'batch_size']
+epochs=list_param.loc[0,'epochs']
+dropout=list_param.loc[0,'dropout']
+lr=list_param.loc[0,'lr']
+activation=list_param.loc[0,'activation']
+loss=list_param.loc[0,'loss']
+
+threshold_accuracy=list_param.loc[0,'threshold_accuracy']
+
+#数据库配置
+host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+port=3306
+db='safety_platform'
+user='qx_read'
+password='Qx@123456'
+
+#读取故障结果库中当前故障......................................................
+param='start_time,end_time,product_id,code,info'
+tablename='all_fault_info'
+mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
+cursor = mysql.cursor()
+#sql =  "select %s from %s where end_time='0000-00-00 00:00:00'" %(param,tablename)
+sql =  "select %s from %s" %(param,tablename)
+cursor.execute(sql)
+res = cursor.fetchall()
+df_diag_ram= pd.DataFrame(res,columns=param.split(','))
+
+df_diag_ram.dropna(inplace=True)
+df_diag_ram.reset_index(drop=True,inplace=True)
+
+#数据库配置
+host2='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
+port=3306
+db2='zhl_omp_v2'
+user2='zhl_omp'
+password='Qx@123456'
+
+#读取故障结果库中当前故障......................................................
+param='fault_time,fault_code,sn,child_tag,tag_type,update_time'
+tablename='t_cloud_control'
+mysql = pymysql.connect (host=host2, user=user2, password=password, port=port, database=db2)
+cursor = mysql.cursor()
+#sql =  "select %s from %s where end_time='0000-00-00 00:00:00'" %(param,tablename)
+sql =  "select %s from %s" %(param,tablename)
+cursor.execute(sql)
+res = cursor.fetchall()
+df_diag_ram2= pd.DataFrame(res,columns=param.split(','))
+
+df_diag_ram2.dropna(inplace=True)
+df_diag_ram2.reset_index(drop=True,inplace=True)
+
+#读取故障结果库中当前故障......................................................
+param='id,parent_id,name'
+tablename='t_child_problem'
+mysql = pymysql.connect (host=host2, user=user2, password=password, port=port, database=db2)
+cursor = mysql.cursor()
+#sql =  "select %s from %s where end_time='0000-00-00 00:00:00'" %(param,tablename)
+sql =  "select %s from %s" %(param,tablename)
+cursor.execute(sql)
+res = cursor.fetchall()
+df_diag_ram3= pd.DataFrame(res,columns=param.split(','))
+
+df_diag_ram3['id']=list(map(lambda x:str(x),list(df_diag_ram3['id'])))
+df_diag=pd.merge(df_diag_ram2,df_diag_ram3,how='left',left_on=['child_tag','tag_type'],right_on=['id','parent_id'])
+
+df_diag['fault_time']=list(map(lambda x:str(x),list(df_diag['fault_time'])))
+df_diag2=pd.merge(df_diag,df_diag_ram,how='left',left_on=['fault_time','sn','fault_code'],right_on=['start_time','product_id','code'])
+df_diag3=df_diag2.sort_values(by='update_time',ascending=False)
+
+df=df_diag2[df_diag2['name']==fault_name]
+df.reset_index(drop=True,inplace=True)
+
+dataset=pd.DataFrame()
+for k in range(len(df)):
+    try: 
+        sn =df.loc[k,'product_id']
+        start_time=str(df.loc[k,'start_time'])
+        end_time=df.loc[k,'end_time']
+        if end_time=='0000-00-00 00:00:00':
+            end_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')   #type: str
+        df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
+        data_bms = df_data['bms']
+        data_bms['sn']=sn   
+        dataset=dataset.append(data_bms)
+        dataset.to_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/fault_'+fault_name+'.csv')
+    except Exception as e:
+        print(repr(e))
+        mylog.logopt(sn,e)
+        pass 
+
+df_bms=make_fault_set(dataset,cols,col_key,compare,threshold_filtre,fault_name)
+nor=normalset2(df_bms1,df_bms2,df_bms3,df_bms4,df_bms5,df_bms6,cols)
+scaler,model,acc=pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss)
+df_acc=pd.DataFrame({'fault_name':[fault_name],'accuracy':[acc]})
+df_acc.to_sql("faultclass_output",con=db_res_engine, if_exists="append",index=False)
+
+if acc>threshold_accuracy:
+    model.save('models/model_'+fault_name+'.h5')
+    pickle.dump(scaler,open('models/scaler_'+fault_name+'.pkl','wb'))

BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/model_B板采样失效.h5


BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/model_传感器_电芯NTC漂移.h5


BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/model_电压采样断线.h5


BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_B板采样失效.pkl


BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_传感器_电芯NTC漂移.pkl


BIN
LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_电压采样断线.pkl