123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- #coding=utf-8
- # 计算里程
- from math import radians, cos, sin, asin, sqrt
- import pandas as pd
- import numpy as np
- from datetime import timedelta
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- from LIB.BACKEND import DBManager, Log
- import pdb
- from urllib import parse
- import os
- import time, datetime
- import pymysql
- import traceback
- import dateutil.relativedelta
- from sqlalchemy import create_engine
- import pandas as pd
- from create_table import BaseTableBatUserAgent
- if __name__ == "__main__":
-
- # 时间设置
- now_time = datetime.datetime.now()
- pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)
- end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
- start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
-
- # 结果数据库配置
- host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
- port = 3306
- user = 'qx_cas'
- password = parse.quote_plus('Qx@123456')
- database = 'qx_cas'
- db_engine = create_engine(
- "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
- user, password, host, port, database
- ))
- DbSession = sessionmaker(bind=db_engine)
- # 日志配置
- now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
- log_path = 'log/' + now_str
- if not os.path.exists(log_path):
- os.makedirs(log_path)
- log = Log.Mylog(log_name='bat_user', log_level = 'info')
- log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
- log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
- logger = log.get_logger()
- logger.info("pid is + {}".format(os.getpid()))
- try:
- logger.info("pid-{} --{} START!".format(os.getpid(), str(start_time)))
- # 连接数据库
- host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
- port=3306
- db='qixiang_manage'
- user='qx_query'
- password=parse.quote_plus('@Qx_query')
- engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user,password, host, str(port),db))
- sql = "select * from py_battery_rent"
- df_rent = pd.read_sql_query(sql, engine)
- sql = "select * from py_battery_rent_change"
- df_rent_change = pd.read_sql_query(sql, engine)
-
- df_rent = df_rent.dropna(axis=0, how='any', subset=['user_id', 'qrcode'], inplace=False)
- df_rent = df_rent[~(df_rent['pay_stat']==3)]
- df_rent['id'] = df_rent['id'].apply(lambda x:str(int(x)) if not pd.isnull(x) else None)
- df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x+3600*8 if x!=0 else None)
- df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x+3600*8 if x!=0 else None)
- df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x+3600*8 if x!=0 else None)
- df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x+3600*8 if x!=0 else None)
- df_rent['addtime'] = pd.to_datetime(df_rent['addtime'].values,unit='s')
- df_rent['pay_time'] = pd.to_datetime(df_rent['pay_time'].values,unit='s')
- df_rent['get_time'] = pd.to_datetime(df_rent['get_time'].values,unit='s')
- df_rent['end_time'] = pd.to_datetime(df_rent['end_time'].values,unit='s')
- df_rent['return_time'] = pd.to_datetime(df_rent['return_time'].values,unit='s')
- df_rent['addtime'] = df_rent['addtime'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
- df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
- df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
- df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
- df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
- df_rent = df_rent.reset_index(drop=True)
- print(len(df_rent))
- # df_rent_change = pd.read_csv("data_rent_change.csv",sep=',',encoding="ANSI")
- print(len(df_rent_change))
- df_rent_change = df_rent_change.dropna(axis=0, how='any', subset=['new_qrcode', 'qrcode'], inplace=False)
- print(len(df_rent_change))
- df_rent_change = df_rent_change.reset_index(drop=True)
- df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x+3600*8 if x!=0 else None)
- df_rent_change['create_time'] = pd.to_datetime(df_rent_change['create_time'].values,unit='s')
- df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
- # 将更换电池的信息,补充至rent中, 旧电池添加一条租用记录和归还记录, 并将订单的pay_time 改为电池更换时间,
- df_groups = df_rent_change.groupby("rent_id")
- for name, df_group in df_groups:
- df_group = df_group.sort_values("create_time")
- df_group = df_group.reset_index(drop=True)
- for i in range(0, len(df_group)):
- df_rent = df_rent.append(pd.DataFrame({'addtime':[df_group.loc[i,'create_time']],'qrcode':[df_group.loc[i,'qrcode']],
- 'return_time':[df_group.loc[i,'create_time']],'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True)
- df_rent = df_rent.append(pd.DataFrame({'addtime':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]],
- 'qrcode':[df_group.loc[i,'qrcode']], 'pay_time':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]],
- 'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True)
-
- df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'] = df_group.loc[i,'create_time']
- # 生成用来排序的时间列
- df_rent = df_rent.reset_index(drop=True)
- df_rent['sort_time'] = [None] * len(df_rent)
- for i in range(0, len(df_rent)):
- df_rent.loc[i, 'sort_time'] = df_rent.loc[i, 'pay_time'] if not pd.isnull(df_rent.loc[i, 'pay_time']) else df_rent.loc[i, 'return_time']
- df_rent['sort_time'] = pd.to_datetime(df_rent['sort_time'])
- # df_rent.to_csv('ttt.csv')
- df = df_rent.copy()
- df_res = pd.DataFrame(columns=['sn', 'st', 'et', 'user_id', 'agent_id'])
- df_groups = df.groupby("qrcode")
- for name, df_group in df_groups:
-
- # 根据sn分组后的电池,首先按照记录时间排序,然后判断用户id是否发生变化,
- df_group = df_group.sort_values("sort_time") # 按照本条记录的生成时间排序
- df_group = df_group.reset_index(drop=True)
- sn = name
- user_id = df_group.loc[0, 'user_id']
- st = df_group.loc[0, 'pay_time']
- et = None
- for i in range(1,len(df_group)):
- if df_group.loc[i, 'user_id'] == user_id:
- continue
- else:
- et = df_group.loc[i-1, 'return_time'] if not pd.isnull(df_group.loc[i-1, 'return_time']) else None
- df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[i-1, 'f_id']]}), ignore_index=True)
- user_id = df_group.loc[i, 'user_id']
- st = df_group.loc[i, 'pay_time']
- et = None
- et = df_group.loc[len(df_group)-1, 'return_time'] if not pd.isnull(df_group.loc[len(df_group)-1, 'return_time']) else None
- df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[len(df_group)-1, 'f_id']]}), ignore_index=True)
-
- df_res.columns = ['sn', 'start_time', 'end_time', 'user_id', 'agent_id']
- session = DbSession()
- session.query(BaseTableBatUserAgent).filter().delete()
- df_res.to_sql("base_table_bat_user_agent",con=db_engine, if_exists="append",index=False)
- session.commit()
- session.close()
- logger.info("{}DONE!".format(str(start_time)))
- except Exception as e:
- logger.error(traceback.format_exc)
- logger.error(u" 任务运行错误", exc_info=True)
- finally:
- db_engine.dispose()
-
|