__author__ = 'lmstack' #coding=utf-8 import os import datetime import pandas as pd from LIB.BACKEND import DBManager from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import datetime import dateutil.relativedelta import traceback from pandas.core.frame import DataFrame from urllib import parse import pymysql from LIB.MIDDLE.SaftyCenter.Common import DBDownload as DBDw from LIB.MIDDLE.SaftyCenter.Common import QX_BatteryParam from LIB.MIDDLE.SaftyCenter.Low_Soc_Alarm.low_soc_alarm import Low_soc_alarm import logging import logging.handlers import re if __name__ == "__main__": # 时间设置 now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)# 前一日 end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 07:00:00") start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 07:00:00") history_run_flag = False # 历史数据运行标志 # 更新sn列表 host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_oss' user='qixiang_oss' password='Qixiang2021' conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) cursor = conn.cursor() cursor.execute("select sn, imei, add_time from app_device") res = cursor.fetchall() df_sn = pd.DataFrame(res, columns=['sn', 'imei', 'add_time']) df_sn = df_sn.reset_index(drop=True) conn.close(); # 数据库配置 host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port = 3306 user = 'qx_cas' password = parse.quote_plus('Qx@123456') database = 'qx_cas' db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, database )) DbSession = sessionmaker(bind=db_engine) # 运行历史数据配置 df_first_data_time = pd.read_sql("select * from bat_first_data_time", db_engine) db_engine.dispose() # 日志 log_path = 'log/' if not os.path.exists(log_path): os.makedirs(log_path) logger = logging.getLogger("main") logger.setLevel(logging.DEBUG) # 根据日期滚动(每天产生1个文件) fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_info.log'.format(log_path), when="D", interval=1, backupCount=30, encoding="utf-8") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M-%S" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}") fh.setFormatter(formatter) fh.setLevel(logging.INFO) logger.addHandler(fh) fh = logging.handlers.TimedRotatingFileHandler(filename='{}/main_error.log'.format(log_path), when="D", interval=1, backupCount=30, encoding="utf-8") formatter = logging.Formatter("%(asctime)s - %(name)s-%(levelname)s %(message)s") fh.suffix = "%Y-%m-%d_%H-%M-%S" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}") fh.setFormatter(formatter) fh.setLevel(logging.ERROR) logger.addHandler(fh) logger.info("pid is {}".format(os.getpid())) # 算法参数 host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_manage' user='qx_query' password='@Qx_query' mode=4 tablename1='py_battery' DBRead=DBDw.DBDownload(host, port, db, user, password,mode) dbManager = DBManager.DBManager() low_soc_bat_list=DataFrame(columns=['sn','time','level']) with DBRead as DBRead: for i in range(0, len(df_sn)): try: if df_sn.loc[i, 'imei'][5:9] == 'N640': celltype=1 #6040三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'N440': celltype=2 #4840三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L660': celltype=99 # 6060锂电芯 elif df_sn.loc[i, 'imei'][3:5] == 'LX' and df_sn.loc[i, 'imei'][5:9] == 'N750': celltype=3 #力信 50ah三元电芯 elif df_sn.loc[i, 'imei'][3:9] == 'CLL128': celltype=100 # 重卡 elif df_sn.loc[i, 'imei'][3:5] == 'CL' and df_sn.loc[i, 'imei'][5:9] == 'N750': celltype=4 #CATL 50ah三元电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L420': celltype=101 #20ah磷酸铁锂电芯 elif df_sn.loc[i, 'imei'][5:9] == 'L264': celltype=102 #120ah磷酸铁锂电芯 # else: # logger.info("pid-{} celltype-{} SN: {} SKIP!".format(os.getpid(), "未知", sn)) # continue sn = df_sn.loc[i, 'sn'] param=QX_BatteryParam.BatteryInfo(celltype) logger.info("pid-{} celltype-{} SN: {} START!".format(os.getpid(), celltype, sn)) df_data = dbManager.get_data(sn=sn, start_time=start_time, end_time=end_time, data_groups=['bms']) df_bms = df_data['bms'] # 处理运行历史数据 if (history_run_flag): this_sn = df_first_data_time[df_first_data_time['sn']==sn] if (len(this_sn) == 0): start_time = pd.to_datetime(str(df_sn.loc[df_sn[df_sn['sn']==sn].index, 'add_time'].values[0])).strftime("%Y-%m-%d 00:00:00") else: first_data_time = df_first_data_time.loc[df_first_data_time[df_first_data_time['sn']==sn].index, 'first_data_time'].values[0] if pd.isnull(first_data_time): start_time = "2018-01-01 00:00:00" else: start_time = pd.to_datetime(str(df_first_data_time.loc[df_first_data_time[df_first_data_time['sn']==sn].index, 'first_data_time'].values[0])).strftime("%Y-%m-%d 00:00:00") df_OprtnSta=DBRead.getdata('qrcode','status', tablename=tablename1, sn=sn, timename='', st='', sp='',factory='')#取最后一条运营状态 low_soc_bat_list=Low_soc_alarm.low_soc_alarm(param,df_bms,low_soc_bat_list,sn,df_OprtnSta) logger.info("pid-{} celltype-{} SN: {} DONE!".format(os.getpid(), celltype, sn)) except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) try: if not low_soc_bat_list.empty: low_soc_bat_list['add_time'] = datetime.datetime.now() low_soc_bat_list.to_sql("lowsoc_info",con=db_engine, if_exists="append",index=False) logger.info("入库成功!") except: logger.error(traceback.format_exc) logger.error(u"{} :{},{} 任务运行错误\n".format(sn,start_time,end_time), exc_info=True) finally: db_engine.dispose()