Kaynağa Gözat

Merge branch 'dev' of http://git.fast-fun.cn:92/lmstack/data_analyze_platform into dev

lmstack 2 yıl önce
ebeveyn
işleme
61d210b366
1 değiştirilmiş dosya ile 138 ekleme ve 0 silme
  1. 138 0
      LIB/BACKEND/DataPreProcess.py

+ 138 - 0
LIB/BACKEND/DataPreProcess.py

@@ -174,6 +174,142 @@ class DataPreProcess:
         df['data_split_by_status'] = status_id
         df['data_status'] = status_list
         return df
+    def data_split_by_status_forMGMCUD02(self, dfin, drive_interval_threshold=120, charge_interval_threshold=300, 
+                                    drive_stand_threshold=120, charge_stand_threshold=300):
+        '''
+        # 数据预处理分段, 将原始数据段分为 charge、drive、stand、none段
+        # 状态判断
+        # 1、drive:(状态为2或3 且 存在电流>0 ) 或 (电流持续为0 且 持续时间<阈值 且 上一段数据为行车)
+        # 2、charge:(状态为2或3 且 不存在电流>0 ) 或 (电流持续为0 且 持续时间<阈值 且 上一段数据为充电)
+        # 3、stand:(电流持续为0 且 是数据段的第一段) 或 (电流持续为0 且 持续时间>阈值)
+        # 4、none: 其他
+
+        --------------输入参数-------------:
+        drive_interval_threshold: 行车段拼接阈值,如果两段行车的间隔时间小于该值,则两段行车合并。
+        charge_interval_threshold: 充电段拼接阈值,如果两段充电的间隔时间小于该值,则两段充电合并。
+        drive_stand_threshold: 静置段合并至行车段阈值,如果静置时间小于该值,则合并到上一段的行车中。
+        charge_stand_threshold: 静置段合并至充电段阈值,如果静置时间小于该值,则合并到上一段的充电中。
+
+        --------------输出-----------------:
+        在原始数据后面,增加data_split_by_crnt, data_split_by_status, data_status 三列
+        data_split_by_crnt: 按电流分段的序号
+        data_split_by_status:按电流和状态分段的序号
+        data_status: 状态标识
+        '''
+        # 首先根据电流是否为0 ,将数据分段
+        df = dfin.copy()
+        df['时间戳'] = pd.to_datetime(df['时间戳'])
+        
+        crnt_zero_or_not = df['总电流[A]']==0
+        last_crnt_flag = crnt_zero_or_not[0]
+        temp = 1
+        group_id = [temp]
+        for cur_crnt_flag in crnt_zero_or_not[1:]:
+            if last_crnt_flag ^ cur_crnt_flag:
+                temp = temp + 1
+            last_crnt_flag = cur_crnt_flag
+            group_id.append(temp)
+        df['data_split_by_crnt'] = group_id
+
+        # 然后判断每个段内的 充电状态及电流=0持续时长,决定当前状态
+        temp = 1
+        last_status = ""
+        status_id = []
+        status_list = []
+        data_number_list = sorted(list(set(df['data_split_by_crnt'])))
+
+        for data_number in data_number_list:
+            df_sel = df[df['data_split_by_crnt'] == data_number]
+            origin_index = list(df_sel.index)
+            df_sel = df_sel.reset_index(drop=True)
+            temp_2 = 0
+            # 如果当前数据段的电流非0,则可能分为charge、drive或none段
+            if df_sel.loc[0,'总电流[A]'] != 0:
+                # 电流 分段中可能存在状态变化的时刻, 内部根据状态进行分段.
+                # 该数据段内部,根据bms状态信号进行二次分段
+                status_drive_or_not = df_sel['充电状态']==3
+                last_status_flag = status_drive_or_not[0]
+                temp_2 = 0
+                group_id_2 = [temp_2]
+                for cur_status_flag in status_drive_or_not[1:]:
+                    if last_status_flag ^ cur_status_flag:
+                        temp_2 = temp_2 + 1
+                    last_status_flag = cur_status_flag
+                    group_id_2.append(temp_2)
+                
+                # 遍历二次状态分段
+                temp_2 = 0
+                last_status_2 = last_status
+                df_sel['index'] = group_id_2
+                data_number_list_2 = sorted(list(set(group_id_2)))
+                for data_number_2 in data_number_list_2:
+                    
+                    df_sel_2 = df_sel[df_sel['index'] == data_number_2]
+                    df_sel_2 = df_sel_2.reset_index(drop=True)
+                    
+                    # 根据bms状态 及 电流符号决定是charge还是drive
+                    # 如果状态为2或3, 且电流均<=0 则记为充电
+                    if df_sel_2.loc[0, '充电状态'] in [2, 3] and len(df_sel_2[df_sel_2['总电流[A]'] > 0]) == 0: 
+                        cur_status = 'charge'
+                    # 如果状态为2或3,且存在电流>0 则记为行车
+                    elif df_sel_2.loc[0, '充电状态'] in [2, 3] and len(df_sel_2[df_sel_2['总电流[A]'] > 0]) > 0: 
+                        cur_status = 'drive'
+                    # 否则 记为none
+                    else:
+                        cur_status = 'none'
+                    status_list.extend([cur_status] * len(df_sel_2))
+                    
+                    # 状态id号与前面电流为0的相同状态进行合并, 均判断应不应该与上一段合并    
+                    if origin_index[0] == 0: # 如果是所有数据的起始段数据,则直接赋值id号
+                        status_id.extend([temp + temp_2]*len(df_sel_2))
+                    
+                    else: # 判断是否与上一段数据合并
+                        deltaT = (df.loc[origin_index[0], '时间戳'] - df.loc[origin_index[0]-1, '时间戳']).total_seconds()
+                        # 如果 状态一致, 且 间隔时间小于阈值,则合并
+                        if last_status_2 == 'drive' and cur_status == last_status_2 and deltaT < drive_interval_threshold: 
+                            temp_2 = temp_2 - 1  
+                            status_id.extend([temp + temp_2]*len(df_sel_2)) 
+                        # 如果状态一致, 且 间隔时间小于阈值,则合并
+                        elif last_status_2 == 'charge' and cur_status == last_status_2 and deltaT < charge_interval_threshold: 
+                            temp_2 = temp_2 - 1  
+                            status_id.extend([temp + temp_2]*len(df_sel_2)) 
+                        else:
+                            status_id.extend([temp + temp_2]*len(df_sel_2))
+                    temp_2 = temp_2 + 1
+                    last_status_2 = status_list[-1]
+                temp_2 = temp_2 - 1
+            else:
+                # 如果当前数据段的电流为0,则可能分为stand,charge、drive或none段
+                if origin_index[0] == 0: # 如果是数据的起始,则无论长短,都认为是stand
+                    status_id.extend([temp]*len(df_sel))
+                    status_list.extend(['stand'] * len(df_sel))
+                else: # 不是数据的起始
+                    cur_deltaT = (df.loc[origin_index[-1], '时间戳'] - df.loc[origin_index[0], '时间戳']).total_seconds()
+                    if last_status == 'charge': # 如果上一个状态为充电
+                        if cur_deltaT < charge_stand_threshold: # 如果本次电流为0的持续时间小于 阈值,则合并
+                            status_list.extend(['charge'] * len(df_sel))
+                            temp = temp - 1  
+                            status_id.extend([temp]*len(df_sel))
+                        else: # 否则超过了阈值,记为stand
+                            status_id.extend([temp]*len(df_sel))
+                            status_list.extend(['stand'] * len(df_sel))
+                    elif last_status == 'drive': # 如果上一个状态为行车
+                        if cur_deltaT < drive_stand_threshold: # 如果本次电流为0的持续时间小于 阈值,则合并
+                            status_list.extend(['drive'] * len(df_sel))
+                            temp = temp - 1  
+                            status_id.extend([temp]*len(df_sel))
+                        else: # 否则超过了阈值,记为stand
+                            status_id.extend([temp]*len(df_sel))
+                            status_list.extend(['stand'] * len(df_sel))
+                    elif last_status == 'none': # 如果上一个状态未知
+                        status_id.extend([temp] * len(df_sel))
+                        status_list.extend(['stand'] * len(df_sel))
+            temp = temp + temp_2 + 1
+            last_status = status_list[-1] # 上一组状态
+        df['data_split_by_status'] = status_id
+        df['data_status'] = status_list
+        return df    
+    
     def data_split_by_time(self, dfin, default_time_threshold = 300, drive_time_threshold=300, charge_time_threshold=300, 
                                         stand_time_threshold = 1800):
         '''
@@ -488,6 +624,8 @@ class DataPreProcess:
             res_record['charge'] = (res_record['charge'])/len(set(df_bms[df_bms['data_status_after_combine']=='charge']['data_split_by_status_after_combine']))
         return df_bms, df_gps, res_record
     
+    
+    
     '''
     为故障数据打标签
     sn: 电池编码