123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- from ctypes import Structure
- from re import M
- import pandas as pd
- import numpy as np
- import datetime
- import time, datetime
- import matplotlib.pyplot as plt
- from pylab import*
- from scipy.signal import savgol_filter
- from scipy import interpolate
- import os
- from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
- class cell_statistic:
- def __init__(self,sn,celltype,df_bms): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
- self.df_bms=pd.DataFrame(df_bms)
- self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
- self.packvolt=df_bms['总电压[V]']
- self.bms_soc=df_bms['SOC[%]']
- self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
- self.LookTab_OCV = self.param.LookTab_OCV
- self.LookTab_SOC = self.param.LookTab_SOC
- self.packcrnt_flg = self.param.PackCrntDec#判断充放电电流方向,一般放电为正,充电为负
- self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
- self.bmssta = df_bms['充电状态']
- #定义加权滤波函数..................................................................................................
- def moving_average(interval, windowsize):
- window = np.ones(int(windowsize)) / float(windowsize)
- re = np.convolve(interval, window, 'same')
- return re
- #.............................................充电行为统计............................................................................
- def rest_sta(self):
- start_time = time.time()
- df_rest_soc = pd.DataFrame(columns = ['sn', 'time_st', 'time_end', 'soc_st', 'soc_sp', 'delta_soc', 'tempure', 'meancrnt'])
- data_temp = self.df_bms
- data = data_temp.dropna(axis = 0, how = 'any', subset = ['总电流[A]', 'SOC[%]'])#删除电流与SOC中的nan值
- #----------------------------------------区分充放电数据----------------------------------------------------------------------
- df_soc = data['SOC[%]']
- # df_soc[df_soc > 98] = 100#满SOC
- #-------------------------------------------------------利用电流连续为0判断电池静置---------------------------------------------------------
- rest_crnt = data.loc[data['总电流[A]'] == 0]#电流为0,判断电池静置
- if not rest_crnt.empty:
- rest_crnt_data = rest_crnt.reset_index(drop=True)
- temp_rest_time = rest_crnt_data['时间戳']
- rest_time = pd.to_datetime(temp_rest_time)
- delta_time = (np.diff(rest_time)/pd.Timedelta(1, 'min'))#计算时间差的分钟数
- pos = np.where(delta_time > 60)#静置数据分段,大于60min时,认为是两个静置过程
- splice_num = []
- if len(pos[0]) >= 1:
- pos_ful_tem = np.insert(pos, 0, 0)
- pos_len = len(pos_ful_tem)
- data_len = len(rest_time)
- pos_ful = np.insert(pos_ful_tem, pos_len, data_len-1)
- for item in range(0,len(pos_ful)-1):
- splice_num.extend(item*np.ones(pos_ful[item +1]-pos_ful[item]))
- splice_num = np.insert(splice_num, 0, 0)
- else:
- splice_num = np.zeros(len(temp_rest_time))
- pos_ful = np.array([0])
- rest_crnt_data['rest_rest'] = splice_num
- rest_splice_num = np.unique(rest_crnt_data['rest_rest'])#判断有几段充电数据
- if len(rest_splice_num) > 0:
- for item_rest in rest_splice_num:
- celltemp_name = self.celltemp_name#电芯温度数量
- df_rest_splice_data = rest_crnt_data.loc[rest_crnt_data['rest_rest'] == item_rest]
- df_time_temp = pd.to_datetime(df_rest_splice_data['时间戳'])
- df_soc_temp = df_rest_splice_data['SOC[%]']
- delta_soc = abs(round((df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]), 3))
- delta_time = (df_time_temp.iloc[-1] - df_time_temp.iloc[0])/pd.Timedelta(1, 'hours')
- if (delta_soc < 2) & (delta_time > 1):
- chrg_rate = round(delta_soc/(100*delta_time), 2)
- df_rest_soc_temp = pd.DataFrame({"sn":[self.sn], "time_st":[df_time_temp.iloc[0]], "time_end":[df_time_temp.iloc[-1]],
- "soc_st":[df_soc_temp.iloc[0]], "soc_sp":[df_soc_temp.iloc[-1]], "delta_soc":[delta_soc],
- "tempure":[str(list(df_rest_splice_data[celltemp_name].iloc[0]))], "meancrnt":[chrg_rate]})
- df_rest_soc = df_rest_soc.append(df_rest_soc_temp)
- df_rest_soc.reset_index(drop = True, inplace = True)
- # df_rest_soc.sort_values(by = ['splicestrt_time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
- if not df_rest_soc.empty:
- return df_rest_soc
- else:
- return pd.DataFrame()
|