123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- from ctypes import Structure
- from re import M
- import pandas as pd
- import numpy as np
- import datetime
- import time, datetime
- import matplotlib.pyplot as plt
- from pylab import*
- from scipy.signal import savgol_filter
- from scipy import interpolate
- import os
- from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
- class chrgr_statistic:
- def __init__(self,sn,celltype,df_bms): #参数初始化
- self.sn=sn
- self.celltype=celltype
- self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
- self.df_bms=pd.DataFrame(df_bms)
- self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
- self.packvolt=df_bms['总电压[V]']
- self.bms_soc=df_bms['SOC[%]']
- self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
- self.LookTab_OCV = self.param.LookTab_OCV
- self.LookTab_SOC = self.param.LookTab_SOC
- self.packcrnt_flg = self.param.PackCrntDec#判断充放电电流方向,一般放电为正,充电为负
- self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
- self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
- self.bmssta = df_bms['充电状态']
- #定义加权滤波函数..................................................................................................
- def moving_average(interval, windowsize):
- window = np.ones(int(windowsize)) / float(windowsize)
- re = np.convolve(interval, window, 'same')
- return re
- #.............................................充电行为统计............................................................................
- def chrgr_soc_sta(self):
- start_time = time.time()
- df_chrgr_soc = pd.DataFrame(columns = ['sn', 'time', 'chrgr_soc', 'chrgr_volt', 'temp', 'full_chrgr_num'])
- data = self.df_bms
- data.fillna(0, inplace = True)
- crnt_flg = self.packcrnt_flg
- LookTab_OCV = self.LookTab_OCV
- LookTab_SOC = self.LookTab_SOC
- #----------------------------------------区分充放电数据----------------------------------------------------------------------
- df_soc = data['SOC[%]']
- df_soc_len = len(df_soc)
- df_soc_add = df_soc.iloc[df_soc_len-3:df_soc_len-1]#由于对SOC二次微分,增加两行数据
- df_soc_new = df_soc.append(df_soc_add)
- df_soc_new_dif = np.diff(df_soc_new,axis=0)#SOC一次微分,判断升降
- df_soc_new_dif = pd.DataFrame(df_soc_new_dif)
- df_soc_new_dif[df_soc_new_dif > 0] = 1#SOC升高
- df_soc_new_dif[df_soc_new_dif == 0] = 0#SOC升高或静置
- df_soc_new_dif[df_soc_new_dif < 0] = -1#SOC下降
- df_soc_dif_arr = np.array(df_soc_new_dif)
- pos_change = nonzero(df_soc_dif_arr)#寻找非零元素位置
- value_change = df_soc_dif_arr[pos_change[0]]#非零元素数值
- #-------------------------------------------------------利用soc的升降判断充放电---------------------------------------------------------
- pos_change_dif = np.diff(pos_change[0])#计算SOC变化时的位置信息
- pos_change_del_temp = np.where(pos_change_dif < 10)
- pos_change_del = pos_change_del_temp[0] + 1
- value_change_new = np.delete(value_change[:,0], pos_change_del)#SOC发生变化数值
- pos_change_new = np.delete(pos_change[0], pos_change_del)#SOC发生变化位置
- value_change_new_dif = np.diff(value_change_new)#寻找充电或放电结束位置
- if len(value_change_new_dif) > 0:
- value_change_new_dif = np.insert(value_change_new_dif, 0, value_change_new_dif[0])
- chrgr_dischrgr_pos_temp = np.where(abs(value_change_new_dif) == 2)#寻找充电或放电结束位置
- splice_num = []
- if len(chrgr_dischrgr_pos_temp[0]) >= 1:
- chrgr_dischrgr_pos = pos_change_new[chrgr_dischrgr_pos_temp[0]]#原数据中充电或放电结束位置
- chrgr_dischrgr_pos = np.insert(chrgr_dischrgr_pos, 0, 0)
- # data_len = len(df_soc)#数据长度
- pos_len = len(chrgr_dischrgr_pos)#切片长度
- chrgr_dischrgr_pos = np.insert(chrgr_dischrgr_pos, pos_len, df_soc_len-1)#
- for item in range(0,len(chrgr_dischrgr_pos)-1):
- splice_num.extend(item*np.ones(chrgr_dischrgr_pos[item +1]-chrgr_dischrgr_pos[item]))
- splice_num = np.insert(splice_num, df_soc_len-2, item)
- else:
- splice_num = np.zeros(df_soc_len)
- # pos_ful = np.array([0])
- data['stat'] = splice_num
- #--------------------------------------------------------筛选充电数据与放电数据-------------------------------------
- chrgr_data = pd.DataFrame()
- dischrgr_data = pd.DataFrame()
- for num_chrgr in range(0,len(set(splice_num))-1):
- df_data_temp = data.loc[(data['stat'] == num_chrgr)]
- df_data_temp = df_data_temp.reset_index(drop = True)
- df_soc_temp = df_data_temp['SOC[%]']
- if len(df_soc_temp) > 10:
- if (df_soc_temp.iloc[-1] - df_soc_temp.iloc[0]) > 10:#筛选SOC增大的区间段,认为是一次充电
- # chrgr_data_temp = df_data_temp.loc[df_data_temp['总电流[A]'] >= 0]
- chrgr_data = chrgr_data.append(df_data_temp)
- chrgr_data = chrgr_data.reset_index(drop = True)
- else:
- # dischrgr_data_temp = df_data_temp.loc[df_data_temp['总电流[A]'] <= 0]#筛选具有放电电流的放电数据
- dischrgr_data = dischrgr_data.append(df_data_temp)
- dischrgr_data = dischrgr_data.reset_index(drop = True)
- else:
- dischrgr_data = dischrgr_data.append(df_data_temp)
- dischrgr_data = dischrgr_data.reset_index(drop = True)
- if len(chrgr_data) > 1:
- chrgr_num_difference = np.unique(chrgr_data['stat'])#充电过程有几段数据
- #---------------------------------------------------统计充电截止电压、充电SOC区间---------------------------------------------------------
- cell_name = self.cellvolt_list#电芯电压名称
- temp_name = self.celltemp_name
- full_chrgr_num = 0
- if len(chrgr_num_difference) > 0:
- chrgr_soc = pd.DataFrame()
- soc_cal = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#插值函数
- max_volt = max(LookTab_OCV)
- min_volt = min(LookTab_OCV)
- for chrgr_num in chrgr_num_difference:
- data_temp = chrgr_data.loc[chrgr_data['stat'] == chrgr_num]#该片段数据
- chrgr_crnt_data = data_temp.loc[crnt_flg*data_temp['总电流[A]'] < 0]#充电数据中电流非零部分,充电时电流为负
- chrgr_crnt_data = chrgr_crnt_data.reset_index(drop = True)
- chrgr_crnt_zero = data_temp.loc[data_temp['总电流[A]'] == 0]#充电数据中电流为0数据
- chrgr_crnt_zero = chrgr_crnt_zero.reset_index(drop = True)
- chrgr_crnt_time = pd.to_datetime(chrgr_crnt_data['时间戳'])
- if len(chrgr_crnt_data) > 1:
- df_cell_volt = chrgr_crnt_data[cell_name].iloc[-1]/1000
- df_cell_volt[df_cell_volt >= max_volt] = max_volt - 0.005
- df_cell_volt[df_cell_volt <= min_volt] = min_volt + 0.005
- chrgr_soc = soc_cal(list(df_cell_volt))
- full_chrgr_check = (chrgr_soc > 98)
- if any(full_chrgr_check):
- full_chrgr_num = full_chrgr_num + 1
- df_chrgr_soc_temp = pd.DataFrame({"sn":[self.sn], "time":[chrgr_crnt_time.iloc[-1]], "chrgr_soc":[str(list(chrgr_soc))], "chrgr_volt":[str(list(df_cell_volt))],
- "temp":[str(list(chrgr_crnt_data[temp_name].iloc[-1]))], "full_chrgr_num":[full_chrgr_num]})
- df_chrgr_soc = df_chrgr_soc.append(df_chrgr_soc_temp)
- df_chrgr_soc = df_chrgr_soc.reset_index(drop = True)
- df_chrgr_soc.sort_values(by = ['time'], axis = 0, ascending=True,inplace=True)#对故障信息按照时间进行排序
- if not df_chrgr_soc.empty:
- return df_chrgr_soc
- else:
- return pd.DataFrame()
|