123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import pandas as pd
- import numpy as np
- import datetime
- import matplotlib as plt
- from scipy.signal import savgol_filter
- from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
- class Liplated_test:
- def __init__(self,sn,celltype,df_bms): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)
- self.df_bms=pd.DataFrame(df_bms)
- self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
- self.packvolt=df_bms['总电压[V]']
- self.bms_soc=df_bms['SOC[%]']
- self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
- self.cellvolt_name=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
- self.bmssta = df_bms['充电状态']
- #定义加权滤波函数..................................................................................................
- def moving_average(interval, windowsize):
- window = np.ones(int(windowsize)) / float(windowsize)
- re = np.convolve(interval, window, 'same')
- return re
- #.............................................析锂检测............................................................................
- def liplated_detect(self):
- #----------------------------------------筛选充电后静置数据------------------------------------------------------------
- chrgr_rest_data_temp = self.df_bms.loc[((self.df_bms['充电状态'] == 0) & (self.df_bms['SOC[%]'] > 98) & (self.df_bms['总电流[A]'] == 0)) |
- ((self.df_bms['充电状态'] == 2) & (self.df_bms['SOC[%]'] > 98) & (self.df_bms['总电流[A]'] == 0))]#接近慢充后静置数据
- df_lipltd_result = pd.DataFrame(columns=['sn','time','liplated','liplated_amount'])
- if not chrgr_rest_data_temp.empty:
- chrgr_rest_data = chrgr_rest_data_temp.reset_index(drop=True)
- temp_rest_time = chrgr_rest_data['时间戳']
- rest_time = pd.to_datetime(temp_rest_time)
- delta_time = (np.diff(rest_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数
- pos = np.where(delta_time > 30)#静置数据分段,大于30min时,认为是两个静置过程
- splice_num = []
- if len(pos[0]) >= 1:
- pos_ful_tem = np.insert(pos, 0, 0)
- pos_len = len(pos_ful_tem)
- data_len = len(rest_time)
- pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
- for item in range(0,len(pos_ful)-1):
- splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
- splice_num = np.insert(splice_num, 0, 0)
- else:
- splice_num = np.zeros(len(temp_rest_time))
- pos_ful = np.array([0])
- chrgr_rest_data['chrgr_rest'] = splice_num
- #---------------------------判断数据点数大于30的数据段,对电压微分并画图--------------------------------------------
- cellvolt_list = self.cellvolt_name
- chrgr_rest_check_data = chrgr_rest_data.drop(['GSM信号','故障等级','故障代码','绝缘电阻','总电流[A]','总电压[V]','充电状态','单体压差','SOC[%]'],axis=1,inplace=False)
- chrgr_rest_check_data.fillna(value=0)
- df_rest_volt_diffdt = pd.DataFrame()
- df_rest_volt_smooth = pd.DataFrame()
- k = 0
- for j in range(0,len(pos_ful)-1):#len(pos_ful)-1#有几段充电后静置数据
- df_test_rest_data = chrgr_rest_check_data.loc[chrgr_rest_check_data['chrgr_rest'] == j]
- df_rest_volt_smooth = pd.DataFrame()
- df_test_rest_time = pd.to_datetime(df_test_rest_data['时间戳'],format='%Y-%m-%d %H:%M:%S')
- df_test_rest_time = df_test_rest_time.reset_index(drop=True)
- df_data_length = len(df_test_rest_time)
- if (df_data_length > 30) & ((df_test_rest_time[df_data_length - 1] - df_test_rest_time[0])/pd.Timedelta(1, 'min') > 40):#静置时间大于40min
- df_test_rest_time_dif_temp = np.diff(df_test_rest_time)/pd.Timedelta(1, 'min')
- num_list = []
- data_jump_pos = np.where(df_test_rest_time_dif_temp > 3)
- if len(data_jump_pos[0]) > 0:
- if data_jump_pos[0][0] > 100:
- for i in range(0,data_jump_pos[0][0],15):##采样密集时每隔10行取数据
- num_list.append(i)
- df_rest_data_temp = df_test_rest_data.iloc[num_list]
- df_test_rest_data_choose = pd.DataFrame(df_rest_data_temp)
- df_test_rest_data_choose_else = df_test_rest_data.iloc[data_jump_pos[0][0]+1:len(df_test_rest_data)-1]
- df_rest_data_recon_temp = df_test_rest_data_choose.append(df_test_rest_data_choose_else)
- df_rest_data_recon =df_rest_data_recon_temp.reset_index(drop=True)
- else:
- df_rest_data_recon = df_test_rest_data
- else:
- df_rest_data_recon = df_test_rest_data
- df_rest_time = pd.to_datetime(df_rest_data_recon['时间戳'],format='%Y-%m-%d %H:%M:%S')
- df_rest_time = df_rest_time.reset_index(drop=True)
- df_rest_time_dif_temp = np.diff(df_rest_time)/pd.Timedelta(1, 'min')
- df_rest_volt = df_rest_data_recon[cellvolt_list]
- for item in cellvolt_list:
- window_temp = int(len(df_rest_volt[item])/3)
- if window_temp%2:#滤波函数的窗口长度需为奇数
- window = window_temp
- else:
- window = window_temp - 1
- step = min(int(window/3),5)
- df_volt_smooth = savgol_filter(df_rest_volt[item],window,step)
- df_rest_volt_smooth[item] = df_volt_smooth
- df_test_rest_volt_diff_temp = np.diff(df_rest_volt_smooth,axis=0)
- df_test_rest_time_dif = pd.DataFrame(df_rest_time_dif_temp)
- df_test_rest_volt_diff = pd.DataFrame(df_test_rest_volt_diff_temp)
- df_test_rest_volt_diffdt_temp = np.divide(df_test_rest_volt_diff,df_test_rest_time_dif)
- df_test_rest_volt_diffdt = pd.DataFrame(df_test_rest_volt_diffdt_temp)
- df_test_rest_volt_diffdt = df_test_rest_volt_diffdt.append(df_test_rest_volt_diffdt.iloc[len(df_test_rest_volt_diffdt)-1])
- df_test_rest_volt_diffdt.columns = cellvolt_list
- if len(df_test_rest_volt_diffdt) > 25:
- for item in cellvolt_list:
- df_volt_diffdt_smooth = savgol_filter(df_test_rest_volt_diffdt[item],13,3)
- df_test_rest_volt_diffdt[item] = df_volt_diffdt_smooth
- df_test_rest_volt_diffdt['chrgr_rest'] = k
- df_test_rest_volt_diffdt['时间戳'] = list(df_rest_time)
- k = k+1
- df_rest_volt_diffdt = df_rest_volt_diffdt.append(df_test_rest_volt_diffdt)
- df_rest_volt_diffdt.reset_index()
- #--------------------------------------------------------确认是否析锂----------------------------------------------------------------------------
- # df_lipltd_data = pd.DataFrame(columns=['sn','date','liplated'])
- for item in range(0,k):
- lipltd_confirm = []
- lipltd_amount = []
- df_check_liplated_temp = df_rest_volt_diffdt.loc[df_rest_volt_diffdt['chrgr_rest'] == item].reset_index(drop = True)
- df_lipltd_volt_temp = df_check_liplated_temp[cellvolt_list]
- df_lipltd_volt_len = len(df_lipltd_volt_temp)
- df_data_temp_add = df_lipltd_volt_temp.iloc[df_lipltd_volt_len-4:df_lipltd_volt_len-1]
- df_lipltd_volt_temp_add = df_lipltd_volt_temp.append(df_data_temp_add)
- df_lipltd_volt_temp_dif = np.diff(df_lipltd_volt_temp_add,axis=0)#电压一次微分,计算dv/dt
- df_lipltd_volt_temp_dif = pd.DataFrame(df_lipltd_volt_temp_dif)
- df_lipltd_volt_temp_dif.columns = cellvolt_list
- df_lipltd_volt_temp_difdif = np.diff(df_lipltd_volt_temp_dif,axis=0)#电压二次微分,判断升降
- df_lipltd_volt_temp_difdif = pd.DataFrame(df_lipltd_volt_temp_difdif)
- df_lipltd_volt_temp_difdif.columns = cellvolt_list
- df_lipltd_volt_temp_difdif_temp = df_lipltd_volt_temp_difdif
- df_lipltd_volt_temp_difdif_temp[df_lipltd_volt_temp_difdif_temp >= 0] = 1
- df_lipltd_volt_temp_difdif_temp[df_lipltd_volt_temp_difdif_temp < 0] = -1
- df_lipltd_volt_temp_difdifdif = np.diff(df_lipltd_volt_temp_difdif_temp,axis=0)#三次微分,利用-2,2判断波分和波谷
- df_lipltd_volt_difdifdif = pd.DataFrame(df_lipltd_volt_temp_difdifdif)
- df_lipltd_volt_difdifdif.columns = cellvolt_list
- df_lipltd_volt_difdifdif['chrgr_rest'] = k
- df_lipltd_volt_difdifdif['时间戳'] = list(df_check_liplated_temp['时间戳'])
- df_lipltd_volt_difdifdif = df_lipltd_volt_difdifdif.reset_index(drop = True)
- df_lipltd_data_temp = df_lipltd_volt_difdifdif.loc[df_lipltd_volt_difdifdif['时间戳'] < (df_check_liplated_temp['时间戳'][0] + datetime.timedelta(minutes=90))]
- for cell_name in cellvolt_list:#对每个电芯判断
- df_check_plated_data = df_lipltd_data_temp[cell_name]
- peak_pos = np.where(df_check_plated_data == -2)
- bot_pos = np.where(df_check_plated_data == 2)
- if len(peak_pos[0]) & len(bot_pos[0]):
- if (peak_pos[0][0] > bot_pos[0][0]) & (df_lipltd_volt_temp_dif[cell_name][peak_pos[0][0] + 1] < 0):
- lipltd_confirm.append(1)#1为析锂,0为非析锂
- lipltd_amount.append((df_check_liplated_temp['时间戳'][bot_pos[0][0] + 2] - df_check_liplated_temp['时间戳'][0])/pd.Timedelta(1, 'min'))
- else:
- lipltd_confirm.append(0)
- lipltd_amount.append(0)
- else:
- lipltd_confirm.append(0)
- lipltd_amount.append(0)
- if any(lipltd_confirm) & (max(lipltd_amount) > 5):
- df_lipltd_confir_temp = pd.DataFrame({"sn":[self.sn], "time":[df_check_liplated_temp['时间戳'][0]], "liplated":[str(lipltd_confirm)], "liplated_amount":[str(lipltd_amount)]})
- df_lipltd_result = df_lipltd_result.append(df_lipltd_confir_temp)
- df_lipltd_result = df_lipltd_result.reset_index(drop = True)
- df_lipltd_result.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
- # df_lipltd_data.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\02析锂检测\liplated\算法开发_检测\滤波后图片\析锂.csv',index=False,encoding='GB18030')
- #返回诊断结果...........................................................................................................
- if not df_lipltd_result.empty:
- return df_lipltd_result
- else:
- return pd.DataFrame()
|