Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions inference/core/roboflow_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def register_image_at_roboflow(
batch_name: str,
tags: Optional[List[str]] = None,
inference_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> dict:
url = f"{API_BASE_URL}/dataset/{dataset_id}/upload"
params = [
Expand All @@ -521,12 +522,13 @@ def register_image_at_roboflow(
for tag in tags:
params.append(("tag", tag))
wrapped_url = wrap_url(_add_params_to_url(url=url, params=params))
m = MultipartEncoder(
fields={
"name": f"{local_image_id}.jpg",
"file": ("imageToUpload", image_bytes, "image/jpeg"),
}
)
fields = {
"name": f"{local_image_id}.jpg",
"file": ("imageToUpload", image_bytes, "image/jpeg"),
}
if metadata is not None:
fields["metadata"] = json.dumps(metadata)
m = MultipartEncoder(fields=fields)
headers = build_roboflow_api_headers(
explicit_headers={"Content-Type": m.content_type},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from typing import List, Literal, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union
from uuid import uuid4

import supervision as sv
Expand Down Expand Up @@ -365,6 +365,7 @@ def register_datapoint_at_roboflow(
thread_pool_executor: Optional[ThreadPoolExecutor],
api_key: str,
image_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, str]:
registration_task = partial(
execute_registration,
Expand All @@ -384,6 +385,7 @@ def register_datapoint_at_roboflow(
cache=cache,
api_key=api_key,
image_name=image_name,
metadata=metadata,
)
if fire_and_forget and background_tasks:
background_tasks.add_task(registration_task)
Expand Down Expand Up @@ -411,6 +413,7 @@ def execute_registration(
cache: BaseCache,
api_key: str,
image_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, str]:
matching_strategies_limits = OrderedDict(
{
Expand Down Expand Up @@ -462,6 +465,7 @@ def execute_registration(
api_key=api_key,
batch_name=batch_name,
tags=registration_tags,
metadata=metadata,
)
if status == DUPLICATED_STATUS:
credit_to_be_returned = True
Expand Down Expand Up @@ -540,6 +544,7 @@ def register_datapoint(
api_key: str,
batch_name: str,
tags: List[str],
metadata: Optional[Dict[str, Any]] = None,
) -> str:
inference_id = None
if isinstance(prediction, dict):
Expand All @@ -559,6 +564,7 @@ def register_datapoint(
batch_name=batch_name,
tags=tags,
inference_id=inference_id,
metadata=metadata,
)
if roboflow_image_id is None:
return DUPLICATED_STATUS
Expand All @@ -585,6 +591,7 @@ def safe_register_image_at_roboflow(
batch_name: str,
tags: List[str],
inference_id: Optional[str],
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[str]:
registration_response = register_image_at_roboflow(
api_key=api_key,
Expand All @@ -594,6 +601,7 @@ def safe_register_image_at_roboflow(
batch_name=batch_name,
tags=tags,
inference_id=inference_id,
metadata=metadata,
)
image_duplicated = registration_response.get("duplicate", False)
if image_duplicated:
Expand All @@ -609,8 +617,8 @@ def is_prediction_registration_forbidden(
return True
if isinstance(prediction, sv.Detections) and len(prediction) == 0:
return True
if isinstance(prediction, dict) and all(
k not in prediction for k in ["top", "predicted_classes"]
if isinstance(prediction, dict) and (
"top" not in prediction and "predicted_classes" not in prediction
):
return True
return False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random
from concurrent.futures import ThreadPoolExecutor
from typing import List, Literal, Optional, Tuple, Type, Union
from typing import Any, Dict, List, Literal, Optional, Tuple, Type, Union

import supervision as sv
from fastapi import BackgroundTasks
Expand Down Expand Up @@ -227,10 +227,15 @@ class BlockManifest(WorkflowBlockManifest):
description="Optional custom name for the uploaded image. This is useful when you want to preserve the original filename or use a meaningful identifier (e.g., serial number, timestamp) for the image in the Roboflow dataset. The name should not include file extension. If not provided, a UUID will be generated automatically.",
examples=["serial_12345", "camera1_frame_001", "$inputs.filename"],
)
metadata: Dict[str, Union[str, int, float, bool, Selector()]] = Field(
default_factory=dict,
description="Optional key-value metadata to attach to uploaded images. Metadata is stored as user_metadata on the image in Roboflow and can be used for filtering and organization. Values can be static strings, numbers, booleans, or references to workflow inputs/steps.",
examples=[{"camera_id": "cam_01", "location": "$inputs.location"}, {}],
)

@classmethod
def get_parameters_accepting_batches(cls) -> List[str]:
return ["images", "predictions", "image_name"]
return ["images", "predictions", "image_name", "metadata"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking briefly I am not sure if that would work - here you mark as batch, but the metadata typing in run method speaks about scalar dict

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look on this. It's working, but I will dig deeper

Copy link
Contributor Author

@digaobarbosa digaobarbosa Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed it from batch parameter. It was working but smelled bad (with some runtime conversion from dict of batches to batch of dicts)


@classmethod
def describe_outputs(cls) -> List[OutputDefinition]:
Expand Down Expand Up @@ -285,6 +290,7 @@ def run(
labeling_batch_prefix: str,
labeling_batches_recreation_frequency: BatchCreationFrequency,
image_name: Optional[Batch[Optional[str]]] = None,
metadata: Optional[Union[Dict[str, Any], Batch]] = None,
) -> BlockResult:
if self._api_key is None:
raise ValueError(
Expand All @@ -304,7 +310,10 @@ def run(
result = []
predictions = [None] * len(images) if predictions is None else predictions
image_names = [None] * len(images) if image_name is None else image_name
for image, prediction, img_name in zip(images, predictions, image_names):
metadata_values = _transpose_metadata_batches(metadata, len(images))
for image, prediction, img_name, meta in zip(
images, predictions, image_names, metadata_values
):
error_status, message = maybe_register_datapoint_at_roboflow(
image=image,
prediction=prediction,
Expand All @@ -326,11 +335,40 @@ def run(
thread_pool_executor=self._thread_pool_executor,
api_key=self._api_key,
image_name=img_name,
metadata=meta,
)
result.append({"error_status": error_status, "message": message})
return result


def _transpose_metadata_batches(
metadata: Optional[Union[Dict[str, Any], Batch]],
batch_size: int,
) -> List[Optional[Dict[str, Any]]]:
"""Transpose metadata into a per-image list of dicts.

Handles three cases:
- None / empty: returns [None] * batch_size
- Batch[Dict]: engine already wrapped it, unwrap directly
- Dict with possible Batch values inside: transpose into per-element dicts
"""
if not metadata:
return [None] * batch_size
if isinstance(metadata, Batch):
return [m or None for m in metadata]
batch_entries = {k: v for k, v in metadata.items() if isinstance(v, Batch)}
non_batch_entries = {k: v for k, v in metadata.items() if not isinstance(v, Batch)}
if not batch_entries:
return [dict(metadata) for _ in range(batch_size)]
result = []
for i in range(batch_size):
entry = dict(non_batch_entries)
for k, batch_val in batch_entries.items():
entry[k] = batch_val[i]
result.append(entry)
return result


def maybe_register_datapoint_at_roboflow(
image: WorkflowImageData,
prediction: Optional[Union[sv.Detections, dict]],
Expand All @@ -352,6 +390,7 @@ def maybe_register_datapoint_at_roboflow(
thread_pool_executor: Optional[ThreadPoolExecutor],
api_key: str,
image_name: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> Tuple[bool, str]:
normalised_probability = data_percentage / 100
if random.random() < normalised_probability:
Expand All @@ -375,5 +414,6 @@ def maybe_register_datapoint_at_roboflow(
thread_pool_executor=thread_pool_executor,
api_key=api_key,
image_name=image_name,
metadata=metadata,
)
return False, "Registration skipped due to sampling settings"
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ def test_run_sink_when_registration_should_happen_in_foreground_despite_providin
cache=cache,
api_key="my_api_key",
image_name=None,
metadata=None,
)
]
* 3
Expand Down Expand Up @@ -1182,6 +1183,7 @@ def test_run_sink_when_predictions_not_provided(
cache=cache,
api_key="my_api_key",
image_name=None,
metadata=None,
)
]
* 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,3 +541,60 @@ def test_manifest_parsing_with_static_image_name() -> None:

# then
assert result.image_name == "my_static_image_name"


@mock.patch.object(v2, "register_datapoint_at_roboflow")
def test_run_sink_with_metadata_parameter(
register_datapoint_at_roboflow_mock: MagicMock,
) -> None:
# given
background_tasks = BackgroundTasks()
cache = MemoryCache()
data_collector_block = RoboflowDatasetUploadBlockV2(
cache=cache,
api_key="my_api_key",
background_tasks=background_tasks,
thread_pool_executor=None,
)
image = WorkflowImageData(
parent_metadata=ImageParentMetadata(parent_id="parent"),
numpy_image=np.zeros((512, 256, 3), dtype=np.uint8),
)
register_datapoint_at_roboflow_mock.return_value = False, "OK"
indices = [(0,), (1,)]

# when
result = data_collector_block.run(
images=Batch(content=[image, image], indices=indices),
predictions=None,
target_project="my_project",
usage_quota_name="my_quota",
data_percentage=100.0,
persist_predictions=True,
minutely_usage_limit=10,
hourly_usage_limit=100,
daily_usage_limit=1000,
max_image_size=(128, 128),
compression_level=75,
registration_tags=["some"],
disable_sink=False,
fire_and_forget=False,
labeling_batch_prefix="my_batch",
labeling_batches_recreation_frequency="never",
metadata={
"camera_id": Batch(content=["cam_01", "cam_02"], indices=indices),
"location": Batch(content=["warehouse_a", "warehouse_b"], indices=indices),
},
)

# then
assert result == [
{"error_status": False, "message": "OK"},
{"error_status": False, "message": "OK"},
], "Expected data registered"
assert register_datapoint_at_roboflow_mock.call_count == 2

# Verify per-image metadata was passed correctly
calls = register_datapoint_at_roboflow_mock.call_args_list
assert calls[0].kwargs["metadata"] == {"camera_id": "cam_01", "location": "warehouse_a"}
assert calls[1].kwargs["metadata"] == {"camera_id": "cam_02", "location": "warehouse_b"}
Loading