Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,79 @@ client = DiodeClient(
)
```

### Message chunking

When ingesting large numbers of entities, you may need to split them into smaller chunks to avoid exceeding the gRPC message size limit for a single `ingest()` call. The SDK provides chunking utilities that automatically split entity lists into appropriately sized chunks.

#### How it works

The SDK uses a **greedy bin-packing algorithm** that:
1. Accumulates entities until adding the next entity would exceed the size limit
2. Starts a new chunk when the limit would be exceeded
3. Ensures each chunk stays safely under the configured limit (default: 3 MB)

#### Basic usage

```python
from netboxlabs.diode.sdk import DiodeClient, create_message_chunks
from netboxlabs.diode.sdk.ingester import Device, Entity

with DiodeClient(
target="grpc://localhost:8080/diode",
app_name="my-app",
app_version="1.0.0",
) as client:
# Create a large list of entities
entities = []
for i in range(10000):
device = Device(
name=f"Device {i}",
device_type="Device Type A",
site="Site ABC",
role="Role ABC",
)
entities.append(Entity(device=device))

# Split into chunks (default 3 MB per chunk), then ingest each chunk separately.
for chunk in create_message_chunks(entities):
client.ingest(entities=chunk)
```

#### Custom chunk size

You can customize the chunk size if needed:

```python
from netboxlabs.diode.sdk import create_message_chunks

# Use a larger chunk size (3.5 MB)
chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)

# Use a smaller chunk size for conservative chunking (2 MB)
chunks = create_message_chunks(entities, max_chunk_size_mb=2.0)
```

#### Estimating message size

You can estimate the serialized size of entities before chunking:

```python
from netboxlabs.diode.sdk import estimate_message_size

size_bytes = estimate_message_size(entities)
size_mb = size_bytes / (1024 * 1024)
print(f"Total size: {size_mb:.2f} MB")

# Decide whether chunking is needed
if size_mb > 3.0:
for chunk in create_message_chunks(entities):
client.ingest(entities=chunk)
else:
# Small enough to send in one request
client.ingest(entities=entities)
```


### Dry run mode

`DiodeDryRunClient` generates ingestion requests without contacting a Diode server. Requests are printed to stdout by default, or written to JSON files when `output_dir` (or the `DIODE_DRY_RUN_OUTPUT_DIR` environment variable) is specified. The `app_name` parameter serves as the filename prefix; if not provided, `dryrun` is used as the default prefix. The file name is suffixed with a nanosecond-precision timestamp, resulting in the format `<app_name>_<timestamp_ns>.json`.
Expand Down
6 changes: 6 additions & 0 deletions netboxlabs/diode/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
# Copyright 2024 NetBox Labs Inc
"""NetBox Labs, Diode - SDK."""

from netboxlabs.diode.sdk.chunking import (
create_message_chunks,
estimate_message_size,
)
from netboxlabs.diode.sdk.client import (
DiodeClient,
DiodeDryRunClient,
DiodeOTLPClient,
load_dryrun_entities,
)

assert create_message_chunks
assert estimate_message_size
assert DiodeClient
assert DiodeDryRunClient
assert DiodeOTLPClient
Expand Down
116 changes: 116 additions & 0 deletions netboxlabs/diode/sdk/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env python
# Copyright 2026 NetBox Labs Inc
"""
Message chunking utilities for Diode SDK.

This module provides utilities for chunking large lists of entities into
size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds
the gRPC message size limit.
"""

from collections.abc import Iterable

from .diode.v1 import ingester_pb2


def create_message_chunks(
entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0
) -> list[list[ingester_pb2.Entity]]:
"""
Create size-aware chunks from entities using greedy bin-packing.

This function chunks entities to ensure each chunk stays under the specified
size limit. It uses a greedy bin-packing algorithm that accumulates entities
until adding the next entity would exceed the limit, then starts a new chunk.

The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB
message size limit, accounting for protobuf serialization overhead.

Args:
entities: Iterable of Entity protobuf messages to chunk
max_chunk_size_mb: Maximum chunk size in MB (default 3.0)

Returns:
List of entity chunks, each under max_chunk_size_mb. Returns at least
one chunk even if the input is empty.

Examples:
>>> entities = [entity1, entity2, entity3, ...]
>>> chunks = create_message_chunks(entities)
>>> for chunk in chunks:
... client.ingest(chunk)

>>> # Use a custom chunk size
>>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)

"""
# Convert iterable to list if necessary for size estimation
if not isinstance(entities, list):
entities = list(entities)

if not entities:
return [entities]

# Convert MB to bytes
max_chunk_size_bytes = int(max_chunk_size_mb * 1024 * 1024)

# Quick check: if all entities fit in one chunk, return early
total_size = estimate_message_size(entities)
if total_size <= max_chunk_size_bytes:
return [entities]

# Greedy bin-packing: accumulate entities until limit reached
base_overhead = ingester_pb2.IngestRequest().ByteSize()
chunks = []
current_chunk: list[ingester_pb2.Entity] = []
current_chunk_size = base_overhead # Start with overhead for the chunk

for entity in entities:
entity_size = entity.ByteSize()
projected_size = current_chunk_size + entity_size

# Check if adding this entity would exceed limit
if current_chunk and projected_size > max_chunk_size_bytes:
# Finalize current chunk and start new one
chunks.append(current_chunk)
current_chunk = [entity]
current_chunk_size = base_overhead + entity_size
else:
# Add entity to current chunk
current_chunk.append(entity)
current_chunk_size = projected_size

# Add final chunk if not empty
if current_chunk:
chunks.append(current_chunk)

return chunks if chunks else [entities]


def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int:
"""
Estimate the serialized size of entities in bytes.

Calculates the total size by summing individual entity sizes plus the
IngestRequest protobuf overhead.

Args:
entities: Iterable of Entity protobuf messages

Returns:
Estimated size in bytes including IngestRequest overhead

Examples:
>>> entities = [entity1, entity2, entity3]
>>> size_bytes = estimate_message_size(entities)
>>> size_mb = size_bytes / (1024 * 1024)
>>> print(f"Estimated size: {size_mb:.2f} MB")

"""
# Convert iterable to list if necessary
if not isinstance(entities, list):
entities = list(entities)

base_overhead = ingester_pb2.IngestRequest().ByteSize()
entity_sizes_sum = sum(entity.ByteSize() for entity in entities)
return base_overhead + entity_sizes_sum
2 changes: 1 addition & 1 deletion netboxlabs/diode/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def __init__(
options=channel_opts,
)
else:
_LOGGER.debug(f"Setting up gRPC insecure channel")
_LOGGER.debug("Setting up gRPC insecure channel")
base_channel = grpc.insecure_channel(
target=self._target,
options=channel_opts,
Expand Down
Loading