Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions dataflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
nodes:
- id: webcam
operator:
build: pip install -r requirements.txt
python: webcam.py
inputs:
tick: dora/timer/millis/50
outputs:
- image

- id: object_detection
operator:
build: pip install -r requirements.txt
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- stdout

- id: plot
operator:
build: pip install -r requirements.txt
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox
assistant_message: object_detection/stdout
5 changes: 2 additions & 3 deletions examples/python-operator-dataflow/dataflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@ nodes:
operator:
build: pip install -r requirements.txt
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- stdout
- latency

- id: plot
operator:
Expand All @@ -26,4 +25,4 @@ nodes:
inputs:
image: webcam/image
bbox: object_detection/bbox
assistant_message: object_detection/stdout
latency: object_detection/latency
63 changes: 40 additions & 23 deletions examples/python-operator-dataflow/object_detection.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,56 @@
"""TODO: Add docstring."""

import os
import time
import numpy as np
import pyarrow as pa
from dora import DoraStatus
from ultralytics import YOLO

# Configuration
YOLO_MODEL_PATH = os.getenv("YOLO_MODEL", "yolov8n.pt")
CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480


model = YOLO("yolov8n.pt")
# Must be a multiple of 32 for optimal YOLO performance
IMGSZ = 320


class Operator:
"""Inferring object from images."""

def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
"""TODO: Add docstring."""
def __init__(self):
self.model = YOLO(YOLO_MODEL_PATH)

def on_event(self, dora_event, send_output) -> DoraStatus:
if dora_event["type"] == "INPUT":
frame = (
dora_event["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
if dora_event["id"] != "image":
return DoraStatus.CONTINUE

start_inference = time.perf_counter()

# Dynamic reshape — works on any camera resolution, not hardcoded to 480
frame = dora_event["value"].to_numpy().reshape((-1, CAMERA_WIDTH, 3)).copy()

# No BGR->RGB flip needed — Ultralytics handles color internally
results = self.model(
frame,
verbose=False,
imgsz=IMGSZ,
conf=0.25,
)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)
results = model(frame, verbose=False) # includes NMS
# Process results

latency = (time.perf_counter() - start_inference) * 1000

# Extract boxes, confidence scores, and class labels
boxes = np.array(results[0].boxes.xyxy.cpu())
conf = np.array(results[0].boxes.conf.cpu())
label = np.array(results[0].boxes.cls.cpu())
# concatenate them together
arrays = np.concatenate((boxes, conf[:, None], label[:, None]), axis=1)

send_output("bbox", pa.array(arrays.ravel()), dora_event["metadata"])
# FIXED: np.zeros((0,6)) so plot.py reshape(-1,6) never fails on empty
if len(boxes) > 0:
arrays = np.concatenate(
(boxes, conf[:, None], label[:, None]), axis=1
).astype(np.float32)
else:
arrays = np.zeros((0, 6), dtype=np.float32)

# Send outputs with clean metadata
send_output("bbox", pa.array(arrays.ravel()), {})
send_output("latency", pa.array([latency]), {})

return DoraStatus.CONTINUE
return DoraStatus.CONTINUE
228 changes: 120 additions & 108 deletions examples/python-operator-dataflow/plot.py
Original file line number Diff line number Diff line change
@@ -1,118 +1,130 @@
"""TODO: Add docstring."""

import os

import cv2
import time
import numpy as np
from dora import DoraStatus
from utils import LABELS

CI = os.environ.get("CI")
# HEADLESS FIX: Must be set BEFORE cv2 import
# cv2 reads QT_QPA_PLATFORM at import time, not at runtime
if os.environ.get("GITHUB_ACTIONS") == "true":
os.environ["QT_QPA_PLATFORM"] = "offscreen"
IS_HEADLESS = True
elif os.environ.get("QT_QPA_PLATFORM") == "offscreen":
IS_HEADLESS = True
else:
IS_HEADLESS = False

CAMERA_WIDTH = 640
CAMERA_HEIGHT = 480
import cv2 # noqa: E402

FONT = cv2.FONT_HERSHEY_SIMPLEX
CAMERA_WIDTH = 640


class Operator:
"""Plot image and bounding box."""

def __init__(self):
"""TODO: Add docstring."""
self.bboxs = []
self.buffer = ""
self.submitted = []
self.lines = []

def on_event(
self,
dora_event,
send_output,
):
"""TODO: Add docstring."""
if dora_event["type"] == "INPUT":
id = dora_event["id"]
value = dora_event["value"]
if id == "image":

image = (
value.to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3)).copy()
)

for bbox in self.bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
)
cv2.putText(
image,
f"{LABELS[int(label)]}, {confidence:0.2f}",
(int(max_x), int(max_y)),
FONT,
0.5,
(0, 255, 0),
)

cv2.putText(
image, self.buffer, (20, 14 + 21 * 14), FONT, 0.5, (190, 250, 0), 1,
)

i = 0
for text in self.submitted[::-1]:
color = (
(0, 255, 190)
if text["role"] == "user_message"
else (0, 190, 255)
)
cv2.putText(
image,
text["content"],
(
20,
14 + (19 - i) * 14,
),
FONT,
0.5,
color,
1,
)
i += 1

for line in self.lines:
cv2.line(
image,
(int(line[0]), int(line[1])),
(int(line[2]), int(line[3])),
(0, 0, 255),
2,
)

if CI != "true":
cv2.imshow("frame", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP
elif id == "bbox":
self.bboxs = value.to_numpy().reshape((-1, 6))
elif id == "keyboard_buffer":
self.buffer = value[0].as_py()
elif id == "line":
self.lines += [value.to_pylist()]
elif "message" in id:
self.submitted += [
{
"role": id,
"content": value[0].as_py(),
},
]

return DoraStatus.CONTINUE
# Initialize as numpy array not list — safe to iterate always
self.bboxs = np.zeros((0, 6))
self.inference_latency = 0
self.last_time = time.time()
self.fps = 0

def on_event(self, dora_event, send_output):
try:
if dora_event["type"] == "INPUT":
event_id = dora_event["id"]

# 1. IMAGE INPUT
if event_id == "image":
try:
image_array = dora_event["value"].to_numpy()
# Dynamic reshape — no hardcoded height
image = image_array.reshape((-1, CAMERA_WIDTH, 3)).copy()

# FPS Calculation
curr = time.time()
elapsed = curr - self.last_time
self.fps = 1.0 / elapsed if elapsed > 0 else 0
self.last_time = curr

# Draw Bounding Boxes
for bbox in self.bboxs:
[x1, y1, x2, y2, conf, lbl] = bbox
cv2.rectangle(
image,
(int(x1), int(y1)),
(int(x2), int(y2)),
(0, 255, 0),
2,
)
txt = f"{LABELS[int(lbl)]}: {conf:.2f}"
(w, h), _ = cv2.getTextSize(
txt, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1
)
# Filled green background for readable white text
cv2.rectangle(
image,
(int(x1), int(y1) - h - 10),
(int(x1) + w, int(y1)),
(0, 255, 0),
-1,
)
cv2.putText(
image,
txt,
(int(x1), int(y1) - 5),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(255, 255, 255),
1,
)

# Performance Overlay
cv2.putText(
image,
f"Total FPS: {self.fps:.1f}",
(20, 40),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 255),
2,
)
cv2.putText(
image,
f"AI Latency: {self.inference_latency:.1f}ms",
(20, 70),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 255, 255),
2,
)

if not IS_HEADLESS:
cv2.imshow("frame", image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP

except Exception as e:
print(f"Image processing error: {e}")

# 2. BBOX INPUT
elif event_id == "bbox":
try:
arr = dora_event["value"].to_numpy()
# Guard against empty array — shape (0,) would crash reshape(-1,6)
self.bboxs = (
arr.reshape((-1, 6)) if arr.size > 0 else np.zeros((0, 6))
)
except Exception as e:
print(f"BBox reshape error: {e}")
self.bboxs = np.zeros((0, 6))

# 3. LATENCY INPUT
elif event_id == "latency":
try:
self.inference_latency = dora_event["value"].to_numpy()[0]
except Exception as e:
print(f"Latency extraction error: {e}")

except Exception as e:
print(f"Unexpected error in on_event: {e}")

return DoraStatus.CONTINUE
Loading
Loading