from ctypes import Structure from re import M import pandas as pd import numpy as np import datetime import time, datetime import matplotlib.pyplot as plt from pylab import* from scipy.signal import savgol_filter from scipy import interpolate import os from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam class cell_statistic: def __init__(self,sn,celltype,df_bms): #参数初始化 self.sn=sn self.celltype=celltype self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo self.df_bms=pd.DataFrame(df_bms) self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec self.packvolt=df_bms['总电压[V]'] self.bms_soc=df_bms['SOC[%]'] self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') self.LookTab_OCV = self.param.LookTab_OCV self.LookTab_SOC = self.param.LookTab_SOC self.packcrnt_flg = self.param.PackCrntDec#判断充放电电流方向,一般放电为正,充电为负 self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)] self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)] self.bmssta = df_bms['充电状态'] #定义加权滤波函数.................................................................................................. def moving_average(interval, windowsize): window = np.ones(int(windowsize)) / float(windowsize) re = np.convolve(interval, window, 'same') return re #.............................................充电行为统计............................................................................ def rest_sta(self): start_time = time.time() df_rest_soc = pd.DataFrame(columns = ['sn', 'time_st', 'time_end', 'soc_st', 'soc_sp', 'delta_soc', 'tempure', 'meancrnt']) data_temp = self.df_bms data = data_temp.dropna(axis = 0, how = 'any', subset = ['总电流[A]', 'SOC[%]'])#删除电流与SOC中的nan值 #----------------------------------------区分充放电数据---------------------------------------------------------------------- df_soc = data['SOC[%]'] # df_soc[df_soc > 98] = 100#满SOC #-------------------------------------------------------利用电流连续为0判断电池静置--------------------------------------------------------- rest_crnt = data.loc[data['总电流[A]'] == 0]#电流为0,判断电池静置 if not rest_crnt.empty: rest_crnt_data = rest_crnt.reset_index(drop=True) temp_rest_time = rest_crnt_data['时间戳'] rest_time = pd.to_datetime(temp_rest_time) delta_time = (np.diff(rest_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数 pos = np.where(delta_time > 60)#静置数据分段,大于60min时,认为是两个静置过程 splice_num = [] if len(pos[0]) >= 1: pos_ful_tem = np.insert(pos, 0, 0) pos_len = len(pos_ful_tem) data_len = len(rest_time) pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1) for item in range(0,len(pos_ful)-1): splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item])) splice_num = np.insert(splice_num, 0, 0) else: splice_num = np.zeros(len(temp_rest_time)) pos_ful = np.array([0]) rest_crnt_data['rest_rest'] = splice_num rest_splice_num = np.unique(rest_crnt_data['rest_rest'])#判断有几段充电数据 if len(rest_splice_num) > 0: for item_rest in rest_splice_num: celltemp_name = self.celltemp_name#电芯温度数量 df_rest_splice_data = rest_crnt_data.loc[rest_crnt_data['rest_rest'] == item_rest] df_time_temp = pd.to_datetime(df_rest_splice_data['时间戳']) df_soc_temp = df_rest_splice_data['SOC[%]'] delta_soc = abs(round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3)) delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours') if (delta_soc < 2) & (delta_time > 1): chrg_rate = round(delta_soc/(100*delta_time), 2) df_rest_soc_temp = pd.DataFrame({"sn":[self.sn], "time_st":[df_time_temp.iloc[0]], "time_end":[df_time_temp.iloc[-1]], "soc_st":[df_soc_temp.iloc[0]], "soc_sp":[df_soc_temp.iloc[-1]], "delta_soc":[delta_soc], "tempure":[str(list(df_rest_splice_data[celltemp_name].iloc[0]))], "meancrnt":[chrg_rate]}) df_rest_soc = df_rest_soc.append(df_rest_soc_temp) df_rest_soc.reset_index(drop = True, inplace = True) # df_rest_soc.sort_values(by = ['splicestrt_time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序 if not df_rest_soc.empty: return df_rest_soc else: return pd.DataFrame()