deploy_soc.py 10 KB


  1. import os
  2. import time
  3. import pandas as pd
  4. from sqlalchemy import create_engine
  5. import logging
  6. import logging.handlers
  7. import traceback
  8. import re
  9. import CBMSBatSoc
  10. import datetime
  11. import dateutil.relativedelta
  12. from urllib import parse
  13. from apscheduler.schedulers.blocking import BlockingScheduler
  14. import warnings
  15. from multiprocessing import Process
  16. def fun(host, port, db, user, password, runenv, logger, period_second):
  17. global df_ram
  18. try:
  19. db_engine = create_engine(
  20. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  21. user, parse.quote_plus(password), host, port, db
  22. ))
  23. except:
  24. logger.error(u"数据库连接错误", exc_info=True)
  25. logger.error(traceback.format_exc)
  26. return
  27. # 获取配置信息
  28. df_confs = pd.read_sql('select * from conf where status=1 order by factory,device_name', db_engine)
  29. # db_engine.dispose()
  30. logger.info("pid is {}".format(os.getpid()))
  31. now_time = datetime.datetime.now()
  32. pre_time = now_time + dateutil.relativedelta.relativedelta(seconds=-period_second-1) #
  33. end_time = datetime.datetime.strftime(now_time, "%Y-%m-%d %H:%M:%S")
  34. start_time = datetime.datetime.strftime(pre_time, "%Y-%m-%d %H:%M:%S")
  35. logger.info("time range from {} to {} ".format(start_time, end_time))
  36. for i in range(len(df_confs)):
  37. try:
  38. factory = df_confs.loc[i, 'factory']
  39. sn = df_confs.loc[i, 'device_name']
  40. if '康普盾' in factory:
  41. celltype = 1
  42. cell_volt_count = 120
  43. cell_temp_count = 40
  44. elif '华霆' in factory:
  45. celltype = 2
  46. cell_volt_count = 204
  47. cell_temp_count = 40
  48. elif '力神' in factory:
  49. celltype = 99
  50. cell_volt_count = 72
  51. cell_temp_count = 32
  52. logger.info("pid-{} FACTORY:{} - SN: {} START!".format(os.getpid(), factory, sn))
  53. sql = "select * from bms_data where factory ='{}' and sn = '{}' and time > '{}' and time <= '{}'".format(factory, sn, start_time, end_time)
  54. logger.info(sql)
  55. df_bms = pd.read_sql(sql, db_engine)
  56. if df_bms.empty:
  57. continue
  58. df_bms.columns = ['sn', 'factory', 'add_time', 'update_time', 'time', 'BMSStat', 'PackVolt', 'PackCrnt',
  59. 'PackSOC', 'PackSOH', 'alarm1', 'alarm2', 'alarm3', 'CellTemp',
  60. 'CellVolt', 'id']
  61. # 单体数据拆分
  62. cell_temp = df_bms['CellTemp']
  63. cell_volt = df_bms['CellVolt']
  64. cell_temps = []
  65. [cell_temps.append(list(map(float,x.split(',')))) for x in cell_temp]
  66. cell_volts = []
  67. [cell_volts.append(list(map(float,x.split(',')))) for x in cell_volt]
  68. del_indexes = []
  69. for i, cell_volt in enumerate(cell_volts):
  70. if len(cell_volt) != cell_volt_count:
  71. del_indexes.append(i)
  72. for i, cell_temp in enumerate(cell_temps):
  73. if len(cell_temp) != cell_temp_count:
  74. del_indexes.append(i)
  75. del_indexes = list(set(del_indexes))
  76. for i, del_index in enumerate(del_indexes):
  77. del_index = del_index - i
  78. cell_volts.pop(del_index)
  79. cell_temps.pop(del_index)
  80. df_bms = df_bms.drop(index=del_indexes, axis=1)
  81. cellvolt_name = ['CellVolt' + str(x) for x in range(1, cell_volt_count + 1)]
  82. celltemp_name = ['CellTemp' + str(x) for x in range(1, cell_temp_count + 1)]
  83. df_bms[cellvolt_name] = cell_volts
  84. df_bms[celltemp_name] = cell_temps
  85. df_bms = df_bms.drop(columns='CellVolt', axis=0)
  86. df_bms = df_bms.drop(columns='CellTemp', axis=0)
  87. df_bms = df_bms.reset_index(drop=True)
  88. logger.info("pid-{} FACTORY:{} - SN: {} 去除单体异常行{}数据!".format(os.getpid(), factory, sn, str(del_indexes)))
  89. # 算法执行
  90. df_soh = pd.read_sql(
  91. "select time_st,time_sp,sn,method,soh,cellsoh from soh_result where factory ='{}' and sn = '{}' order by time_st desc limit 1".format(factory, sn),
  92. db_engine);
  93. if celltype>50:
  94. df_socdiff = pd.read_sql(
  95. "select time,sn,cellsoc_diff from uniform_result where factory ='{}' and sn = '{}' order by time desc limit 1".format(factory, sn),
  96. db_engine);
  97. else:
  98. df_socdiff = pd.DataFrame()
  99. df_ram_sn=df_ram[df_ram['sn']==sn]
  100. BatSoc= CBMSBatSoc.BatSoc(sn, celltype, df_bms, df_soh, df_ram_sn, df_socdiff)
  101. df_res, df_ram_sn=BatSoc.batsoc()
  102. if not df_ram_sn.empty:
  103. sn_index=df_ram.loc[df_ram['sn']==sn].index
  104. df_ram=df_ram.drop(index=sn_index)
  105. df_ram=df_ram.append(df_ram_sn)
  106. df_ram.reset_index(inplace=True,drop=True)
  107. # 如果以开发环境运行,则写入测试结果!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
  108. if (runenv == 'dev'):
  109. logger.info("pid-{} FACTORY:{} - SN: {} 测试!".format(os.getpid(), factory, sn))
  110. df_res = pd.DataFrame({'time': ["2021-01-01 00:00:00"],
  111. 'sn': [sn], 'bms_soc': [10],'packsoc': [1], 'socdsp':[100], 'cellsocmin': [99], 'cellsocmax': [100], 'socmin_num':[10], 'socmax_num':[20],
  112. 'cellsoc_diff':['12,22'], 'cellsoc':[10,20], 'ocvweight':[1], 'socstep':[2]})
  113. if not df_res.empty:
  114. df_res.columns = ['time', 'sn', 'bms_soc', 'packsoc', 'socdsp', 'cellsocmin', 'cellsocmax', 'socmin_num', 'socmax_num', 'cellsoc_diff', 'cellsoc', 'ocvweight', 'socstep']
  115. df_res['factory'] = factory
  116. df_res['add_time'] = datetime.datetime.now()
  117. df_res.to_sql("soc_result", con=db_engine, if_exists="append", index=False)
  118. logger.info("pid-{} FACTORY:{} - SN: {} 结果入库成功!".format(os.getpid(), factory, sn))
  119. logger.info("pid-{} FACTORY:{} - SN: {} DONE!".format(os.getpid(), factory, sn))
  120. except:
  121. logger.error(u"pid-{} FACTORY:{} - SN: {} ERROR!\n".format(os.getpid(), factory, sn), exc_info=True)
  122. logger.error(traceback.format_exc)
  123. db_engine.dispose()
  124. logger.info("time range from {} to {} done!!!!!!!!! ".format(start_time, end_time))
  125. # 多进程运行
  126. # def mainprocess(host, port, db, user, password, runenv, logger, period_second, process):
  127. #
  128. # process = int(process)
  129. # pool = multiprocessing.Pool(processes = process)
  130. #
  131. # for i in range(process):
  132. # sn_list = SNnums[i]
  133. # pool.apply_async(fun, (host, port, db, user, password, runenv, logger, period_second, ))
  134. #
  135. # pool.close()
  136. # pool.join()
  137. def heart_beat(host, port, db, user, password, log):
  138. while True:
  139. try:
  140. db_engine = create_engine(
  141. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  142. user, parse.quote_plus(password), host, port, db
  143. ))
  144. while True:
  145. now = datetime.datetime.now()
  146. db_engine.execute("update status set algo_soc='{}'".format(now))
  147. log.info("soc心跳信号" + now.strftime("%Y-%m-%d %H:%M:%S"))
  148. time.sleep(5)
  149. except:
  150. log.error(u"soc心跳错误", exc_info=True)
  151. log.error(traceback.format_exc)
  152. time.sleep(5)
  153. finally:
  154. db_engine.dispose()
  155. # 开启定时任务
  156. if __name__ == '__main__':
  157. warnings.filterwarnings("ignore")
  158. env_dist = os.environ
  159. # 配置信息信息
  160. host = env_dist.get("ALI_HOST", '120.25.223.1')
  161. port = env_dist.get("ALI_PORT", '4901')
  162. db = env_dist.get("ALI_DB", 'ali')
  163. user = env_dist.get("ALI_ROOT", 'root')
  164. password = env_dist.get("ALI_PASSWORD", '123456')
  165. runenv = env_dist.get("ALI_RUNENV", 'pro')
  166. process = env_dist.get("ALI_PROCESS", '10')
  167. period_second = int(env_dist.get("ALI_PERIOD_SECOND", '10'))
  168. algo_name = 'soc'
  169. # 日志配置
  170. log_path = './log' + '/' + algo_name
  171. if not os.path.exists(log_path):
  172. os.makedirs(log_path)
  173. logger = logging.getLogger()
  174. logger.setLevel(logging.DEBUG)
  175. fh = logging.handlers.RotatingFileHandler(filename='{}/info.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  176. backupCount=5, encoding="utf-8", mode="a")
  177. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  178. fh.suffix = "%Y-%m-%d_%H-%M.log"
  179. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}.log$")
  180. fh.setFormatter(formatter)
  181. fh.setLevel(logging.DEBUG)
  182. logger.addHandler(fh)
  183. fh = logging.handlers.RotatingFileHandler(filename='{}/error.log'.format(log_path), maxBytes=1024 * 1024 * 1024,
  184. backupCount=5, encoding="utf-8", mode="a")
  185. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  186. fh.setFormatter(formatter)
  187. fh.setLevel(logging.ERROR)
  188. logger.addHandler(fh)
  189. logger.info("算法初始化完毕,开始周期运行!!!!!!!!!!!!!!!!!!!!!")
  190. # 算法参数初始化
  191. column_name=['time', 'sn', 'bms_soc', 'soc','cellsoc','standingtime','rampackcrnt','ramcellvolt','kocellvoltmin','kocellvoltmax','ocvweight','as_accum','socstep']
  192. df_ram=pd.DataFrame(columns=column_name)
  193. p = Process(target=heart_beat, args=(host, port, db, user, password, logger,))
  194. p.start()
  195. scheduler = BlockingScheduler()
  196. # heart_beat(host, port, db, user, password, logger)
  197. # scheduler.add_job(func=heart_beat, args=(host, port, db, user, password, logger), trigger='interval', seconds=5,max_instances=1, coalesce=True)
  198. fun(host, port, db, user, password, runenv, logger, period_second)
  199. scheduler.add_job(func=fun, args=(host, port, db, user, password, runenv, logger, period_second), trigger='interval', seconds=period_second
  200. ,max_instances=1, coalesce=True)
  201. try:
  202. scheduler.start()
  203. except Exception as e:
  204. scheduler.shutdown()
  205. logger.error(str(e))