2 次代碼提交 5f8cc9bf0f ... 7b4bc3310e

作者 SHA1 備註 提交日期
  zhuxi 7b4bc3310e Merge branch 'dev' of http://git.fast-fun.cn:92/lmstack/data_analyze_platform into dev 1 年之前
  zhuxi 417b7837b7 faultclass 1 年之前

+ 54 - 42
LIB/MIDDLE/FaultClass/V1_0_0/faultclass.py

@@ -63,7 +63,7 @@ def makedataset(dataset):
         data_set=pd.DataFrame()
         start=set2.loc[0,'时间戳']
         end=set2.loc[len(set2)-1,'时间戳']
-        data_set['Time']=pd.date_range(start=start, end=end, freq='S')  #每分钟一条记录
+        data_set['Time']=pd.date_range(start=start, end=end, freq='S')  #每一条记录
         data_set['Time']=list(map(lambda x:str(x),list(data_set['Time'])))
         dfbms=pd.merge(data_set,set2,left_on='Time',right_on='时间戳',how='left')
         dfbms=dfbms.fillna(method='ffill')
@@ -149,42 +149,7 @@ def makeres(res,end_time):
                         'update_time':[update_time]}))
     return df_res
 
-# Step7 Merge
-def arrange(result,result_final,start_time):
-    result.reset_index(drop=True,inplace=True)
-    result_final.reset_index(drop=True,inplace=True)
-    list_faults=list(set(list(result_final['fault_class'])))
-    res_update=pd.DataFrame()
-    res_new=result.copy()
-    for fault in list_faults:
-        result0=result_final[result_final['fault_class']==fault]
-        result1=result[result['fault_class']==fault]
-        st=datetime.datetime.strptime(str(result.loc[0,'start_time']),'%Y-%m-%d %H:%M:%S')
-        start_time=datetime.datetime.strptime(str(start_time),'%Y-%m-%d %H:%M:%S')
-        if len(result1)>0:
-            if (start_time-st).total_seconds()<900:
-                result0['end_time']=result1['end_time']
-                result0['update_time']=result1['update_time']
-                res_update=res_update.append(result0)
-                res_new.drop(result1.index,inplace=True)
-            else:
-                result0['end_time']=result0['update_time']
-                res_update=res_update.append(result0)
-                res_new.drop(result1.index,inplace=True)
-        else:
-            result0['end_time']=result0['update_time']
-            res_update=res_update.append(result0)
-    return res_new,res_update
-
-def arrange2(dataorg,df_res,start_time,fault_name):
-    res_new=df_res.copy()
-    res_update=pd.DataFrame()
-    if len(dataorg)>0:
-        dataorg=dataorg[dataorg['fault_class']==fault_name]
-        res_new,res_update=arrange(df_res,dataorg,start_time)
-    return res_new,res_update
-
-# Step8 Process
+# Step7 Process
 def pred(data_fea,model,scaler,col,end_time,time_steps):
     df_res=pd.DataFrame()
     fea=split(data_fea)
@@ -197,8 +162,54 @@ def pred(data_fea,model,scaler,col,end_time,time_steps):
         df_res=makeres(res,end_time)
     return df_res
 
+# Step8 Merge
+def arrange(result,result_final):
+    result.reset_index(drop=True,inplace=True)
+    res_update=pd.DataFrame()
+    res_new=result.copy()
+    if len(result)>0:
+        st=datetime.datetime.strptime(str(result.loc[0,'start_time']),'%Y-%m-%d %H:%M:%S')
+        end=datetime.datetime.strptime(str(result_final['update_time']),'%Y-%m-%d %H:%M:%S')
+        if (st-end).total_seconds()<3600:
+            result_final['end_time']=result.loc[0,'end_time']
+            result_final['update_time']=result.loc[0,'update_time']
+            res_update=result_final.copy()
+            res_new.drop(result.index,inplace=True)
+        else:
+            result_final['end_time']=result_final['update_time']
+            res_update=result_final.copy()
+            res_new.drop(result.index,inplace=True)
+    else:
+        result_final['end_time']=result_final['update_time']
+        res_update=result_final.copy()
+    return res_new,res_update
 
-
+def arrange2(dataorg,df_res,time_stepsi):
+    res_new=df_res.copy()
+    res_update=pd.DataFrame()
+    if len(dataorg)>0:
+        res_new,res_update=arrange(df_res,dataorg)
+    if len(res_new)>0:
+        for i in range(len(res_new)):
+            if res_new.loc[i,'end_time'] != '0000-00-00 00:00:00':
+                st1=datetime.datetime.strptime(str(res_new.loc[i,'start_time']),'%Y-%m-%d %H:%M:%S')
+                end1=datetime.datetime.strptime(str(res_new.loc[i,'end_time']),'%Y-%m-%d %H:%M:%S')
+                if (end1-st1).total_seconds()<time_stepsi:
+                    res_new.drop([i],axis=0,inplace=True)
+    if len(res_update)>0:
+        if res_update['end_time']!= '0000-00-00 00:00:00':
+            st2=datetime.datetime.strptime(str(res_update['start_time']),'%Y-%m-%d %H:%M:%S')
+            end2=datetime.datetime.strptime(str(res_update['end_time']),'%Y-%m-%d %H:%M:%S')
+            res_update=pd.DataFrame(pd.DataFrame({'product_id':[res_update['product_id']],'start_time':[str(res_update['start_time'])],
+                        'end_time':[str(res_update['end_time'])],'fault_class':[res_update['fault_class']],
+                        'update_time':[res_update['update_time']]}))
+            if (end2-st2).total_seconds()<time_stepsi:
+                res_update=pd.DataFrame()
+        else:
+            res_update=pd.DataFrame(pd.DataFrame({'product_id':[res_update['product_id']],'start_time':[str(res_update['start_time'])],
+                        'end_time':[str(res_update['end_time'])],'fault_class':[res_update['fault_class']],
+                        'update_time':[res_update['update_time']]}))
+    return res_new,res_update
 #################################################################################################################################
 
 #### Process1 - New Model ###
@@ -325,7 +336,8 @@ def xy(train):
     Xtrain=train.drop(['fault_class','Time','sn','split'],axis=1)
     Ytrain=train[['fault_class']]          
     Ytrain2=pd.get_dummies(Ytrain,columns=['fault_class'],prefix_sep='_')
-    return Xtrain,Ytrain,Ytrain2
+    cols=list(map(lambda x:x[12:],list(Ytrain2.columns)))
+    return Xtrain,Ytrain,Ytrain2,cols
 
 # Step8 Scaler 
 def scaler_train(Xtrain):
@@ -425,8 +437,8 @@ def pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_siz
     newtrain,newtest=shuffle_data(nor,df_bms)
     train_sh=shuffle_data2(newtrain)
     test_sh=shuffle_data2(newtest)
-    Xtrain,Ytrain,Ytrain2=xy(train_sh)
-    Xtest,Ytest,Ytest2=xy(test_sh)                           
+    Xtrain,Ytrain,Ytrain2,cols_train=xy(train_sh)
+    Xtest,Ytest,Ytest2,cols_test=xy(test_sh)                           
     Xsc,scaler=scaler_train(Xtrain)
     Xtsc=scaler_test(Xtest,scaler)
     indextr=make_index(train_sh)
@@ -437,7 +449,7 @@ def pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_siz
                     Xtwin=Xtwin,Ytwin=Ytwin,batch_size=batch_size,epochs=epochs,dropout=dropout,lr=lr,activation=activation,
                     loss=loss,metrics='accuracy')
     loss,acc=model.evaluate(Xtwin,Ytwin)
-    return scaler,model,acc
+    return scaler,model,acc,cols_train
 
 
 

+ 100 - 42
LIB/MIDDLE/FaultClass/V1_0_0/main_pred.py

@@ -21,7 +21,7 @@ import re
 #...................................故障检测函数......................................................................................................................
 def diag_cal():
     global SNnums
-    global scaler1,scaler2,model1,model2,col1,col2,time_steps1,time_steps2
+    global list_faults,list_scalers,list_models,list_cols,list_time_steps,list_features
 
     start=time.time()
     now_time=datetime.datetime.now()
@@ -37,53 +37,60 @@ def diag_cal():
     password='Qx@123456'
 
     #读取结果库数据......................................................
-    param='product_id,start_time,end_time,diff_min,SOC,AnoScoreV_sum_max,AnoScoreV_max_max,AnoScoreT_sum_max,AnoScoreT_max_max'
-    tablename='fault_detection'
+    param='product_id,start_time,end_time,fault_class,update_time'
+    tablename='fault_class'
     mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
     cursor = mysql.cursor()
     sql =  "select {} from {} where end_time='0000-00-00 00:00:00'".format(param,tablename)
     cursor.execute(sql)
     res = cursor.fetchall()
     df_diag_ram= pd.DataFrame(res,columns=param.split(','))
-    
 
     db_res_engine = create_engine(
         "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
             user, parse.quote_plus(password), host, port, db
         ))
 
-
     #调用主函数................................................................................................................................................................
     for sn in SNnums:
         try:
-            df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
-            data_bms = df_data['bms']
-            data_bms['sn']=sn
-            if len(data_bms)>0:
-                logger.info("SN: {} 数据开始预处理".format(sn))
-                data_fea1=features1(data_bms)
-                data_fea2=features2(data_bms)
-                logger.info("SN: {} 数据开始模型预测".format(sn))
-                df_res1=pred(data_fea1,model1,scaler1,col1,end_time,time_steps1)
-                df_res2=pred(data_fea2,model2,scaler2,col2,end_time,time_steps2)
-                df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn]
-                res_new1,res_update1=arrange2(df_diag_ram_sn,df_res1,start_time,'B板采样失效')
-                res_new2,res_update2=arrange2(df_diag_ram_sn,df_res2,start_time,'传感器_电芯NTC漂移')
-               
-                if len(res_update1)>0:
-                    cursor.execute("DELETE FROM fault_class WHERE end_time = '0000-00-00 00:00:00' and product_id='{}' and fault_class='{}'".format(sn,'B板采样失效'))
-                    mysql.commit()
-                    res_update1.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
-                res_new1.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
-                if len(res_update2)>0:
-                    cursor.execute("DELETE FROM fault_class WHERE end_time = '0000-00-00 00:00:00' and product_id='{}' and fault_class='{}'".format(sn,'传感器_电芯NTC漂移'))
-                    mysql.commit()
-                    res_update2.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
-                res_new2.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
-            
-                        #新增结果存入结果库................................................................
-
-
+            #sn='PK50001A100000248'
+            #start_time='2022-11-18 00:00:00'
+            #end_time='2022-11-18 03:00:00'
+            group=sn[:2]
+            if group in ['PK','MG','UD']:
+                df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
+                data_bms = df_data['bms']
+                data_bms['sn']=sn
+                if len(data_bms)>0:
+                    df_diag_ram_sn=df_diag_ram[df_diag_ram['product_id']==sn]
+                    for i in range(len(list_faults)):
+                        fault=list_faults[i]
+                        df_diag_ram_sn_fault=df_diag_ram_sn[df_diag_ram_sn['fault_class']==fault]
+                        if len(df_diag_ram_sn_fault)>0:
+                            df_diag_ram_sn_fault.reset_index(drop=True,inplace=True)
+                            df_diag_ram_sn_fault=df_diag_ram_sn_fault.iloc[0]
+                        logger.info("SN: {} 数据开始预处理".format(sn))
+                        if fault=='B板采样失效':
+                            data_fea=features1(data_bms)
+                        elif fault=='传感器_电芯NTC漂移':
+                            data_fea=features2(data_bms)
+                        else:
+                            feas=list_features[i]
+                            data_fea=features_filtre(data_bms,feas)
+                        logger.info("SN: {} 数据开始模型预测".format(sn))
+                        modeli=list_models[i]
+                        scaleri=list_scalers[i]
+                        coli=list_cols[i]
+                        time_stepsi=list_time_steps[i]
+                        df_res=pred(data_fea,modeli,scaleri,coli,end_time,time_stepsi)
+                        res_new,res_update=arrange2(df_diag_ram_sn_fault,df_res,time_stepsi)
+                        if len(res_update)>0:
+                            cursor.execute("DELETE FROM fault_class WHERE end_time = '0000-00-00 00:00:00' and product_id='{}' and fault_class='{}'".format(sn,fault))
+                            mysql.commit()
+                            res_update.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+                        res_new.to_sql("fault_class",con=db_res_engine, if_exists="append",index=False)
+    
             # end=time.time()
             # print(end-start)  
                 
@@ -125,7 +132,7 @@ if __name__ == "__main__":
 
     logger.info("pid is {}".format(os.getpid()))
     
-     # # 更新sn列表
+    # # 更新sn列表
     host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
     port=3306
     db='qixiang_oss'
@@ -138,17 +145,68 @@ if __name__ == "__main__":
     df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
     df_sn = df_sn.reset_index(drop=True)
     conn.close();
+
+    #用户输出参数数据库配置
+    host0='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+    port0=3306
+    db0='qx_cas'
+    user0='qx_algo_rw'
+    password0='qx@123456'
+
+    #读取用户输出参数......................................................
+    param='fault_name,class,time_steps,accuracy'
+    tablename='faultclass_output1'
+    mysql = pymysql.connect (host=host0, user=user0, password=password0, port=port0, database=db0)
+    cursor = mysql.cursor()
+    sql =  "select {} from {}".format(param,tablename)
+    cursor.execute(sql)
+    res = cursor.fetchall()
+    list_param= pd.DataFrame(res,columns=param.split(','))
+    list_param.reset_index(drop=True,inplace=True)
+
+    #读取用户输入参数......................................................
+    param0='fault_name,cols,col_key,compare,threshold_filtre,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss,threshold_accuracy'
+    tablename0='faultclass_input'
+    mysql0 = pymysql.connect (host=host0, user=user0, password=password0, port=port0, database=db0)
+    cursor0 = mysql0.cursor()
+    sql0 =  "select {} from {} ".format(param0,tablename0)
+    cursor0.execute(sql0)
+    res0 = cursor0.fetchall()
+    list_param0= pd.DataFrame(res0,columns=param0.split(','))
+    list_param0.reset_index(drop=True,inplace=True)
     
     SNnums = list(df_sn['sn'])
     
-    scaler1=pickle.load(open('LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_B板采样失效.pkl','rb'))
-    scaler2=pickle.load(open('LIB/MIDDLE/FaultClass/V1_0_0/models/scaler_传感器_电芯NTC漂移.pkl','rb'))
-    model1=load_model('LIB/MIDDLE/FaultClass/V1_0_0/models/model_B板采样失效.h5')
-    model2=load_model('LIB/MIDDLE/FaultClass/V1_0_0/models/model_传感器_电芯NTC漂移.h5')
-    col1=['B板采样失效','正常']
-    col2=['传感器_电芯NTC漂移','正常']
-    time_steps1=60
-    time_steps2=60
+    # # 加载模型
+    list_model=os.listdir('LIB/MIDDLE/FaultClass/V1_0_0/models')
+    list_faults=[]
+    list_scalers=[]
+    list_models=[]
+    list_cols=[]
+    list_time_steps=[]
+    list_features=[]
+    for k in range(len(list_model)):
+        fault_name=list_model[k][6:-3]
+        list_faults.append(fault_name)
+        scaler=pickle.load(open('LIB/MIDDLE/FaultClass/V1_0_0/scalers/scaler_'+fault_name+'.pkl','rb'))
+        list_scalers.append(scaler)
+        model=load_model('LIB/MIDDLE/FaultClass/V1_0_0/models/'+list_model[k])
+        list_models.append(model)
+        if fault_name=='B板采样失效':
+            col=['B板采样失效','正常']
+            time_steps=60
+            feature=['feature1']
+        elif fault_name=='传感器_电芯NTC漂移':
+            col=['传感器_电芯NTC漂移','正常']
+            time_steps=60
+            feature=['feature2']
+        else:
+            col=eval(list_param.loc[list_param[list_param['fault_name']==fault_name].index,'class'].values[0])
+            time_steps=list_param.loc[list_param[list_param['fault_name']==fault_name].index,'time_steps'].values[0]
+            feature=eval(list_param0.loc[list_param0[list_param0['fault_name']==fault_name].index,'cols'].values[0])
+        list_cols.append(col)
+        list_time_steps.append(time_steps)
+        list_features.append(feature)
     logger.info("模型加载完成")
 
     diag_cal()

+ 25 - 26
LIB/MIDDLE/FaultClass/V1_0_0/main_train.py

@@ -12,9 +12,6 @@ mylog.logcfg()
 from sqlalchemy import create_engine
 from urllib import parse
 
-#故障
-fault_name='电压采样断线'
-
 #读取文件:正常数据
 
 df_bms1=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset2.csv')
@@ -24,14 +21,14 @@ df_bms4=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset5.csv')
 df_bms5=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset6.csv')
 df_bms6=pd.read_csv('LIB/MIDDLE/FaultClass/V1_0_0/data/dataset7.csv')
 
-#数据库配置
+#用户输入参数数据库配置
 host0='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
 port0=3306
 db0='qx_cas'
 user0='qx_algo_rw'
 password0='qx@123456'
 
-#读取结果库数据......................................................
+#读取用户输入参数......................................................
 param='fault_name,cols,col_key,compare,threshold_filtre,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss,threshold_accuracy'
 tablename='faultclass_input'
 mysql = pymysql.connect (host=host0, user=user0, password=password0, port=port0, database=db0)
@@ -48,25 +45,27 @@ db_res_engine = create_engine(
     ))
 
 #用户输入参数
-cols=eval(list_param.loc[0,'cols'])
-col_key=list_param.loc[0,'col_key']
-compare=list_param.loc[0,'compare']
-threshold_filtre=list_param.loc[0,'threshold_filtre']
-
-time_steps=list_param.loc[0,'time_steps']
-nbr_features=list_param.loc[0,'nbr_features']
-nbr_neurons=list_param.loc[0,'nbr_neurons']
-nbr_class=list_param.loc[0,'nbr_class']
-batch_size=list_param.loc[0,'batch_size']
-epochs=list_param.loc[0,'epochs']
-dropout=list_param.loc[0,'dropout']
-lr=list_param.loc[0,'lr']
-activation=list_param.loc[0,'activation']
-loss=list_param.loc[0,'loss']
-
-threshold_accuracy=list_param.loc[0,'threshold_accuracy']
-
-#数据库配置
+id=0
+fault_name=list_param.loc[id,'fault_name']
+cols=eval(list_param.loc[id,'cols'])
+col_key=list_param.loc[id,'col_key']
+compare=list_param.loc[id,'compare']
+threshold_filtre=list_param.loc[id,'threshold_filtre']
+
+time_steps=list_param.loc[id,'time_steps']
+nbr_features=list_param.loc[id,'nbr_features']
+nbr_neurons=list_param.loc[id,'nbr_neurons']
+nbr_class=list_param.loc[id,'nbr_class']
+batch_size=list_param.loc[id,'batch_size']
+epochs=list_param.loc[id,'epochs']
+dropout=list_param.loc[id,'dropout']
+lr=list_param.loc[id,'lr']
+activation=list_param.loc[id,'activation']
+loss=list_param.loc[id,'loss']
+
+threshold_accuracy=list_param.loc[id,'threshold_accuracy']
+
+#故障结果数据库配置
 host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
 port=3306
 db='safety_platform'
@@ -149,8 +148,8 @@ for k in range(len(df)):
 
 df_bms=make_fault_set(dataset,cols,col_key,compare,threshold_filtre,fault_name)
 nor=normalset2(df_bms1,df_bms2,df_bms3,df_bms4,df_bms5,df_bms6,cols)
-scaler,model,acc=pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss)
-df_acc=pd.DataFrame({'fault_name':[fault_name],'accuracy':[acc]})
+scaler,model,acc,cols_train=pre_model(nor,df_bms,time_steps,nbr_features,nbr_neurons,nbr_class,batch_size,epochs,dropout,lr,activation,loss)
+df_acc=pd.DataFrame({'fault_name':[fault_name],'class':[str(cols_train)],'time_steps':[time_steps],'accuracy':[acc]})
 df_acc.to_sql("faultclass_output",con=db_res_engine, if_exists="append",index=False)
 
 if acc>threshold_accuracy: