Sfoglia il codice sorgente

添加电池信号跟踪模块

lmstack 3 anni fa
parent
commit
7ef9e046e5

+ 0 - 1
1

@@ -1 +0,0 @@
-ttt

+ 20 - 10
LIB/BACKEND/DBManager.py

@@ -5,26 +5,26 @@
 '''
 __author__ = 'Wang Liming'
 
-
-from re import S
 import time
 import datetime
-import os
-import urllib.request
 import time
 import pandas as pd
 import numpy as np
 import json
 import requests
+import pymysql
 import pdb
 
 
-# import http.client
-# http.client.HTTPConnection._http_vsn = 10
-# http.client.HTTPConnection._http_vsn_str = 'HTTP/1.1'
 class DBManager():
 
     def __init__(self, host='', port='', auth='', db='', username='', password=''):
+        self.host = host
+        self.port = port
+        self.auth = auth
+        self.db = db
+        self.username = username
+        self.password = password
         pass
 
     def __enter__(self):
@@ -38,21 +38,31 @@ class DBManager():
         conn_success_flag = 0
         while not conn_success_flag:
             try:
-                pass # 连接数据库
+                self.conn = pymysql.connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db)
             except Exception as e:
                 conn_success_flag = 0
+                print("数据库连接失败 :{}".format(e))
                 time.sleep(5)
             else:
                 conn_success_flag = 1
-                pass # 连接成功, 获取cursor
+                self.cursor = self.conn.cursor()
     def close(self):
         try:
-            pass # 断开数据库
+            self.conn.close()
         except Exception as e:
             print(e)
         else:
             print('数据库已断开连接')
 
+    def add(table, keyvalue):
+        fields_str = ''
+        values_str = ''
+        for k,v in keyvalue.items():
+
+            fields_str += k+' '
+        sql = 'insert into table {} ({}) values ({})'.format(table, fields_str, values_str)
+
+
     # 以下各个函数实现 通过http方式获取数据
     @staticmethod
     def _get_var_name(cellnum,Tempnum,Othernum):

BIN
LIB/BACKEND/__pycache__/DBManager.cpython-38.pyc


BIN
LIB/BACKEND/__pycache__/Log.cpython-38.pyc


+ 102 - 0
LIB/FRONTEND/SignalMonitor/create_table.py

@@ -0,0 +1,102 @@
+'''
+定义表的结构,并在数据库中创建对应的数据表
+'''
+__author__ = 'Wang Liming'
+
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy import Column, String, create_engine, Integer, DateTime, BigInteger, FLOAT
+
+Base = declarative_base()
+
+
+class BmsLastDataDay(Base):
+    __tablename__ = "bms_last_data_day"
+    __table_args__ = ({'comment': '电池信号监控---每天最后一条bms数据'})  # 添加索引和表注释
+
+    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
+    sn = Column(String(64), comment="sn")
+    current = Column(FLOAT, comment="电流")
+    time_stamp = Column(Integer, comment="时间戳")
+    pack_state = Column(Integer, comment="电池状态")
+    line_state = Column(Integer, comment="信号状态")
+
+
+    def __init__(self, sn, current, time_stamp, pack_state, line_state):
+        self.sn = sn
+        self.current = current
+        self.time_stamp = time_stamp
+        self.pack_state = pack_state
+        self.line_state = line_state
+
+class GpsLastDataDay(Base):
+    __tablename__ = "gps_last_data_day"
+    __table_args__ = ({'comment': '电池信号监控---每天最后一条gps数据'})  # 添加索引和表注释
+
+    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
+    sn = Column(String(64), comment="sn")
+    time_stamp = Column(Integer, comment="时间戳")
+    pack_state = Column(Integer, comment="电池状态")
+    line_state = Column(Integer, comment="信号状态")
+
+
+    def __init__(self, sn, time_stamp, pack_state, line_state):
+        self.sn = sn
+        self.time_stamp = time_stamp
+        self.pack_state = pack_state
+        self.line_state = line_state
+
+class GpsSignalMonitor(Base):
+    __tablename__ = "gps_signal_monitor"
+    __table_args__ = ({'comment': 'gps信号监控'})  # 添加索引和表注释
+
+    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
+    sn = Column(String(64), comment="sn")
+    start_time = Column(DateTime, comment="开始时间")
+    end_time = Column(DateTime, comment="结束时间")
+    offline_time = Column(DateTime, comment="离线时间")
+    pack_state = Column(Integer, comment="电池状态")
+    line_state = Column(Integer, comment="信号状态")
+
+
+    def __init__(self, sn, start_time, end_time, off_line_time, pack_state, line_state):
+        self.sn = sn
+        self.start_time = start_time
+        self.end_time = end_time
+        self.off_line_time = off_line_time
+        self.pack_state = pack_state
+        self.line_state = line_state
+
+class BmsSignalMonitor(Base):
+    __tablename__ = "bms_signal_monitor"
+    __table_args__ = ({'comment': 'bms信号监控'})  # 添加索引和表注释
+
+    id = Column(Integer, primary_key=True, autoincrement=True, comment="主键")
+    sn = Column(String(64), comment="sn")
+    start_time = Column(DateTime, comment="开始时间")
+    end_time = Column(DateTime, comment="结束时间")
+    offline_time = Column(DateTime, comment="离线时间")
+    pack_state = Column(Integer, comment="电池状态")
+    line_state = Column(Integer, comment="信号状态")
+
+
+    def __init__(self, sn, start_time, end_time, off_line_time, pack_state, line_state):
+        self.sn = sn
+        self.start_time = start_time
+        self.end_time = end_time
+        self.off_line_time = off_line_time
+        self.pack_state = pack_state
+        self.line_state = line_state
+
+# 执行该文件,创建表格到对应的数据库中
+if __name__ == "__main__":
+    host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+    port = 3306
+    user = 'qx_cas'
+    password = 'Qx@123456'
+    database = 'qx_cas'
+
+    db_engine = create_engine(
+        "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+            user, password, host, port, database
+        ))
+    Base.metadata.create_all(db_engine)

+ 95 - 0
LIB/FRONTEND/SignalMonitor/main.py

@@ -0,0 +1,95 @@
+#coding=utf-8
+import os
+import datetime
+import pandas as pd
+from LIB.BACKEND import DBManager, Log
+from LIB.MIDDLE import SignalMonitor
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+import time, datetime
+import traceback
+
+dbManager = DBManager.DBManager()
+if __name__ == "__main__":
+    try:
+        # 数据库配置
+        host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
+        port = 3306
+        user = 'qx_cas'
+        password = 'Qx@123456'
+        database = 'qx_cas'
+
+        db_engine = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user, password, host, port, database
+            ))
+
+        db_engine = create_engine("mysql+pymysql://qx_cas:Qx@123456@rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com/qx_cas?charset=utf8")
+        DbSession = sessionmaker(bind=db_engine)
+        
+        # 日志配置
+        log = Log.Mylog(log_name='signal_monitor', log_level = 'info')
+        log.set_file_hl(file_name='info.log', log_level='info')
+        log.set_file_hl(file_name='error.log', log_level='error')
+        logger = log.get_logger()
+
+        logger.info("pid is + {}".format(os.getpid))
+
+        # 读取sn列表
+        df_sn = pd.read_csv('sn_list.csv')
+        df_sn = df_sn.reset_index(drop=True)
+        df_sn['StartTime'] = pd.to_datetime(df_sn['StartTime'])
+        df_sn['EndTime'] = pd.to_datetime(df_sn['EndTime'])
+        df_sn['ExitTime'] = pd.to_datetime(df_sn['ExitTime'])
+        signalMonitor = SignalMonitor.SignalMonitor()
+        
+        cal_period = 24    # 计算间隔,单位h
+        for i in range(0, len(df_sn['sn'])):    # 遍历SN号
+            sn = [df_sn.loc[i,'sn']]
+            if not (sn[0][0:2] == 'PK' or sn[0][0:2] == 'MG' or sn[0][0:2] == 'UD'):
+                continue
+            st = df_sn.loc[i, 'StartTime']
+            if df_sn.loc[i, 'Service'] == 0:
+                et = df_sn.loc[i, 'ExitTime']
+            elif pd.isnull(df_sn.loc[i, 'EndTime']):
+                otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
+                et = pd.to_datetime(otherStyleTime)
+            else:
+                et = df_sn.loc[i, 'EndTime']
+            df_last_state = pd.DataFrame(
+                columns=['sn', 'current', 'Timestamp', 'PackState', 'LineState'])    # 每日最后BMS数据
+            df_last_state_gps = pd.DataFrame(
+                columns=['sn', 'Timestamp', 'PackState', 'LineState'])    # 每日最后GPS数据
+            while st < et:
+                df_res = pd.DataFrame(columns=[
+                                    'sn', 'PackState', 'LineState', 'StartTime', 'EndTime', 'OfflineTime'])    # 初始化BMS信号统计数据
+                df_res_gps = pd.DataFrame(columns=[
+                                        'sn', 'PackState', 'LineState', 'StartTime', 'EndTime', 'OfflineTime'])    # 初始化GPS信号统计数据
+                df_res, df_state, df_last_state = signalMonitor.get_bms_offline_stat(
+                    sn, st, et, df_res, df_last_state, cal_period)    # 计算每日BMS信号统计数据
+                df_res_gps, df_last_state_gps = signalMonitor.get_gps_offline_stat(
+                    sn, st, et, df_state, df_res_gps, df_last_state_gps, cal_period)    # 计算每日GPS信号统计数据
+            
+                # 数据入库
+                df_tosql = df_res_gps.copy()
+                df_tosql.columns = ['sn', 'pack_state', 'line_state', 'start_time', 'end_time', 'offline_time']
+                df_tosql.loc[df_tosql.index[-1:]].to_sql("gps_signal_monitor",con=db_engine, if_exists="append",index=False)
+
+                df_tosql = df_res.copy()
+                df_tosql.columns = ['sn', 'pack_state', 'line_state', 'start_time', 'end_time', 'offline_time']
+                df_tosql.loc[df_res.index[-1:]].to_sql("bms_signal_monitor",con=db_engine, if_exists="append",index=False)
+                st = st + datetime.timedelta(hours=cal_period)
+
+            # 数据入库
+            df_tosql = df_last_state.copy()
+            df_tosql.columns = ['sn', 'current', 'time_stamp', 'pack_state', 'line_state']
+            df_tosql.to_sql("bms_last_data_day",con=db_engine, if_exists="append",index=False)
+
+            df_tosql = df_last_state.copy()
+            df_tosql.columns = ['sn', 'time_stamp', 'pack_state', 'line_state']
+            df_tosql.to_sql("gps_last_data_day",con=db_engine, if_exists="append",index=False)
+
+            logger.info("{} {} Success!".format(sn, str(st)))
+    except Exception as e:
+        logger.error(traceback.format_exc)
+        logger.error(u"任务运行错误", exc_info=True)

+ 235 - 0
LIB/MIDDLE/SignalMonitor.py

@@ -0,0 +1,235 @@
+import datetime
+import os
+import pandas as pd
+import Tools
+import sys
+import xlutils
+from xlrd import open_workbook
+from xlutils.copy import copy
+
+import CONFIGURE.PathSetting as PathSetting
+sys.path.append(PathSetting.backend_path)
+import DBManager
+dbManager = DBManager.DBManager()
+
+class SignalMonitor():
+    def __init__(self):
+        pass
+
+    @staticmethod
+    def _set_working_states(df_state):
+        for i in range(0, len(df_state)):
+            if abs(df_state.loc[i, 'current']) >= 0.45:
+                df_state.loc[i, 'PackState'] = 0
+            else:
+                df_state.loc[i, 'PackState'] = 1
+        # path = r'D:\daily_data_analysis_01.csv'
+        # df_state.to_csv(path, index=True, encoding='GB2312')
+        return df_state
+    
+    @staticmethod
+    def _set_standby_states(df_state):
+        index = 0
+        set = 0
+        while index < len(df_state)-1:
+            index = index + 1
+            if set == 0:
+                if df_state.loc[index, 'PackState'] == 1 and df_state.loc[index-1, 'PackState'] == 0:
+                    set = 1
+                    start_time = df_state.loc[index-1, 'Timestamp']
+                    timeDelta = datetime.timedelta(minutes=10)
+                    end_time = start_time + timeDelta
+                    df_state.loc[index, 'PackState'] = 0
+            else:
+                if df_state.loc[index, 'Timestamp'] <= end_time:
+                    df_state.loc[index, 'PackState'] = 0
+                    if abs(df_state.loc[index, 'current']) >= 0.45:
+                        start_time = df_state.loc[index-1, 'Timestamp']
+                        timeDelta = datetime.timedelta(minutes=10)
+                        end_time = start_time + timeDelta
+                else:
+                    set = 0
+        # path = r'D:\daily_data_analysis_02.csv'
+        # df_state.to_csv(path, index=True, encoding='GB2312')
+        return df_state
+    
+    @staticmethod
+    def _set_lowpwr_states(df_state):
+        index = 0
+        set = 0
+        while index < len(df_state)-1:
+            index = index + 1
+            if df_state.loc[index,'PackState'] == 1:
+                if set ==0:
+                    start_time = df_state.loc[index,'Timestamp']
+                    timeDelta = datetime.timedelta(minutes=300)
+                    end_time = start_time + timeDelta
+                    set = 1
+                else:
+                    if df_state.loc[index,'Timestamp'] > end_time:
+                        df_state.loc[index,'PackState'] = 2
+            else:
+                set = 0
+        # path = r'D:\daily_data_analysis_03.csv'
+        # df_state.to_csv(path, index=True, encoding='GB2312')
+        return df_state
+    
+    @staticmethod
+    def _judge_offline_state_between_messages(sn, PackState_new, PackState_old, Timestamp_new, Timestamp_old, df_res, mode):
+        delta_time = (Timestamp_new - Timestamp_old).total_seconds()
+        max_state = max(PackState_new, PackState_old)
+        if max_state == 0:
+            if mode == 'BMS':
+                thres1 = 60
+                thres2 = 300
+            elif mode == 'GPS':
+                thres1 = 120
+                thres2 = 600
+        elif max_state == 1:
+            if mode == 'BMS':
+                thres1 = 1200
+                thres2 = 2400
+            elif mode == 'GPS':
+                thres1 = 2400
+                thres2 = 4800
+        else:
+            if mode == 'BMS':
+                thres1 = 3600
+                thres2 = 7200
+            elif mode == 'GPS':
+                thres1 = 7200
+                thres2 = 14400
+        
+        if delta_time <= thres1:
+            LineState = 0
+        elif delta_time <= thres2:
+            LineState = 1
+        else:
+            LineState = 2
+        
+        if LineState > 0:
+            df_res = df_res.append({'sn':sn[0], 'PackState':PackState_new*16+PackState_old, 'LineState':LineState, 'StartTime':Timestamp_old, 
+                        'EndTime':Timestamp_new, 'OfflineTime':delta_time}, ignore_index=True)
+        return LineState, df_res
+
+    @staticmethod
+    def _get_offline_info(sn, df_state, df_last_state, df_res, mode):
+        index = 0
+        if len(df_last_state) == 0:
+            df_state.loc[0,'LineState'] = 0
+            while index < len(df_state)-1:
+                index = index + 1
+                LineState, df_res = SignalMonitor._judge_offline_state_between_messages(sn, df_state.loc[index, 'PackState'], df_state.loc[index-1, 'PackState'], 
+                df_state.loc[index, 'Timestamp'], df_state.loc[index-1, 'Timestamp'], df_res, mode=mode)
+                df_state.loc[index, 'LineState'] = LineState
+        else:
+            df_last_info = df_last_state.loc[len(df_last_state) - 1]
+            df_state.loc[0,'LineState'], df_res = SignalMonitor._judge_offline_state_between_messages(sn, df_state.loc[0, 'PackState'], df_last_info['PackState'], 
+                df_state.loc[0, 'Timestamp'], df_last_info['Timestamp'], df_res, mode=mode)
+            while index < len(df_state)-1:
+                index = index + 1
+                LineState, df_res = SignalMonitor._judge_offline_state_between_messages(sn, df_state.loc[index, 'PackState'], df_state.loc[index-1, 'PackState'], 
+                df_state.loc[index, 'Timestamp'], df_state.loc[index-1, 'Timestamp'], df_res, mode=mode)
+                df_state.loc[index, 'LineState'] = LineState
+        # SignalMonitor._file_write(r'D:\result_03.xls', df_state)
+        return df_res
+    
+    @staticmethod
+    def _set_gps_working_states(df_state, df_state_gps):
+        for i in range(0, len(df_state_gps)):
+                if df_state_gps.loc[i, 'Timestamp'] <= df_state.loc[0, 'Timestamp']:
+                    df_state_gps.loc[i, 'PackState'] = df_state.loc[0, 'PackState']
+                elif df_state_gps.loc[i, 'Timestamp'] >= df_state.loc[len(df_state)-1, 'Timestamp']:
+                    df_state_gps.loc[i:len(df_state_gps)-1, 'PackState'] = df_state.loc[len(df_state)-1, 'PackState']
+                    break
+                else:
+                    index0 = max(df_state[df_state['Timestamp'] <= df_state_gps.loc[i, 'Timestamp']].index)
+                    index1 = min(df_state[df_state['Timestamp'] >= df_state_gps.loc[i, 'Timestamp']].index)
+                    front = (df_state_gps.loc[i, 'Timestamp'] - df_state.loc[index0, 'Timestamp']).total_seconds()
+                    back = (df_state.loc[index1, 'Timestamp'] - df_state_gps.loc[i, 'Timestamp']).total_seconds()
+                    if front > back:
+                        df_state_gps.loc[i, 'PackState'] = df_state.loc[index1, 'PackState']
+                    elif front == back:
+                        df_state_gps.loc[i, 'PackState'] = max(df_state.loc[index1, 'PackState'], df_state.loc[index0, 'PackState'])
+                    else:
+                        df_state_gps.loc[i, 'PackState'] = df_state.loc[index0, 'PackState']
+        return df_state_gps
+    
+    @staticmethod
+    def _file_write(path, df_res):
+        r_xls = open_workbook(path) # 读取excel文件
+        sheet = len(r_xls.sheets())
+        row = r_xls.sheets()[sheet-1].nrows # 获取已有的行数
+        excel = copy(r_xls) # 将xlrd的对象转化为xlwt的对象
+        table = excel.get_sheet(sheet-1) # 获取要操作的sheet
+        #对excel表追加一行内容
+        # print(row)
+        current_row = row
+        num = len(df_res.columns)
+        for i in range(0, len(df_res)):
+            for j in range(0, num):
+                table.write(current_row, j, df_res.iloc[i][j]) #括号内分别为行数、列数、内容
+            current_row = current_row + 1
+            if current_row == 65500:
+                table = r_xls.add_sheet('sheet'+ str(sheet))
+                sheet = sheet + 1
+                current_row = 0
+        excel.save(path) # 保存并覆盖文件
+    
+    def get_bms_offline_stat(self, sn, st, et, df_res, df_last_state, cal_Period=24):    # 计算一段时间内BMS信号统计数据
+        df_state = pd.DataFrame(columns=['sn', 'current', 'Timestamp', 'PackState', 'LineState'])
+        # print("start_time is {}, limit_time is {}".format(st, limit_time))
+        relative_delta_time = datetime.timedelta(hours=6)    # 将时间往前推nh,以便更准确计算BMS状态
+        end_time = st + datetime.timedelta(hours=cal_Period)    # 结束时间
+        relative_time = st - relative_delta_time
+        relative_time = relative_time.strftime('%Y-%m-%d %H:%M:%S')
+        end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
+        df_data = dbManager.get_data(sn=sn[0], start_time=relative_time, end_time=end_time_str, data_groups=['bms'])
+        df_bms = df_data['bms']
+        df_bms = df_bms.drop_duplicates(['时间戳'])
+        df_bms = df_bms.reset_index(drop=True)
+        df_bms['时间戳'] = pd.to_datetime(df_bms['时间戳'])
+        print('{} BMS data read Well Done!'.format(sn[0]))
+
+        df_state['current'] = df_bms['总电流[A]']
+        df_state['Timestamp'] = df_bms['时间戳']
+        df_state['sn'] = sn[0]
+
+        if len(df_state[df_state['Timestamp'] >= st]) > 0:     # 无数据则不计算
+            df_state = SignalMonitor._set_working_states(df_state)    # 根据电流初步定义BMS状态
+            if len(df_state) > 1:
+                df_state = SignalMonitor._set_standby_states(df_state)    # 根据静置时间修正standby状态
+                df_state = SignalMonitor._set_lowpwr_states(df_state)    # 根据静置持续时间设置lowpwr状态
+            df_state_spec = df_state[df_state['Timestamp'] >= st]
+            df_state_spec = df_state_spec.reset_index(drop=True)    # 去除为准确计算BMS状态而多获取的数据
+            df_res = SignalMonitor._get_offline_info(sn, df_state_spec, df_last_state, df_res, 'BMS')    # 计算设定时间段内信号质量数据
+            df_last_info = df_state_spec.loc[len(df_state_spec) - 1]
+            df_last_state = df_last_state.append(df_last_info)    # 记录该段时间内最后数据
+            df_last_state = df_last_state.reset_index(drop=True)
+        return df_res,df_state, df_last_state
+    
+    def get_gps_offline_stat(self,sn, st, et, df_state, df_res_gps, df_last_state_gps, cal_Period=24):    # 计算一段时间内GPS信号统计数据
+        df_state_gps = pd.DataFrame(columns=['sn', 'Timestamp', 'PackState', 'LineState'])
+        # print("start_time is {}, limit_time is {}".format(st, limit_time))
+        end_time = st + datetime.timedelta(hours=cal_Period)    # 结束时间
+        start_time_str = st.strftime('%Y-%m-%d %H:%M:%S')
+        end_time_str = end_time.strftime('%Y-%m-%d %H:%M:%S')
+        df_data = dbManager.get_data(sn=sn[0], start_time=start_time_str, end_time=end_time_str, data_groups=['gps'])
+        df_gps = df_data['gps']
+        df_gps = df_gps.drop_duplicates(['时间戳'])
+        df_gps = df_gps.reset_index(drop=True)
+        df_gps['时间戳'] = pd.to_datetime(df_gps['时间戳'])
+        print('{} GPS data read Well Done!'.format(sn[0]))
+
+        df_state_gps['Timestamp'] = df_gps['时间戳']
+        df_state_gps['sn'] = sn[0]
+
+        if len(df_state_gps) > 0:    # 无数据则不计算    
+            df_state_gps = SignalMonitor._set_gps_working_states(df_state, df_state_gps)    # 根据同时间段内BMS状态计算GPS数据对应的BMS状态
+            df_res_gps = SignalMonitor._get_offline_info(sn, df_state_gps, df_last_state_gps, df_res_gps, 'GPS')    # 计算设定时间段内信号质量数据
+            df_last_info = df_state_gps.loc[len(df_state_gps) - 1]
+            df_last_state_gps = df_last_state_gps.append(df_last_info)    # 记录该段时间内最后数据
+            df_last_state_gps = df_last_state_gps.reset_index(drop=True)
+        return df_res_gps, df_last_state_gps
+
+

File diff suppressed because it is too large
+ 18 - 46
demo.ipynb


+ 0 - 0
demo.md


Some files were not shown because too many files changed in this diff