import pandas as pd import pymysql from sqlalchemy import create_engine import datetime #建立引擎 engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s/%s") % ('root', 'pengmin', 'localhost', 'qixiangdb')) conn_qx = pymysql.connect( host='rm-bp10j10qy42bzy0q77o.mysql.rds.aliyuncs.com', user='qx_cas', password='Qx@123456',#Qx@123456 database='qx_cas', charset='utf8' ) conn_local = pymysql.connect( host='localhost', user='root', password='pengmin', database='qixiangdb', charset='utf8' ) def getNextSoc(start_soc): '''输入当前的soc,寻找目标soc函数''' if start_soc>80: next_soc=80 elif start_soc>60: next_soc=60 elif start_soc>40: next_soc=40 elif start_soc>20: next_soc=20 else: next_soc=1 return next_soc def updtSnFct(sn_factor_df,end_soc,delta_range,range_soc): '''输入当前的soc区间段,里程变量量,soc变化量,输出新的df sn_factor_df为dataframe,delta_range单位为km,range_soc单位为km/persoc''' if end_soc==80: updtFctByCol(sn_factor_df,'a0',delta_range,range_soc) elif end_soc==60: updtFctByCol(sn_factor_df,'a1',delta_range,range_soc) elif end_soc==40: updtFctByCol(sn_factor_df,'a2',delta_range,range_soc) elif end_soc==20: updtFctByCol(sn_factor_df,'a3',delta_range,range_soc) elif end_soc<20: updtFctByCol(sn_factor_df,'a4',delta_range,range_soc) return sn_factor_df def updtFctByCol(sn_factor_df,colmun_name,delta_range,range_soc): '''更新制定列的factor,sn_factor_df为dataframe,新的系数更新到第一行。delta_range单位为km, range_soc单位为km/persoc,默认按照100km更新续驶里程权重''' range_soc_old=sn_factor_df.loc[0,colmun_name]#读取第0行的老factor debounce_range=200#更新权重 new_factor=range_soc*((delta_range)/debounce_range)+range_soc_old*(1-(delta_range)/debounce_range) #在第1行,存储新的factor sn_factor_df.loc[1,colmun_name]=new_factor return sn_factor_df def updtTodayFct(factor_input,sn_day_df): '''更新今日的Factor***''' sn_factor_df_last=factor_input start_soc=sn_day_df.loc[0,'soc'] next_soc=getNextSoc(start_soc) start_range=sn_day_df.loc[0,'vehodo'] sn=sn_day_df.loc[0,'name'] for index in range(len(sn_day_df)-1): #寻找分割点, index_soc=sn_day_df.loc[index,'soc']#当前行soc next_index_soc=sn_day_df.loc[index+1,'soc']#下一行soc if (index_soc>=next_soc)&(next_index_soc1: sn_factor_df_last=updtSnFct(sn_factor_df_last,next_soc,delta_range_tonext_km,range_soc_tonext) start_soc=next_index_soc#变更开始soc next_soc=getNextSoc(start_soc)#变更结束soc start_range=sn_day_df.loc[index+1,'vehodo']#变更开始里程 return sn_factor_df_last def snDayDfPreProcess(sn_day_df): '''预处理,判断是否在dirvemode,获取drivemode条件下的累计行驶距离。 增加delta_soc列,drive_flg列,vehodo列''' sn_day_df=sn_day_df.reset_index(drop=True)#重置index #增加列,计算delta_soc for index in range(len(sn_day_df)): if index==0: sn_day_df.loc[index,'delta_soc']=0 else: sn_day_df.loc[index,'delta_soc']=sn_day_df.loc[index,'soc']-sn_day_df.loc[index-1,'soc'] #增加列,判断是否在drive状态 drive_flg=False accum_distance=0 for index in range(len(sn_day_df)): if index==0: sn_day_df.loc[index,'drive_status']=drive_flg sn_day_df.loc[index,'vehodo']=0 else: if (sn_day_df.loc[index,'delta_soc']<-0.1)|\ ((sn_day_df.loc[index,'delta_soc']<=0)&(sn_day_df.loc[index,'distance']>500)):#soc处于下降状态,说明在drive drive_flg=True#置true elif sn_day_df.loc[index,'delta_soc']>0.1:#soc处于上升状态,说明不在drive drive_flg=False#置false accum_distance=0#清零 sn_day_df.loc[index,'drive_flg']=drive_flg accum_distance+=sn_day_df.loc[index,'distance']#对行驶里程进行累加 sn_day_df.loc[index,'vehodo']=accum_distance #筛选所有的drive信息行 sn_day_drive_df=sn_day_df.loc[sn_day_df['drive_flg']==True,:] sn_day_drive_df=sn_day_drive_df.reset_index(drop=True)#重置index return sn_day_drive_df def updtAllSnFct(start_date,end_date): '''计算开始时间到结束时间的,所有sn的factor''' start_date_datetime=datetime.datetime.strptime(start_date,'%Y-%m-%d')#开始时间 end_date_datetime=datetime.datetime.strptime(end_date,'%Y-%m-%d')#开始时间 delta_day=(end_date_datetime-start_date_datetime).days#间隔天数 i=1 while i<=delta_day: end_date=(start_date_datetime+datetime.timedelta(days=i)).strftime("%Y-%m-%d") updtAllSnTodayFct(start_date,end_date)#调用函数 print('update all sn factor from '+start_date+" to "+end_date) start_date=end_date i+=1#自加 def updtAllSnTodayFct(start_date,end_date): ''''更新今天所有sn的factorx信息,start_date和end_date相隔一天。此处还可优化''' start_date_str="'"+start_date+"'" end_date_str="'"+end_date+"'" sql_cmd="select * from drive_info where time between "+start_date_str+" and "+end_date_str+" and distance!=0;" range_soc_df = pd.read_sql(sql_cmd, conn_qx)#使用read_sql方法查询qx数据库 #筛选出所有当日数据之后,筛选当日有更新的sn today_sn_list=range_soc_df['name'].unique().tolist()#[:100]#先一次更新5个 #建立空的dataframe,用于承接所有更新的factor信息 today_sn_fct_df=pd.DataFrame([],columns=['sn','date','a0','a1','a2','a3','a4']) for sn in today_sn_list: #寻找factor_df,里面是否有sn号,如果没有sn对应信息,则新增信息。 sn_str="'"+sn+"'" sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<"+start_date_str+" and sn="+sn_str #此处可以限定每次查询的数量,例如不高于5行 factor_df=pd.read_sql(sql_cmd2, conn_local)#使用read_sql方法查询local数据库 #按照sn号和日期进行去重,避免运行时重复产生factor数据,保留第一次出现的行。 factor_df=factor_df.drop_duplicates(subset=['sn','date'],keep='first') if len(factor_df)==0: #如果没有搜索到factor历史数据,则声明一个新的进行初始化 start_date_datetime=datetime.datetime.strptime(start_date,'%Y-%m-%d') yesterday=(start_date_datetime+datetime.timedelta(days=-1)).strftime("%Y-%m-%d") #为sn申请一个新的factor,初始值为1 factor_df=pd.DataFrame({'sn':sn,'date':yesterday,'a0':[1],'a1':[1],'a2':[1],'a3':[1],'a4':[1]}) sn_factor_df=factor_df.loc[factor_df['sn']==sn,:]#筛选sn对应的factor sn_factor_df=sn_factor_df.sort_values(by='date',ascending='True')#按照日期排序 sn_factor_df_last=sn_factor_df.tail(1).copy()#寻找最后一行,代表最近日期 sn_factor_df_last=sn_factor_df_last.append(sn_factor_df_last)#新增加一行,用于存储新的factor sn_factor_df_last=sn_factor_df_last.reset_index(drop=True)#重置index sn_factor_df_last.loc[1,'date']=start_date#更改后一行的date为当前日期 #筛选对应车辆的信息 condition_sn=(range_soc_df['name']==sn) sn_day_df=range_soc_df.loc[condition_sn,:].copy() sn_day_df=sn_day_df.reset_index(drop=True) #使用updtTodayFct函数更新今天的factor if len(sn_day_df)>=2: #使用process函数,进行预处理 sn_day_df=snDayDfPreProcess(sn_day_df)#预处理函数 if len(sn_day_df)>=2: sn_factor_df_new=updtTodayFct(sn_factor_df_last,sn_day_df)# today_sn_fct_df=today_sn_fct_df.append(sn_factor_df_new.loc[1,:])#筛选第一行,进行拼接,最后写入到数据库中 #将today_sn_fct_df写入到数据库中,今天所有factor更新的系数,一次写入。 if len(today_sn_fct_df)>=1: today_sn_fct_df.to_sql('tb_sn_factor',con=engine,chunksize=10000,if_exists='append',index=False) def updtOneSnFct(sn,start_date,end_date): '''计算开始时间到结束时间的,一个sn的所有factor''' start_date_datetime=datetime.datetime.strptime(start_date,'%Y-%m-%d')#开始时间 end_date_datetime=datetime.datetime.strptime(end_date,'%Y-%m-%d')#开始时间 delta_day=(end_date_datetime-start_date_datetime).days#间隔天数 i=1 while i<=delta_day: end_date=(start_date_datetime+datetime.timedelta(days=i)).strftime("%Y-%m-%d") updtOneSnTodayFct(sn,start_date,end_date)#调用函数 print('update one sn factor from '+start_date+" to "+end_date) start_date=end_date i+=1#自加 def updtOneSnTodayFct(sn,start_date,end_date): start_date_str="'"+start_date+"'" end_date_str="'"+end_date+"'" sn_str="'"+sn+"'" sql_cmd="select * from drive_info where time between "+start_date_str+" and "+end_date_str+\ " and distance!=0 and name="+sn_str range_soc_df = pd.read_sql(sql_cmd, conn_qx)#使用read_sql方法查询qx数据库 if len(range_soc_df)>0: #筛选出所有当日数据之后,筛选当日有更新的sn today_sn_list=range_soc_df['name'].unique().tolist() #建立空的dataframe,用于承接所有更新的factor信息 today_sn_fct_df=pd.DataFrame([],columns=['sn','date','a0','a1','a2','a3','a4']) for sn in today_sn_list: #寻找factor_df,里面是否有sn号,如果没有sn对应信息,则新增信息。 sn_str="'"+sn+"'" sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<"+start_date_str+" and sn="+sn_str factor_df=pd.read_sql(sql_cmd2, conn_local)#使用read_sql方法查询local数据库 #按照sn号和日期进行去重,避免运行时重复产生factor数据,保留第一次出现的行。 factor_df=factor_df.drop_duplicates(subset=['sn','date'],keep='first') if len(factor_df)==0: #如果没有搜索到factor历史数据,则声明一个新的进行初始化 start_date_datetime=datetime.datetime.strptime(start_date,'%Y-%m-%d') yesterday=(start_date_datetime+datetime.timedelta(days=-1)).strftime("%Y-%m-%d") factor_df=pd.DataFrame({'sn':sn,'date':yesterday,'a0':[1],'a1':[1],'a2':[1],'a3':[1],'a4':[1]}) today_sn_fct_df=today_sn_fct_df.append(factor_df.loc[0,:])#将初始化的行记录到数据库 sn_factor_df=factor_df.loc[factor_df['sn']==sn,:]#筛选sn对应的factor sn_factor_df=sn_factor_df.sort_values(by='date',ascending='True')#按照日期排序 sn_factor_df_last=sn_factor_df.tail(1).copy()#寻找最后一行,代表最近日期 sn_factor_df_last=sn_factor_df_last.append(sn_factor_df_last)#新增加一行,用于存储新的factor sn_factor_df_last=sn_factor_df_last.reset_index(drop=True)#重置index sn_factor_df_last.loc[1,'date']=start_date#更改后一行的date为当前日期 #筛选对应车辆的信息 condition_sn=(range_soc_df['name']==sn) sn_day_df=range_soc_df.loc[condition_sn,:].copy() sn_day_df=sn_day_df.reset_index(drop=True) #使用updtTodayFct函数更新今天的factor if len(sn_day_df)>=2: #使用process函数,进行预处理 sn_day_df=snDayDfPreProcess(sn_day_df)#!!!!!!!!!!!增加 if len(sn_day_df)>=2: sn_factor_df_new=updtTodayFct(sn_factor_df_last,sn_day_df)# today_sn_fct_df=today_sn_fct_df.append(sn_factor_df_new.loc[1,:])#筛选第一行,进行拼接,最后写入到数据库中 # #将today_sn_fct_df写入到数据库中 if len(today_sn_fct_df)>=1: today_sn_fct_df.to_sql('tb_sn_factor',con=engine,chunksize=10000,if_exists='append',index=False) # print(sn+' factor will be update in table tb_sn_factor!') return today_sn_fct_df # def updtASnTodayFct(start_date,end_date,today_sn_list): # sql_cmd="select * from qixiang_test where time>='"+start_date+"' and time<='"+end_date+"'" # range_soc_df = pd.read_sql(sql_cmd, conn)#使用read_sql方法查询数据库 # sql_cmd2="select sn,date,a0,a1,a2,a3,a4 from tb_sn_factor where date<'"+start_date+"'" # factor_df=pd.read_sql(sql_cmd2, conn)#使用read_sql方法查询数据库 # #筛选出所有当日数据之后,筛选当日有更新的sn # # today_sn_list=range_soc_df['sn'].unique().tolist() # # today_sn_list=today_sn_list[:10]#更新若干个 # #建立空的dataframe,用于承接所有更新的factor信息 # today_sn_fct_df=pd.DataFrame([],columns=['sn','date','a0','a1','a2','a3','a4']) # for sn in today_sn_list: # sn_factor_df=factor_df.loc[factor_df['sn']==sn,:]#筛选sn对应的factor # sn_factor_df=sn_factor_df.sort_values(by='date',ascending='True')#按照日期排序 # sn_factor_df_last=sn_factor_df.tail(1).copy()#寻找最后一行,代表最近日期 # sn_factor_df_last=sn_factor_df_last.append(sn_factor_df_last)#新增加一行,用于存储新的factor # sn_factor_df_last=sn_factor_df_last.reset_index(drop=True)#重置index # sn_factor_df_last.loc[1,'date']=start_date#更改后一行的date为当前日期 # #筛选对应车辆的信息 # condition_sn=(range_soc_df['sn']==sn) # sn_day_df=range_soc_df.loc[condition_sn,:].copy() # sn_day_df=sn_day_df.reset_index(drop=True) # #使用updtTodayFct函数更新今天的factor # sn_factor_df_new=updtTodayFct(sn_factor_df_last,sn_day_df) # today_sn_fct_df=today_sn_fct_df.append(sn_factor_df_new.loc[1,:])#筛选第一行,进行拼接,最后写入到数据库中 # #将today_sn_fct_df写入到数据库中 # today_sn_fct_df.to_sql('tb_sn_factor',con=engine,chunksize=10000,if_exists='append',index=False)