from ctypes import Structure from re import M import pandas as pd import numpy as np import datetime import time, datetime import matplotlib.pyplot as plt from pylab import* from scipy.signal import savgol_filter from scipy import interpolate import os from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class chrgr_statistic: def __init__(self,sn,celltype,df_bms): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo self.df_bms=pd.DataFrame(df_bms) self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.LookTab_OCV = self.param.LookTab_OCV self.LookTab_SOC = self.param.LookTab_SOC self.packcrnt_flg = self.param.PackCrntDec#判断充放电电流方向,一般放电为正,充电为负 self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] self.bmssta = df_bms['充电状态'] #定义加权滤波函数.................................................................................................. def moving_average(interval, windowsize): window = np.ones(int(windowsize)) / float(windowsize) re = np.convolve(interval, window, 'same') return re #.............................................充电行为统计............................................................................ def chrgr_soc_sta(self): start_time = time.time() df_chrgr_soc = pd.DataFrame(columns = ['sn', 'time', 'chrgr_soc', 'chrgr_volt', 'temp', 'full_chrgr_num']) data = self.df_bms data.fillna(0, inplace = True) crnt_flg = self.packcrnt_flg LookTab_OCV = self.LookTab_OCV LookTab_SOC = self.LookTab_SOC #----------------------------------------区分充放电数据---------------------------------------------------------------------- df_soc = data['SOC[%]'] df_soc_len = len(df_soc) df_soc_add = df_soc.iloc[df_soc_len-3:df_soc_len-1]#由于对SOC二次微分,增加两行数据 df_soc_new = df_soc.append(df_soc_add) df_soc_new_dif = np.diff(df_soc_new,axis=0)#SOC一次微分,判断升降 df_soc_new_dif = pd.DataFrame(df_soc_new_dif) df_soc_new_dif[df_soc_new_dif > 0] = 1#SOC升高 df_soc_new_dif[df_soc_new_dif == 0] = 0#SOC升高或静置 df_soc_new_dif[df_soc_new_dif < 0] = -1#SOC下降 df_soc_dif_arr = np.array(df_soc_new_dif) pos_change = nonzero(df_soc_dif_arr)#寻找非零元素位置 value_change = df_soc_dif_arr[pos_change[0]]#非零元素数值 #-------------------------------------------------------利用soc的升降判断充放电--------------------------------------------------------- pos_change_dif = np.diff(pos_change[0])#计算SOC变化时的位置信息 pos_change_del_temp = np.where(pos_change_dif < 10) pos_change_del = pos_change_del_temp[0] + 1 value_change_new = np.delete(value_change[:,0], pos_change_del)#SOC发生变化数值 pos_change_new = np.delete(pos_change[0], pos_change_del)#SOC发生变化位置 value_change_new_dif = np.diff(value_change_new)#寻找充电或放电结束位置 value_change_new_dif = np.insert(value_change_new_dif, 0, value_change_new_dif[0]) chrgr_dischrgr_pos_temp = np.where(abs(value_change_new_dif) == 2)#寻找充电或放电结束位置 splice_num = [] if len(chrgr_dischrgr_pos_temp[0]) >= 1: chrgr_dischrgr_pos = pos_change_new[chrgr_dischrgr_pos_temp[0]]#原数据中充电或放电结束位置 chrgr_dischrgr_pos = np.insert(chrgr_dischrgr_pos, 0, 0) # data_len = len(df_soc)#数据长度 pos_len = len(chrgr_dischrgr_pos)#切片长度 chrgr_dischrgr_pos = np.insert(chrgr_dischrgr_pos, pos_len, df_soc_len-1)# for item in range(0,len(chrgr_dischrgr_pos)-1): splice_num.extend(item*np.ones(chrgr_dischrgr_pos[item +1]-chrgr_dischrgr_pos[item])) splice_num = np.insert(splice_num, df_soc_len-2, item) else: splice_num = np.zeros(df_soc_len) # pos_ful = np.array([0]) data['stat'] = splice_num #--------------------------------------------------------筛选充电数据与放电数据------------------------------------- chrgr_data = pd.DataFrame() dischrgr_data = pd.DataFrame() for num_chrgr in range(0,len(chrgr_dischrgr_pos)-1): df_data_temp = data.loc[(data['stat'] == num_chrgr)] df_data_temp = df_data_temp.reset_index(drop = True) df_soc_temp = df_data_temp['SOC[%]'] if len(df_soc_temp) > 10: if (df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]) > 10:#筛选SOC增大的区间段,认为是一次充电 # chrgr_data_temp = df_data_temp.loc[df_data_temp['总电流[A]'] >= 0] chrgr_data = chrgr_data.append(df_data_temp) chrgr_data = chrgr_data.reset_index(drop = True) else: # dischrgr_data_temp = df_data_temp.loc[df_data_temp['总电流[A]'] <= 0]#筛选具有放电电流的放电数据 dischrgr_data = dischrgr_data.append(df_data_temp) dischrgr_data = dischrgr_data.reset_index(drop = True) else: dischrgr_data = dischrgr_data.append(df_data_temp) dischrgr_data = dischrgr_data.reset_index(drop = True) if len(chrgr_data) > 1: chrgr_num_difference = np.unique(chrgr_data['stat'])#充电过程有几段数据 #---------------------------------------------------统计充电截止电压、充电SOC区间--------------------------------------------------------- cell_name = self.cellvolt_list#电芯电压名称 temp_name = self.celltemp_name full_chrgr_num = 0 if len(chrgr_num_difference) > 0: chrgr_soc = pd.DataFrame() soc_cal = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#插值函数 max_volt = max(LookTab_OCV) min_volt = min(LookTab_OCV) for chrgr_num in chrgr_num_difference: data_temp = chrgr_data.loc[chrgr_data['stat'] == chrgr_num]#该片段数据 chrgr_crnt_data = data_temp.loc[crnt_flg*data_temp['总电流[A]'] < 0]#充电数据中电流非零部分,充电时电流为负 chrgr_crnt_data = chrgr_crnt_data.reset_index(drop = True) chrgr_crnt_zero = data_temp.loc[data_temp['总电流[A]'] == 0]#充电数据中电流为0数据 chrgr_crnt_zero = chrgr_crnt_zero.reset_index(drop = True) chrgr_crnt_time = pd.to_datetime(chrgr_crnt_data['时间戳']) if len(chrgr_crnt_data) > 1: df_cell_volt = chrgr_crnt_data[cell_name].iloc[-1]/1000 df_cell_volt[df_cell_volt >= max_volt] = max_volt - 0.005 df_cell_volt[df_cell_volt <= min_volt] = min_volt + 0.005 chrgr_soc = soc_cal(list(df_cell_volt)) full_chrgr_check = (chrgr_soc > 98) if any(full_chrgr_check): full_chrgr_num = full_chrgr_num + 1 df_chrgr_soc_temp = pd.DataFrame({"sn":[self.sn], "time":[chrgr_crnt_time.iloc[-1]], "chrgr_soc":[str(list(chrgr_soc))], "chrgr_volt":[str(list(df_cell_volt))], "temp":[str(list(chrgr_crnt_data[temp_name].iloc[-1]))], "full_chrgr_num":[full_chrgr_num]}) df_chrgr_soc = df_chrgr_soc.append(df_chrgr_soc_temp) df_chrgr_soc = df_chrgr_soc.reset_index(drop = True) df_chrgr_soc.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序 if not df_chrgr_soc.empty: return df_chrgr_soc else: return pd.DataFrame()