# 获取数据 from LIB.BACKEND import DBManager import os import pandas as pd import numpy as np import datetime # import matplotlib.pyplot as plt #参数输入 Capacity = 54 PackFullChrgVolt=69.99 CellFullChrgVolt=3.5 CellVoltNums=20 CellTempNums=4 FullChrgSoc=98 PeakSoc=57 # #40Ah-OCV # LookTab_SOC = [0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100] # LookTab_OCV = [3.3159, 3.4502, 3.4904, 3.5277, 3.5590, 3.5888, 3.6146, 3.6312, 3.6467, 3.6642, 3.6865, 3.7171, 3.7617, # 3.8031, 3.8440, 3.8888, 3.9376, 3.9891, 4.0451, 4.1068, 4.1830] #55Ah-OCV LookTab_SOC = [0.00, 2.40, 6.38, 10.37, 14.35, 18.33, 22.32, 26.30, 30.28, 35.26, 40.24, 45.22, 50.20, 54.19, 58.17, 60.16, 65.14, 70.12, 75.10, 80.08, 84.06, 88.05, 92.03, 96.02, 100.00] LookTab_OCV = [2.7151, 3.0298, 3.1935, 3.2009, 3.2167, 3.2393, 3.2561, 3.2703, 3.2843, 3.2871, 3.2874, 3.2868, 3.2896, 3.2917, 3.2967, 3.3128, 3.3283, 3.3286, 3.3287, 3.3288, 3.3289, 3.3296, 3.3302, 3.3314, 3.3429] #定义滑动滤波函数 def np_move_avg(a, n, mode="same"): return (np.convolve(a, np.ones((n,)) / n, mode=mode)) #参数初始化 dvdq_soh=[] dvdq_soh_err=[] bms_soh=[] dvdq_time=[] dvdq_sohcfd=[] sn_list=[] #获取数据时间段 now_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') now_time=datetime.datetime.strptime(now_time,'%Y-%m-%d %H:%M:%S') start_time=now_time-datetime.timedelta(days=30) end_time=str(now_time) strat_time=str(start_time) #输入一个含有‘SN号’的xlsx SNdata = pd.read_excel('骑享资产梳理-20210621.xlsx', sheet_name='6060') SNnums=SNdata['SN号'] for k in range(15): SNnum=str(SNnums[k]) sn = SNnum st = '2021-07-06 00:00:00' et = '2021-07-07 20:00:00' dbManager = DBManager.DBManager() df_data = dbManager.get_data(sn=sn, start_time=st, end_time=et, data_groups=['bms']) data = df_data['bms'] packcrnt=data['总电流[A]'] packvolt=data['总电压[V]'] SOC=data['SOC[%]'] SOH=data['SOH[%]'] bmsstat=data['充电状态'] time= pd.to_datetime(data['时间戳'], format='%Y-%m-%d %H:%M:%S') #第一步:筛选充电数据 ChgStart=[] ChgEnd=[] for i in range(3, len(time) - 3): if i==3 and bmsstat[i]==2 and bmsstat[i+1]==2 and bmsstat[i+2]==2: ChgStart.append(i) elif bmsstat[i-2]!=2 and bmsstat[i-1]!=2 and bmsstat[i]==2: ChgStart.append(i) elif bmsstat[i-1]==2 and bmsstat[i]!=2 and bmsstat[i+1]!=2: ChgEnd.append(i) elif i == (len(time) - 4) and bmsstat[len(bmsstat)-1] == 2 and bmsstat[len(bmsstat)-2] == 2: ChgEnd.append(len(time)-1) #第二步:筛选充电起始Soc<48%,电芯温度>15℃,且满充的数据 ChgStartValid1=[] ChgEndValid1=[] ChgStartValid2=[] ChgEndValid2=[] for i in range(min(len(ChgStart),len(ChgEnd))): #获取最小温度值 celltemp = [] for j in range(1, CellTempNums+1): s = str(j) temp = data['单体温度' + s] celltemp.append(temp[ChgEnd[i]]) #寻找最大电压值 cellvolt = [] for j in range(1, CellVoltNums+1): s = str(j) volt = max(data['单体电压' + s][ChgStart[i]:ChgEnd[i]]/1000) cellvolt.append(volt) #筛选满足2点法计算的数据 StandingTime=0 if max(cellvolt)>CellFullChrgVolt and SOC[ChgStart[i]]<30 and min(celltemp)>5: for k in reversed(range(ChgStart[i])): if abs(packcrnt[k - 2]) < 0.01: StandingTime = StandingTime + (time[k] - time[k-1]).total_seconds() if StandingTime > 600: # 筛选静置时间>10min ChgStartValid1.append(ChgStart[i]) ChgEndValid1.append(ChgEnd[i]) break else: break #筛选满足DV/DQ方法的数据 if max(cellvolt)>CellFullChrgVolt and SOC[ChgStart[i]]<45 and min(celltemp)>5: if ((time[ChgEnd[i]]-time[ChgStart[i]]).total_seconds())/(ChgEnd[i]-ChgStart[i])<60: ChgStartValid2.append(ChgStart[i]) ChgEndValid2.append(ChgEnd[i]) #第三步:计算充电Soc和Soh # 两点法计算soh Soc=[] Time=[] Soc_Err=[] Bms_Soc=[] Soh1=[] Time1=[] Bms_Soh1=[] Soh_Err1=[] for i in range(len(ChgStartValid1)): #寻找最大电压值 cellvolt = [] for j in range(1, CellVoltNums+1): s = str(j) volt = max(data['单体电压' + s]) cellvolt.append(volt) voltmax_index = cellvolt.index(max(cellvolt)) + 1 cellvolt = data['单体电压' + str(voltmax_index)] / 1000 #soc Soc.append(np.interp(cellvolt[ChgStartValid1[i]-3],LookTab_OCV,LookTab_SOC)) Time.append(time[ChgStartValid1[i]-3]) Bms_Soc.append(SOC[ChgStartValid1[i]-3]) Soc_Err.append(Bms_Soc[-1]-Soc[-1]) #soh Ocv_Soc=np.interp(cellvolt[ChgStartValid1[i]-3],LookTab_OCV,LookTab_SOC) Ah=0 for j in range(ChgStartValid1[i],ChgEndValid1[i]): #计算soc Step=(time[j]-time[j-1]).total_seconds() Time.append(time[j]) Bms_Soc.append(SOC[j]) if Soc[-1]-(packcrnt[j]*Step*100)/(3600*Capacity)<100: Soc.append(Soc[-1]-(packcrnt[j]*Step*100)/(3600*Capacity)) else: Soc.append(100) Soc_Err.append(Bms_Soc[-1] - Soc[-1]) #两点法计算soh Ah=Ah-packcrnt[j]*Step/3600 Soh1.append(Ah*100/((FullChrgSoc-Ocv_Soc)*0.01*Capacity)) Bms_Soh1.append(SOH[i]) Soh_Err1.append(Bms_Soh1[-1]-Soh1[-1]) Time1.append(time[ChgStartValid1[i]]) # DV/DQ法计算soh Soh2=[] Time2=[] Bms_Soh2=[] Soh_Err2=[] SohCfd = [] sn_list=[] for i in range(len(ChgStartValid2)): #寻找最大电压值 cellvolt1 = [] cellvolt=[] for j in range(1, CellVoltNums+1): s = str(j) volt = data['单体电压' + s] cellvolt1.append(volt[ChgEndValid2[i]]) voltmax1_index = cellvolt1.index(max(cellvolt1)) + 1 cellvolt1 = data['单体电压' + str(voltmax1_index)] / 1000 #电压采用滑动平均滤波 cellvolt=np_move_avg(cellvolt1, 3, mode="same") #参数赋初始值 Ah = 0 Volt = cellvolt[ChgStartValid2[i]] DV_Volt=[] DQ_Ah = [] DVDQ = [] time2 = [] soc2 = [] Ah_tatal=[0] xvolt=[] #计算DV和DQ值 for j in range(ChgStartValid2[i],ChgEndValid2[i]): Step=(time[j+1]-time[j]).total_seconds() Ah=Ah-packcrnt[j]*Step/3600 if (cellvolt[j]-Volt)>0.0009 and Ah>0: Ah_tatal.append(Ah_tatal[-1]+Ah) DQ_Ah.append(Ah) DV_Volt.append(cellvolt[j]-Volt) DVDQ.append((DV_Volt[-1])/DQ_Ah[-1]) xvolt.append(cellvolt[j]) Volt=cellvolt[j] Ah = 0 time2.append(time[j]) soc2.append(SOC[j]) #切片Soc>50且Soc<80 Data1 = pd.DataFrame({'SOC': soc2, 'DVDQ': DVDQ, 'Ah_tatal': Ah_tatal[:-1], 'DQ_Ah':DQ_Ah, 'DV_Volt':DV_Volt, 'XVOLT':xvolt}) Data1=Data1[(Data1['SOC']>50) & (Data1['SOC']<80)] #寻找峰值并计算Soh和置信度 # 获取最小温度值 celltemp = [] for j in range(1, CellTempNums+1): s = str(j) temp = data['单体温度' + s] celltemp.append(temp[ChgStartValid2[i]]) if len(Data1['DVDQ'])>1: PeakIndex=Data1['DVDQ'].idxmax() #筛选峰值点附近±0.5%SOC内的数据 Data2=Data1[(Data1['SOC']>(Data1['SOC'][PeakIndex]-0.5)) & (Data1['SOC']<(Data1['SOC'][PeakIndex]+0.5))] if len(Data2)>2: Ah_tatal1 = Data1['Ah_tatal'] DVDQ = Data1['DVDQ'] soc2 = Data1['SOC'] xvolt = Data1['XVOLT'] if soc2[PeakIndex]>50 and soc2[PeakIndex]<80: DVDQ_SOH=(Ah_tatal[-1]-Ah_tatal1[PeakIndex]) * 100 / ((FullChrgSoc - PeakSoc) * 0.01 * Capacity) if DVDQ_SOH<95: DVDQ_SOH=DVDQ_SOH*0.3926+58.14 if DVDQ_SOH>70 and DVDQ_SOH<120: Soh2.append(DVDQ_SOH) Bms_Soh2.append(SOH[ChgStartValid2[i]]) Soh_Err2.append(Bms_Soh2[-1] - Soh2[-1]) Time2.append(time[ChgStartValid2[i]]) sn_list.append(SNnum) #计算置信度 if min(celltemp)<10: SohCfd.append(50) elif min(celltemp)<20: SohCfd.append(80) else: SohCfd.append(100) else: Data1=Data1.drop([PeakIndex]) PeakIndex = Data1['DVDQ'].idxmax() Data2 = Data1[(Data1['SOC'] > (Data1['SOC'][PeakIndex] - 0.5)) & (Data1['SOC'] < (Data1['SOC'][PeakIndex] + 0.5))] if len(Data2) > 3: Ah_tatal1 = Data1['Ah_tatal'] DVDQ = Data1['DVDQ'] soc2 = Data1['SOC'] xvolt = Data1['XVOLT'] if soc2[PeakIndex]>50 and soc2[PeakIndex]<80: DVDQ_SOH=(Ah_tatal[-1]-Ah_tatal1[PeakIndex]) * 100 / ((FullChrgSoc - PeakSoc) * 0.01 * Capacity) if DVDQ_SOH<95: DVDQ_SOH=DVDQ_SOH*0.3926+58.14 if DVDQ_SOH>70 and DVDQ_SOH<120: Soh2.append(DVDQ_SOH) Bms_Soh2.append(SOH[ChgStartValid2[i]]) Soh_Err2.append(Bms_Soh2[-1] - Soh2[-1]) Time2.append(time[ChgStartValid2[i]]) sn_list.append(SNnum) #计算置信度 if min(celltemp)<10: SohCfd.append(50) elif min(celltemp)<20: SohCfd.append(80) else: SohCfd.append(100) #处理数据 if len(Soh2)>5: Soh2=np_move_avg(Soh2,5,mode="valid") result_soh2={'时间': Time2[4::], 'SN号':sn_list[4::], 'BMS_SOH': Bms_Soh2[4::], 'SOH': Soh2, 'SOH误差': Soh_Err2[4::]} else: result_soh2={'时间': Time2, 'SN号':sn_list, 'BMS_SOH': Bms_Soh2, 'SOH': Soh2, 'SOH误差': Soh_Err2} #第四步:将数据存入Excel Result_Soh2=pd.DataFrame(result_soh2) Result_Soh2.to_csv('BMS_SOH_'+SNnum+'.csv',encoding='GB18030')