#coding=utf-8 import pymysql import pandas as pd from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import time, datetime import dateutil.relativedelta import os import traceback from LIB.BACKEND import Log if __name__ == "__main__": # 时间设置 now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1) end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00") start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00") # 结果数据库配置 host = '172.16.121.236' port = 3306 user = 'root' password = 'FastFun1234,' database = 'fastfun' db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, database )) DbSession = sessionmaker(bind=db_engine) # 日志配置 now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_") log_path = 'log/' + now_str if not os.path.exists(log_path): os.makedirs(log_path) log = Log.Mylog(log_name='oss_to_iotp', log_level = 'info') log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100) log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100) logger = log.get_logger() logger.info("pid is + {}".format(os.getpid())) try: logger.info("pid-{} --{} START!".format(os.getpid(), str(start_time))) host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port=3306 db='qixiang_oss' user='qx_read' password='Qx@123456' conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) cursor = conn.cursor() # cursor.execute("select * from (select distinct(t.sn) from (select * from app_device where status = 3)t inner join (select * from app_device_log where type=3 and out_type=1) t2 on t.sn=t2.sn)t3") cursor.execute("select distinct sn from app_device") res = cursor.fetchall() df_res = pd.DataFrame(res, columns=['sn']) df_res = df_res.reset_index(drop=True) all_sns = df_res['sn'].tolist() host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_manage' user='qx_query' password='@Qx_query' conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) cursor = conn.cursor() cursor.execute("select qrcode from py_battery where py_battery.status not in (0,1,2)") res = cursor.fetchall() df_filt = pd.DataFrame(res, columns=['sn']) df_filt = df_filt.reset_index(drop=True) filt_sns = df_filt['sn'].tolist() sns = list(set(all_sns)-set(filt_sns)) df_res = pd.DataFrame({'sn':sns}) df_res['factory'] = '骑享' # 金茂换电sn列表 # 更新sn列表 host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_oss' user='qx_algo_rw' password='qx@123456' db = 'qx_cas' conn = pymysql.connect(host=host, port=port, user=user, password=password, database=db) cursor = conn.cursor() cursor.execute("select sn, factory from sn_list") res = cursor.fetchall() df_sn2 = pd.DataFrame(res, columns=['sn', 'factory']) df_sn2['factory'] = "金茂换电" df_res = pd.concat([df_res, df_sn2], axis=0) df_res = df_res.reset_index(drop=True) df_res.to_sql("operation_sn",con=db_engine, if_exists="replace",index=False) logger.info("{}DONE!".format(str(start_time))) except Exception as e: logger.error(traceback.format_exc) logger.error(u" 任务运行错误", exc_info=True) finally: cursor.close() conn.close() db_engine.dispose()