deploy.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #coding=utf-8
  2. # 计算里程
  3. from math import radians, cos, sin, asin, sqrt
  4. import pandas as pd
  5. import numpy as np
  6. from datetime import timedelta
  7. from sqlalchemy import create_engine
  8. from sqlalchemy.orm import sessionmaker
  9. from LIB.BACKEND import DBManager, Log
  10. import pdb
  11. from urllib import parse
  12. import os
  13. import time, datetime
  14. import pymysql
  15. import traceback
  16. import dateutil.relativedelta
  17. from sqlalchemy import create_engine
  18. import pandas as pd
  19. from create_table import BaseTableBatUserAgent
  20. if __name__ == "__main__":
  21. # 时间设置
  22. now_time = datetime.datetime.now()
  23. pre_time = now_time + dateutil.relativedelta.relativedelta(days=-1)
  24. end_time=datetime.datetime.strftime(now_time,"%Y-%m-%d 00:00:00")
  25. start_time=datetime.datetime.strftime(pre_time,"%Y-%m-%d 00:00:00")
  26. # 结果数据库配置
  27. host = 'rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com'
  28. port = 3306
  29. user = 'qx_cas'
  30. password = parse.quote_plus('Qx@123456')
  31. database = 'qx_cas'
  32. db_engine = create_engine(
  33. "mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8".format(
  34. user, password, host, port, database
  35. ))
  36. DbSession = sessionmaker(bind=db_engine)
  37. # 日志配置
  38. now_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()).replace(":","_")
  39. log_path = 'log/' + now_str
  40. if not os.path.exists(log_path):
  41. os.makedirs(log_path)
  42. log = Log.Mylog(log_name='bat_user', log_level = 'info')
  43. log.set_file_hl(file_name='{}/info.log'.format(log_path), log_level='info', size=1024* 1024 * 100)
  44. log.set_file_hl(file_name='{}/error.log'.format(log_path), log_level='error', size=1024* 1024 * 100)
  45. logger = log.get_logger()
  46. logger.info("pid is + {}".format(os.getpid()))
  47. try:
  48. logger.info("pid-{} --{} START!".format(os.getpid(), str(start_time)))
  49. # 连接数据库
  50. host='rm-bp10j10qy42bzy0q7.mysql.rds.aliyuncs.com'
  51. port=3306
  52. db='qixiang_manage'
  53. user='qx_query'
  54. password=parse.quote_plus('@Qx_query')
  55. engine = create_engine('mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'.format(user,password, host, str(port),db))
  56. sql = "select * from py_battery_rent"
  57. df_rent = pd.read_sql_query(sql, engine)
  58. sql = "select * from py_battery_rent_change"
  59. df_rent_change = pd.read_sql_query(sql, engine)
  60. df_rent = df_rent.dropna(axis=0, how='any', subset=['user_id', 'qrcode'], inplace=False)
  61. df_rent = df_rent[~(df_rent['pay_stat']==3)]
  62. df_rent['id'] = df_rent['id'].apply(lambda x:str(int(x)) if not pd.isnull(x) else None)
  63. df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x+3600*8 if x!=0 else None)
  64. df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x+3600*8 if x!=0 else None)
  65. df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x+3600*8 if x!=0 else None)
  66. df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x+3600*8 if x!=0 else None)
  67. df_rent['addtime'] = pd.to_datetime(df_rent['addtime'].values,unit='s')
  68. df_rent['pay_time'] = pd.to_datetime(df_rent['pay_time'].values,unit='s')
  69. df_rent['get_time'] = pd.to_datetime(df_rent['get_time'].values,unit='s')
  70. df_rent['end_time'] = pd.to_datetime(df_rent['end_time'].values,unit='s')
  71. df_rent['return_time'] = pd.to_datetime(df_rent['return_time'].values,unit='s')
  72. df_rent['addtime'] = df_rent['addtime'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
  73. df_rent['pay_time'] = df_rent['pay_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
  74. df_rent['get_time'] = df_rent['get_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
  75. df_rent['end_time'] = df_rent['end_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
  76. df_rent['return_time'] = df_rent['return_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S") if not pd.isna(x) else x)
  77. df_rent = df_rent.reset_index(drop=True)
  78. print(len(df_rent))
  79. # df_rent_change = pd.read_csv("data_rent_change.csv",sep=',',encoding="ANSI")
  80. print(len(df_rent_change))
  81. df_rent_change = df_rent_change.dropna(axis=0, how='any', subset=['new_qrcode', 'qrcode'], inplace=False)
  82. print(len(df_rent_change))
  83. df_rent_change = df_rent_change.reset_index(drop=True)
  84. df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x+3600*8 if x!=0 else None)
  85. df_rent_change['create_time'] = pd.to_datetime(df_rent_change['create_time'].values,unit='s')
  86. df_rent_change['create_time'] = df_rent_change['create_time'].apply(lambda x:x.strftime("%Y-%m-%d %H:%M:%S"))
  87. # 将更换电池的信息,补充至rent中, 旧电池添加一条租用记录和归还记录, 并将订单的pay_time 改为电池更换时间,
  88. df_groups = df_rent_change.groupby("rent_id")
  89. for name, df_group in df_groups:
  90. df_group = df_group.sort_values("create_time")
  91. df_group = df_group.reset_index(drop=True)
  92. for i in range(0, len(df_group)):
  93. df_rent = df_rent.append(pd.DataFrame({'addtime':[df_group.loc[i,'create_time']],'qrcode':[df_group.loc[i,'qrcode']],
  94. 'return_time':[df_group.loc[i,'create_time']],'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True)
  95. df_rent = df_rent.append(pd.DataFrame({'addtime':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]],
  96. 'qrcode':[df_group.loc[i,'qrcode']], 'pay_time':[df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'].values[0]],
  97. 'user_id':[df_group.loc[i,'user_id']], 'f_id':[df_group.loc[i,'f_id']]}), ignore_index=True)
  98. df_rent.loc[df_rent[(df_rent['id']==str(int(df_group.loc[i,'rent_id'])))].index,'pay_time'] = df_group.loc[i,'create_time']
  99. # 生成用来排序的时间列
  100. df_rent = df_rent.reset_index(drop=True)
  101. df_rent['sort_time'] = [None] * len(df_rent)
  102. for i in range(0, len(df_rent)):
  103. df_rent.loc[i, 'sort_time'] = df_rent.loc[i, 'pay_time'] if not pd.isnull(df_rent.loc[i, 'pay_time']) else df_rent.loc[i, 'return_time']
  104. df_rent['sort_time'] = pd.to_datetime(df_rent['sort_time'])
  105. # df_rent.to_csv('ttt.csv')
  106. df = df_rent.copy()
  107. df_res = pd.DataFrame(columns=['sn', 'st', 'et', 'user_id', 'agent_id'])
  108. df_groups = df.groupby("qrcode")
  109. for name, df_group in df_groups:
  110. # 根据sn分组后的电池,首先按照记录时间排序,然后判断用户id是否发生变化,
  111. df_group = df_group.sort_values("sort_time") # 按照本条记录的生成时间排序
  112. df_group = df_group.reset_index(drop=True)
  113. sn = name
  114. user_id = df_group.loc[0, 'user_id']
  115. st = df_group.loc[0, 'pay_time']
  116. et = None
  117. for i in range(1,len(df_group)):
  118. if df_group.loc[i, 'user_id'] == user_id:
  119. continue
  120. else:
  121. et = df_group.loc[i-1, 'return_time'] if not pd.isnull(df_group.loc[i-1, 'return_time']) else None
  122. df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[i-1, 'f_id']]}), ignore_index=True)
  123. user_id = df_group.loc[i, 'user_id']
  124. st = df_group.loc[i, 'pay_time']
  125. et = None
  126. et = df_group.loc[len(df_group)-1, 'return_time'] if not pd.isnull(df_group.loc[len(df_group)-1, 'return_time']) else None
  127. df_res = df_res.append(pd.DataFrame({'sn':[sn], 'st':[st], 'et':[et], 'user_id':[user_id], 'agent_id':[df_group.loc[len(df_group)-1, 'f_id']]}), ignore_index=True)
  128. df_res.columns = ['sn', 'start_time', 'end_time', 'user_id', 'agent_id']
  129. session = DbSession()
  130. session.query(BaseTableBatUserAgent).filter().delete()
  131. df_res.to_sql("base_table_bat_user_agent",con=db_engine, if_exists="append",index=False)
  132. session.commit()
  133. session.close()
  134. logger.info("{}DONE!".format(str(start_time)))
  135. except Exception as e:
  136. logger.error(traceback.format_exc)
  137. logger.error(u" 任务运行错误", exc_info=True)
  138. finally:
  139. db_engine.dispose()