|
@@ -0,0 +1,223 @@
|
|
|
+import subprocess as sp
|
|
|
+import time
|
|
|
+import cv2
|
|
|
+import requests
|
|
|
+import pandas as pd
|
|
|
+import json
|
|
|
+from threading import Thread
|
|
|
+from queue import Queue
|
|
|
+import math
|
|
|
+import yaml
|
|
|
+import sys
|
|
|
+import os
|
|
|
+import datetime
|
|
|
+import dateutil.relativedelta
|
|
|
+
|
|
|
+# 默认当前目录
|
|
|
+config_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
+# 有配置目录参数,取配置的目录
|
|
|
+args = sys.argv
|
|
|
+print(args)
|
|
|
+if len(args)>1:
|
|
|
+ config_dir=args[1]
|
|
|
+
|
|
|
+config_file_path = os.path.join(config_dir, "config.yaml")
|
|
|
+config_file = open(config_file_path, "r", encoding="utf-8")
|
|
|
+configs = yaml.load(config_file)
|
|
|
+print(configs)
|
|
|
+
|
|
|
+# 实时图像中继url
|
|
|
+relay_url = configs["relay_url"]
|
|
|
+
|
|
|
+# 数据通知url
|
|
|
+post_url = configs["post_url"]
|
|
|
+# 脚本excel
|
|
|
+script_excel = configs["script_excel"]
|
|
|
+# 视频文件
|
|
|
+video_file = configs["video_file"]
|
|
|
+# 开启调试视频
|
|
|
+debug_video = configs["debug_video"]
|
|
|
+# 读取脚本文件
|
|
|
+dfs = pd.read_excel(script_excel, sheet_name=["script","user","reset"])
|
|
|
+df = dfs.get("script")
|
|
|
+udf = dfs.get("user")
|
|
|
+rdf = dfs.get("reset")
|
|
|
+records = df.to_dict("records")
|
|
|
+users = udf.to_dict("records")
|
|
|
+resets = rdf.to_dict("records")[0]
|
|
|
+# print(resets)
|
|
|
+# print("read: {} records".format(len(records)))
|
|
|
+script = {}
|
|
|
+for i in range(len(records)):
|
|
|
+ # print(records[i]["userInfo"])
|
|
|
+ ts = records[i]["time"]
|
|
|
+ p = records[i]["process"].replace("\n","").replace("\t","") if not pd.isnull(records[i]["process"]) else None
|
|
|
+ a = records[i]["carAnimation"].replace("\n","").replace("\t","") if not pd.isnull(records[i]["carAnimation"]) else None
|
|
|
+ u = True if not pd.isnull(records[i]["userInfo"]) else False
|
|
|
+ # if u:
|
|
|
+ # print("----------")
|
|
|
+ # print(records[i]["time"],u)
|
|
|
+ h,m,s,ms = ts.split("-")
|
|
|
+ h,m,s,ms = int(h),int(m),int(s),int(ms)
|
|
|
+ ts_1 = ms+s*1000+m*1000*60+h*1000*60*60
|
|
|
+ # 每秒60帧
|
|
|
+ offsetFrames = int(ts_1*30/1000)
|
|
|
+ # print(i, p, a)
|
|
|
+ d = {"ts":ts}
|
|
|
+ if p:
|
|
|
+ d.update({"process": json.loads(p)})
|
|
|
+ if a:
|
|
|
+ d.update({"carAnimation": json.loads(a)})
|
|
|
+ d.update({"userInfo": u})
|
|
|
+ script[offsetFrames] = d
|
|
|
+rps = resets["process"].replace("\n","").replace("\t","") if not pd.isnull(resets["process"]) else None
|
|
|
+ras = resets["carAnimation"].replace("\n","").replace("\t","") if not pd.isnull(resets["carAnimation"]) else None
|
|
|
+rpds = resets["positionData"].replace("\n","").replace("\t","") if not pd.isnull(resets["carAnimation"]) else None
|
|
|
+
|
|
|
+rd = {"ts":ts}
|
|
|
+if rps:
|
|
|
+ rd.update({"process": json.loads(rps)})
|
|
|
+if ras:
|
|
|
+ rd.update({"carAnimation": json.loads(ras)})
|
|
|
+if rpds:
|
|
|
+ rd.update({"positionData": json.loads(rpds)})
|
|
|
+# print(rd)
|
|
|
+# exit(0)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+queue = Queue()
|
|
|
+
|
|
|
+# 定义通知worker的线程函数
|
|
|
+def worker(q):
|
|
|
+ user_index = 0
|
|
|
+ action = {"type":"reset", "data":{"process":rd["process"], "carAnimation":rd["carAnimation"], "positionData":rd["positionData"], "t":rd["ts"]}}
|
|
|
+ try:
|
|
|
+ re = requests.post(post_url, json = action)
|
|
|
+ print("-------action:", action)
|
|
|
+ print(re)
|
|
|
+ except Exception as e:
|
|
|
+ print("Error:")
|
|
|
+ print(e)
|
|
|
+ while True:
|
|
|
+ actions = q.get()
|
|
|
+ t, p, a, u=None,None,None,False
|
|
|
+ t = actions.get("ts", None)
|
|
|
+ p = actions.get("process", None)
|
|
|
+ a = actions.get("carAnimation", None)
|
|
|
+ u = actions.get("userInfo", False)
|
|
|
+ # 控制换电流程状态
|
|
|
+ if p:
|
|
|
+ action = {"type":"process", "data":{"process":p, "t":t}}
|
|
|
+ try:
|
|
|
+ re = requests.post(post_url, json = action)
|
|
|
+ print("-------action:", action)
|
|
|
+ print(re)
|
|
|
+ except Exception as e:
|
|
|
+ print("Error:")
|
|
|
+ print(e)
|
|
|
+ # 控制动画
|
|
|
+ if a:
|
|
|
+ action = {"type":"carAnimation", "data":{"carAnimation":a, "t":t}}
|
|
|
+ try:
|
|
|
+ re = requests.post(post_url, json = action)
|
|
|
+ print("-------action:", action)
|
|
|
+ print(re)
|
|
|
+ except Exception as e:
|
|
|
+ print("Error:")
|
|
|
+ print(e)
|
|
|
+ # 用户信息推送
|
|
|
+ if u:
|
|
|
+ user = users[user_index]
|
|
|
+ # print("---user:", user)
|
|
|
+ action = {"type": "baseInfo", "data":{"baseInfo":user}}
|
|
|
+ try:
|
|
|
+ re = requests.post(post_url, json = action)
|
|
|
+ print("-------action:", action)
|
|
|
+ print(re)
|
|
|
+ except Exception as e:
|
|
|
+ print("Error:")
|
|
|
+ print(e)
|
|
|
+ user_index = (user_index+1) % len(users)
|
|
|
+
|
|
|
+t = Thread(target=worker,args=(queue,))
|
|
|
+t.start()
|
|
|
+
|
|
|
+
|
|
|
+# 打开视频
|
|
|
+cap=cv2.VideoCapture(video_file)
|
|
|
+# 总帧数
|
|
|
+total_frame_cnt = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
+# fps
|
|
|
+fps = int(cap.get(cv2.CAP_PROP_FPS))
|
|
|
+
|
|
|
+# 执行ffmpeg管道命令
|
|
|
+command= [
|
|
|
+ 'ffmpeg',
|
|
|
+ '-f', 'rawvideo',
|
|
|
+ '-vcodec', 'rawvideo',
|
|
|
+ '-pix_fmt', 'bgr24',
|
|
|
+ '-s', '512x288',
|
|
|
+ '-r','30',
|
|
|
+ '-i', '-',
|
|
|
+ '-r','30',
|
|
|
+ '-q', '0',
|
|
|
+ '-codec:v', 'mpeg1video',
|
|
|
+ '-f', 'mpegts',
|
|
|
+ relay_url
|
|
|
+]
|
|
|
+pipe = sp.Popen(command,stdin=sp.PIPE)
|
|
|
+# exit(0)
|
|
|
+# 每帧间隔 ms
|
|
|
+delay = 1000/fps
|
|
|
+wait = round(delay/1000.,2)
|
|
|
+print("-----------", total_frame_cnt, fps, delay,wait)
|
|
|
+
|
|
|
+# 启动推流子进程,不断推流
|
|
|
+
|
|
|
+time.sleep(1)
|
|
|
+
|
|
|
+frame_counter = 0
|
|
|
+while (cap.isOpened()):
|
|
|
+ print("+++++++++++++", frame_counter)
|
|
|
+ ret, frame = cap.read()
|
|
|
+ if not ret:
|
|
|
+ print("not ret")
|
|
|
+ break
|
|
|
+
|
|
|
+ frame = cv2.resize(frame, (512, 288))
|
|
|
+ # 1. 图像写入管道
|
|
|
+ # 模n采样
|
|
|
+ if frame_counter%3==0:
|
|
|
+ nt = datetime.datetime.now()
|
|
|
+ nt+=dateutil.relativedelta.relativedelta(seconds=-2)
|
|
|
+ timeStr = nt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
+ cv2.putText(frame, timeStr, (10, 20), cv2.FONT_ITALIC, .4, (255, 255, 255), 1)
|
|
|
+ try:
|
|
|
+ pipe.stdin.write(frame.tostring())
|
|
|
+ except Exception as e:
|
|
|
+ print("may be broken pipe")
|
|
|
+ print(e)
|
|
|
+ break
|
|
|
+ actions = script.get(frame_counter, None)
|
|
|
+
|
|
|
+ if actions:
|
|
|
+ queue.put(actions)
|
|
|
+ # print("---------",wait)
|
|
|
+ # print(frame_counter,"+++++++++++",delay, waste, wait)
|
|
|
+ if debug_video:
|
|
|
+ cv2.imshow('show', frame)
|
|
|
+ if cv2.waitKey(math.ceil(delay*0.5)) == ord('q'): # 按Q退出
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ time.sleep(delay*0.5/1000)
|
|
|
+ # time.sleep(wait)
|
|
|
+ frame_counter += 1
|
|
|
+ if frame_counter >= total_frame_cnt:
|
|
|
+ frame_counter = 0
|
|
|
+ cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
|
|
+
|
|
|
+print("break out")
|
|
|
+cap.release()
|
|
|
+pipe.terminate()
|
|
|
+t.join()
|