import pandas as pd import numpy as np from scipy.signal import savgol_filter from sklearn.preprocessing import RobustScaler from sklearn.decomposition import PCA import matplotlib.pyplot as plt def makedataset(df_data): df_data=df_data.drop(['Unnamed: 0','总电流[A]','GSM信号','外电压','单体压差','SOH[%]','开关状态','充电状态','故障等级','故障代码','绝缘电阻','上锁状态','加热状态','单体均衡状态','总输出状态'],axis=1,errors='ignore') for i in range(1,21): df_data=df_data[(df_data['单体电压'+str(i)]>2200) & (df_data['单体电压'+str(i)]<4800)] df_data=df_data[df_data['SOC[%]']>12] df_data['时间']=[df_data.loc[i,'时间戳'][0:15] for i in df_data.index] df_data=df_data.groupby('时间').mean() for k in df_data.columns: df_data[k]=savgol_filter(df_data[k],3,2) return df_data def process(data_set): features=data_set.columns sX=RobustScaler(copy=True) data_set2=data_set.copy() data_set2.loc[:,features]=sX.fit_transform(data_set2[features]) return data_set2 def anomalyScores(originalDF,reducedDF): loss=np.sum((np.array(originalDF)-np.array(reducedDF))**2,axis=1) loss=pd.Series(data=loss,index=originalDF.index) loss=(loss-np.min(loss))/(np.max(loss)-np.min(loss)) return loss def anomalyPCA(x_train_pro): n_components=4 whiten=True random_state=2 pca=PCA(n_components=n_components,whiten=whiten,random_state=random_state) pca.fit(x_train_pro) return pca def transform(df_data_pro,model,df_data): #降维 X_train=model.transform(df_data_pro) X_train=pd.DataFrame(data=X_train,index=df_data_pro.index) #还原 X_train_inverse=model.inverse_transform(X_train) X_train_inverse=pd.DataFrame(data=X_train_inverse,index=df_data_pro.index) #异常指数 anomalyScoresModel=anomalyScores(df_data_pro,X_train_inverse) anomalyScoresModel=savgol_filter(anomalyScoresModel,15,3) df_data2=df_data.copy() df_data2['anomalyScores_'+str(model)]=anomalyScoresModel return df_data2 def detect_outliers(data,threshold=3): anomaly=data['anomalyScores_PCA(n_components=4, random_state=2, whiten=True)'] mean_d=np.mean(anomaly.values) std_d=np.std(anomaly.values) outliers=pd.DataFrame() for k in anomaly.index: z_score= (anomaly[k]-mean_d)/std_d if np.abs(z_score) >threshold: outliers=outliers.append(data[anomaly.values==anomaly[k]]) return outliers def detect_outliers2(data,pred,threshold=3): anomaly=data['anomalyScores_PCA(n_components=4, random_state=2, whiten=True)'] anomalypred=pred['anomalyScores_PCA(n_components=4, random_state=2, whiten=True)'] mean_d=np.mean(anomaly.values) std_d=np.std(anomaly.values) outliers2=pd.DataFrame() for k in anomalypred.index: z_score= (anomalypred[k]-mean_d)/std_d if np.abs(z_score) >threshold: outliers2=outliers2.append(pred[anomalypred.values==anomalypred[k]]) return outliers2