2 Коміти 7de769ca35 ... 64be6cdebb

Автор SHA1 Опис Дата
  shangguanlie23 64be6cdebb Merge branch 'dev' of http://git.fast-fun.cn:92/lmstack/data_analyze_platform into dev 2 роки тому
  shangguanlie23 ab2e2eaef1 增加内阻估计代码的主函数、计算程序及保存结果。先计算各项目近3个月的结果,之后计算周期有一周,与析锂程序合并,拉取一次数据即可。 2 роки тому

+ 108 - 0
LIB/MIDDLE/SaftyCenter/Sorest/main.py

@@ -0,0 +1,108 @@
+import datetime
+import pandas as pd
+from LIB.BACKEND import DBManager, Log
+import time, datetime
+from apscheduler.schedulers.blocking import BlockingScheduler
+from pandas.core.frame import DataFrame
+import matplotlib as plt
+from pylab import*
+import sor_est
+from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
+import log
+
+#...................................电池包电芯安全诊断函数......................................................................................................................
+def cell_sor_test():
+    global SNnums
+    global df_diag_sor
+    start=time.time()
+    now_time=datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
+    now_time=datetime.datetime.strptime(now_time,'%Y-%m-%d %H:%M:%S')
+    start_time=now_time-datetime.timedelta(days=3)
+    end_time=str(now_time)
+    start_time=str(start_time)
+    k = 1
+    for sn in SNnums:
+        if 'PK500' in sn:
+            celltype=1 #6040三元电芯
+        elif 'PK502' in sn:
+            celltype=2 #4840三元电芯
+        elif 'K504B' in sn:
+            celltype=99    #60ah林磷酸铁锂电芯
+        elif 'MGMLXN750' in sn:
+            celltype=3 #力信50ah三元电芯
+        elif 'MGMCLN750' in sn: 
+            celltype=4 #CATL 50ah三元电芯
+        elif 'UD' in sn:
+            celltype=4 #CATL 50ah三元电芯
+        elif 'TJMCL' in sn: 
+            celltype=100 #金茂电芯
+        else:
+            print('SN:{},未找到对应电池类型!!!'.format(sn))
+            continue
+            # sys.exit()
+        print('计算的第' + str(k) + '个:' + sn)
+        k = k + 1
+        #读取原始数据库数据........................................................................................................................................................
+        start_time = '2021-10-01 00:00:00'
+        end_time = '2021-11-01 00:00:00'
+        dbManager = DBManager.DBManager()
+        df_data = dbManager.get_data(sn = sn, start_time = start_time, end_time = end_time, data_groups = ['bms'])
+        df_bms = df_data['bms']
+        # df_sor_add = pd.DataFrame()
+        param = BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
+        #........................................................电压内阻估计.....................................................................
+        if not df_bms.empty:
+            Diag_sorvol_temp = sor_est.sor_est(sn,celltype,df_bms)#电压内阻估计
+            df_sor_add = Diag_sorvol_temp.sor_cal()        
+            if not df_sor_add.empty:
+                df_diag_sor = df_diag_sor.append(df_sor_add)
+                df_diag_sor = df_diag_sor.drop_duplicates(subset = ['sn','time'], keep = 'first', inplace = False)
+                df_diag_sor.to_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\01算法开发\内阻估计\测试结果\MGMCLN750N215N049\计算结果\内阻估计.csv',index=False,encoding='GB18030')
+        end=time.time()
+        print(end-start)
+        
+
+#...............................................主函数...............................................................................................
+if __name__ == "__main__":
+    global SNnums
+    global df_diag_sor
+    
+    excelpath=r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\00项目sn号\sn-20210903.xlsx'
+    SNdata_6060 = pd.read_excel(excelpath, sheet_name='科易6060')
+    SNdata_6040 = pd.read_excel(excelpath, sheet_name='科易6040')
+    SNdata_4840 = pd.read_excel(excelpath, sheet_name='科易4840')
+    SNdata_L7255 = pd.read_excel(excelpath, sheet_name='格林美-力信7255')
+    SNdata_C7255 = pd.read_excel(excelpath, sheet_name='格林美-CATL7255')
+    SNdata_U7255 = pd.read_excel(excelpath, sheet_name='优旦7255')
+    SNnums_6060=SNdata_6060['SN号'].tolist()
+    SNnums_6040=SNdata_6040['SN号'].tolist()
+    SNnums_4840=SNdata_4840['SN号'].tolist()
+    SNnums_L7255=SNdata_L7255['SN号'].tolist()
+    SNnums_C7255=SNdata_C7255['SN号'].tolist()
+    SNnums_U7255=SNdata_U7255['SN号'].tolist()
+    # df_diag_sor = pd.read_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\01算法开发\内阻估计\测试结果\内阻估计.csv', encoding='GB18030')
+    # SNums_finish = list(df_diag_sor['sn'])
+    #SNnums=SNnums_L7255 + SNnums_C7255 + SNnums_6040 + SNnums_4840 + SNnums_U7255+ SNnums_6060
+    #SNnums=['TJMCL120502305010','TJMCL120502305012','TJMCL120502305048','TJMCL120502305044','TJMCL120502305026','TJMCL120502305022','TJMCL120502305032','TJMCL120502305038']
+    # SNnums = SNnums_U7255#['MGMCLN750N215N049'] #SNnums_6040 #SNnums_C7255 #SNnums_6040['MGMCLN750N215N049'] PK504B00100004003
+    # SNnums = pd.read_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\02析锂检测\liplated\疑似析锂电池sn.csv',encoding='GB18030')
+    # SNnums = list(set(SNnums_U7255).difference(set(SNums_finish)))
+    SNnums = ['MGMCLN750N215N049']
+    
+    mylog=log.Mylog('log_diag.txt','error')
+    mylog.logcfg()
+    #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................
+    df_diag_sor = pd.read_csv(r'D:\Work\Code_write\data_analyze_platform\USER\lzx\01算法开发\05内阻及电压估计\01算法开发\内阻估计\测试结果\MGMCLN750N215N049\计算结果\内阻估计.csv',encoding='GB18030')
+    
+    print('----------------输入--------')
+    print('-------计算中-----------')
+    #定时任务.......................................................................................................................................................................
+    scheduler = BlockingScheduler()
+    scheduler.add_job(cell_sor_test, 'interval', seconds=10, id='diag_job')
+
+    try:  
+        scheduler.start()
+    except Exception as e:
+        scheduler.shutdown()
+        print(repr(e))
+        mylog.logopt(e)

+ 122 - 0
LIB/MIDDLE/SaftyCenter/Sorest/sor_est.py

@@ -0,0 +1,122 @@
+import pandas as pd
+import numpy as np
+import datetime
+import time, datetime
+import scipy
+import math
+import itertools
+import log
+from scipy import interpolate
+from sklearn.linear_model import LinearRegression
+from LIB.MIDDLE.CellStateEstimation.Common.V1_0_1 import BatParam
+
+class sor_est:
+    def __init__(self,sn,celltype,df_bms):  #参数初始化
+
+        self.sn=sn
+        self.celltype=celltype
+        self.param=BatParam.BatParam(celltype)#鹏飞param中为BatParam,学琦为BatteryInfo
+        self.df_bms=pd.DataFrame(df_bms)
+        self.packcrnt=df_bms['总电流[A]']*self.param.PackCrntDec
+        self.packvolt=df_bms['总电压[V]']
+        self.bms_soc=df_bms['SOC[%]']
+        self.bmstime= pd.to_datetime(df_bms['时间戳'], format='%Y-%m-%d %H:%M:%S')
+        self.LookTab_OCV = self.param.LookTab_OCV
+        self.LookTab_SOC = self.param.LookTab_SOC
+        
+        self.cellvolt_list=['单体电压'+str(x) for x in range(1,self.param.CellVoltNums+1)]
+        self.celltemp_name=['单体温度'+str(x) for x in range(1,self.param.CellTempNums+1)]
+        self.bmssta = df_bms['充电状态']
+    #定义加权滤波函数..................................................................................................
+    def moving_average(interval, windowsize):
+        window = np.ones(int(windowsize)) / float(windowsize)
+        re = np.convolve(interval, window, 'same')
+        return re
+#.............................................内阻及电压估计............................................................................
+    def sor_cal(self):
+        start_time = time.time()
+        data = self.df_bms
+        data.fillna(0, inplace=True)
+        LookTab_OCV = self.LookTab_OCV
+        LookTab_SOC = self.LookTab_SOC
+        max_volt = max(LookTab_OCV)
+        min_volt = min(LookTab_OCV)
+        crnt = data['总电流[A]']
+        zero_group = itertools.groupby(crnt, lambda x : x == 0)#判断电流是否为0,并根据0和非0分组
+        zero_flg = []
+        zero_value = []
+        for k, v in zero_group:
+            zero_flg.append(k)#0和非0状态
+            zero_value.append(len(list(v)))#各个连续0和非0值的个数
+        zero_flg_new = zero_flg.copy()
+        for item in range(0, len(zero_value)):
+            if (zero_flg[item] == True) & (zero_value[item] < 6):#筛选出充电或放电过程中跳零的点,0点数据少于等于3个采样的时刻
+                zero_flg_new[item] = False
+        def get_index1(lst=None, item=''):
+            return [index for (index,value) in enumerate(lst) if value == item]#获取某个元素的位置
+        true_num = (get_index1(zero_flg_new, True))#获取电流为0的位置
+        zero_value_fit = []
+        true_num_flg = []
+        for item in range(len(true_num) - 1):
+            if ((true_num[item + 1] - true_num[item]) == 2):#重新赋值零点和非零点并计算各数据段个数
+                true_num_flg.append(True)
+                true_num_flg.append(False)
+                zero_value_fit.append(zero_value[true_num[item]])#[2*item]
+                zero_value_fit.append(zero_value[true_num[item] + 1])#[2*item + 1]
+            else:
+                true_num_flg.append(True)
+                true_num_flg.append(False)
+                zero_value_fit.append(zero_value[true_num[item]])
+                zero_value_fit.append(sum(zero_value[(true_num[item] + 1):(true_num[item + 1])]))
+        true_num_flg_fit = []
+        for item in range(1,len(zero_value_fit) + 1):
+            true_num_flg_fit.append(sum(zero_value_fit[0:item]) - 1)
+        true_num_flg_fit.insert(0,0)
+        soc_interplt = interpolate.interp1d(LookTab_OCV, LookTab_SOC, kind = 'linear')#OCV-SOC插值函数
+        cellvolt_name = self.cellvolt_list#电芯数量
+        celltemp_name = self.celltemp_name#电芯温度数量
+        df_sor_result = pd.DataFrame(columns = ["sn", "time", "sor", "sor_sigma", "soc", "temp", "delta_time"])
+        for range_num in range(0, len(true_num_flg) - 1):#
+            sor_temp = []
+            df_time = pd.to_datetime(data['时间戳'])
+            delta_time = (df_time.iloc[true_num_flg_fit[range_num + 1]] - df_time.iloc[true_num_flg_fit[range_num] + 1])/pd.Timedelta(1, 'min')#电流为0阶段的时间长度
+            if (true_num_flg[range_num] == True) & (delta_time > 10):#电流为0时间大于30 min时,利用下一段数据计算阻值
+                df_sor_soc_data = data.iloc[(true_num_flg_fit[range_num] + 1):(true_num_flg_fit[range_num + 1] + 1)]#获取电流为零数据段,计算SOC
+                df_sor_cal_temp = data.iloc[(true_num_flg_fit[range_num + 1] + 1):(true_num_flg_fit[range_num + 2] + 1)]#获取电流非零数据,计算内阻
+                df_sor_cal_temp.reset_index(drop = True, inplace = True)
+                df_sor_cal_len = len(df_sor_cal_temp)
+                cal_range_num = true_num_flg_fit[range_num + 1] + 1
+                if df_sor_cal_len > 20:#放电数据
+                    df_sor_cal = df_sor_cal_temp.iloc[:30]#筛选前30行放电数据
+                    df_sor_time = pd.to_datetime(df_sor_cal['时间戳'])
+                    delta_time_sor = (df_sor_time.iloc[-1] - df_sor_time.iloc[0])/pd.Timedelta(1, 'min')#计算内阻使用数据段的时长
+                    df_soc_volt = df_sor_soc_data[cellvolt_name].iloc[-1]/1000
+                    df_soc_volt[df_soc_volt > max_volt] = max_volt - 0.005
+                    df_soc_volt[df_soc_volt < min_volt] = min_volt + 0.005
+                    df_soc = soc_interplt(list(df_soc_volt))#插值计算电芯SOC
+                    df_sor_volt = df_sor_cal[cellvolt_name]/1000
+                    df_sor_crnt = df_sor_cal['总电流[A]']
+                    df_sorvolt_dif = np.diff(df_sor_volt, axis = 0)
+                    df_sorcrnt_dif = np.diff(df_sor_crnt, axis = 0)
+                    df_sorvolt_dif_csv = pd.DataFrame(df_sorvolt_dif)
+                    df_sorvolt_dif_csv.columns = cellvolt_name
+                    if np.sum(df_sorcrnt_dif != 0) > 0.5*len(df_sorcrnt_dif):
+                        for item in cellvolt_name:
+                            lineModel = LinearRegression()
+                            lineModel.fit(np.array(df_sorcrnt_dif).reshape(-1, 1), df_sorvolt_dif_csv[item])
+                            sor_temp.append(lineModel.coef_[0])
+                        sor_sort_temp = np.sort(sor_temp)
+                        sor_sort_del = sor_sort_temp[1:-2]
+                        sor_sort_del_mean = np.mean(sor_sort_del)#去除最值的均值
+                        sor_sort_del_std = np.std(sor_sort_del)#去除最值的均方差
+                        sor_sigma = list((sor_temp - sor_sort_del_mean)/sor_sort_del_std)
+                        df_sor_result_temp = pd.DataFrame({"sn":[self.sn], "time":[df_sor_cal_temp['时间戳'][0]], "sor":[str(sor_temp)], "sor_sigma":[str(sor_sigma)],
+                                                           "soc":[str(list(df_soc))], "temp":[str(list(df_sor_cal_temp[celltemp_name].iloc[0]))], "delta_time":[delta_time_sor]})
+                        df_sor_result = df_sor_result.append(df_sor_result_temp)
+                        df_sor_result.reset_index(drop = True, inplace = True)
+        end_time = time.time()
+        print(end_time - start_time)
+        if not df_sor_result.empty:
+            return df_sor_result
+        else:
+            return pd.DataFrame()

BIN
LIB/MIDDLE/SaftyCenter/Sorest/内阻估计.xlsx