|
@@ -4,24 +4,24 @@ from sqlalchemy import create_engine
|
|
|
import datetime
|
|
|
import pdb
|
|
|
|
|
|
-#建立引擎
|
|
|
-engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s/%s") % ('root', 'pengmin', 'localhost', 'qixiangdb'))
|
|
|
-#连接到qx数据库
|
|
|
-conn_qx = pymysql.connect(
|
|
|
- host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com',
|
|
|
- user='qx_cas',
|
|
|
- password='Qx@123456',#Qx@123456
|
|
|
- database='qx_cas',
|
|
|
- charset='utf8'
|
|
|
- )
|
|
|
-#连接到本地数据库,输出物
|
|
|
-conn_local = pymysql.connect(
|
|
|
- host='localhost',
|
|
|
- user='root',
|
|
|
- password='pengmin',
|
|
|
- database='qixiangdb',
|
|
|
- charset='utf8'
|
|
|
- )
|
|
|
+# #建立引擎
|
|
|
+# engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s/%s") % ('root', 'pengmin', 'localhost', 'qixiangdb'))
|
|
|
+# #连接到qx数据库
|
|
|
+# conn_qx = pymysql.connect(
|
|
|
+# host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com',
|
|
|
+# user='qx_cas',
|
|
|
+# password='Qx@123456',#Qx@123456
|
|
|
+# database='qx_cas',
|
|
|
+# charset='utf8'
|
|
|
+# )
|
|
|
+# #连接到本地数据库,输出物
|
|
|
+# conn_local = pymysql.connect(
|
|
|
+# host='localhost',
|
|
|
+# user='root',
|
|
|
+# password='pengmin',
|
|
|
+# database='qixiangdb',
|
|
|
+# charset='utf8'
|
|
|
+# )
|
|
|
|
|
|
#计算下一个soc
|
|
|
def getNextSoc(start_soc):
|
|
@@ -133,7 +133,7 @@ def snDayDfPreProcess(sn_day_df):
|
|
|
|
|
|
|
|
|
#更新所有sn,连读多天的的factor
|
|
|
-def updtAllSnFct(start_date,end_date):
|
|
|
+def updtAllSnFct(start_date,end_date, db_engine, db_local, db_qx, sn_table_name='tb_sn_factor'):
|
|
|
'''计算开始时间到结束时间的,所有sn的factor'''
|
|
|
start_date_datetime=datetime.datetime.strptime(start_date,'%Y-%m-%d')#开始时间
|
|
|
end_date_datetime=datetime.datetime.strptime(end_date,'%Y-%m-%d')#开始时间
|
|
@@ -141,25 +141,26 @@ def updtAllSnFct(start_date,end_date):
|
|
|
i=1
|
|
|
while i<=delta_day:
|
|
|
end_date=(start_date_datetime+datetime.timedelta(days=i)).strftime("%Y-%m-%d")
|
|
|
- updtAllSnTodayFct(start_date,end_date)#调用函数
|
|
|
+ updtAllSnTodayFct(start_date,end_date, db_engine, db_local, db_qx, sn_table_name)#调用函数
|
|
|
# print('update all sn factor from '+start_date+" to "+end_date)
|
|
|
start_date=end_date
|
|
|
i+=1#自加
|
|
|
+
|
|
|
#更新所有sn,一天的factor
|
|
|
-def updtAllSnTodayFct(start_date,end_date):
|
|
|
+def updtAllSnTodayFct(start_date,end_date, db_engine, db_local, db_qx, sn_table_name):
|
|
|
''''更新今天所有sn的factorx信息,start_date和end_date相隔一天。此处还可优化'''
|
|
|
- conn_local = pymysql.connect(
|
|
|
- host='localhost',
|
|
|
- user='root',
|
|
|
- password='pengmin',
|
|
|
- database='qixiangdb',
|
|
|
- charset='utf8'
|
|
|
- )
|
|
|
+ # conn_local = pymysql.connect(
|
|
|
+ # host='localhost',
|
|
|
+ # user='root',
|
|
|
+ # password='pengmin',
|
|
|
+ # database='qixiangdb',
|
|
|
+ # charset='utf8'
|
|
|
+ # )
|
|
|
|
|
|
start_date_str="'"+start_date+"'"
|
|
|
end_date_str="'"+end_date+"'"
|
|
|
sql_cmd="select * from drive_info where time between "+start_date_str+" and "+end_date_str+" and distance!=0;"
|
|
|
- range_soc_df = pd.read_sql(sql_cmd, conn_qx)#使用read_sql方法查询qx数据库
|
|
|
+ range_soc_df = pd.read_sql(sql_cmd, db_qx)#使用read_sql方法查询qx数据库
|
|
|
|
|
|
#筛选出所有当日数据之后,筛选当日有更新的sn
|
|
|
today_sn_list=range_soc_df['name'].unique().tolist()#[:100]#先一次更新5个
|
|
@@ -170,15 +171,15 @@ def updtAllSnTodayFct(start_date,end_date):
|
|
|
#寻找factor_df,里面是否有sn号,如果没有sn对应信息,则新增信息。
|
|
|
sn_str="'"+sn+"'"
|
|
|
update_today_factor_flg=True
|
|
|
- sql_cmd3="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date="+start_date_str+" and sn="+sn_str
|
|
|
- factor_today_df=pd.read_sql(sql_cmd3, conn_local)#使用read_sql方法查询local数据库
|
|
|
+ sql_cmd3="select sn,date,a0,a1,a2,a3,a4 from {} where date=".format(sn_table_name)+start_date_str+" and sn="+sn_str
|
|
|
+ factor_today_df=pd.read_sql(sql_cmd3, db_local)#使用read_sql方法查询local数据库
|
|
|
if len(factor_today_df)>=1:
|
|
|
- print(sn+' '+start_date_str+' factor exist in table! Factor not update.')
|
|
|
+ # print(sn+' '+start_date_str+' factor exist in table! Factor not update.')
|
|
|
update_today_factor_flg=False
|
|
|
|
|
|
- sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<"+start_date_str+" and sn="+sn_str
|
|
|
+ sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from {} where date<".format(sn_table_name)+start_date_str+" and sn="+sn_str
|
|
|
#此处可以限定每次查询的数量,例如不高于5行
|
|
|
- factor_df=pd.read_sql(sql_cmd2, conn_local)#使用read_sql方法查询local数据库
|
|
|
+ factor_df=pd.read_sql(sql_cmd2, db_local)#使用read_sql方法查询local数据库
|
|
|
|
|
|
#按照sn号和日期进行去重,避免运行时重复产生factor数据,保留第一次出现的行。
|
|
|
factor_df=factor_df.drop_duplicates(subset=['sn','date'],keep='first')
|
|
@@ -218,10 +219,10 @@ def updtAllSnTodayFct(start_date,end_date):
|
|
|
|
|
|
#将today_sn_fct_df写入到数据库中,今天所有factor更新的系数,一次写入。
|
|
|
if len(today_sn_fct_df)>=1:
|
|
|
- today_sn_fct_df.to_sql('tb_sn_factor',con=engine,chunksize=10000,if_exists='append',index=False)
|
|
|
+ today_sn_fct_df.to_sql(sn_table_name,con=db_engine,chunksize=10000,if_exists='append',index=False)
|
|
|
|
|
|
#更新一个sn,连续多天的factor
|
|
|
-def updtOneSnFct(sn,start_date,end_date):
|
|
|
+def updtOneSnFct(sn,start_date,end_date,db_engine, db_local, db_qx, sn_table_name='tb_sn_factor'):
|
|
|
'''计算开始时间到结束时间的,一个sn的所有factor。
|
|
|
重复多次调用,updtOneSnTodayFct。
|
|
|
'''
|
|
@@ -232,27 +233,27 @@ def updtOneSnFct(sn,start_date,end_date):
|
|
|
while i<=delta_day:
|
|
|
end_date=(start_date_datetime+datetime.timedelta(days=i)).strftime("%Y-%m-%d")
|
|
|
# print('update one '+sn+'factor from '+start_date+" to "+end_date)
|
|
|
- updtOneSnTodayFct(sn,start_date,end_date)#调用函数,更新当日的factor。
|
|
|
+ updtOneSnTodayFct(sn,start_date,end_date,db_engine, db_local, db_qx, sn_table_name)#调用函数,更新当日的factor。
|
|
|
start_date=end_date
|
|
|
i+=1#自加
|
|
|
#更新一个sn,一天的factor
|
|
|
-def updtOneSnTodayFct(sn,start_date,end_date):
|
|
|
+def updtOneSnTodayFct(sn,start_date,end_date,db_engine, db_local, db_qx, sn_table_name):
|
|
|
'''更新一个sn,一天的factor。'''
|
|
|
#重新建立连接,更新数据库
|
|
|
- conn_local = pymysql.connect(
|
|
|
- host='localhost',
|
|
|
- user='root',
|
|
|
- password='pengmin',
|
|
|
- database='qixiangdb',
|
|
|
- charset='utf8'
|
|
|
- )
|
|
|
+ # conn_local = pymysql.connect(
|
|
|
+ # host='localhost',
|
|
|
+ # user='root',
|
|
|
+ # password='pengmin',
|
|
|
+ # database='qixiangdb',
|
|
|
+ # charset='utf8'
|
|
|
+ # )
|
|
|
|
|
|
start_date_str="'"+start_date+"'"
|
|
|
end_date_str="'"+end_date+"'"
|
|
|
sn_str="'"+sn+"'"
|
|
|
sql_cmd="select * from drive_info where time between "+start_date_str+" and "+end_date_str+\
|
|
|
" and distance!=0 and name="+sn_str
|
|
|
- range_soc_df = pd.read_sql(sql_cmd, conn_qx)#使用read_sql方法查询qx数据库
|
|
|
+ range_soc_df = pd.read_sql(sql_cmd, db_qx)#使用read_sql方法查询qx数据库
|
|
|
|
|
|
if len(range_soc_df)>0:
|
|
|
#筛选出所有当日数据之后,筛选当日有更新的sn
|
|
@@ -265,14 +266,14 @@ def updtOneSnTodayFct(sn,start_date,end_date):
|
|
|
sn_str="'"+sn+"'"
|
|
|
|
|
|
update_today_factor_flg=True
|
|
|
- sql_cmd3="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date="+start_date_str+" and sn="+sn_str
|
|
|
- factor_today_df=pd.read_sql(sql_cmd3, conn_local)#使用read_sql方法查询local数据库
|
|
|
+ sql_cmd3="select sn,date,a0,a1,a2,a3,a4 from {} where date=".format(sn_table_name)+start_date_str+" and sn="+sn_str
|
|
|
+ factor_today_df=pd.read_sql(sql_cmd3, db_local)#使用read_sql方法查询local数据库
|
|
|
if len(factor_today_df)>=1:
|
|
|
- print(sn+' '+start_date_str+' factor exist in table! Factor not update.')
|
|
|
+ # print(sn+' '+start_date_str+' factor exist in table! Factor not update.')
|
|
|
update_today_factor_flg=False
|
|
|
|
|
|
- sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<="+start_date_str+" and sn="+sn_str
|
|
|
- factor_df=pd.read_sql(sql_cmd2, conn_local)#使用read_sql方法查询local数据库
|
|
|
+ sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from {} where date<=".format(sn_table_name)+start_date_str+" and sn="+sn_str
|
|
|
+ factor_df=pd.read_sql(sql_cmd2, db_local)#使用read_sql方法查询local数据库
|
|
|
#按照sn号和日期进行去重,避免运行时重复产生factor数据,保留第一次出现的行。
|
|
|
factor_df=factor_df.drop_duplicates(subset=['sn','date'],keep='first')
|
|
|
# pdb.set_trace()
|
|
@@ -313,13 +314,13 @@ def updtOneSnTodayFct(sn,start_date,end_date):
|
|
|
|
|
|
# #将today_sn_fct_df写入到数据库中
|
|
|
if len(today_sn_fct_df)>=1:
|
|
|
- today_sn_fct_df.to_sql('tb_sn_factor',con=engine,chunksize=10000,if_exists='append',index=False)
|
|
|
+ today_sn_fct_df.to_sql(sn_table_name,con=db_engine,chunksize=10000,if_exists='append',index=False)
|
|
|
# print(sn+' factor will be update in table tb_sn_factor!')
|
|
|
return sn_factor_df_new
|
|
|
|
|
|
|
|
|
#更新最新的factor,一天调用一次。
|
|
|
-def updtNewestFctTb():
|
|
|
+def updtNewestFctTb(db_local, db_engine, sn_table_name='tb_sn_factor', sn_newest_table_name='tb_sn_factor_newest'):
|
|
|
|
|
|
'''更新tb_sn_factor_newest,只保留最新日期的factor。
|
|
|
从tb_sn_factor中,筛选最新的日期。
|
|
@@ -329,8 +330,8 @@ def updtNewestFctTb():
|
|
|
current_time_str=current_time.strftime('%Y-%m-%d %H:%M:%S')#时间格式化为字符串,年-月-日 时-分-秒
|
|
|
current_time_str="'"+current_time_str+"'"
|
|
|
|
|
|
- sql_cmd_4="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<"+current_time_str
|
|
|
- factor_all_df = pd.read_sql(sql_cmd_4, conn_local)#使用read_sql方法查询qx数据库
|
|
|
+ sql_cmd_4="select sn,date,a0,a1,a2,a3,a4 from {} where date<".format(sn_table_name)+current_time_str
|
|
|
+ factor_all_df = pd.read_sql(sql_cmd_4, db_local)#使用read_sql方法查询qx数据库
|
|
|
#筛选今天之前的所有factor,只保留最近的一天。
|
|
|
sn_list=factor_all_df['sn'].unique().tolist()#筛选sn序列
|
|
|
newest_sn_fct_df=pd.DataFrame([],columns=['sn','date','a0','a1','a2','a3','a4'])#声明空df
|
|
@@ -344,7 +345,7 @@ def updtNewestFctTb():
|
|
|
|
|
|
#按照日期排序,只保留最近的一天,输出factor_unique_df,方法为replace。
|
|
|
#本函数,每天需要运行一次,用于更新factor。
|
|
|
- newest_sn_fct_df.to_sql('tb_sn_factor_newest',con=engine,chunksize=10000,\
|
|
|
+ newest_sn_fct_df.to_sql(sn_newest_table_name,con=db_engine,chunksize=10000,\
|
|
|
if_exists='replace',index=False)
|
|
|
#使用factor和soc推荐剩余续驶里程
|
|
|
def calDistFromFct(input_df):
|
|
@@ -372,7 +373,7 @@ def calDistFromFct(input_df):
|
|
|
row_df['vehelecrng']=range#给VehElecRng列赋值
|
|
|
return row_df
|
|
|
#更新当前时间对应的里程,每5min调用一次
|
|
|
-def updtVehElecRng(input_time='2021-07-29 12:01:00'):
|
|
|
+def updtVehElecRng(db_qx, db_local, db_engine, range_table_name='tb_sn_factor_soc_range', sn_newest_table_name='tb_sn_factor_newest', input_time='2021-07-29 12:01:00'):
|
|
|
'''更新续驶里程,到tb_sn_factor_soc_range。
|
|
|
部署时设置每5min更新一次。
|
|
|
'''
|
|
@@ -389,14 +390,14 @@ def updtVehElecRng(input_time='2021-07-29 12:01:00'):
|
|
|
|
|
|
#从drive_info里面读取,该时间段内的name,time,soc三列
|
|
|
sql_cmd="select name,time,soc from drive_info where time between "+before6min_time_str+" and "+current_time_str
|
|
|
- print(sql_cmd)
|
|
|
- range_soc_df = pd.read_sql(sql_cmd, conn_qx)#使用read_sql方法查询qx数据库
|
|
|
+ # print(sql_cmd)
|
|
|
+ range_soc_df = pd.read_sql(sql_cmd, db_qx)#使用read_sql方法查询qx数据库
|
|
|
range_soc_df.rename(columns={'name':'sn'},inplace=True)#将name列重命名为sn列
|
|
|
|
|
|
#任务2,从tb_sn_factor_newest里面读取最新的factor,获取距离今天最近的一个factor list
|
|
|
- sql_cmd_1="select sn,a0,a1,a2,a3,a4 from tb_sn_factor_newest"
|
|
|
- print(sql_cmd_1)
|
|
|
- sn_factor_newest_df_raw = pd.read_sql(sql_cmd_1, conn_local)#使用read_sql方法查询qx数据库
|
|
|
+ sql_cmd_1="select sn,a0,a1,a2,a3,a4 from {}".format(sn_newest_table_name)
|
|
|
+ # print(sql_cmd_1)
|
|
|
+ sn_factor_newest_df_raw = pd.read_sql(sql_cmd_1, db_local)#使用read_sql方法查询qx数据库
|
|
|
|
|
|
#任务3,将range_soc_df和sn_factor_newest_df_raw,双表合并成为一个新表格。
|
|
|
sn_soc_factor_df=pd.merge(range_soc_df,sn_factor_newest_df_raw,how='left',on='sn')
|
|
@@ -412,6 +413,6 @@ def updtVehElecRng(input_time='2021-07-29 12:01:00'):
|
|
|
sn_soc_factor_range_df=sn_soc_factor_range_df.append(sn_soc_factor_range_row)#拼接
|
|
|
|
|
|
##任务5,将sn_soc_factor_range_df写入到tb_sn_factor_soc_range中,使用替换关系。
|
|
|
- sn_soc_factor_range_df.to_sql('tb_sn_factor_soc_range',con=engine,chunksize=10000,\
|
|
|
+ sn_soc_factor_range_df.to_sql(range_table_name,con=db_engine,chunksize=10000,\
|
|
|
if_exists='replace',index=False)
|
|
|
|