deploy_soh.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import os
  2. import time
  3. import pandas as pd
  4. from sqlalchemy import create_engine
  5. import logging.handlers
  6. import traceback
  7. import re
  8. import CBMSBatSoh
  9. import datetime
  10. import dateutil.relativedelta
  11. from urllib import parse
  12. from apscheduler.schedulers.blocking import BlockingScheduler
  13. import warnings
  14. from multiprocessing import Process
  15. def fun(host, port, db, user, password, runenv, logger, period_second):
  16. logger.info("pid is {}".format(os.getpid()))
  17. try:
  18. db_engine = create_engine(
  19. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  20. user, parse.quote_plus(password), host, port, db
  21. ))
  22. except:
  23. logger.error(u"数据库连接错误", exc_info=True)
  24. logger.error(traceback.format_exc)
  25. return
  26. # 获取配置信息
  27. df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
  28. # db_engine.dispose()
  29. now_time = datetime.datetime.now()
  30. pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-6*3600) #两周前
  31. end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
  32. start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
  33. logger.info("time range from {} to {} ".format(start_time, end_time))
  34. for i in range(len(df_confs)):
  35. try:
  36. factory = df_confs.loc[i, 'factory']
  37. sn = df_confs.loc[i, 'device_name']
  38. if '康普盾' in factory:
  39. celltype = 1
  40. cell_volt_count = 120
  41. cell_temp_count = 40
  42. elif '华霆' in factory:
  43. celltype = 2
  44. cell_volt_count = 204
  45. cell_temp_count = 40
  46. elif '力神' in factory:
  47. celltype = 99
  48. cell_volt_count = 72
  49. cell_temp_count = 32
  50. logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
  51. sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
  52. logger.info(sql)
  53. df_bms = pd.read_sql(sql, db_engine)
  54. if df_bms.empty:
  55. continue
  56. df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
  57. 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
  58. 'CellVolt', 'id']
  59. # 单体数据拆分
  60. cell_temp = df_bms['CellTemp']
  61. cell_volt = df_bms['CellVolt']
  62. cell_temps = []
  63. [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
  64. cell_volts = []
  65. [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
  66. del_indexes = []
  67. for i, cell_volt in enumerate(cell_volts):
  68. if len(cell_volt) != cell_volt_count:
  69. del_indexes.append(i)
  70. for i, cell_temp in enumerate(cell_temps):
  71. if len(cell_temp) != cell_temp_count:
  72. del_indexes.append(i)
  73. del_indexes = list(set(del_indexes))
  74. for i, del_index in enumerate(del_indexes):
  75. del_index = del_index - i
  76. cell_volts.pop(del_index)
  77. cell_temps.pop(del_index)
  78. df_bms = df_bms.drop(index=del_indexes, axis=1)
  79. cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
  80. celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
  81. df_bms[cellvolt_name] = cell_volts
  82. df_bms[celltemp_name] = cell_temps
  83. df_bms = df_bms.drop(columns='CellVolt', axis=0)
  84. df_bms = df_bms.drop(columns='CellTemp', axis=0)
  85. df_bms = df_bms.reset_index(drop=True)
  86. logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
  87. # 算法执行
  88. df_soh = pd.read_sql(
  89. "select time_st,time_sp,sn,method,soh,cellsoh,packsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
  90. db_engine);
  91. BatSoh = CBMSBatSoh.BatSoh(sn, celltype, df_bms, df_soh)
  92. df_res = BatSoh.batsoh()
  93. # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  94. if (runenv == 'dev'):
  95. logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn))
  96. df_res = pd.DataFrame({'time_st': ["2021-01-01 00:00:00"], 'time_sp': ["2021-01-02 00:00:00"],
  97. 'sn': [sn], 'method': [1], 'bmssoh': [99], 'packsoh': [100], 'soh': [99],"cellsohmin":[10],"cellsohmax":[20],"sohmin_num":[30], 'sohmax_num':[10],
  98. 'cellsoh_diff': [1], 'cellsoh': ["98,97"]})
  99. if not df_res.empty:
  100. df_res.columns = ['time_st', 'time_sp', 'sn', 'method', 'bmssoh', 'packsoh', 'soh','cellsohmin', 'cellsohmax', 'sohmin_num', 'sohmax_num', 'cellsoh_diff',
  101. 'cellsoh']
  102. df_res['factory'] = factory
  103. df_res['add_time'] = datetime.datetime.now()
  104. df_res.to_sql("soh_result", con=db_engine, if_exists="append", index=False)
  105. logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn))
  106. logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
  107. except:
  108. logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
  109. logger.error(traceback.format_exc)
  110. db_engine.dispose()
  111. logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
  112. def heart_beat(host, port, db, user, password, log):
  113. while True:
  114. try:
  115. db_engine = create_engine(
  116. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  117. user, parse.quote_plus(password), host, port, db
  118. ))
  119. while True:
  120. now = datetime.datetime.now()
  121. db_engine.execute("update status set algo_soh='{}'".format(now))
  122. log.info("soh心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
  123. time.sleep(5)
  124. except:
  125. log.error(u"soh心跳错误", exc_info=True)
  126. log.error(traceback.format_exc)
  127. time.sleep(5)
  128. finally:
  129. db_engine.dispose()
  130. if __name__ == '__main__':
  131. warnings.filterwarnings("ignore")
  132. env_dist = os.environ
  133. # 配置信息信息
  134. # host = env_dist.get("ALI_HOST", '120.25.223.1')
  135. # port = env_dist.get("ALI_PORT", '4901')
  136. # db = env_dist.get("ALI_DB", 'ali')
  137. # user = env_dist.get("ALI_ROOT", 'root')
  138. # password = parse.quote_plus(env_dist.get("ALI_PASSWORD", '123456'))
  139. host = env_dist.get("ALI_HOST", '192.168.31.141')
  140. port = env_dist.get("ALI_PORT", '3306')
  141. db = env_dist.get("ALI_DB", 'ali')
  142. user = env_dist.get("ALI_ROOT", 'root')
  143. password = env_dist.get("ALI_PASSWORD", 'Ali@123456')
  144. runenv = env_dist.get("ALI_RUNENV", 'dev')
  145. period_second = int(env_dist.get("ALI_PERIOD_SECOND", '604800'))
  146. algo_name = 'soh'
  147. # 日志配置
  148. log_path = './log' + '/' + algo_name
  149. if not os.path.exists(log_path):
  150. os.makedirs(log_path)
  151. logger = logging.getLogger()
  152. logger.setLevel(logging.DEBUG)
  153. fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  154. backupCount=5, encoding="utf-8", mode="a")
  155. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  156. fh.suffix = "%Y-%m-%d_%H-%M.log"
  157. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
  158. fh.setFormatter(formatter)
  159. fh.setLevel(logging.DEBUG)
  160. logger.addHandler(fh)
  161. fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  162. backupCount=5, encoding="utf-8", mode="a")
  163. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  164. fh.setFormatter(formatter)
  165. fh.setLevel(logging.ERROR)
  166. logger.addHandler(fh)
  167. logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
  168. # 开启定时任务
  169. p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
  170. p.start()
  171. scheduler = BlockingScheduler()
  172. # heart_beat(host, port, db, user, password, logger)
  173. # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
  174. fun(host, port, db, user, password, runenv, logger, period_second)
  175. scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
  176. ,max_instances=1, coalesce=True)
  177. try:
  178. scheduler.start()
  179. except Exception as e:
  180. scheduler.shutdown()
  181. logger.error(str(e))