import pandas as pd import numpy as np class BatDiag: def __init__(self, df_bms, df_soh, df_sor): #参数初始化 self.df_bms=df_bms self.vin=df_bms['VIN'] self.model=df_bms['VehModel'] self.sn=df_bms['SN'] self.end_time='0000-00-00 00:00:00' self.cellvolt_name=['CellVolt'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['CellTemp'+str(x) for x in range(1,self.param.CellTempNums+1)] #电芯电压无效故障........................................................................................................................................................ def cellvoltvalid(self,df_ram,df_param): # df_res=pd.DataFrame(columns=['start_time', 'end_time', 'vin', 'sn', 'model', 'fault_code', 'fault_reason', 'fault_advice', 'fault_location']) df_volt=self.df_bms[['Time']+self.cellvolt_name] volt_mean=df_volt[self.cellvolt_name].mean(axis=1) volt_std=df_volt[self.cellvolt_name].std(axis=1) volt_std = volt_std.replace(0,0.000001) df_stray=df[self.cellvolt_name].sub(volt_mean,axis=0).div(volt_std,axis=0) if df_ram.empty: #电压断线 df=df_volt[((df_volt[self.cellvolt_name]<2.5).any(1)) & ((df_volt[self.cellvolt_name]>4.5).any(1))] if not df.empty: list_of_df = np.split(df, np.flatnonzero(np.diff(df.index) != 1) + 1) df=max(list_of_df, key=len, default='') if df.iloc[-1]['Time']-df.iloc[0]['Time']>df_param: cellvolt=np.array(df.iloc[0][self.cellvolt_name]) cellvoltmin_index=cellvolt.index(min(cellvolt))+1 cellvoltmax_index=cellvolt.index(max(cellvolt))+1 if abs(cellvoltmax_index-cellvoltmin_index)==1: time=df.iloc[0]['Time'] faultcode='C309' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样线断线' faultlocation='电芯{}'.format([cellvoltmin_index,cellvoltmax_index]) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' df_ram.loc[0]=[time, self.end_time, self.vin, self.sn, self.model, faultcode, reason, faultadvice, faultlocation] else: pass else: pass else: pass #电压超限 if df_ram.empty: df=df_volt[((df_volt[self.cellvolt_name]<1).any(1)) | ((df_volt[self.cellvolt_name]>5).any(1))] if not df.empty: list_of_df = np.split(df, np.flatnonzero(np.diff(df.index) != 1) + 1) df=max(list_of_df, key=len, default='') if df.iloc[-1]['Time']-df.iloc[0]['Time']>df_param: cellvolt=np.array(df.iloc[0][self.cellvolt_name]) time=df.iloc[0]['Time'] faultcode='C309' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样电路异常' faultlocation='电芯{}'.format(list(np.argwhere(cellvolt)<1+1)+list(np.argwhere(cellvolt)>5+1)) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' df_ram.loc[0]=[time, self.vin, self.sn, self.model, faultcode, reason, faultadvice, faultlocation] else: pass else: pass else: pass #电压采样松动 if df_ram.empty: df=df_stray[((df_stray[self.cellvolt_name]<-4).any(1)) & ((df_stray[self.cellvolt_name]>4).any(1))] if not df.empty: list_of_df = np.split(df, np.flatnonzero(np.diff(df.index) != 1) + 1) df=max(list_of_df, key=len, default='') if df.iloc[-1]['Time']-df.iloc[0]['Time']>df_param: cellvolt=np.array(df.iloc[0][self.cellvolt_name]) time=df.iloc[0]['Time'] faultcode='C309' faultlv=3 faultinfo='电芯电压无效' reason='电芯电压采样线松动' faultlocation='电芯{}'.format(list(np.argwhere(cellvolt)<1+1)+list(np.argwhere(cellvolt)>5+1)) faultadvice='召回电池包,进行检修' influence='失去对电芯电压监测' df_ram.loc[0]=[time, self.vin, self.sn, self.model, faultcode, reason, faultadvice, faultlocation] else: pass else: pass else: pass return df_ram