main.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. # -*- coding: UTF-8 -*-
  2. import pandas as pd
  3. import time
  4. from sqlalchemy import create_engine
  5. import os
  6. import pymysql
  7. import traceback
  8. import datetime
  9. from LIB.BACKEND import DBManager,Log # 以相对路径的方式引入!!!!!
  10. from V_1_0_0 import core # 以相对路径的方式引入!!!!!
  11. from LIB.BACKEND.OPENAPI import OpenApi
  12. if __name__ == '__main__':
  13. # 环境变量配置(通过环境变量接收数据库等相关配置参数)
  14. env_dist = os.environ
  15. host1 = env_dist.get("HOST1", '127.0.0.1')
  16. port1 = int(env_dist.get("PORT1", '3306'))
  17. db1 = env_dist.get("DB1", 'test')
  18. user1 = env_dist.get("USER1", 'root')
  19. password1 = env_dist.get("PASSWORD1", 'Qx123456')
  20. host2 = env_dist.get("HOST2", '127.0.0.1')
  21. port2 = int(env_dist.get("PORT2", '3306'))
  22. db2 = env_dist.get("DB2", 'test')
  23. user2 = env_dist.get("USER2", 'root')
  24. password2 = env_dist.get("PASSWORD2", 'Qx123456')
  25. # 日志配置(按照该配置,每次运行时可自动生成运行日期的文件夹, 会在与main同级的)
  26. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  27. log_path = 'log/' + now_str
  28. if not os.path.exists(log_path):
  29. os.makedirs(log_path)
  30. log = Log.Mylog(log_name='cal_sor', log_level = 'info')
  31. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  32. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  33. log.set_stream_hl("error") # 打印错误日志
  34. logger = log.get_logger()
  35. logger.info("{}, 算法开始".format(str(os.getpid())))
  36. try:
  37. # 连接数据库的两种方式
  38. # 方式一:新dataframe写入数据库时,采用该方式可以不需要写sql语句;
  39. # 该方式无法对数据库进行修改;
  40. db_engine_1 = create_engine(
  41. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  42. user1, password1, host1, port1, db1
  43. ))
  44. # 方式二:该方式可以通过写update SQL语句,对数据库中的数据进行修改
  45. conn = pymysql.connect(host=host2, port=port2, user=user2, password=password2, database=db2)
  46. cursor = conn.cursor()
  47. # 准备算法数据
  48. # 原始数据时间设置
  49. end_time=datetime.datetime.now()
  50. start_time=end_time-datetime.timedelta(seconds=300)
  51. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  52. end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
  53. # 从开放平台获取资产列表及相关信息
  54. openApi = OpenApi.OpenApi()
  55. df_all_sn = openApi.get_asset()
  56. calSor = core.CalSor(); # 算法初始化
  57. # 遍历资产列表,获取数据和参数,输入算法中
  58. for index in range(0, df_all_sn.index[-1]):
  59. try:# 解析得到厂家、sn、协议类型、电芯参数等输入数据
  60. factory = df_all_sn.loc[index,'factory']
  61. sn = df_all_sn.loc[index,'sn']
  62. imei = df_all_sn.loc[index,'imei']
  63. protocolType = df_all_sn.loc[index,'protocolType'] # 协议类型(3:32960,10:科易,13:优旦)
  64. if imei[5:9] == 'N640':
  65. celltype=1 #6040三元电芯
  66. elif imei[5:9] == 'N440':
  67. celltype=2 #4840三元电芯
  68. elif imei[5:9] == 'L660':
  69. celltype=99 # 6060锂电芯
  70. elif imei[3:5] == 'LX' and imei[5:9] == 'N750':
  71. celltype=3 #力信 50ah三元电芯
  72. elif imei[3:5] == 'CL' and imei[5:9] == 'N750':
  73. celltype=4 #CATL 50ah三元电芯
  74. elif imei[1:3] == 'JM': # 金茂
  75. celltype=100 #CATL 50ah三元电芯
  76. # 获取电池历史数据
  77. logger.info("{} start".format(sn))
  78. dbManager = DBManager.DBManager()
  79. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms', 'gps', 'accum', 'system'])
  80. df_bms = df_data['bms']
  81. # 获取数据库原始数据
  82. df_ram = pd.read_sql("select * from test_tb where sn ='{}'".format(sn), db_engine_1)
  83. # 调用核心算法
  84. df_res = calSor.calSor(sn, df_bms,celltype);
  85. # 算法结果入库
  86. # 新增
  87. if (len(df_ram) == 0):
  88. df_res.to_sql("test_tb",con=db_engine_1, if_exists="append",index=False)
  89. else:
  90. # 修改
  91. sql = '''update test_tb set data={} where sn='{}' '''.format(df_res['data'].values[0], (df_res['sn'].values[0]))
  92. cursor.execute(sql)
  93. conn.commit()
  94. logger.info("{} done".format(sn))
  95. except Exception as e :
  96. logger.error(traceback.format_exc)
  97. logger.error(str(e))
  98. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  99. except Exception as e :
  100. logger.error(traceback.format_exc)
  101. logger.error(str(e))
  102. logger.error(u"任务运行错误2\n", exc_info=True)
  103. # 释放数据库资源
  104. cursor.close()
  105. conn.close()
  106. db_engine_1.dispose()