#coding=utf-8 # 计算里程 from math import radians, cos, sin, asin, sqrt import pandas as pd import numpy as np from datetime import timedelta from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from LIB.BACKEND import DBManager, Log import pdb from urllib import parse import os import time, datetime import pymysql import traceback import dateutil.relativedelta from sqlalchemy import create_engine import pandas as pd from create_table import BaseTableBatUserAgent if __name__ == "__main__": # 时间设置 now_time = datetime.datetime.now() pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1) end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00") start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00") # 结果数据库配置 host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com' port = 3306 user = 'qx_cas' password = parse.quote_plus('Qx@123456') database = 'qx_cas' db_engine = create_engine( "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format( user, password, host, port, database )) DbSession = sessionmaker(bind=db_engine) # 日志配置 now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_") log_path = 'log/' + now_str if not os.path.exists(log_path): os.makedirs(log_path) log = Log.Mylog(log_name='bat_user', log_level = 'info') log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100) log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100) logger = log.get_logger() logger.info("pid is + {}".format(os.getpid())) try: logger.info("pid-{} --{} START!".format(os.getpid(), str(start_time))) # 连接数据库 host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com' port=3306 db='qixiang_manage' user='qx_query' password=parse.quote_plus('@Qx_query') engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user,password, host, str(port),db)) sql = "select * from py_battery_rent" df_rent = pd.read_sql_query(sql, engine) sql = "select * from py_battery_rent_change" df_rent_change = pd.read_sql_query(sql, engine) df_rent = df_rent.dropna(axis=0, how='any', subset=['user_id', 'qrcode'], inplace=False) df_rent = df_rent[~(df_rent['pay_stat']==3)] df_rent['id'] = df_rent['id'].apply(lambda x:str(int(x)) if not pd.isnull(x) else None) df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x+3600*8 if x!=0 else None) df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x+3600*8 if x!=0 else None) df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x+3600*8 if x!=0 else None) df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x+3600*8 if x!=0 else None) df_rent['addtime'] = pd.to_datetime(df_rent['addtime'].values,unit='s') df_rent['pay_time'] = pd.to_datetime(df_rent['pay_time'].values,unit='s') df_rent['get_time'] = pd.to_datetime(df_rent['get_time'].values,unit='s') df_rent['end_time'] = pd.to_datetime(df_rent['end_time'].values,unit='s') df_rent['return_time'] = pd.to_datetime(df_rent['return_time'].values,unit='s') df_rent['addtime'] = df_rent['addtime'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S")) df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x) df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x) df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x) df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x) df_rent = df_rent.reset_index(drop=True) print(len(df_rent)) # df_rent_change = pd.read_csv("data_rent_change.csv",sep=',',encoding="ANSI") print(len(df_rent_change)) df_rent_change = df_rent_change.dropna(axis=0, how='any', subset=['new_qrcode', 'qrcode'], inplace=False) print(len(df_rent_change)) df_rent_change = df_rent_change.reset_index(drop=True) df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x+3600*8 if x!=0 else None) df_rent_change['create_time'] = pd.to_datetime(df_rent_change['create_time'].values,unit='s') df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S")) # 将更换电池的信息,补充至rent中, 旧电池添加一条租用记录和归还记录, 并将订单的pay_time 改为电池更换时间, df_groups = df_rent_change.groupby("rent_id") for name, df_group in df_groups: df_group = df_group.sort_values("create_time") df_group = df_group.reset_index(drop=True) for i in range(0, len(df_group)): df_rent = df_rent.append(pd.DataFrame({'addtime':[df_group.loc[i,'create_time']],'qrcode':[df_group.loc[i,'qrcode']], 'return_time':[df_group.loc[i,'create_time']],'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True) df_rent = df_rent.append(pd.DataFrame({'addtime':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]], 'qrcode':[df_group.loc[i,'qrcode']], 'pay_time':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]], 'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True) df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'] = df_group.loc[i,'create_time'] # 生成用来排序的时间列 df_rent = df_rent.reset_index(drop=True) df_rent['sort_time'] = [None] * len(df_rent) for i in range(0, len(df_rent)): df_rent.loc[i, 'sort_time'] = df_rent.loc[i, 'pay_time'] if not pd.isnull(df_rent.loc[i, 'pay_time']) else df_rent.loc[i, 'return_time'] df_rent['sort_time'] = pd.to_datetime(df_rent['sort_time']) # df_rent.to_csv('ttt.csv') df = df_rent.copy() df_res = pd.DataFrame(columns=['sn', 'st', 'et', 'user_id', 'agent_id']) df_groups = df.groupby("qrcode") for name, df_group in df_groups: # 根据sn分组后的电池,首先按照记录时间排序,然后判断用户id是否发生变化, df_group = df_group.sort_values("sort_time") # 按照本条记录的生成时间排序 df_group = df_group.reset_index(drop=True) sn = name user_id = df_group.loc[0, 'user_id'] st = df_group.loc[0, 'pay_time'] et = None for i in range(1,len(df_group)): if df_group.loc[i, 'user_id'] == user_id: continue else: et = df_group.loc[i-1, 'return_time'] if not pd.isnull(df_group.loc[i-1, 'return_time']) else None df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[i-1, 'f_id']]}), ignore_index=True) user_id = df_group.loc[i, 'user_id'] st = df_group.loc[i, 'pay_time'] et = None et = df_group.loc[len(df_group)-1, 'return_time'] if not pd.isnull(df_group.loc[len(df_group)-1, 'return_time']) else None df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[len(df_group)-1, 'f_id']]}), ignore_index=True) df_res.columns = ['sn', 'start_time', 'end_time', 'user_id', 'agent_id'] session = DbSession() session.query(BaseTableBatUserAgent).filter().delete() df_res.to_sql("base_table_bat_user_agent",con=db_engine, if_exists="append",index=False) session.commit() session.close() logger.info("{}DONE!".format(str(start_time))) except Exception as e: logger.error(traceback.format_exc) logger.error(u" 任务运行错误", exc_info=True) finally: db_engine.dispose()