# 获取数据 from LIB.BACKEND import DBManager import os import pandas as pd import numpy as np import datetime # import matplotlib.pyplot as plt #参数初始化 Capacity = 53.6 PackFullChrgVolt=69.99 CellFullChrgVolt=3.6 CellVoltNums=20 CellTempNums=4 FullChrgSoc=98 PeakSoc=57 #获取数据时间段 def cal_deltsoc(sn, end_time, start_time): end_time = end_time strat_time = start_time SNnum=str(sn) sn = sn st = strat_time et = end_time dbManager = DBManager.DBManager() df_data = dbManager.get_data(sn=sn, start_time=st, end_time=et, data_groups=['bms']) df_bms = df_data['bms'] # 计算电芯Soc差 packcrnt = df_bms['总电流[A]'] packvolt = df_bms['总电压[V]'] SOC = df_bms['SOC[%]'] SOH = df_bms['SOH[%]'] bmsstat = (df_bms['充电状态']).astype(int) time = pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S') # 筛选充电数据 ChgStart = [] ChgEnd = [] for i in range(3, len(time) - 3): if i==3 and bmsstat[i]==2 and bmsstat[i+1]==2 and bmsstat[i+2]==2: ChgStart.append(i) elif bmsstat[i-2]!=2 and bmsstat[i-1]!=2 and bmsstat[i]==2: ChgStart.append(i) elif bmsstat[i-1]==2 and bmsstat[i]!=2 and bmsstat[i+1]!=2: ChgEnd.append(i) elif i == (len(time) - 4) and bmsstat[len(bmsstat)-1] == 2 and bmsstat[len(bmsstat)-2] == 2: ChgEnd.append(len(time)-1) # 筛选充电起始Soc<46%,电芯温度>15℃的数据 if ChgStart: ChgStartValid = [] ChgEndValid = [] for i in range(min(len(ChgStart),len(ChgEnd))): # 获取最小温度值 celltemp = [] for j in range(1, CellTempNums + 1): s = str(j) temp = df_bms['单体温度' + s] celltemp.append(temp[ChgEnd[i]]) if SOC[ChgStart[i]] < 46 and SOC[ChgEnd[i]]>80 and min(celltemp) > 10: if ((time[ChgEnd[i]]-time[ChgStart[i]]).total_seconds())/(ChgEnd[i]-ChgStart[i])<30: ChgStartValid.append(ChgStart[i]) ChgEndValid.append(ChgEnd[i]) # 计算充电每个单体到达DVDQ峰值的Ah差 # 定义滑动平均滤波函数 def np_move_avg(a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) # 定义函数:切片Soc>50且Soc<80,并寻找峰值返回峰值点的时间 def data_search(t, soc, cellvolt1, packcrnt1): cellvolt2 = np_move_avg(cellvolt1, 5, mode="same") Soc = 0 Ah = 0 Volt = [cellvolt2[0]] DV_Volt = [] DQ_Ah = [] DVDQ = [] time1 = [] soc1 = [] soc2 = [] for m in range(1, len(t)): Step = (t[m] - t[m - 1]).total_seconds() Soc = Soc - packcrnt1[m] * Step * 100 / (3600 * Capacity) Ah = Ah - packcrnt1[m] * Step / 3600 if (cellvolt2[m] - Volt[-1]) > 0.9 and Ah>0: DQ_Ah.append(Ah) Volt.append(cellvolt2[m]) DV_Volt.append(Volt[-1] - Volt[-2]) DVDQ.append((DV_Volt[-1]) / DQ_Ah[-1]) Ah = 0 time1.append(t[m]) soc1.append(Soc) soc2.append(soc[m]) df_Data1 = pd.DataFrame({'Time': time1, 'SOC': soc2, 'DVDQ': DVDQ, 'AhSOC': soc1}) df_Data1 = df_Data1[(df_Data1['SOC'] > 50) & (df_Data1['SOC'] < 75)] # 寻找峰值点,且峰值点个数>3 if len(df_Data1['DVDQ'])>2: PeakIndex = df_Data1['DVDQ'].idxmax() df_Data2 = df_Data1[ (df_Data1['SOC'] > (df_Data1['SOC'][PeakIndex] - 0.5)) & (df_Data1['SOC'] < (df_Data1['SOC'][PeakIndex] + 0.5))] if len(df_Data2) > 3: return df_Data1['AhSOC'][PeakIndex] else: df_Data1 = df_Data1.drop([PeakIndex]) PeakIndex = df_Data1['DVDQ'].idxmax() return df_Data1['AhSOC'][PeakIndex] # 计算最大最小Soc差 if ChgStartValid: DetaSoc2 = [] DetaSoc=[] DetaSoc_SN=[] DetaSoc_time=[] for i in range(len(ChgStartValid)): DetaSoc1 = [] for j in range(1, CellVoltNums + 1): s = str(j) cellvolt = df_bms['单体电压' + s] cellvolt = list(cellvolt[ChgStartValid[i]:ChgEndValid[i]]) Time = list(time[ChgStartValid[i]:ChgEndValid[i]]) Packcrnt = list(packcrnt[ChgStartValid[i]:ChgEndValid[i]]) SOC1 = list(SOC[ChgStartValid[i]:ChgEndValid[i]]) a = data_search(Time, SOC1, cellvolt, Packcrnt) if a: DetaSoc1.append(a) # 计算到达峰值点的累计Soc if DetaSoc1: DetaSoc2.append(max(DetaSoc1) - min(DetaSoc1)) DetaSocMean = np.mean(DetaSoc2) DetaSoc.append(DetaSocMean) DetaSoc_SN.append(SNnum) DetaSoc_time.append(time[ChgStartValid[-1]]) result_DetaSoc={'time':DetaSoc_time, 'SN号':DetaSoc_SN, 'Soc差':DetaSoc} Result_DetaSoc=pd.DataFrame(result_DetaSoc) return Result_DetaSoc return pd.DataFrame()