import pandas as pd import numpy as np import datetime import matplotlib as plt from scipy.signal import savgol_filter from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class Liplated_test: def __init__(self,sn,celltype,df_bms): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype) self.df_bms=pd.DataFrame(df_bms) self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] self.bmssta = df_bms['充电状态'] #定义加权滤波函数.................................................................................................. def moving_average(interval, windowsize): window = np.ones(int(windowsize)) / float(windowsize) re = np.convolve(interval, window, 'same') return re #.............................................析锂检测............................................................................ def liplated_detect(self): #----------------------------------------筛选充电后静置数据------------------------------------------------------------ chrgr_rest_data_temp = self.df_bms.loc[((self.df_bms['充电状态'] == 0) & (self.df_bms['SOC[%]'] > 98) & (self.df_bms['总电流[A]'] == 0)) | ((self.df_bms['充电状态'] == 2) & (self.df_bms['SOC[%]'] > 98) & (self.df_bms['总电流[A]'] == 0))]#接近慢充后静置数据 df_lipltd_result = pd.DataFrame(columns=['sn','time','liplated','liplated_amount']) if not chrgr_rest_data_temp.empty: chrgr_rest_data = chrgr_rest_data_temp.reset_index(drop=True) temp_rest_time = chrgr_rest_data['时间戳'] rest_time = pd.to_datetime(temp_rest_time) delta_time = (np.diff(rest_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数 pos = np.where(delta_time > 30)#静置数据分段,大于30min时,认为是两个静置过程 splice_num = [] if len(pos[0]) >= 1: pos_ful_tem = np.insert(pos, 0, 0) pos_len = len(pos_ful_tem) data_len = len(rest_time) pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1) for item in range(0,len(pos_ful)-1): splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item])) splice_num = np.insert(splice_num, 0, 0) else: splice_num = np.zeros(len(temp_rest_time)) pos_ful = np.array([0]) chrgr_rest_data['chrgr_rest'] = splice_num #---------------------------判断数据点数大于30的数据段,对电压微分并画图-------------------------------------------- cellvolt_list = self.cellvolt_name chrgr_rest_check_data = chrgr_rest_data.drop(['GSM信号','故障等级','故障代码','绝缘电阻','总电流[A]','总电压[V]','充电状态','单体压差','SOC[%]'],axis=1,inplace=False) chrgr_rest_check_data.fillna(value=0) df_rest_volt_diffdt = pd.DataFrame() df_rest_volt_smooth = pd.DataFrame() k = 0 for j in range(0,len(pos_ful)-1):#len(pos_ful)-1#有几段充电后静置数据 df_test_rest_data = chrgr_rest_check_data.loc[chrgr_rest_check_data['chrgr_rest'] == j] df_rest_volt_smooth = pd.DataFrame() df_test_rest_time = pd.to_datetime(df_test_rest_data['时间戳'],format='%Y-%m-%d %H:%M:%S') df_test_rest_time = df_test_rest_time.reset_index(drop=True) df_data_length = len(df_test_rest_time) if (df_data_length > 30) & ((df_test_rest_time[df_data_length - 1] - df_test_rest_time[0])/pd.Timedelta(1, 'min') > 40):#静置时间大于40min df_test_rest_time_dif_temp = np.diff(df_test_rest_time)/pd.Timedelta(1, 'min') num_list = [] data_jump_pos = np.where(df_test_rest_time_dif_temp > 3) if len(data_jump_pos[0]) > 0: if data_jump_pos[0][0] > 100: for i in range(0,data_jump_pos[0][0],15):##采样密集时每隔10行取数据 num_list.append(i) df_rest_data_temp = df_test_rest_data.iloc[num_list] df_test_rest_data_choose = pd.DataFrame(df_rest_data_temp) df_test_rest_data_choose_else = df_test_rest_data.iloc[data_jump_pos[0][0]+1:len(df_test_rest_data)-1] df_rest_data_recon_temp = df_test_rest_data_choose.append(df_test_rest_data_choose_else) df_rest_data_recon =df_rest_data_recon_temp.reset_index(drop=True) else: df_rest_data_recon = df_test_rest_data else: df_rest_data_recon = df_test_rest_data df_rest_time = pd.to_datetime(df_rest_data_recon['时间戳'],format='%Y-%m-%d %H:%M:%S') df_rest_time = df_rest_time.reset_index(drop=True) df_rest_time_dif_temp = np.diff(df_rest_time)/pd.Timedelta(1, 'min') df_rest_volt = df_rest_data_recon[cellvolt_list] for item in cellvolt_list: window_temp = int(len(df_rest_volt[item])/3) if window_temp%2:#滤波函数的窗口长度需为奇数 window = window_temp else: window = window_temp - 1 step = min(int(window/3),5) df_volt_smooth = savgol_filter(df_rest_volt[item],window,step) df_rest_volt_smooth[item] = df_volt_smooth df_test_rest_volt_diff_temp = np.diff(df_rest_volt_smooth,axis=0) df_test_rest_time_dif = pd.DataFrame(df_rest_time_dif_temp) df_test_rest_volt_diff = pd.DataFrame(df_test_rest_volt_diff_temp) df_test_rest_volt_diffdt_temp = np.divide(df_test_rest_volt_diff,df_test_rest_time_dif) df_test_rest_volt_diffdt = pd.DataFrame(df_test_rest_volt_diffdt_temp) df_test_rest_volt_diffdt = df_test_rest_volt_diffdt.append(df_test_rest_volt_diffdt.iloc[len(df_test_rest_volt_diffdt)-1]) df_test_rest_volt_diffdt.columns = cellvolt_list if len(df_test_rest_volt_diffdt) > 25: for item in cellvolt_list: df_volt_diffdt_smooth = savgol_filter(df_test_rest_volt_diffdt[item],13,3) df_test_rest_volt_diffdt[item] = df_volt_diffdt_smooth df_test_rest_volt_diffdt['chrgr_rest'] = k df_test_rest_volt_diffdt['时间戳'] = list(df_rest_time) k = k+1 df_rest_volt_diffdt = df_rest_volt_diffdt.append(df_test_rest_volt_diffdt) df_rest_volt_diffdt.reset_index() #--------------------------------------------------------确认是否析锂---------------------------------------------------------------------------- # df_lipltd_data = pd.DataFrame(columns=['sn','date','liplated']) for item in range(0,k): lipltd_confirm = [] lipltd_amount = [] df_check_liplated_temp = df_rest_volt_diffdt.loc[df_rest_volt_diffdt['chrgr_rest'] == item].reset_index(drop = True) df_lipltd_volt_temp = df_check_liplated_temp[cellvolt_list] df_lipltd_volt_len = len(df_lipltd_volt_temp) df_data_temp_add = df_lipltd_volt_temp.iloc[df_lipltd_volt_len-4:df_lipltd_volt_len-1] df_lipltd_volt_temp_add = df_lipltd_volt_temp.append(df_data_temp_add) df_lipltd_volt_temp_dif = np.diff(df_lipltd_volt_temp_add,axis=0)#电压一次微分,计算dv/dt df_lipltd_volt_temp_dif = pd.DataFrame(df_lipltd_volt_temp_dif) df_lipltd_volt_temp_dif.columns = cellvolt_list df_lipltd_volt_temp_difdif = np.diff(df_lipltd_volt_temp_dif,axis=0)#电压二次微分,判断升降 df_lipltd_volt_temp_difdif = pd.DataFrame(df_lipltd_volt_temp_difdif) df_lipltd_volt_temp_difdif.columns = cellvolt_list df_lipltd_volt_temp_difdif_temp = df_lipltd_volt_temp_difdif df_lipltd_volt_temp_difdif_temp[df_lipltd_volt_temp_difdif_temp >= 0] = 1 df_lipltd_volt_temp_difdif_temp[df_lipltd_volt_temp_difdif_temp < 0] = -1 df_lipltd_volt_temp_difdifdif = np.diff(df_lipltd_volt_temp_difdif_temp,axis=0)#三次微分,利用-2,2判断波分和波谷 df_lipltd_volt_difdifdif = pd.DataFrame(df_lipltd_volt_temp_difdifdif) df_lipltd_volt_difdifdif.columns = cellvolt_list df_lipltd_volt_difdifdif['chrgr_rest'] = k df_lipltd_volt_difdifdif['时间戳'] = list(df_check_liplated_temp['时间戳']) df_lipltd_volt_difdifdif = df_lipltd_volt_difdifdif.reset_index(drop = True) df_lipltd_data_temp = df_lipltd_volt_difdifdif.loc[df_lipltd_volt_difdifdif['时间戳'] < (df_check_liplated_temp['时间戳'][0] + datetime.timedelta(minutes=90))] for cell_name in cellvolt_list:#对每个电芯判断 df_check_plated_data = df_lipltd_data_temp[cell_name] peak_pos = np.where(df_check_plated_data == -2) bot_pos = np.where(df_check_plated_data == 2) if len(peak_pos[0]) & len(bot_pos[0]): if (peak_pos[0][0] > bot_pos[0][0]) & (df_lipltd_volt_temp_dif[cell_name][peak_pos[0][0] + 1] < 0): lipltd_confirm.append(1)#1为析锂,0为非析锂 lipltd_amount.append((df_check_liplated_temp['时间戳'][bot_pos[0][0] + 2] - df_check_liplated_temp['时间戳'][0])/pd.Timedelta(1, 'min')) else: lipltd_confirm.append(0) lipltd_amount.append(0) else: lipltd_confirm.append(0) lipltd_amount.append(0) if any(lipltd_confirm) & (max(lipltd_amount) > 5): df_lipltd_confir_temp = pd.DataFrame({"sn":[self.sn], "time":[df_check_liplated_temp['时间戳'][0]], "liplated":[str(lipltd_confirm)], "liplated_amount":[str(lipltd_amount)]}) df_lipltd_result = df_lipltd_result.append(df_lipltd_confir_temp) df_lipltd_result = df_lipltd_result.reset_index(drop = True) df_lipltd_result.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序 # df_lipltd_data.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\02析锂检测\liplated\算法开发_检测\滤波后图片\析锂.csv',index=False,encoding='GB18030') #返回诊断结果........................................................................................................... if not df_lipltd_result.empty: return df_lipltd_result else: return pd.DataFrame()