Browse Source

算法模板测试

lmstack 3 years ago
parent
commit
6122df47d8

+ 2 - 2
LIB/BACKEND/Log.py

@@ -20,8 +20,8 @@ class Mylog:
     def get_logger(self):
         return self.logger
     
-    def set_file_hl(self, file_name='all.log', log_level='info', size=1):
-        fh = handlers.RotatingFileHandler(file_name, maxBytes=size, backupCount=10)
+    def set_file_hl(self, file_name='all.log', log_level='info', size=1, backupCount=10):
+        fh = handlers.RotatingFileHandler(file_name, maxBytes=size, backupCount=backupCount)
         fh_formatter = logging.Formatter('%(asctime)s:%(created)f:%(name)s:%(module)s:%(funcName)s:%(levelname)s:%(message)s')
         fh.setFormatter(fh_formatter)
         if len(log_level) > 0:

+ 0 - 1
LIB/FRONTEND/AlgoTest/Algo1/Dockerfile

@@ -7,7 +7,6 @@ LABEL version="1.0.0"
 
 # 环境变量参数
 ENV TZ="Asia/Shanghai"
-ENV HOST="127.0.0.1"
 
 # 安装用到的python 第三方库
 RUN pip install -i https://pypi.tuna.tsinghua.edu.cn/simple \

+ 2 - 3
LIB/MIDDLE/AlgoTest/Algo1/V_1_0_0/core.py

@@ -8,7 +8,6 @@ class CalSor:
     def __init__(self):
         pass
     
-    def calSor(self, df_bms, df_param):
+    def calSor(self, sn,df_bms, celltype):
         i = len(df_bms)
-        j = len(df_param)
-        return pd.DataFrame({'i':[i]}), pd.DataFrame({'j':[j]})
+        return pd.DataFrame({'data':[i],'sn':[sn]})

+ 109 - 19
LIB/MIDDLE/AlgoTest/Algo1/main.py

@@ -1,19 +1,34 @@
+# -*- coding: UTF-8 -*-
+
 import pandas as pd
 import time
+from sqlalchemy import create_engine
+import os
+import pymysql
+import traceback
+import datetime
 from LIB.BACKEND import DBManager,Log # 以相对路径的方式引入!!!!!
 from V_1_0_0 import core # 以相对路径的方式引入!!!!!
-import os
+from LIB.BACKEND.OPENAPI import OpenApi
+
 if __name__ == '__main__':
     
-    # 获取环境变量
+    
+    # 环境变量配置(通过环境变量接收数据库等相关配置参数)
     env_dist = os.environ
-    host = env_dist.get("HOST", '120.25.223.1')
-    port = env_dist.get("PORT", '4901')
-    db = env_dist.get("DB", 'ali')
-    user = env_dist.get("ROOT", 'root')
-    password = env_dist.get("PASSWORD", '123456')
+    host1 = env_dist.get("HOST1", '127.0.0.1')
+    port1 = int(env_dist.get("PORT1", '3306'))
+    db1 = env_dist.get("DB1", 'test')
+    user1 = env_dist.get("ROOT1", 'root')
+    password1 = env_dist.get("PASSWORD1", 'Qx123456')
+    
+    host2 = env_dist.get("HOST2", '127.0.0.1')
+    port2 = int(env_dist.get("PORT2", '3306'))
+    db2 = env_dist.get("DB2", 'test')
+    user2 = env_dist.get("ROOT2", 'root')
+    password2 = env_dist.get("PASSWORD2", 'Qx123456')
     
-    # 日志配置
+    # 日志配置(按照该配置,每次运行时可自动生成运行日期的文件夹, 会在与main同级的)
     now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
     log_path = 'log/' + now_str
     if not os.path.exists(log_path):
@@ -22,15 +37,90 @@ if __name__ == '__main__':
     log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
     log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
     logger = log.get_logger()
+    logger.info("{}, 算法开始".format(str(os.getpid())))
     
-    # 准备算法数据
-    sn = "PK10001A326000123" 
-    st = '2021-07-06 00:00:00'
-    et = '2021-07-07 20:00:00'
-    dbManager = DBManager.DBManager()
-    df_data = dbManager.get_data(sn=sn, start_time=st, end_time=et, data_groups=['bms', 'gps', 'accum', 'system'])
-    df_bms = df_data['bms']
-    # 调用核心算法
-    calSor = core.CalSor();
-    # 接收算法结果
-    print(calSor.calSor(df_bms,df_bms))
+    try:    
+        # 连接数据库的两种方式
+        # 方式一:新dataframe写入数据库时,采用该方式可以不需要写sql语句;
+        #    该方式无法对数据库进行修改;
+        
+        db_engine_1 = create_engine(
+            "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
+                user1, password1, host1, port1, db1
+            ))
+        
+        # 方式二:该方式可以通过写update SQL语句,对数据库中的数据进行修改
+        conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2)
+        cursor = conn.cursor()
+
+        # 准备算法数据
+        
+        # 原始数据时间设置
+        end_time=datetime.datetime.now()
+        start_time=end_time-datetime.timedelta(seconds=300)
+        start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
+        end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
+        
+        # 从开放平台获取资产列表及相关信息
+        openApi = OpenApi.OpenApi()
+        df_all_sn = openApi.get_asset()
+        calSor = core.CalSor(); # 算法初始化
+        
+        # 遍历资产列表,获取数据和参数,输入算法中
+        for index in range(0, df_all_sn.index[-1]):
+            
+            try:# 解析得到厂家、sn、协议类型、电芯参数等输入数据
+                factory = df_all_sn.loc[index,'factory']
+                sn = df_all_sn.loc[index,'sn']
+                imei = df_all_sn.loc[index,'imei']
+                protocolType = df_all_sn.loc[index,'protocolType'] # 协议类型(3:32960,10:科易,13:优旦)
+                if imei[5:9] == 'N640':
+                    celltype=1 #6040三元电芯
+                elif imei[5:9] == 'N440':
+                    celltype=2 #4840三元电芯
+                elif imei[5:9] == 'L660':
+                    celltype=99 # 6060锂电芯
+                elif imei[3:5] == 'LX' and imei[5:9] == 'N750':    
+                    celltype=3 #力信 50ah三元电芯
+                elif imei[3:5] == 'CL' and imei[5:9] == 'N750': 
+                    celltype=4 #CATL 50ah三元电芯
+                elif imei[1:3] == 'JM': # 金茂
+                    celltype=100 #CATL 50ah三元电芯
+
+                # 获取电池历史数据
+
+                dbManager = DBManager.DBManager()
+                df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms', 'gps', 'accum', 'system'])
+                df_bms = df_data['bms']
+                
+                # 获取数据库原始数据
+                df_ram = pd.read_sql("select * from test_tb where sn ='{}'".format(sn), db_engine_1)
+                
+                # 调用核心算法 
+                df_res = calSor.calSor(sn, df_bms,celltype);
+                
+                # 算法结果入库
+                # 新增
+                if (len(df_ram) == 0):
+                    df_res.to_sql("test_tb",con=db_engine_1, if_exists="append",index=False)
+                    
+                else:   
+                # 修改
+                    sql = '''update test_tb set data={} where sn='{}' '''.format(df_res['data'].values[0], (df_res['sn'].values[0]))
+                    cursor.execute(sql)
+                    conn.commit()
+            except Exception as e :
+                logger.error(traceback.format_exc)
+                logger.error(str(e))
+                logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
+            
+    except Exception as e :
+        logger.error(traceback.format_exc)
+        logger.error(str(e))
+        logger.error(u"任务运行错误2\n", exc_info=True)
+        
+    # 释放数据库资源
+    cursor.close()
+    conn.close()
+    db_engine_1.dispose()
+