123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- #自动化训练
- import pymysql
- import datetime
- import pandas as pd
- import numpy as np
- from sklearn.preprocessing import StandardScaler
- from random import shuffle
- import tensorflow.keras as keras
- import random
- import pickle
- #特征工程
- def features_total(dataset2,capacity):
- dataset2=dataset2.drop(['PackSoh','InsulationRssPos','InsulationRssNeg','AccumChrgWh','AccumChrgAh','AccumDsChgAh','imei','InsulationRss','cellvoltmax','cellvoltmin','celltempmax','celltempmin'],axis=1,errors='ignore')
- cellvolt_list = [s for s in list(dataset2) if 'CellVoltage' in s] #单体电压
- celltemp_name = [s for s in list(dataset2) if 'CellTemp' in s] #单体温度
- celltemp_name2 = [s for s in list(dataset2) if '温度' in s] #其他温度
- dataset2['PackCrnt']=list(map(lambda x: x/float(capacity),list(dataset2['PackCrnt'])))
- dataset2['volt_diff']=list(np.array(dataset2[cellvolt_list].max(axis=1))-np.array(dataset2[cellvolt_list].min(axis=1))) #压差
- dataset2=dataset2.reindex(columns=['Time','sn','PackCrnt','PackVolt','BMSSta','volt_diff','PackSoc']+cellvolt_list+celltemp_name+celltemp_name2)
- dataset2[cellvolt_list]=dataset2[cellvolt_list]*1000
- dataset2['volt_last']=list(dataset2[cellvolt_list[-1]]) #最后一根电芯电压
- dataset2['volt_first']=list(dataset2[cellvolt_list[0]]) #第一根电芯电压
- dataset2['volt_last2']=list(dataset2[cellvolt_list[-2]]) #倒数第二根电芯电压
- dataset2['volt_first2']=list(dataset2[cellvolt_list[1]]) #第二根电芯电压
- dataset2['volt_max']=dataset2[cellvolt_list].max(axis=1) #最大电压
- dataset2['volt_min']=dataset2[cellvolt_list].min(axis=1) #最小电压
- dataset2['volt_mean'] = round(dataset2[cellvolt_list].mean(axis=1),3) #每行平均电压
- dataset2['volt_sigma'] =list(dataset2[cellvolt_list].apply(lambda x: np.std(x.values),axis=1)) #电压离散度
- cell_volt_max =list(dataset2[cellvolt_list].apply(lambda x: np.argmax(x.values)+1,axis=1))
- cell_volt_min =list(dataset2[cellvolt_list].apply(lambda x: np.argmin(x.values)+1,axis=1))
- volt_max2= dataset2[cellvolt_list].apply(lambda x: sorted(x)[-2], axis=1)
- volt_min2=dataset2[cellvolt_list].apply(lambda x: sorted(x)[1], axis=1)
- dataset2['volt_max2']=volt_max2 #第二大电压
- dataset2['volt_min2']=volt_min2 #第二小电压
- dataset2['volt_max_mean']=list(np.array(dataset2[cellvolt_list].max(axis=1))-np.array(round(dataset2[cellvolt_list].mean(axis=1),3) )) #最大电压与平均电压之差
- dataset2['volt_min_mean']=list(np.array(round(dataset2[cellvolt_list].mean(axis=1),3))-np.array(dataset2[cellvolt_list].min(axis=1))) #平均电压与最小电压之差
- dataset2['volt_min_diff']= list(np.array(volt_min2)-np.array(dataset2[cellvolt_list].min(axis=1))) #最小两个电压差
- dataset2['volt_mm_diff']=list(np.array(dataset2[cellvolt_list].max(axis=1))-np.array(volt_max2))+np.array(volt_min2)-np.array(dataset2[cellvolt_list].min(axis=1)) #最大两个电压差+最小两个电压差
- dataset2['mm_volt_cont'] = list(np.array(cell_volt_max) - np.array(cell_volt_min))
- dataset2['mm_volt_cont']=list(map(lambda x : 1 if (abs(x)==1) | (abs(x)==len(cellvolt_list)-1) else 0, list(dataset2['mm_volt_cont']))) #最大最小电压的电芯是否连续
- dataset2['temp_max']=dataset2[celltemp_name].max(axis=1) #最大单体温度
- dataset2['temp_min']=dataset2[celltemp_name].min(axis=1) #最小单体温度
- dataset2['temp_mean'] = round(dataset2[celltemp_name].mean(axis=1),3) #每行平均单体温度
- dataset2['temp_diff']=list(np.array(dataset2['temp_max'])-np.array(dataset2['temp_min'])) #单体温度差
- dataset2['temp2_max']=dataset2[celltemp_name2].max(axis=1) #最大其他温度
- dataset2['temp2_min']=dataset2[celltemp_name2].min(axis=1) #最小其他温度
- dataset2['temp2_mean'] = round(dataset2[celltemp_name2].mean(axis=1),3) #每行平均其他温度
- dataset2['temp2_diff']=list(np.array(dataset2['temp2_max'])-np.array(dataset2['temp2_min'])) #其他温度差
- dataset2=dataset2.drop(celltemp_name+cellvolt_list+celltemp_name2,axis=1)
- return dataset2
- #故障时间序列构建
- def makedataset(dataset2,freq):
- df_bms=pd.DataFrame()
- for sp in list(set(dataset2['split'])):
- set2=dataset2[dataset2['split']==sp]
- set2.reset_index(drop=True,inplace=True)
- data_set=pd.DataFrame()
- start=set2.loc[0,'Time']
- end=set2.loc[len(set2)-1,'Time']
- data_set['Time']=pd.date_range(start=start, end=end, freq=freq) #每freq一条记录
- data_set['Time']=list(map(lambda x:str(x),list(data_set['Time'])))
- set2['Time']=list(map(lambda x:str(x),list(set2['Time'])))
- dfbms=pd.merge(data_set,set2,left_on='Time',right_on='Time',how='left')
- dfbms=dfbms.fillna(method='ffill')
- dfbms=dfbms.fillna(method='bfill')
- df_bms=df_bms.append(dfbms)
- df_bms.reset_index(drop=True,inplace=True)
- return df_bms
- #打乱并切分训练集测试集
- def shuffle_data(dataset_faults):
- sn_fau=list(set(dataset_faults['sn']))
- shuffle(sn_fau)
- newtrain=dataset_faults[dataset_faults['sn'].isin(sn_fau[:int(0.8*len(sn_fau))])]
- newtest=dataset_faults[dataset_faults['sn'].isin(sn_fau[int(0.8*len(sn_fau)):])]
- newtrain.reset_index(drop=True,inplace=True)
- newtest.reset_index(drop=True,inplace=True)
- return newtrain,newtest
- #训练集数据标准化
- def scaler_train(train):
- Xtrain=train.drop(['Time','sn','split'],axis=1)
- Xsc_colnames=list(Xtrain.columns)
- scaler=StandardScaler()
- scaler.fit(Xtrain) #保存train_sc的均值和标准差
- Xsc=scaler.transform(np.array(Xtrain))
- Xsc=pd.DataFrame(Xsc)
- Xsc.columns=Xsc_colnames
- Xsc['split']=train['split'].values
- return Xsc,scaler
- #测试集数据标准化
- def scaler_test_train(test,scaler):
- Xtest=test.drop(['Time','sn','split'],axis=1)
- Xsc_colnames=list(Xtest.columns)
- Xtsc=scaler.transform(np.array(Xtest))
- Xtsc=pd.DataFrame(Xtsc)
- Xtsc.columns=Xsc_colnames
- Xtsc['split']=test['split'].values
- return Xtsc
- #时间窗口划分
- def create_dataset(data_set,data_train,time_steps=6): #X为dataframe,y为serie
- a=[]
- aa=np.empty(shape=[0,3])
- index=pd.DataFrame()
- List_n_split=sorted(list(set(data_set['split'])))
- for k in List_n_split:
- dataset=data_set[data_set['split']==k]
- datatrain=data_train[data_train['split']==k]
- if len(dataset)>time_steps:
- dataset2=dataset.reset_index(drop=True)
- dataset=dataset.drop(['split'],axis=1)
- dataX= []
- index_step=[]
- for i in range(len(dataset)-time_steps):
- v1 = dataset.iloc[i:(i+time_steps)].values
- dataX.append(v1)
- index_step.append(i)
- dataset3=dataset2.iloc[:len(dataset2)-time_steps]
- newdatatrain=datatrain[:len(dataset3)]
- dataX2=np.array(dataX)
- a.append(dataX2)
- index=index.append(newdatatrain)
- if len(a)>0:
- aa=np.vstack(a)
- index.reset_index(drop=True,inplace=True)
- return aa,index
- #模型训练
- def model_train(X,units=60,batch_size=128,epochs=15):
- optimizer = 'adam' #梯度下降学习法:降低loss learning rate取默认值
- loss = 'mae' #均方误差
- dropout=0.30 #迭代次数
- reg=0.001
- callback = keras.callbacks.EarlyStopping(monitor='loss', patience=2) #过拟合即停止训练,最多容忍一次loss上升
- model = keras.Sequential()
- #输入层: 输入shape=(N,X.shape[1],X.shape[2])
- model.add(keras.layers.LSTM(units=units, input_shape =(X.shape[1],X.shape[2]), return_sequences=True,kernel_regularizer=keras.regularizers.l2(reg),activity_regularizer=keras.regularizers.l1(reg)))
- model.add(keras.layers.Dropout(rate=dropout))
- #return_sequences输出时间步长区间每个时间点对应的所有值 many to many (N,X.shape[1],units)
- #return_sequence=False 输出shape=(N,units)
-
- #输出层:输入shape=(N,X.shape[1],units)
- model.add(keras.layers.TimeDistributed(keras.layers.Dense(X.shape[2])))
- #Dense输出shape=(N,X.shape[2])
- #TimeDistributed:many to many 时间步长区间每个时间点用Dense 输出shape=(N,X.shape[1],X.shape[2])
- model.compile(loss= loss, optimizer=optimizer)
- #model.fit(X, X, epochs=epochs, batch_size=batch_size, validation_data=(x_test,x_test), shuffle=False,callbacks=[callback])
- model.fit(X, X, epochs=epochs, batch_size=batch_size, validation_split=0.1, shuffle=False,callbacks=[callback])
- return model
- #模型预测
- def prediction(model,xtest,Xtsc,conftest):
- test_pred = model.predict(xtest)
- test_loss = np.mean(np.abs(test_pred - xtest), axis=1)
- col=list(Xtsc.columns)
- col.remove('split')
- test_loss=pd.DataFrame(data=test_loss[0:,0:])
- test_loss.columns=col
- conftest['loss_sum']=test_loss.sum(axis=1)
- conftest['loss_max']=test_loss.max(axis=1)
- maxcol=test_loss.quantile(q=0.95, axis=0, numeric_only=True, interpolation='linear')
- mincol=test_loss.quantile(q=0.02, axis=0, numeric_only=True, interpolation='linear')
- test_loss_test=conftest[['loss_max','loss_sum']].mean(axis=0)
- test_loss_test2=conftest[['loss_max','loss_sum']].max(axis=0)
- test_loss_test3=conftest[['loss_max','loss_sum']].min(axis=0)
- test_loss_test4=conftest[['loss_max','loss_sum']].quantile(q=0.9, axis=0, numeric_only=True, interpolation='linear')
- conftest2=conftest[conftest['loss_max'].notnull()]
- test_loss_test5=conftest2[['loss_max','loss_sum']].quantile(q=0.05, axis=0, numeric_only=True, interpolation='linear')
- # conftest.to_csv('conftest.csv')
- # test_loss.to_csv('test_loss.csv')
- return test_loss_test,test_loss_test2,test_loss_test3,test_loss_test4,test_loss_test5,maxcol,mincol
- #交叉验证模型稳定性
- def cross_val(data_bms5,time_steps,units,batch_size,df_nor):
- #打乱训练数据
- train,test=shuffle_data(data_bms5)
- Xsc,scaler=scaler_train(train)
- Xtsc=scaler_test_train(test,scaler)
- #时间滑窗
- xtrain,conftrain=create_dataset(Xsc,train,time_steps=time_steps)
- xtest,conftest=create_dataset(Xtsc,test,time_steps=time_steps)
- #训练
- model= model_train(xtrain,units=units,batch_size=batch_size,epochs=50)
- #输出验证集每列loss
- test_loss_fault=prediction(model,xtest,Xtsc,conftest)
- Xtsc_nor=scaler_test_train(df_nor,scaler)
- xtest_nor,conftest_nor=create_dataset(Xtsc_nor,df_nor,time_steps=time_steps)
- test_loss_nor=prediction(model,xtest_nor,Xtsc_nor,conftest_nor)
- cols=list(test_loss_nor[6].index)
- list_delta_loss=[]
- for k in range(len(test_loss_fault[5])):
- delta_loss=test_loss_nor[6][k]-test_loss_fault[5][k]
- list_delta_loss.append(delta_loss)
- list_delta_loss_serie = pd.Series(list_delta_loss,index=cols)
- key_col=list_delta_loss_serie.idxmax()
- delta_loss_serie_sort=list_delta_loss_serie.sort_values()
- # print(delta_loss_serie_sort)
- return key_col,model,scaler,test_loss_fault,delta_loss_serie_sort
- #自动化训练流程
- def train(datatest,dataset_nor):
- #特征工程
- datatest.reset_index(drop=True,inplace=True)
- #dataset_nor2=dataset_nor[dataset_nor['temp2_max'].notnull()]
- dataset_nor.reset_index(drop=True,inplace=True)
- #选取时间滑窗
- deltatime=[]
- for sp in list(datatest['split'].drop_duplicates()):
- fault=datatest[datatest['split']==sp]
- fault.reset_index(drop=True,inplace=True)
- delta_time=(datetime.datetime.strptime(str(fault.loc[len(fault)-1,'Time']),'%Y-%m-%d %H:%M:%S')-datetime.datetime.strptime(str(fault.loc[0,'Time']),'%Y-%m-%d %H:%M:%S')).total_seconds()
- deltatime.append(delta_time)
- time_delta= np.median(deltatime) # 秒 接插件过温:110.8 温度NTC漂移 594 低电量 396077 B板采样失效: 373650
- if time_delta>259200: # 3天
- freq='T'
- df_fault=makedataset(datatest,freq) # 1分钟步长
- df_nor=makedataset(dataset_nor,freq)
- time_steps=10 # 10min
- else:
- freq='10S'
- df_fault=makedataset(datatest,freq) # 10秒步长
- df_nor=makedataset(dataset_nor,freq)
- time_steps=6 if time_delta<180 else 30 if time_delta<600 else 60 # 1min 5min 10min
- #选取模型参数
- units=30
- #print(len(df_fault))
- batch_size=16 if len(df_fault)<20000 else 32 if (len(df_fault)<50000) & (len(df_fault)>20000) else 64 if (len(df_fault)>50000) & (len(df_fault)<100000) else 96 if (len(df_fault)>100000) & (len(df_fault)<200000) else 128 # 接插件过温:25768 # 温度NTC漂移:152363
- #10折交叉验证
- list_model=[]
- list_scaler=[]
- list_key_col=[]
- list_loss_fault=[]
- list_deltaloss_sort=[]
- for k in range(10):
- key_col,model,scaler,test_loss_fault,delta_loss_serie_sort=cross_val(df_fault,time_steps,units,batch_size,df_nor)
- list_model.append(model)
- list_scaler.append(scaler)
- list_key_col.append(key_col)
- list_loss_fault.append(test_loss_fault)
- list_deltaloss_sort.append(delta_loss_serie_sort)
- #判断模型稳定性 独特特征
- #if (list_key_col.count(list_key_col[0]) == len(list_key_col)) & (np.array(list_key_deltaloss).all()>0):
- maxlabel = max(list_key_col,key=list_key_col.count)
- list_key_deltaloss= [list_deltaloss_sort[i][maxlabel] for i in list(range(10))]
- if (list_key_col.count(maxlabel)>6) & (np.array(list_key_deltaloss).all()>0):
- #loss阈值
- a=[]
- for j in range(10):
- if list(list_deltaloss_sort[j].index)[-1]==maxlabel:
- b=list_deltaloss_sort[j][maxlabel]
- a.append(b)
- index_delmax=list_key_deltaloss.index(max(a))
- test_loss_fault,model,scaler=list_loss_fault[index_delmax],list_model[index_delmax],list_scaler[index_delmax]
- loss_th_max=round(test_loss_fault[5][maxlabel],2)
- # key_col=list(set(list_key_col))
- # aa=[]
- # for k in key_col:
- # for i in range(len(list_deltaloss_sort)):
- # a=list_deltaloss_sort[i][k]
- # aa.append(a)
- # if np.array(aa).all()>0: #判断模型稳定性 混合特征
- # #loss阈值
- # bb=[]
- # for i in range(len(list_deltaloss_sort)):
- # b=sum(list(list_deltaloss_sort[i][key_col]))
- # bb.append(b)
- # index_delmax=bb.index(max(bb))
- # test_loss_fault,model,scaler,deltaloss_sort=list_loss_fault[index_delmax],list_model[index_delmax],list_scaler[index_delmax],list_deltaloss_sort[index_delmax]
- # loss_th_max=[]
- # for key in key_col:
- # loss_max=round(test_loss_fault[5][key],2)+float(deltaloss_sort[key])/10
- # loss_th_max.append(loss_max)
- # Plan2
- else:
- #print('模型不稳定')
- model,scaler,loss_th_max='','',''
- return model,scaler,loss_th_max,time_steps,maxlabel,freq
-
|