Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions netboxlabs/diode/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
# Copyright 2024 NetBox Labs Inc
"""NetBox Labs, Diode - SDK."""

from netboxlabs.diode.sdk.chunking import (
create_message_chunks,
estimate_message_size,
)
from netboxlabs.diode.sdk.client import (
DiodeClient,
DiodeDryRunClient,
DiodeOTLPClient,
load_dryrun_entities,
)

assert create_message_chunks
assert estimate_message_size
assert DiodeClient
assert DiodeDryRunClient
assert DiodeOTLPClient
Expand Down
111 changes: 111 additions & 0 deletions netboxlabs/diode/sdk/chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/usr/bin/env python
# Copyright 2024 NetBox Labs Inc
"""Message chunking utilities for Diode SDK.
This module provides utilities for chunking large lists of entities into
size-appropriate chunks for gRPC ingestion, ensuring no chunk exceeds
the gRPC message size limit.
"""

Check failure on line 8 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:3:1: D213 Multi-line docstring summary should start at the second line

Check failure on line 8 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:3:1: D213 Multi-line docstring summary should start at the second line

from typing import Iterable

Check failure on line 10 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (UP035)

netboxlabs/diode/sdk/chunking.py:10:1: UP035 Import from `collections.abc` instead: `Iterable`

Check failure on line 10 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (UP035)

netboxlabs/diode/sdk/chunking.py:10:1: UP035 Import from `collections.abc` instead: `Iterable`

from .diode.v1 import ingester_pb2


def create_message_chunks(
entities: Iterable[ingester_pb2.Entity], max_chunk_size_mb: float = 3.0
) -> list[list[ingester_pb2.Entity]]:
"""Create size-aware chunks from entities using greedy bin-packing.
This function chunks entities to ensure each chunk stays under the specified
size limit. It uses a greedy bin-packing algorithm that accumulates entities
until adding the next entity would exceed the limit, then starts a new chunk.
The default chunk size of 3.0 MB provides a safe margin below the gRPC 4 MB
message size limit, accounting for protobuf serialization overhead.
Args:
entities: Iterable of Entity protobuf messages to chunk
max_chunk_size_mb: Maximum chunk size in MB (default 3.0)
Returns:
List of entity chunks, each under max_chunk_size_mb. Returns at least
one chunk even if the input is empty.
Examples:

Check failure on line 35 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D413)

netboxlabs/diode/sdk/chunking.py:35:5: D413 Missing blank line after last section ("Examples")

Check failure on line 35 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D413)

netboxlabs/diode/sdk/chunking.py:35:5: D413 Missing blank line after last section ("Examples")
>>> entities = [entity1, entity2, entity3, ...]
>>> chunks = create_message_chunks(entities)
>>> for chunk in chunks:
... client.ingest(chunk)
>>> # Use a custom chunk size
>>> chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)
"""

Check failure on line 43 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:18:5: D213 Multi-line docstring summary should start at the second line

Check failure on line 43 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:18:5: D213 Multi-line docstring summary should start at the second line
# Convert iterable to list if necessary for size estimation
if not isinstance(entities, list):
entities = list(entities)

if not entities:
return [entities]

# Convert MB to bytes
max_chunk_size_bytes = int(max_chunk_size_mb * 1024 * 1024)

# Quick check: if all entities fit in one chunk, return early
total_size = estimate_message_size(entities)
if total_size <= max_chunk_size_bytes:
return [entities]

# Greedy bin-packing: accumulate entities until limit reached
base_overhead = ingester_pb2.IngestRequest().ByteSize()
chunks = []
current_chunk: list[ingester_pb2.Entity] = []
current_chunk_size = base_overhead # Start with overhead for the chunk

for entity in entities:
entity_size = entity.ByteSize()
projected_size = current_chunk_size + entity_size

# Check if adding this entity would exceed limit
if current_chunk and projected_size > max_chunk_size_bytes:
# Finalize current chunk and start new one
chunks.append(current_chunk)
current_chunk = [entity]
current_chunk_size = base_overhead + entity_size
else:
# Add entity to current chunk
current_chunk.append(entity)
current_chunk_size = projected_size

# Add final chunk if not empty
if current_chunk:
chunks.append(current_chunk)

return chunks if chunks else [entities]


def estimate_message_size(entities: Iterable[ingester_pb2.Entity]) -> int:
"""Estimate the serialized size of entities in bytes.
Calculates the total size by summing individual entity sizes plus the
IngestRequest protobuf overhead.
Args:
entities: Iterable of Entity protobuf messages
Returns:
Estimated size in bytes including IngestRequest overhead
Examples:

Check failure on line 99 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D413)

netboxlabs/diode/sdk/chunking.py:99:5: D413 Missing blank line after last section ("Examples")

Check failure on line 99 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D413)

netboxlabs/diode/sdk/chunking.py:99:5: D413 Missing blank line after last section ("Examples")
>>> entities = [entity1, entity2, entity3]
>>> size_bytes = estimate_message_size(entities)
>>> size_mb = size_bytes / (1024 * 1024)
>>> print(f"Estimated size: {size_mb:.2f} MB")
"""

Check failure on line 104 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:88:5: D213 Multi-line docstring summary should start at the second line

Check failure on line 104 in netboxlabs/diode/sdk/chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D213)

netboxlabs/diode/sdk/chunking.py:88:5: D213 Multi-line docstring summary should start at the second line
# Convert iterable to list if necessary
if not isinstance(entities, list):
entities = list(entities)

base_overhead = ingester_pb2.IngestRequest().ByteSize()
entity_sizes_sum = sum(entity.ByteSize() for entity in entities)
return base_overhead + entity_sizes_sum
200 changes: 200 additions & 0 deletions tests/test_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#!/usr/bin/env python
# Copyright 2024 NetBox Labs Inc
"""Tests for message chunking utilities."""

from unittest.mock import patch

from netboxlabs.diode.sdk.chunking import create_message_chunks, estimate_message_size
from netboxlabs.diode.sdk.diode.v1 import ingester_pb2


def test_create_message_chunks_empty_list():
"""Test create_message_chunks with an empty entity list."""
entities = []
chunks = create_message_chunks(entities)

assert len(chunks) == 1
assert chunks[0] == []


def test_create_message_chunks_single_chunk():
"""Test create_message_chunks when entities fit in a single chunk."""
# Create small mock entities that will fit in one chunk
entities = []
for i in range(5):
entity = ingester_pb2.Entity()
entity.device.name = f"test_device_{i}"
entities.append(entity)

# Mock size to be small (under 3 MB default)
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=1024):
chunks = create_message_chunks(entities)

assert len(chunks) == 1
assert len(chunks[0]) == 5
assert chunks[0] == entities


def test_create_message_chunks_multiple_chunks():
"""Test create_message_chunks when entities need to be split into multiple chunks."""
# Create entities that will exceed the target size
entities = []
for i in range(10):
entity = ingester_pb2.Entity()
entity.device.name = f"test_device_{i}"
entities.append(entity)

# Mock size to be larger than target (3MB default)
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024):
# Also need to mock ByteSize for individual entities and base overhead
with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each
with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100):
chunks = create_message_chunks(entities)

# Should have multiple chunks
assert len(chunks) > 1

# All entities should be present across chunks
total_entities = sum(len(chunk) for chunk in chunks)
assert total_entities == 10

# Each chunk should have at least 1 entity
for chunk in chunks:
assert len(chunk) >= 1


def test_create_message_chunks_one_entity_per_chunk():
"""Test create_message_chunks when each entity needs its own chunk."""
entities = []
for i in range(3):
entity = ingester_pb2.Entity()
entity.device.name = f"large_device_{i}"
entities.append(entity)

# Mock very large size to force one entity per chunk
# Each entity is 3.5 MB, forcing one per chunk with 3 MB limit
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=20 * 1024 * 1024):
with patch.object(ingester_pb2.Entity, "ByteSize", return_value=3 * 1024 * 1024 + 500000):
with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100):
chunks = create_message_chunks(entities)

# Should have 3 chunks with 1 entity each
assert len(chunks) == 3
for chunk in chunks:
assert len(chunk) == 1


def test_estimate_message_size():
"""Test estimate_message_size method."""
# Create mock entities
entities = []
for i in range(3):
entity = ingester_pb2.Entity()
entity.device.name = f"test_device_{i}"
entities.append(entity)

# Call the function
size = estimate_message_size(entities)

# Should return a positive integer
assert isinstance(size, int)
assert size > 0


def test_estimate_message_size_empty_list():
"""Test estimate_message_size with an empty entity list."""
entities = []

size = estimate_message_size(entities)

# Should return base overhead (positive value for protobuf header)
assert isinstance(size, int)
assert size >= 0


def test_create_message_chunks_custom_chunk_size():
"""Test create_message_chunks with a custom chunk size."""
# Create entities
entities = []
for i in range(10):
entity = ingester_pb2.Entity()
entity.device.name = f"test_device_{i}"
entities.append(entity)

# Use 3.5 MB chunk size (like orb-discovery)
# Mock size estimation to return 5 MB (exceeds 3.5 MB limit)
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024):
with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000): # ~600KB each
with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100):
chunks = create_message_chunks(entities, max_chunk_size_mb=3.5)

# Should have multiple chunks due to size limit
assert len(chunks) > 1

# All entities should be present
total_entities = sum(len(chunk) for chunk in chunks)
assert total_entities == 10


def test_create_message_chunks_preserves_order():
"""Test that create_message_chunks preserves entity order."""
# Create entities with identifiable names
entities = []
for i in range(20):
entity = ingester_pb2.Entity()
entity.device.name = f"device_{i:03d}"
entities.append(entity)

# Mock to force multiple chunks
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=10 * 1024 * 1024):
with patch.object(ingester_pb2.Entity, "ByteSize", return_value=600000):
with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100):
chunks = create_message_chunks(entities)

# Flatten chunks and verify order
flattened = []
for chunk in chunks:
flattened.extend(chunk)

assert len(flattened) == 20
for i, entity in enumerate(flattened):
assert entity.device.name == f"device_{i:03d}"


def test_create_message_chunks_with_iterable():
"""Test create_message_chunks with a generator/iterator input."""
# Create generator
def entity_generator():
for i in range(5):
entity = ingester_pb2.Entity()
entity.device.name = f"test_device_{i}"
yield entity

# Should work with generator (converted to list internally)
chunks = create_message_chunks(entity_generator())

assert len(chunks) >= 1
total_entities = sum(len(chunk) for chunk in chunks)
assert total_entities == 5


def test_create_message_chunks_single_large_entity():
"""Test create_message_chunks with a single entity that exceeds chunk size.
This edge case verifies the function doesn't fail when a single entity
is larger than the chunk size limit.
"""

Check failure on line 186 in tests/test_chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.11)

Ruff (D213)

tests/test_chunking.py:182:5: D213 Multi-line docstring summary should start at the second line

Check failure on line 186 in tests/test_chunking.py

View workflow job for this annotation

GitHub Actions / tests (3.10)

Ruff (D213)

tests/test_chunking.py:182:5: D213 Multi-line docstring summary should start at the second line
entity = ingester_pb2.Entity()
entity.device.name = "huge_device"
entities = [entity]

# Mock a very large entity (5 MB) that exceeds 3 MB limit
with patch("netboxlabs.diode.sdk.chunking.estimate_message_size", return_value=5 * 1024 * 1024):
with patch.object(ingester_pb2.Entity, "ByteSize", return_value=5 * 1024 * 1024):
with patch.object(ingester_pb2.IngestRequest, "ByteSize", return_value=100):
chunks = create_message_chunks(entities)

# Should still return one chunk with the single entity
assert len(chunks) == 1
assert len(chunks[0]) == 1
assert chunks[0][0].device.name == "huge_device"
Loading