# -*- coding: UTF-8 -*- import pandas as pd import time from sqlalchemy import create_engine import os import configparser import pymysql import traceback import datetime from LIB.BACKEND import DBManager,Log # 以相对路径的方式从LIB开始引入!!!!! from LIB.MIDDLE.AlgoDemo.Demo.V_1_0_0.CoreAlgo import core # 以相对路径的方式从LIB开始引入!!!!! from LIB.BACKEND.OPENAPI import OpenApi if __name__ == '__main__': # 环境变量配置(通过环境变量确定当前程序运行在开发、测试、生产环境) env_dist = os.environ cur_env = env_dist.get("CURENV", 'dev') # 默认为开发环境 # 读取配置文件 (该部分请不要修改) cf = configparser.ConfigParser() if cur_env == 'dev': cf.read("config-dev.ini") elif cur_env == 'test': cf.read("config-test.ini") elif cur_env == 'pro': cf.read("config-pro.ini") # 解析配置文件 (该部分请按照实际需求修改) host1 = cf.get("Mysql-1", 'host') port1 = int(cf.get("Mysql-1", 'port')) db1 = cf.get("Mysql-1", 'db') user1 = cf.get("Mysql-1", 'user') password1 = cf.get("Mysql-1", 'password') host2 = cf.get("Mysql-2", 'host') port2 = int(cf.get("Mysql-2", 'port')) db2 = cf.get("Mysql-2", 'db') user2 = cf.get("Mysql-2", 'user') password2 = cf.get("Mysql-2", 'password') # 日志配置(按照该配置,每次运行时可自动生成运行日期的文件夹, 会在与main.py同级的, 该部分可按照实际需求修改) now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_") log_path = 'log/' + now_str if not os.path.exists(log_path): os.makedirs(log_path) log = Log.Mylog(log_name='cal_sor', log_level = 'info') log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100) log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100) log.set_stream_hl("error") # 打印错误日志 logger = log.get_logger() logger.info("{}, 算法开始".format(str(os.getpid()))) try: # 连接数据库的两种方式 # 方式一:新dataframe写入数据库时,采用该方式可以不需要写sql语句; # 该方式无法对数据库进行修改; db_engine_1 = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user1, password1, host1, port1, db1 )) # 方式二:该方式可以通过写update SQL语句,对数据库中的数据进行修改 conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2) cursor = conn.cursor() # 准备算法数据 # 原始数据时间设置 end_time=datetime.datetime.now() start_time=end_time-datetime.timedelta(seconds=300) start_time=start_time.strftime('%Y-%m-%d %H:%M:%S') end_time=end_time.strftime('%Y-%m-%d %H:%M:%S') # 从开放平台获取资产列表及相关信息 openApi = OpenApi.OpenApi() df_all_sn = openApi.get_asset() calSor = core.CalSor(); # 算法初始化 # 遍历资产列表,获取数据和参数,输入算法中 for index in range(0, df_all_sn.index[-1]): try:# 解析得到厂家、sn、协议类型、电芯参数等输入数据 factory = df_all_sn.loc[index,'factory'] sn = df_all_sn.loc[index,'sn'] imei = df_all_sn.loc[index,'imei'] protocolType = df_all_sn.loc[index,'protocolType'] # 协议类型(3:32960,10:科易,13:优旦) if imei[5:9] == 'N640': celltype=1 #6040三元电芯 elif imei[5:9] == 'N440': celltype=2 #4840三元电芯 elif imei[5:9] == 'L660': celltype=99 # 6060锂电芯 elif imei[3:5] == 'LX' and imei[5:9] == 'N750': celltype=3 #力信 50ah三元电芯 elif imei[3:5] == 'CL' and imei[5:9] == 'N750': celltype=4 #CATL 50ah三元电芯 elif imei[1:3] == 'JM': # 金茂 celltype=100 #CATL 50ah三元电芯 # 获取电池历史数据 logger.info("{} start".format(sn)) dbManager = DBManager.DBManager() df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms', 'gps', 'accum', 'system']) df_bms = df_data['bms'] # 获取数据库原始数据 df_ram = pd.read_sql("select * from test_tb where sn ='{}'".format(sn), db_engine_1) # 调用核心算法 df_res = calSor.calSor(sn, df_bms,celltype); # 算法结果入库 # 新增 if (len(df_ram) == 0): df_res.to_sql("test_tb",con=db_engine_1, if_exists="append",index=False) else: # 修改 sql = '''update test_tb set data={} where sn='{}' '''.format(df_res['data'].values[0], (df_res['sn'].values[0])) cursor.execute(sql) conn.commit() logger.info("{} done".format(sn)) except Exception as e : logger.error(traceback.format_exc) logger.error(str(e)) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) except Exception as e : logger.error(traceback.format_exc) logger.error(str(e)) logger.error(u"任务运行错误2\n", exc_info=True) # 释放数据库资源 cursor.close() conn.close() db_engine_1.dispose()