main_V0.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. from LIB.MIDDLE.InfoChrgDrive.Charge.V1_0_0.coreV0 import *
  2. import pymysql
  3. import datetime
  4. import pandas as pd
  5. from LIB.BACKEND import DBManager
  6. dbManager = DBManager.DBManager()
  7. from sqlalchemy import create_engine
  8. from urllib import parse
  9. import datetime, time
  10. from apscheduler.schedulers.blocking import BlockingScheduler
  11. import os
  12. import traceback
  13. import logging
  14. import logging.handlers
  15. #...................................充电技术指标统计函数......................................................................................................................
  16. def diag_cal():
  17. global SNnums
  18. global kmeans1,kmeans2,kmeans3,kmeans4,kmeans5
  19. start=time.time()
  20. now_time=datetime.datetime.now()
  21. start_time=now_time-datetime.timedelta(hours=24)
  22. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  23. end_time=now_time.strftime('%Y-%m-%d %H:%M:%S')
  24. #数据库配置
  25. host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  26. port=3306
  27. db='safety_platform'
  28. user='qx_algo_rw'
  29. password='qx@123456'
  30. #读取结果库数据......................................................
  31. param='sn,time_st,time_end,status,delta_time,soc_st,soc_end,volt_st,volt_end,diffvolt_st,diffvolt_end, \
  32. temp_max,temp_min,temp_incr,temp_mean,temp_st_mean,temp_end_mean,difftem_max,meancrnt,max_meancrnt, \
  33. sts_flg,full_chrg_flg,ovchrg_flg,ovchrg_prop,gps_lon,gps_lat,standtime_f,standtime_b,city,airtemp_st,airtemp_end,charge_env'
  34. tablename='algo_charge_info'
  35. mysql = pymysql.connect (host=host, user=user, password=password, port=port, database=db)
  36. cursor = mysql.cursor()
  37. sql = "select %s from %s where time_end='0000-00-00 00:00:00'" %(param,tablename)
  38. cursor.execute(sql)
  39. res = cursor.fetchall()
  40. df_diag_ram= pd.DataFrame(res,columns=param.split(','))
  41. db_res_engine = create_engine(
  42. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  43. user, parse.quote_plus(password), host, port, db
  44. ))
  45. for sn in SNnums:
  46. try:
  47. #读取原始数据库数据........................................................................................................................................................
  48. dbManager = DBManager.DBManager()
  49. df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms','gps'])
  50. df_bms = df_data['bms']
  51. df_gps = df_data['gps']
  52. #读取城市天气数据........................................................................................................................................................
  53. gpscity=pd.read_csv('LIB/MIDDLE/InfoChrgDrive/Charge/gps.csv')
  54. #调用主函数................................................................................................................................................................
  55. if not df_bms.empty: #BMS数据非空
  56. df_merge=pd.merge(df_bms,df_gps,how='outer',sort=True)
  57. df_merge['sn']=sn
  58. chrg_last=pd.Series()
  59. if not df_diag_ram.empty: #结果库非空
  60. df_diag_ram_sn=df_diag_ram[df_diag_ram['sn']==sn]
  61. if not df_diag_ram_sn.empty:
  62. df_diag_ram_sn['time_end']=list(map(lambda x: datetime.datetime.strptime(str(x),'%Y-%m-%d %H:%M:%S'),list(df_diag_ram_sn['time_end'])))
  63. df_diag_ram_sn = df_diag_ram_sn.sort_values(by = 'time_end')
  64. df_diag_ram_sn.reset_index(inplace=True,drop=True)
  65. chrg_last=df_diag_ram_sn.iloc[-1]
  66. time_end=chrg_last['time_end']
  67. df_diag_new,df_diag_change=pro_output(df_merge,sn,gpscity,chrg_last)
  68. df_diag_new=prediction(df_diag_new,kmeans1,kmeans2,kmeans3,kmeans4,kmeans5)
  69. df_diag_change=prediction(df_diag_change,kmeans1,kmeans2,kmeans3,kmeans4,kmeans5)
  70. if not df_diag_change.empty: #需变更的结果非空
  71. cursor.execute("DELETE FROM algo_charge_info WHERE time_end = '{}' and sn='{}'".format(time_end,sn))
  72. mysql.commit()
  73. df_diag_change.to_sql("algo_charge_info",con=db_res_engine, if_exists="append",index=False)
  74. #新增结果存入结果库.....................................................................
  75. if not df_diag_new.empty: #需新增的结果非空
  76. df_diag_new.to_sql("algo_charge_info",con=db_res_engine, if_exists="append",index=False)
  77. end=time.time()
  78. print(end-start)
  79. except Exception as e:
  80. logger.error(str(e))
  81. logger.error(traceback.format_exc())
  82. cursor.close()
  83. mysql.close()
  84. #...............................................主函数起定时作用.......................................................................................................................
  85. if __name__ == "__main__":
  86. # 日志
  87. log_path = 'log/'
  88. if not os.path.exists(log_path):
  89. os.makedirs(log_path)
  90. logger = logging.getLogger("main")
  91. logger.setLevel(logging.DEBUG)
  92. # 根据日期滚动(每天产生1个文件)
  93. fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_info.log'.format(log_path), when="D", interval=1, backupCount=30,
  94. encoding="utf-8")
  95. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  96. fh.suffix = "%Y-%m-%d_%H-%M-%S"
  97. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
  98. fh.setFormatter(formatter)
  99. fh.setLevel(logging.INFO)
  100. logger.addHandler(fh)
  101. fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_error.log'.format(log_path), when="D", interval=1, backupCount=30,
  102. encoding="utf-8")
  103. formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s")
  104. fh.suffix = "%Y-%m-%d_%H-%M-%S"
  105. fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}")
  106. fh.setFormatter(formatter)
  107. fh.setLevel(logging.ERROR)
  108. logger.addHandler(fh)
  109. logger.info("pid is {}".format(os.getpid()))
  110. # # 更新sn列表
  111. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  112. port=3306
  113. db='qixiang_oss'
  114. user='qixiang_oss'
  115. password='Qixiang2021'
  116. conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db)
  117. cursor = conn.cursor()
  118. cursor.execute("select sn, imei, add_time from app_device where status in (1,2,3)")
  119. res = cursor.fetchall()
  120. df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time'])
  121. df_sn = df_sn.reset_index(drop=True)
  122. conn.close();
  123. SNnums = list(df_sn['sn'])
  124. kmeans1 = joblib.load('LIB/MIDDLE/InfoChrgDrive/Charge/kmeans1.pkl')
  125. kmeans2 = joblib.load('LIB/MIDDLE/InfoChrgDrive/Charge/kmeans2.pkl')
  126. kmeans3 = joblib.load('LIB/MIDDLE/InfoChrgDrive/Charge/kmeans3.pkl')
  127. kmeans4 = joblib.load('LIB/MIDDLE/InfoChrgDrive/Charge/kmeans4.pkl')
  128. kmeans5 = joblib.load('LIB/MIDDLE/InfoChrgDrive/Charge/kmeans5.pkl')
  129. diag_cal()
  130. #定时任务.......................................................................................................................................................................
  131. scheduler = BlockingScheduler()
  132. scheduler.add_job(diag_cal, 'interval', hours=24)
  133. try:
  134. scheduler.start()
  135. except Exception as e:
  136. scheduler.shutdown()
  137. logger.error(str(e))
  138. logger.error(traceback.format_exc())