Skip to content

Commit 478ddab

Browse files
authored
feat: message chunking (#80)
2 parents 8e95f47 + 802cd16 commit 478ddab

File tree

5 files changed

+397
-1
lines changed

5 files changed

+397
-1
lines changed

README.md

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,79 @@ client = DiodeClient(
281281
)
282282
```
283283

284+
### Message chunking
285+
286+
When ingesting large numbers of entities, you may need to split them into smaller chunks to avoid exceeding the gRPC message size limit for a single `ingest()` call. The SDK provides chunking utilities that automatically split entity lists into appropriately sized chunks.
287+
288+
#### How it works
289+
290+
The SDK uses a **greedy bin-packing algorithm** that:
291+
1. Accumulates entities until adding the next entity would exceed the size limit
292+
2. Starts a new chunk when the limit would be exceeded
293+
3. Ensures each chunk stays safely under the configured limit (default: 3 MB)
294+
295+
#### Basic usage
296+
297+
```python
298+
from netboxlabs.diode.sdk import DiodeClient, create_message_chunks
299+
from netboxlabs.diode.sdk.ingester import Device, Entity
300+
301+
with DiodeClient(
302+
target="grpc://localhost:8080/diode",
303+
app_name="my-app",
304+
app_version="1.0.0",
305+
) as client:
306+
# Create a large list of entities
307+
entities = []
308+
for i in range(10000):
309+
device = Device(
310+
name=f"Device {i}",
311+
device_type="Device Type A",
312+
site="Site ABC",
313+
role="Role ABC",
314+
)
315+
entities.append(Entity(device=device))
316+
317+
# Split into chunks (default 3 MB per chunk), then ingest each chunk separately.
318+
for chunk in create_message_chunks(entities):
319+
client.ingest(entities=chunk)
320+
```
321+
322+
#### Custom chunk size
323+
324+
You can customize the chunk size if needed:
325+
326+
```python
327+
from netboxlabs.diode.sdk import create_message_chunks
328+
329+
# Use a larger chunk size (3.5 MB)
330+
chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)
331+
332+
# Use a smaller chunk size for conservative chunking (2 MB)
333+
chunks = create_message_chunks(entities, max_chunk_size_mb=2.0)
334+
```
335+
336+
#### Estimating message size
337+
338+
You can estimate the serialized size of entities before chunking:
339+
340+
```python
341+
from netboxlabs.diode.sdk import estimate_message_size
342+
343+
size_bytes = estimate_message_size(entities)
344+
size_mb = size_bytes / (1024 * 1024)
345+
print(f"Total size: {size_mb:.2f} MB")
346+
347+
# Decide whether chunking is needed
348+
if size_mb > 3.0:
349+
for chunk in create_message_chunks(entities):
350+
client.ingest(entities=chunk)
351+
else:
352+
# Small enough to send in one request
353+
client.ingest(entities=entities)
354+
```
355+
356+
284357
### Dry run mode
285358

286359
`DiodeDryRunClient` generates ingestion requests without contacting a Diode server. Requests are printed to stdout by default, or written to JSON files when `output_dir` (or the `DIODE_DRY_RUN_OUTPUT_DIR` environment variable) is specified. The `app_name` parameter serves as the filename prefix; if not provided, `dryrun` is used as the default prefix. The file name is suffixed with a nanosecond-precision timestamp, resulting in the format `<app_name>_<timestamp_ns>.json`.

netboxlabs/diode/sdk/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@
22
# Copyright 2024 NetBox Labs Inc
33
"""NetBox Labs, Diode - SDK."""
44

5+
from netboxlabs.diode.sdk.chunking import (
6+
create_message_chunks,
7+
estimate_message_size,
8+
)
59
from netboxlabs.diode.sdk.client import (
610
DiodeClient,
711
DiodeDryRunClient,
812
DiodeOTLPClient,
913
load_dryrun_entities,
1014
)
1115

16+
assert create_message_chunks
17+
assert estimate_message_size
1218
assert DiodeClient
1319
assert DiodeDryRunClient
1420
assert DiodeOTLPClient

netboxlabs/diode/sdk/chunking.py

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#!/usr/bin/env python
2+
# Copyright 2026 NetBox Labs Inc
3+
"""
4+
Message chunking utilities for Diode SDK.
5+
6+
This module provides utilities for chunking large lists of entities into
7+
size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds
8+
the gRPC message size limit.
9+
"""
10+
11+
from collections.abc import Iterable
12+
13+
from .diode.v1 import ingester_pb2
14+
15+
16+
def create_message_chunks(
17+
entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0
18+
) -> list[list[ingester_pb2.Entity]]:
19+
"""
20+
Create size-aware chunks from entities using greedy bin-packing.
21+
22+
This function chunks entities to ensure each chunk stays under the specified
23+
size limit. It uses a greedy bin-packing algorithm that accumulates entities
24+
until adding the next entity would exceed the limit, then starts a new chunk.
25+
26+
The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB
27+
message size limit, accounting for protobuf serialization overhead.
28+
29+
Args:
30+
entities: Iterable of Entity protobuf messages to chunk
31+
max_chunk_size_mb: Maximum chunk size in MB (default 3.0)
32+
33+
Returns:
34+
List of entity chunks, each under max_chunk_size_mb. Returns at least
35+
one chunk even if the input is empty.
36+
37+
Examples:
38+
>>> entities = [entity1, entity2, entity3, ...]
39+
>>> chunks = create_message_chunks(entities)
40+
>>> for chunk in chunks:
41+
... client.ingest(chunk)
42+
43+
>>> # Use a custom chunk size
44+
>>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)
45+
46+
"""
47+
# Convert iterable to list if necessary for size estimation
48+
if not isinstance(entities, list):
49+
entities = list(entities)
50+
51+
if not entities:
52+
return [entities]
53+
54+
# Convert MB to bytes
55+
max_chunk_size_bytes = int(max_chunk_size_mb * 1024 * 1024)
56+
57+
# Quick check: if all entities fit in one chunk, return early
58+
total_size = estimate_message_size(entities)
59+
if total_size <= max_chunk_size_bytes:
60+
return [entities]
61+
62+
# Greedy bin-packing: accumulate entities until limit reached
63+
base_overhead = ingester_pb2.IngestRequest().ByteSize()
64+
chunks = []
65+
current_chunk: list[ingester_pb2.Entity] = []
66+
current_chunk_size = base_overhead # Start with overhead for the chunk
67+
68+
for entity in entities:
69+
entity_size = entity.ByteSize()
70+
projected_size = current_chunk_size + entity_size
71+
72+
# Check if adding this entity would exceed limit
73+
if current_chunk and projected_size > max_chunk_size_bytes:
74+
# Finalize current chunk and start new one
75+
chunks.append(current_chunk)
76+
current_chunk = [entity]
77+
current_chunk_size = base_overhead + entity_size
78+
else:
79+
# Add entity to current chunk
80+
current_chunk.append(entity)
81+
current_chunk_size = projected_size
82+
83+
# Add final chunk if not empty
84+
if current_chunk:
85+
chunks.append(current_chunk)
86+
87+
return chunks if chunks else [entities]
88+
89+
90+
def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int:
91+
"""
92+
Estimate the serialized size of entities in bytes.
93+
94+
Calculates the total size by summing individual entity sizes plus the
95+
IngestRequest protobuf overhead.
96+
97+
Args:
98+
entities: Iterable of Entity protobuf messages
99+
100+
Returns:
101+
Estimated size in bytes including IngestRequest overhead
102+
103+
Examples:
104+
>>> entities = [entity1, entity2, entity3]
105+
>>> size_bytes = estimate_message_size(entities)
106+
>>> size_mb = size_bytes / (1024 * 1024)
107+
>>> print(f"Estimated size: {size_mb:.2f} MB")
108+
109+
"""
110+
# Convert iterable to list if necessary
111+
if not isinstance(entities, list):
112+
entities = list(entities)
113+
114+
base_overhead = ingester_pb2.IngestRequest().ByteSize()
115+
entity_sizes_sum = sum(entity.ByteSize() for entity in entities)
116+
return base_overhead + entity_sizes_sum

netboxlabs/diode/sdk/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,7 @@ def __init__(
668668
options=channel_opts,
669669
)
670670
else:
671-
_LOGGER.debug(f"Setting up gRPC insecure channel")
671+
_LOGGER.debug("Setting up gRPC insecure channel")
672672
base_channel = grpc.insecure_channel(
673673
target=self._target,
674674
options=channel_opts,

0 commit comments

Comments
 (0)