from re import X import pandas as pd import numpy as np from pandas.core.frame import DataFrame from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam import pandas as pd # 计算充电过程 def preprocess(df): # 滤除前后电压存在一增一减的情况(采样异常) pass # 计算SOC变化率 def cal_volt_change(dfin, volt_column): df = dfin.copy() df_volt_rolling = df[volt_column] df_volt_rolling_sum=df_volt_rolling.sum(1)-df_volt_rolling.max(1) df_volt_rolling_sum=df_volt_rolling_sum-df_volt_rolling.min(1) mean1 = df_volt_rolling_sum/(len(volt_column)-2) df_volt_rolling_norm = df_volt_rolling.sub(mean1, axis=0)#.div(std,axis=0) df_volt_rolling_norm = df_volt_rolling_norm.reset_index(drop=True)#和均值的距离 return df_volt_rolling_norm # 计算电压离群 def cal_volt_sigma(dfin, volt_column): df = dfin.copy() df_volt_rolling = df[volt_column] mean1=df_volt_rolling.mean(axis=1) std = df_volt_rolling.std(axis=1) std = std.replace(0,0.000001) df_volt_rolling = df_volt_rolling.sub(mean1, axis=0).div(std,axis=0) df_volt_rolling = df_volt_rolling.reset_index(drop=True)#分布 return df_volt_rolling # # 计算电压变化量的偏离度 # def cal_voltdiff_uniform(dfin, volt_column, window=10, step=5, window2=10, step2=5,threshold=3): # df = dfin.copy() # time_list = dfin['time'].tolist() # # 电压滤波 # df_volt = df[volt_column] # df_volt_rolling = df_volt.rolling(window).mean()[window-1::step] # 滑动平均值 # time_list = time_list[window-1::step] # # 计算电压变化量的绝对值(# 计算前后的差值的绝对值, 时间列-1) # df_volt_diff = abs(df_volt_rolling.diff()[1:]) # df_volt_diff = df_volt_diff.reset_index(drop=True) # time_list = time_list[1:] # # 压差归一化(偏离度) # # mean = df_volt_diff.mean(axis=1) # # std = df_volt_diff.std(axis=1) # # df_voltdiff_norm = df_volt_diff.sub(mean, axis=0).div(std,axis=0) # df_voltdiff_norm = df_volt_diff.copy() # # 压差偏离度滑动平均滤波 # df_voltdiff_rolling = df_voltdiff_norm.rolling(window2).mean()[window2-1::step2] # 滑动平均值 # time_list = time_list[window2-1::step2] # df_voltdiff_rolling_sum=df_voltdiff_rolling.sum(1)-df_voltdiff_rolling.max(1) # df_voltdiff_rolling_sum=df_voltdiff_rolling_sum-df_voltdiff_rolling.min(1) # mean = df_voltdiff_rolling_sum/(len(volt_column)-2) # std = df_voltdiff_rolling.std(axis=1) # # mean = [np.array(sorted(x)[1:-1]).mean() for x in df_voltdiff_rolling.values] # # std = [np.array(sorted(x)[1:-1]).std() for x in df_voltdiff_rolling.values] # df_voltdiff_rolling_norm = df_voltdiff_rolling.sub(mean, axis=0)#.div(std,axis=0) # df_voltdiff_rolling_norm = df_voltdiff_rolling_norm.reset_index(drop=True) # return df_voltdiff_rolling_norm, time_list def main(sn,df_bms,df_soh,celltype,df_last): df_ram=pd.DataFrame(columns=['sn','time4','cellsoc']) param=BatParam.BatParam(celltype) df_bms['总电流[A]']=df_bms['总电流[A]']*param.PackCrntDec df_bms.rename(columns = {'总电流[A]':'PackCrnt'}, inplace=True) df_bms['time']=pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') volt_column = ['单体电压'+str(i) for i in range(1,param.CellVoltNums+1)] columns=['time']+volt_column df_bms=df_bms[(df_bms['SOC[%]']>10)] # df_bms=df_bms[(df_bms['PackCrnt']<1)] # df_chrg=df_bms[(df_bms['PackCrnt']<-1)] #电压/SOC变化率计算 if celltype<50: df_ram=pd.DataFrame(columns=['sn','time4','cellsoc']) df_ori = df_bms[columns] df = df_ori.drop_duplicates(subset=['time']) # 删除时间相同的数据 df= df.set_index('time') df=df[(df[volt_column]>2000) & (df[volt_column]<4500)] df[volt_column]=pd.DataFrame(df[volt_column],dtype=np.float) df=df.resample('H').mean() #取一个小时的平均值 df=df.dropna(how='any') time_list1=df.index.tolist() fun=lambda x: np.interp(x/1000, param.LookTab_OCV, param.LookTab_SOC) df_soc=df.applymap(fun) if (not df_soh.empty) and len(df_soc)>1: if (not df_last.empty) and (time_list1[0]-df_last.loc[0,'time4']).total_seconds()<12*3600: df_delt_soc1=df_soc-df_last.loc[0,'cellsoc'] cellsoh=eval(df_soh.loc[0,'cellsoh']) df_delt_soc2=df_delt_soc1*np.array(cellsoh)/100 df_delt_soc=df_delt_soc2-df_delt_soc1 df_soc=df_soc+df_delt_soc else: df_delt_soc1=df_soc-df_soc.iloc[0] cellsoh=eval(df_soh.loc[0,'cellsoh']) df_delt_soc2=df_delt_soc1*np.array(cellsoh)/100 df_delt_soc=df_delt_soc2-df_delt_soc1 df_soc=df_soc+df_delt_soc df_ram.loc[0]=[sn,df_soc.index[-1],list(df_soc.iloc[-1])] else: df_ram=df_last VolChng = cal_volt_change(df_soc,volt_column) else: # df_bms=df_bms[(df_bms['PackCrnt']>-0.1) & (df_bms['PackCrnt']<0.1)] df_ori = df_bms[columns] df = df_ori.drop_duplicates(subset=['time']) # 删除时间相同的数据 df= df.set_index('time') df=df[(df[volt_column]>3200) & (df[volt_column]<3400)] df[volt_column]=pd.DataFrame(df[volt_column],dtype=np.float) df=df.resample('H').mean() #取一个小时的平均值 df=df.dropna(how='any') time_list1=df.index.tolist() VolChng = cal_volt_change(df,volt_column) VolSigma = cal_volt_sigma(df,volt_column) OutLineVol=DataFrame(columns=['time','sn','VolOl_Uni','VolChng_Uni']) #电压变化率和离群度计算 if len(VolChng)>5 and len(VolSigma)>5: VolChng['time'] = time_list1 VolChng= VolChng.set_index('time') VolChng_Uni_result=VolChng.values.tolist()#改 VolSigma['time'] = time_list1 VolSigma= VolSigma.set_index('time') VolOl_Uni_result=VolSigma.values.tolist()#改 for i in range(0,len(VolChng)): if max(VolOl_Uni_result[i])>3 and min(VolOl_Uni_result[i])<-3: pass else: OutLineVol.loc[i,'VolOl_Uni']=str(list(np.around(VolOl_Uni_result[i],decimals=2))) OutLineVol.loc[i,'VolChng_Uni']=str(list(np.around(VolChng_Uni_result[i],decimals=2))) OutLineVol=OutLineVol[~OutLineVol['VolOl_Uni'].str.contains('nan')] OutLineVol=OutLineVol[~OutLineVol['VolChng_Uni'].str.contains('nan')] OutLineVol=OutLineVol.applymap((lambda x:''.join(x.split()) if type(x) is str else x)) OutLineVol=OutLineVol.reset_index(drop=True) OutLineVol['time']= VolSigma.index OutLineVol['sn']=sn return(OutLineVol,df_ram) # this_alarm = {} # df_alarm = df_voltdiff_rolling_norm[abs(df_voltdiff_rolling_norm)>threshold].dropna(how='all') # for index in df_alarm.index: # df_temp = df_alarm.loc[index, :].dropna(how='all', axis=0) # this_alarm.update({df_cell_volt.loc[index+1, 'date']:[str(df_temp.keys().tolist()), str([round(x, 2) for x in df_temp.values.tolist()])]}) # df_alarm1 = pd.DataFrame(this_alarm) # return pd.DataFrame(df_alarm1.values.T, index=df_alarm1.columns, columns=df_alarm1.index), pd.DataFrame(df_alarm2.values.T, index=df_alarm2.columns, columns=df_alarm2.index) # # 孤立森林算法 # def iso_froest(df): # #1. 生成训练数据 # rng = np.random.RandomState(42) # X = 0.3 * rng.randn(100, 2) #生成100 行,2 列 # X_train = np.r_[X + 2, X - 2] # print(X_train) # # 产生一些异常数据 # X_outliers = rng.uniform(low=-4, high=4, size=(20, 2)) # iForest= IsolationForest(contamination=0.1) # iForest = iForest.fit(X_train) # #预测 # pred = iForest.predict(X_outliers) # print(pred) # # [-1 1 -1 -1 -1 -1 -1 1 - # # 计算相关系数 # def cal_coff(df): # cc_mean = np.mean(df, axis=0) #axis=0,表示按列求均值 ——— 即第一维 # cc_std = np.std(df, axis=0) # cc_zscore = (df-cc_mean)/cc_std #标准化 # cc_zscore = cc_zscore.dropna(axis=0, how='any') # cc_zscore_corr = cc_zscore.corr(method='spearman') # result = [] # for i in range(len(cc_zscore_corr)): # v = abs(np.array((sorted(cc_zscore_corr.iloc[i]))[2:-1])).mean() # 去掉1 和两个最小值后求均值 # result.append(v) # return cc_zscore_corr, result # def instorage(sn, df_voltdiff_result, df_volt_result): # df_all_result = pd.DataFrame(columns=['sn', 'time', 'cellnum', 'value', 'type']) # value_list = [] # cellnum_list = [] # time_list = [] # type_list = [] # df_result = df_voltdiff_result.copy().drop(columns='time') # time_list_1 = df_voltdiff_result['time'] # df_result = df_result[(df_result>3) | (df_result<-3)].dropna(axis=0, how='all').dropna(axis=1, how='all') # for column in df_result.columns: # df = df_result[[column]].dropna(axis=0, how='all') # value_list.extend(df[column].tolist()) # cellnum_list.extend([column]*len(df)) # time_list.extend([time_list_1[x] for x in df.index]) # length_1 = len(value_list) # df_result = df_volt_result.copy().drop(columns='time') # time_list_2 = df_volt_result['time'] # df_result = df_result[(df_result>3) | (df_result<-3)].dropna(axis=0, how='all').dropna(axis=1, how='all') # for column in df_result.columns: # df = df_result[[column]].dropna(axis=0, how='all') # value_list.extend(df[column].tolist()) # cellnum_list.extend([column]*len(df)) # time_list.extend([time_list_2[x] for x in df.index]) # length_2 = len(value_list) - length_1 # type_list.extend(['电压变化量离群'] * length_1) # type_list.extend(['电压离群'] * length_2) # df_all_result['sn'] = [sn] * len(value_list) # df_all_result['cellnum'] = cellnum_list # df_all_result['value'] = value_list # df_all_result['time'] = time_list # df_all_result['type'] = type_list # return df_all_result # # 报警.如果在某个窗口内,有超过ratio个的点,偏离度超过threshold, 则报警 # def alarm(dfin, volt_column, alarm_window=10, alarm_ratio=0.8, alarm_threshold=2.5): # time_list = dfin['time'].tolist() # df_result = dfin[volt_column].copy() # alarm_result = pd.DataFrame(columns=['type', 'num', 'alarm_time']) # df_result_1 = df_result.copy() # df_result_1[df_result_1alarm_threshold] = 1 # df_result_1 = df_result_1.rolling(alarm_window).sum() # for column in volt_column: # if len(df_result_1[df_result_1[column]>alarm_window * alarm_ratio])>0: # alarm_result = alarm_result.append({'type':'1', 'num':column, 'alarm_time':time_list[df_result_1[df_result_1[column]>alarm_window * alarm_ratio].index[0]]}, ignore_index=True) # # time_list = time_list[window-1::step] # df_result_2 = df_result.copy() # df_result_2[df_result_2>-alarm_threshold] = 0 # df_result_2[df_result_2<-alarm_threshold] = 1 # df_result_2 = df_result_2.rolling(alarm_window).sum() # for column in volt_column: # if len(df_result_2[df_result_2[column]>alarm_window * alarm_ratio])>0: # alarm_result = alarm_result.append({'type':'2', 'num':column, 'alarm_time':time_list[df_result_2[df_result_2[column]>alarm_window * alarm_ratio].index[0]]}, ignore_index=True) # return alarm_result