90 lines
No EOL
3.8 KiB
Python
90 lines
No EOL
3.8 KiB
Python
import websockets
|
|
import asyncio
|
|
import numpy as np
|
|
from ultralytics import YOLO
|
|
import time
|
|
import cv2
|
|
|
|
classNames = ["person", "bicycle", "car", "motorbike", "aeroplane", "bus", "train", "truck", "boat",
|
|
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
|
|
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella",
|
|
"handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat",
|
|
"baseball glove", "skateboard", "surfboard", "tennis racket", "bottle", "wine glass", "cup",
|
|
"fork", "knife", "spoon", "bowl", "banana", "apple", "sandwich", "orange", "broccoli",
|
|
"carrot", "hot dog", "pizza", "donut", "cake", "chair", "sofa", "pottedplant", "bed",
|
|
"diningtable", "toilet", "tvmonitor", "laptop", "mouse", "remote", "keyboard", "cell phone",
|
|
"microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors",
|
|
"teddy bear", "hair drier", "toothbrush"
|
|
]
|
|
|
|
model = YOLO('yolov8s.pt') # Load an official Detect model
|
|
|
|
async def handle_connection(websocket, path):
|
|
print(f"Connection from: {path}")
|
|
try:
|
|
while True:
|
|
raw_data = await websocket.recv()
|
|
|
|
nparr = np.frombuffer(raw_data, np.uint8).reshape((480, 640, 3))
|
|
|
|
# frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
frame = cv2.cvtColor(nparr, cv2.COLOR_BGR2RGB)
|
|
cv2.imshow("from_remote", frame)
|
|
|
|
# Perform object detection
|
|
results = model.track(frame, persist=True)
|
|
|
|
for r in results:
|
|
lines = ""
|
|
boxes = r.boxes
|
|
for box in boxes:
|
|
if box.cls[0].item() == 0 and not box.id is None:
|
|
# bounding box
|
|
id = box.id.int().cpu().tolist()
|
|
# x1, y1, x2, y2 = box.xyxy[0]
|
|
# # 2.08333 = 1/480 * 1000 or normalize, then save 4 sig-figures and cast to int
|
|
# # 1.5625 = 1/640 * 1000 or normalize, then save 4 sig-figures and cast to int
|
|
|
|
# x1, x2, y1, y2 = int(x1), int(x2), int(y1), int(y2)
|
|
|
|
# # put box in cam
|
|
# cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 255), 3)
|
|
|
|
# # class name
|
|
# cls = int(box.cls[0])
|
|
|
|
# # object details
|
|
# org = [x1, y1]
|
|
# org2 = [x1, y1+50]
|
|
# font = cv2.FONT_HERSHEY_SIMPLEX
|
|
# fontScale = 1
|
|
# color = (255, 0, 0)
|
|
# color_w = (255, 255, 255)
|
|
# thickness = 2
|
|
|
|
# cv2.putText(frame, classNames[cls], org, font, fontScale, color, thickness)
|
|
# cv2.putText(frame, str(id), org2, font, fontScale, color_w, thickness)
|
|
|
|
x1, y1, x2, y2 = box.xyxyn[0]
|
|
x1, y1, x2, y2 = int(x1 * 1000), int(y1 * 1000), int(x2 * 1000), int(y2 * 1000)
|
|
lines += f"{id} {x1}:{y1} {x2}:{y2}\n"
|
|
|
|
# cv2.imshow('Webcam', frame)
|
|
# if cv2.waitKey(1) == ord('q'):
|
|
# break
|
|
|
|
ret = lines.encode('utf-8')
|
|
|
|
cv2.waitKey(80)
|
|
|
|
await websocket.send(ret)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
print(f"Client disconnected: {path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
start_server = websockets.serve(handle_connection, "0.0.0.0", 6543)
|
|
asyncio.get_event_loop().run_until_complete(start_server)
|
|
|
|
asyncio.get_event_loop().run_forever() |