main.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. __author__ = 'ERIC'
  2. #coding=utf-8
  3. import os
  4. import datetime
  5. import pandas as pd
  6. from LIB.BACKEND import DBManager, Log
  7. from sqlalchemy import create_engine
  8. from sqlalchemy.orm import sessionmaker
  9. import soc
  10. import time, datetime
  11. import traceback
  12. from LIB.MIDDLE.CellStateEstimation.Common import log
  13. from urllib import parse
  14. import pymysql
  15. import pdb
  16. from apscheduler.schedulers.blocking import BlockingScheduler
  17. import datacompy
  18. import logging
  19. import multiprocessing
  20. #...................................电池包SOC计算......................................................................................................................
  21. def SOC_cal():
  22. start=time.time()
  23. now_time=datetime.datetime.now()
  24. start_time=now_time-datetime.timedelta(seconds=300)
  25. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  26. end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
  27. #数据库配置
  28. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  29. port=3306
  30. db='safety_platform'
  31. user='qx_read'
  32. password='Qx@123456'
  33. db_res_engine = create_engine(
  34. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  35. user, parse.quote_plus(password), host, port, db
  36. ))
  37. for i in range(0, len(df_sn)):
  38. factory = "骑享"
  39. sn = df_sn.loc[i, 'sn']
  40. try:
  41. if df_sn.loc[i, 'imei'][5:9] == 'N640':
  42. celltype=1 #6040三元电芯
  43. elif df_sn.loc[i, 'imei'][5:9] == 'N440':
  44. celltype=2 #4840三元电芯
  45. elif df_sn.loc[i, 'imei'][5:9] == 'L660':
  46. celltype=99 # 6060锂电芯
  47. elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  48. celltype=3 #力信 50ah三元电芯
  49. elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750':
  50. celltype=4 #CATL 50ah三元电芯
  51. elif df_sn.loc[i, 'imei'][3:9] == 'CLL128':
  52. celltype=100 # 重卡
  53. factory = "金茂换电"
  54. elif df_sn.loc[i, 'imei'][0:6] == 'SPFPFL':
  55. celltype=200#储能电站
  56. factory = "平峰"
  57. else:
  58. logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn))
  59. continue
  60. logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn))
  61. #读取原始数据库数据........................................................................................................................................................
  62. dbManager = DBManager.DBManager()
  63. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms'])
  64. df_bms = df_data['bms']
  65. #电池诊断................................................................................................................................................................
  66. if not df_bms.empty:
  67. SOC_result=soc.soc_test(sn,df_bms)
  68. SOC_result.columns=['sn','soc','time']
  69. SOC_result.reset_index(inplace=True,drop=True)
  70. print(SOC_result)
  71. #新增故障筛选并存入数据库.....................................................................
  72. if not SOC_result.empty: #新增写入数据库
  73. SOC_result.to_sql('pingfeng_soc_cal',con=db_res_engine, if_exists='append',index=False)
  74. end=time.time()
  75. print(end-start)
  76. except Exception as e:
  77. logger.warning(e)
  78. logger.error(traceback.format_exc)
  79. logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True)
  80. if __name__ == "__main__":
  81. # 时间设置
  82. # now_time = datetime.datetime.now()
  83. # pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日
  84. # end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
  85. # start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
  86. history_run_flag = False # 历史数据运行标志
  87. def get_sn():
  88. global df_sn
  89. # # 更新sn列表
  90. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  91. port=3306
  92. db='qixiang_oss'
  93. user='qixiang_oss'
  94. password='Qixiang2021'
  95. conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  96. cursor = conn.cursor()
  97. cursor.execute("select sn, imei, add_time from app_device where status in (1,2,3)")
  98. res = cursor.fetchall()
  99. df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  100. df_sn = df_sn[df_sn['sn'].str.contains('SPFPFL')]
  101. #df_sn = df_sn[df_sn['sn'].str.contains('PK500A20100000941')]
  102. df_sn = df_sn.reset_index(drop=True)
  103. #conn.close();
  104. # # 数据库配置
  105. # host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  106. # port = 3306
  107. # user = 'qx_cas'
  108. # password = parse.quote_plus('Qx@123456')
  109. # database = 'qx_cas'
  110. # db_engine = create_engine(
  111. # "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  112. # user, password, host, port, database
  113. # ))
  114. # DbSession = sessionmaker(bind=db_engine)
  115. # # 运行历史数据配置
  116. # df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine)
  117. # 日志配置
  118. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  119. log_path = 'log/' + now_str
  120. if not os.path.exists(log_path):
  121. os.makedirs(log_path)
  122. log = Log.Mylog(log_name='batDiag', log_level = 'info')
  123. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  124. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  125. logger = log.get_logger()
  126. logger.info("pid is {}".format(os.getpid()))
  127. # # 算法参数
  128. # host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  129. # port=3306
  130. # db='safety_platform'
  131. # user='qx_read'
  132. # password=parse.quote_plus('Qx@123456')
  133. # tablename='all_fault_info'
  134. # db_res_engine = create_engine(
  135. # "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  136. # user, password, host, port, db
  137. # ))
  138. #............................模块运行前,先读取数据库中所有结束时间为0的数据,需要从数据库中读取................
  139. # print("select start_time, end_time, product_id, code, level, info, advice, factory from {}".format(tablename))
  140. # result=pd.read_sql("select start_time, end_time, product_id, code, level, info, advice from all_fault_info where factory = '{}'".format('骑享'), db_res_engine)
  141. # result = result[['start_time', 'end_time', 'product_id', 'code', 'level', 'info', 'advice']]
  142. # df_diag_ram=result[(result['end_time']=='0000-00-00 00:00:00') & (result['code']==119)]
  143. df_bms_ram=pd.DataFrame(columns=['time', 'sn', 'packvolt', 'cellvolt', 'celltemp'])
  144. get_sn()
  145. SOC_cal()
  146. #定时任务.......................................................................................................................................................................
  147. scheduler = BlockingScheduler()
  148. scheduler.add_job(get_sn, 'interval', days=1, id='get_sn')
  149. scheduler.add_job(SOC_cal, 'interval', seconds=300, id='SOC_cal')
  150. try:
  151. scheduler.start()
  152. except Exception as e:
  153. scheduler.shutdown()
  154. logger.error(str(e))