main.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. import datetime,time
  2. from multiprocessing import Pool
  3. import json
  4. import os
  5. import traceback
  6. import warnings
  7. import numpy as np
  8. from sqlalchemy import text, delete, and_, or_, update
  9. import pandas as pd
  10. from ZlwlAlgosCommon.utils.ProUtils import *
  11. from ZlwlAlgosCommon.service.iotp.IotpAlgoService import IotpAlgoService
  12. from ZlwlAlgosCommon.service.iotp.Beans import DataField
  13. from ZlwlAlgosCommon.orm.models import *
  14. ##导入相关的算法程序包
  15. from station_strategy.V_1_0_0 import data_status as ds
  16. from station_strategy.V_1_0_0 import data_split as dt
  17. from station_strategy.V_1_0_0 import scheduling_base as schb
  18. from station_strategy.V_1_0_0 import scheduling_static as scht
  19. from station_strategy.V_1_0_0 import data_charge_stat as dct ##充电数据按充电段汇总
  20. from station_strategy.V_1_0_0 import scheduling_method as schm
  21. from station_strategy.V_1_0_0 import data_charge_slot as dcs
  22. from station_strategy.V_1_0_0 import trans_day as trd
  23. from station_strategy.V_1_0_0 import stand_status as ss
  24. def main(process_num):
  25. # 程序不能停止
  26. while(True):
  27. warnings.filterwarnings("ignore")
  28. try:
  29. # 调用算法前的准备工作
  30. kafka_topic_key = 'topic_task_test_hour_half_lk'
  31. kafka_groupid_key = 'group_id_task_test_hour_half_lk'
  32. algo_list = ['station_strategy'] # 本调度所包含的算法名列表。
  33. loggers = sysUtils.get_loggers(algo_list, log_base_path, process_num) # 为每个算法分配一个logger
  34. logger_main.info(f"process-{process_num}: 配置中间件")
  35. # mysql
  36. mysql_algo_params = sysUtils.get_cf_param('mysql-algo')
  37. mysqlUtils = MysqlUtils()
  38. mysql_algo_engine, mysql_algo_Session= mysqlUtils.get_mysql_engine(mysql_algo_params)
  39. mysql_algo_conn = mysql_algo_engine.connect()
  40. # kafka
  41. kafka_params = sysUtils.get_cf_param('kafka')
  42. kafkaUtils = KafkaUtils()
  43. kafka_consumer = kafkaUtils.get_kafka_consumer(kafka_params, kafka_topic_key, kafka_groupid_key, client_id=kafka_topic_key)
  44. #Hbase
  45. hbase_params = sysUtils.get_cf_param('hbase-datafactory')
  46. iotp_service = IotpAlgoService(hbase_params=hbase_params)
  47. #redis
  48. redis_params = sysUtils.get_cf_param('redis')
  49. reidsUtils = RedisUtils()
  50. rc = reidsUtils.get_redis_conncect(redis_params)
  51. except Exception as e:
  52. logger_main.error(str(e))
  53. logger_main.error(traceback.format_exc())
  54. # 开始准备调度
  55. logger_main.info(f"process-{process_num}: 监听topic {kafka_params[kafka_topic_key]}等待kafka 调度")
  56. for message in kafka_consumer:
  57. try:
  58. logger_main.info(f'收到调度')
  59. if mysql_algo_conn.close:
  60. mysql_algo_conn = mysql_algo_engine.connect() # 从连接池中获取一个myslq连接
  61. schedule_params = json.loads(message.value)
  62. if (schedule_params is None) or (schedule_params ==''):
  63. logger_main.info('{} kafka数据异常,跳过本次运算'.format(str(message.value)))
  64. continue
  65. # kafka 调度参数解析
  66. df_snlist = pd.DataFrame(schedule_params['snlist'])
  67. # df_algo_adjustable_param = pd.DataFrame([(d['algo_id'], d['param'],d['param_ai']) for d in schedule_params['adjustable_param']], columns=['algo_id', 'param','param_ai'])
  68. df_algo_pack_param = json.loads(schedule_params['pack_param'][0]['param'])
  69. df_algo_pack_param = {k: eval(v) if isinstance(v, str) else v for k, v in df_algo_pack_param.items()}
  70. df_algo_param = pd.DataFrame(schedule_params['algo_list'])
  71. start_time_in = schedule_params['start_time']
  72. end_time_in = schedule_params['end_time']
  73. pack_code = schedule_params['pack_code']
  74. cell_type = schedule_params['cell_type']
  75. sn_list=df_snlist['sn'].tolist()
  76. now_time=pd.to_datetime(end_time_in)
  77. start_time=now_time-datetime.timedelta(hours=32)
  78. start_time=start_time.strftime('%Y-%m-%d %H:%M:%S')
  79. start_time1=now_time-datetime.timedelta(hours=24)
  80. start_time1=start_time1.strftime('%Y-%m-%d %H:%M:%S')
  81. end_time=now_time-datetime.timedelta(seconds=1)
  82. end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
  83. end_time1=now_time.strftime('%Y-%m-%d %H:%M:%S')
  84. if df_snlist['imei'].iloc[0][:1]=='P':
  85. try:
  86. # 取数
  87. time_st = time.time()
  88. logger_main.info(f"process-{process_num}: 开始取数{sn_list}")
  89. columns = [ DataField.time, DataField.sn, DataField.pack_crnt, DataField.pack_volt, DataField.pack_soc,DataField.bms_sta,DataField.latitude,DataField.longitude,DataField.vin]
  90. df_data_get = iotp_service.get_data(sn_list=sn_list, columns=columns, start_time=start_time, end_time=end_time)
  91. logger_main.info(f'process-{process_num},获取到{len(df_data_get)}条数据,取数耗时:{time.time()-time_st}')
  92. ##数据处理
  93. ##bms
  94. df_data=df_data_get[['sn','time','pack_crnt','pack_volt', 'pack_soc','bms_sta']][df_data_get['datatype']==12]
  95. ##gps
  96. df_gps=df_data_get[['sn','time','latitude','longitude']][df_data_get['datatype']==16]
  97. df_gps=df_gps.replace('',np.nan)
  98. df_gps=df_gps.dropna(axis=0,how='any')
  99. df_gps["latitude"]=df_gps["latitude"].astype(float)
  100. df_gps["longitude"]=df_gps["longitude"].astype(float)
  101. ##处理经纬度为0的情况
  102. df_gps=df_gps.replace(0,np.nan)
  103. df_gps=df_gps.sort_values(["sn","time"],ascending = [True, True])
  104. df_gps_filled = df_gps.groupby("sn").fillna(method='ffill')
  105. df_gps = pd.concat([df_gps[['sn']], df_gps_filled], axis=1)
  106. ##vin
  107. df_vin=df_data_get[['sn','time','vin']][df_data_get['datatype']==50]
  108. df_vin["vin"].replace("","z",inplace=True)
  109. ##先关联gps和vin
  110. df_merge_gv = pd.merge(df_gps, df_vin, on=['sn','time'],how='outer')
  111. df_merge_gv=df_merge_gv.sort_values(["sn","time"],ascending = [True, True])
  112. df_merge_gv_filled = df_merge_gv.groupby("sn").fillna(method='ffill')
  113. df_merge_gv= pd.concat([df_merge_gv[['sn']], df_merge_gv_filled], axis=1)
  114. ##识别静置的情况,首先关联非静置数据用前值填充
  115. ##将静置数据填充0后合并
  116. df_merge_gv_ns,df_merge_gv_s=ss.stand_status(df_merge_gv,s_order_delta=120)
  117. df_merge = pd.merge(df_data, df_merge_gv_ns, on=['sn','time'],how='outer')
  118. df_merge=df_merge.sort_values(["sn","time"],ascending = [True, True])
  119. df_merge_filled = df_merge.groupby("sn").fillna(method='ffill')
  120. df_merge = pd.concat([df_merge[['sn']], df_merge_filled], axis=1)
  121. df_merge=df_merge.dropna(subset=['pack_crnt','latitude','longitude','vin'],axis=0,how='any')
  122. sql="select sn,organ_code from t_device"
  123. df_relation=pd.read_sql(sql, mysql_algo_conn)
  124. sql="select * from algo_dwd_station_location_new"
  125. df_location=pd.read_sql(sql, mysql_algo_conn)
  126. sql="select * from algo_dwd_station_location_new"
  127. organ_code_t=pd.read_sql(sql, mysql_algo_conn)
  128. organ_code_list = list(set(organ_code_t["organ_code"]))
  129. df_merge=pd.merge(df_merge,df_relation,on="sn",how="left")
  130. for organ_code in organ_code_list:
  131. try:
  132. sql="select sn from t_device where organ_code='{}' ".format(organ_code)
  133. df_sn=pd.read_sql_query(sql, mysql_algo_conn)
  134. organ_sn_list=list(df_sn["sn"])
  135. ##括号的结构
  136. organ_sn_t = tuple(organ_sn_list)
  137. a=df_location["latitude"][df_location["organ_code"]==organ_code].values[0]
  138. b=df_location["longitude"][df_location["organ_code"]==organ_code].values[0]
  139. center=(a,b)
  140. df_merge_s=df_merge[df_merge["organ_code"]==organ_code]
  141. if not df_merge_s.empty:
  142. df_merge_s=ds.data_status(df_merge_s,c_soc_dif_p=0.01,s_soc_dif_p=0,c_order_delta=60,s_order_delta=120,center=center)
  143. df_drive,df_charge,df_stand=dt.split(df_merge_s,drive_interval_time_min=1200,charge_interval_time_min=1200,stand_interval_time_min=1200,single_num_min=3,drive_sts=3,charge_sts=[21,22],stand_sts=0)
  144. df_charge_static_base=dct.data_charge_stat(df_charge)
  145. df_charge_static_base["Time_e"]=pd.to_datetime(df_charge_static_base["Time_e"] )
  146. df_charge_static_base=df_charge_static_base[(df_charge_static_base["Time_e"]>=start_time1)&(df_charge_static_base["Time_e"]<end_time)]
  147. if not df_charge_static_base.empty:
  148. for sn in organ_sn_list:##机构下的电池
  149. df_new=df_charge_static_base[df_charge_static_base["sn"]==sn]
  150. if not df_new.empty:
  151. trd.trans_day(df_new,sn,start_time1,"algo_dwd_station_charge_static",mysql_algo_conn)
  152. df_charge_static_base.to_sql("algo_dwd_station_charge_static",con=mysql_algo_conn, if_exists="append",index=False)
  153. sql="select t1.* from algo_dwd_station_charge_static t1 join(select sn, max(id) as max_id from algo_dwd_station_charge_static group by sn ) t2 \
  154. on t1.sn=t2.sn and t1.id=t2.max_id where t1.Time_e >= '{}' and t1.sn in {} ".format(start_time1,organ_sn_t)
  155. df_charge_static_base_new=pd.read_sql_query(sql, mysql_algo_conn)
  156. if not df_charge_static_base_new.empty:
  157. df_charge_slot=dcs.charge_slot(df_charge_static_base_new,end_time)
  158. print(type(df_charge_slot.loc[0,"time_date"]))
  159. df_charge_slot["time_date"]=df_charge_slot["time_date"].dt.strftime("%Y-%m-%d")
  160. print(type(df_charge_slot.loc[0,"time_date"]))
  161. df_charge_slot.to_csv("df_charge_slot.csv",index=False)
  162. else:
  163. bb1={ "time_date":end_time[:10],
  164. "time_slot":end_time,
  165. "c_e_count":0,
  166. "c_b_count":0}
  167. df_charge_slot=pd.DataFrame(columns=["time_date","time_slot","c_e_count","c_b_count"])
  168. df_charge_slot.loc[len(df_charge_slot)] = bb1
  169. df_charge_slot["time_slot"]=dct.Halfhour(df_charge_slot["time_slot"])
  170. df_charge_slot.to_csv("df_charge_slot.csv",index=False)
  171. df_merge_s["time"]=pd.to_datetime(df_merge_s["time"])
  172. df_merge_s=df_merge_s[df_merge_s["time"]>=pd.to_datetime(start_time1)]
  173. if not df_merge_s.empty:
  174. ##高soc换电倾向客户
  175. sql="select * from algo_dwd_high_soc_vin where organ_code='{}' ".format(organ_code)
  176. soc_high_cus=pd.read_sql_query(sql, mysql_algo_conn)
  177. soc_high_cus_l=soc_high_cus["vin"].tolist()
  178. control_df_base=schb.sheduling_base(df_merge_s,drive_sts=3,stand_sts=0,charge_sts=[21,22],full_value=98,center=center,soc_high_cus_l=soc_high_cus_l)
  179. control_df_base.to_sql("algo_dwd_sheduling_base",con=mysql_algo_conn, if_exists="append",index=False)
  180. control_df_static=scht.sheduling_static(control_df_base,working_intensity=3,station=1)
  181. control_df_static["time_date"]=pd.to_datetime(control_df_static["time_date"]).dt.strftime("%Y-%m-%d")
  182. if not control_df_static.empty:
  183. control_df_static.to_csv("control_df_static.csv",index=False)
  184. print(type(control_df_static.loc[0,"time_date"]))
  185. control_df_static["time_date"]=pd.to_datetime(control_df_static["time_date"]).dt.strftime("%Y-%m-%d")
  186. print(type(control_df_static.loc[0,"time_date"]))
  187. else:
  188. bb2={ "time_date":end_time[:10],
  189. "time_slot":end_time,
  190. "charge_in":0,
  191. "charge_full_next":0,
  192. "charge_full_next2":0,
  193. "soc_low_1":0,
  194. "soc_low_2":0,
  195. "soc_low_3":0,
  196. "dist_close_1":0,
  197. "dist_close_2":0,
  198. "dist_close_3":0,
  199. "in_pro_1":0,
  200. "in_pro_2":0,
  201. "in_pro_3":0}
  202. control_df_static.loc[len(control_df_static)] = bb2
  203. control_df_static["time_slot"]=dct.Halfhour(control_df_static["time_slot"])
  204. control_df_union=pd.merge(control_df_static,df_charge_slot,on=['time_date','time_slot'],how="left")
  205. control_df_union=control_df_union.fillna(0)
  206. control_df_union["full_unused"]=control_df_union["c_e_count"]-control_df_union["c_b_count"]
  207. ##取上一个时间段的空闲电池量,进行空闲电池的累计
  208. sql="select * from algo_dwd_sheduling_static where organ_code={} order by id desc limit 3" .format(organ_code)
  209. control_df_union_old=pd.read_sql_query(sql, mysql_algo_conn)
  210. control_df_union_old=control_df_union_old.sort_values("id",ascending = True)
  211. full_unused_old=0
  212. if not control_df_union_old.empty:
  213. if len(control_df_union_old)==3:
  214. if (control_df_union_old['c_b_count'] == 0).all():
  215. full_unused_old=7
  216. else:
  217. full_unused_old=control_df_union_old["full_unused"].iloc[-1]
  218. if control_df_union["full_unused"].iloc[-1]+full_unused_old>=0 and control_df_union["full_unused"].iloc[-1]+full_unused_old<=7:
  219. control_df_union["full_unused"].iloc[-1]=control_df_union["full_unused"].iloc[-1]+full_unused_old #需要累加上次的,初始的时候需要选没进站的加7
  220. elif control_df_union["full_unused"].iloc[-1]+full_unused_old>7:
  221. control_df_union["full_unused"].iloc[-1]=7
  222. else:
  223. control_df_union["full_unused"].iloc[-1]=0
  224. #control_df_union["full_unused"]=control_df_union["full_unused"].cumsum(axis = 0)
  225. control_df_union.to_csv("control_df_union.csv",index=False)
  226. charge_pre,charge_need=schm.scheduling_method(control_df_union,end_time1[:10])
  227. control_df_union["organ_code"]=organ_code
  228. charge_pre["organ_code"]=organ_code
  229. charge_need["organ_code"]=organ_code
  230. control_df_union.to_sql("algo_dwd_sheduling_static",con=mysql_algo_conn, if_exists="append",index=False)
  231. charge_need.to_sql("algo_exchange_station_charge_need",con=mysql_algo_conn, if_exists="append",index=False)
  232. charge_pre.to_sql("algo_exchange_station_charge_pre",con=mysql_algo_conn, if_exists="append",index=False)
  233. except Exception as e:
  234. logger_main.error(str(e))
  235. logger_main.error(traceback.format_exc())
  236. except Exception as e:
  237. logger_main.error(str(e))
  238. logger_main.error(traceback.format_exc())
  239. except Exception as e:
  240. logger_main.error(f"process-{process_num}:{pack_code}获取原始数据出错")
  241. logger_main.error(f"process-{process_num}:{e}")
  242. logger_main.error(f"process-{process_num}:{traceback.format_exc()}")
  243. continue
  244. if __name__ == '__main__':
  245. while(True):
  246. try:
  247. # 配置量
  248. cur_env = 'dev' # 设置运行环境
  249. app_path = "/home/likun/project/zlwl-algos/" # 设置app绝对路径
  250. log_base_path = f"{os.path.dirname(os.path.abspath(__file__))}/log" # 设置日志路径
  251. app_name = "station_strategy" # 应用名
  252. sysUtils = SysUtils(cur_env, app_path)
  253. logger_main = sysUtils.get_logger(app_name, log_base_path)
  254. logger_main.info(f"本次主进程号: {os.getpid()}")
  255. # 读取配置文件 (该部分请不要修改)
  256. processes = int(sysUtils.env_params.get("PROCESS_NUM_PER_NODE", '1')) # 默认为1个进程
  257. pool = Pool(processes = int(processes))
  258. logger_main.info("开始分配子进程")
  259. for i in range(int(processes)):
  260. pool.apply_async(main, (i, ))
  261. pool.close()
  262. logger_main.info("进程分配结束,堵塞主进程")
  263. pool.join()
  264. except Exception as e:
  265. print(str(e))
  266. print(traceback.format_exc())
  267. logger_main.error(str(e))
  268. logger_main.error(traceback.format_exc())
  269. finally:
  270. handlers = logger_main.handlers.copy()
  271. for h in handlers:
  272. logger_main.removeHandler(h)
  273. pool.terminate()