123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- import datetime,time
- from multiprocessing import Pool
- import json
- import os
- import traceback
- import warnings
- import numpy as np
- from sqlalchemy import text, delete, and_, or_, update
- import pandas as pd
- from ZlwlAlgosCommon.utils.ProUtils import *
- from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
- from ZlwlAlgosCommon.service.iotp.Beans import DataField
- from ZlwlAlgosCommon.orm.models import *
- ##导入相关的算法程序包
- from station_strategy.V_1_0_0 import data_status as ds
- from station_strategy.V_1_0_0 import data_split as dt
- from station_strategy.V_1_0_0 import scheduling_base as schb
- from station_strategy.V_1_0_0 import scheduling_static as scht
- from station_strategy.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总
- from station_strategy.V_1_0_0 import scheduling_method as schm
- from station_strategy.V_1_0_0 import data_charge_slot as dcs
- from station_strategy.V_1_0_0 import trans_day as trd
- from station_strategy.V_1_0_0 import stand_status as ss
- def main(process_num):
- # 程序不能停止
- while(True):
- warnings.filterwarnings("ignore")
- try:
- # 调用算法前的准备工作
- kafka_topic_key = 'topic_task_test_hour_half_lk'
- kafka_groupid_key = 'group_id_task_test_hour_half_lk'
- algo_list = ['station_strategy'] # 本调度所包含的算法名列表。
-
- loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
- logger_main.info(f"process-{process_num}: 配置中间件")
-
- # mysql
- mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
- mysqlUtils = MysqlUtils()
- mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
- mysql_algo_conn = mysql_algo_engine.connect()
- # kafka
- kafka_params = sysUtils.get_cf_param('kafka')
- kafkaUtils = KafkaUtils()
- kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
- #Hbase
- hbase_params = sysUtils.get_cf_param('hbase-datafactory')
- iotp_service = IotpAlgoService(hbase_params=hbase_params)
- #redis
- redis_params = sysUtils.get_cf_param('redis')
- reidsUtils = RedisUtils()
- rc = reidsUtils.get_redis_conncect(redis_params)
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
-
- # 开始准备调度
- logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
-
- for message in kafka_consumer:
- try:
- logger_main.info(f'收到调度')
- if mysql_algo_conn.close:
- mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
-
- schedule_params = json.loads(message.value)
- if (schedule_params is None) or (schedule_params ==''):
- logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
- continue
- # kafka 调度参数解析
- df_snlist = pd.DataFrame(schedule_params['snlist'])
- # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
- df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
- df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
- df_algo_param = pd.DataFrame(schedule_params['algo_list'])
- start_time_in = schedule_params['start_time']
- end_time_in = schedule_params['end_time']
- pack_code = schedule_params['pack_code']
- cell_type = schedule_params['cell_type']
- sn_list=df_snlist['sn'].tolist()
-
- now_time=pd.to_datetime(end_time_in)
- start_time=now_time-datetime.timedelta(hours=32)
- start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
- start_time1=now_time-datetime.timedelta(hours=24)
- start_time1=start_time1.strftime('%Y-%m-%d %H:%M:%S')
- end_time=now_time-datetime.timedelta(seconds=1)
- end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
- end_time1=now_time.strftime('%Y-%m-%d %H:%M:%S')
-
- if df_snlist['imei'].iloc[0][:1]=='P':
- try:
- # 取数
- time_st = time.time()
- logger_main.info(f"process-{process_num}: 开始取数{sn_list}")
- columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,DataField.bms_sta,DataField.latitude,DataField.longitude,DataField.vin]
- df_data_get = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
- logger_main.info(f'process-{process_num},获取到{len(df_data_get)}条数据,取数耗时:{time.time()-time_st}')
-
- ##数据处理
- ##bms
- df_data=df_data_get[['sn','time','pack_crnt','pack_volt', 'pack_soc','bms_sta']][df_data_get['datatype']==12]
- ##gps
- df_gps=df_data_get[['sn','time','latitude','longitude']][df_data_get['datatype']==16]
- df_gps=df_gps.replace('',np.nan)
- df_gps=df_gps.dropna(axis=0,how='any')
- df_gps["latitude"]=df_gps["latitude"].astype(float)
- df_gps["longitude"]=df_gps["longitude"].astype(float)
- ##处理经纬度为0的情况
- df_gps=df_gps.replace(0,np.nan)
- df_gps=df_gps.sort_values(["sn","time"],ascending = [True, True])
- df_gps_filled = df_gps.groupby("sn").fillna(method='ffill')
- df_gps = pd.concat([df_gps[['sn']], df_gps_filled], axis=1)
- ##vin
- df_vin=df_data_get[['sn','time','vin']][df_data_get['datatype']==50]
- df_vin["vin"].replace("","z",inplace=True)
- ##先关联gps和vin
- df_merge_gv = pd.merge(df_gps, df_vin, on=['sn','time'],how='outer')
- df_merge_gv=df_merge_gv.sort_values(["sn","time"],ascending = [True, True])
- df_merge_gv_filled = df_merge_gv.groupby("sn").fillna(method='ffill')
- df_merge_gv= pd.concat([df_merge_gv[['sn']], df_merge_gv_filled], axis=1)
- ##识别静置的情况,首先关联非静置数据用前值填充
- ##将静置数据填充0后合并
- df_merge_gv_ns,df_merge_gv_s=ss.stand_status(df_merge_gv,s_order_delta=120)
- df_merge = pd.merge(df_data, df_merge_gv_ns, on=['sn','time'],how='outer')
- df_merge=df_merge.sort_values(["sn","time"],ascending = [True, True])
- df_merge_filled = df_merge.groupby("sn").fillna(method='ffill')
- df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
- df_merge=df_merge.dropna(subset=['pack_crnt','latitude','longitude','vin'],axis=0,how='any')
-
- sql="select sn,organ_code from t_device"
- df_relation=pd.read_sql(sql, mysql_algo_conn)
- sql="select * from algo_dwd_station_location_new"
- df_location=pd.read_sql(sql, mysql_algo_conn)
-
- sql="select * from algo_dwd_station_location_new"
- organ_code_t=pd.read_sql(sql, mysql_algo_conn)
- organ_code_list = list(set(organ_code_t["organ_code"]))
-
-
- df_merge=pd.merge(df_merge,df_relation,on="sn",how="left")
-
- for organ_code in organ_code_list:
- try:
- sql="select sn from t_device where organ_code='{}' ".format(organ_code)
- df_sn=pd.read_sql_query(sql, mysql_algo_conn)
- organ_sn_list=list(df_sn["sn"])
- ##括号的结构
- organ_sn_t = tuple(organ_sn_list)
- a=df_location["latitude"][df_location["organ_code"]==organ_code].values[0]
- b=df_location["longitude"][df_location["organ_code"]==organ_code].values[0]
- center=(a,b)
- df_merge_s=df_merge[df_merge["organ_code"]==organ_code]
- if not df_merge_s.empty:
- df_merge_s=ds.data_status(df_merge_s,c_soc_dif_p=0.01,s_soc_dif_p=0,c_order_delta=60,s_order_delta=120,center=center)
- df_drive,df_charge,df_stand=dt.split(df_merge_s,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0)
- df_charge_static_base=dct.data_charge_stat(df_charge)
- df_charge_static_base["Time_e"]=pd.to_datetime(df_charge_static_base["Time_e"] )
- df_charge_static_base=df_charge_static_base[(df_charge_static_base["Time_e"]>=start_time1)&(df_charge_static_base["Time_e"]<end_time)]
- if not df_charge_static_base.empty:
- for sn in organ_sn_list:##机构下的电池
- df_new=df_charge_static_base[df_charge_static_base["sn"]==sn]
- if not df_new.empty:
- trd.trans_day(df_new,sn,start_time1,"algo_dwd_station_charge_static",mysql_algo_conn)
- df_charge_static_base.to_sql("algo_dwd_station_charge_static",con=mysql_algo_conn, if_exists="append",index=False)
- sql="select t1.* from algo_dwd_station_charge_static t1 join(select sn, max(id) as max_id from algo_dwd_station_charge_static group by sn ) t2 \
- on t1.sn=t2.sn and t1.id=t2.max_id where t1.Time_e >= '{}' and t1.sn in {} ".format(start_time1,organ_sn_t)
- df_charge_static_base_new=pd.read_sql_query(sql, mysql_algo_conn)
-
- if not df_charge_static_base_new.empty:
- df_charge_slot=dcs.charge_slot(df_charge_static_base_new,end_time)
- print(type(df_charge_slot.loc[0,"time_date"]))
- df_charge_slot["time_date"]=df_charge_slot["time_date"].dt.strftime("%Y-%m-%d")
- print(type(df_charge_slot.loc[0,"time_date"]))
- df_charge_slot.to_csv("df_charge_slot.csv",index=False)
- else:
- bb1={ "time_date":end_time[:10],
- "time_slot":end_time,
- "c_e_count":0,
- "c_b_count":0}
- df_charge_slot=pd.DataFrame(columns=["time_date","time_slot","c_e_count","c_b_count"])
- df_charge_slot.loc[len(df_charge_slot)] = bb1
- df_charge_slot["time_slot"]=dct.Halfhour(df_charge_slot["time_slot"])
- df_charge_slot.to_csv("df_charge_slot.csv",index=False)
-
- df_merge_s["time"]=pd.to_datetime(df_merge_s["time"])
- df_merge_s=df_merge_s[df_merge_s["time"]>=pd.to_datetime(start_time1)]
- if not df_merge_s.empty:
- ##高soc换电倾向客户
- sql="select * from algo_dwd_high_soc_vin where organ_code='{}' ".format(organ_code)
- soc_high_cus=pd.read_sql_query(sql, mysql_algo_conn)
- soc_high_cus_l=soc_high_cus["vin"].tolist()
- control_df_base=schb.sheduling_base(df_merge_s,drive_sts=3,stand_sts=0,charge_sts=[21,22],full_value=98,center=center,soc_high_cus_l=soc_high_cus_l)
- control_df_base.to_sql("algo_dwd_sheduling_base",con=mysql_algo_conn, if_exists="append",index=False)
- control_df_static=scht.sheduling_static(control_df_base,working_intensity=3,station=1)
- control_df_static["time_date"]=pd.to_datetime(control_df_static["time_date"]).dt.strftime("%Y-%m-%d")
- if not control_df_static.empty:
- control_df_static.to_csv("control_df_static.csv",index=False)
- print(type(control_df_static.loc[0,"time_date"]))
- control_df_static["time_date"]=pd.to_datetime(control_df_static["time_date"]).dt.strftime("%Y-%m-%d")
- print(type(control_df_static.loc[0,"time_date"]))
- else:
- bb2={ "time_date":end_time[:10],
- "time_slot":end_time,
- "charge_in":0,
- "charge_full_next":0,
- "charge_full_next2":0,
- "soc_low_1":0,
- "soc_low_2":0,
- "soc_low_3":0,
- "dist_close_1":0,
- "dist_close_2":0,
- "dist_close_3":0,
- "in_pro_1":0,
- "in_pro_2":0,
- "in_pro_3":0}
- control_df_static.loc[len(control_df_static)] = bb2
- control_df_static["time_slot"]=dct.Halfhour(control_df_static["time_slot"])
- control_df_union=pd.merge(control_df_static,df_charge_slot,on=['time_date','time_slot'],how="left")
- control_df_union=control_df_union.fillna(0)
- control_df_union["full_unused"]=control_df_union["c_e_count"]-control_df_union["c_b_count"]
-
- ##取上一个时间段的空闲电池量,进行空闲电池的累计
- sql="select * from algo_dwd_sheduling_static where organ_code={} order by id desc limit 3" .format(organ_code)
- control_df_union_old=pd.read_sql_query(sql, mysql_algo_conn)
- control_df_union_old=control_df_union_old.sort_values("id",ascending = True)
- full_unused_old=0
- if not control_df_union_old.empty:
- if len(control_df_union_old)==3:
- if (control_df_union_old['c_b_count'] == 0).all():
- full_unused_old=7
- else:
- full_unused_old=control_df_union_old["full_unused"].iloc[-1]
- if control_df_union["full_unused"].iloc[-1]+full_unused_old>=0 and control_df_union["full_unused"].iloc[-1]+full_unused_old<=7:
- control_df_union["full_unused"].iloc[-1]=control_df_union["full_unused"].iloc[-1]+full_unused_old #需要累加上次的,初始的时候需要选没进站的加7
- elif control_df_union["full_unused"].iloc[-1]+full_unused_old>7:
- control_df_union["full_unused"].iloc[-1]=7
- else:
- control_df_union["full_unused"].iloc[-1]=0
- #control_df_union["full_unused"]=control_df_union["full_unused"].cumsum(axis = 0)
- control_df_union.to_csv("control_df_union.csv",index=False)
- charge_pre,charge_need=schm.scheduling_method(control_df_union,end_time1[:10])
- control_df_union["organ_code"]=organ_code
- charge_pre["organ_code"]=organ_code
- charge_need["organ_code"]=organ_code
- control_df_union.to_sql("algo_dwd_sheduling_static",con=mysql_algo_conn, if_exists="append",index=False)
- charge_need.to_sql("algo_exchange_station_charge_need",con=mysql_algo_conn, if_exists="append",index=False)
- charge_pre.to_sql("algo_exchange_station_charge_pre",con=mysql_algo_conn, if_exists="append",index=False)
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- except Exception as e:
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- except Exception as e:
- logger_main.error(f"process-{process_num}:{pack_code}获取原始数据出错")
- logger_main.error(f"process-{process_num}:{e}")
- logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
- continue
-
- if __name__ == '__main__':
- while(True):
- try:
- # 配置量
-
- cur_env = 'dev' # 设置运行环境
- app_path = "/home/likun/project/zlwl-algos/" # 设置app绝对路径
- log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
- app_name = "station_strategy" # 应用名
-
- sysUtils = SysUtils(cur_env, app_path)
- logger_main = sysUtils.get_logger(app_name, log_base_path)
- logger_main.info(f"本次主进程号: {os.getpid()}")
- # 读取配置文件 (该部分请不要修改)
- processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
- pool = Pool(processes = int(processes))
- logger_main.info("开始分配子进程")
- for i in range(int(processes)):
- pool.apply_async(main, (i, ))
- pool.close()
- logger_main.info("进程分配结束,堵塞主进程")
- pool.join()
- except Exception as e:
- print(str(e))
- print(traceback.format_exc())
- logger_main.error(str(e))
- logger_main.error(traceback.format_exc())
- finally:
- handlers = logger_main.handlers.copy()
- for h in handlers:
- logger_main.removeHandler(h)
- pool.terminate()
|