Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

- Loosened protobuf dependency constraint to support versions >= 4.25.0 and < 7.0
- **JSON Serialization Support**: Added support for JSON record serialization alongside Protocol Buffers (default)
- New `RecordType.JSON` mode for ingesting JSON-encoded strings
- New `RecordType.JSON` mode for ingesting records as Python dicts
- The SDK handles JSON serialization internally
- No protobuf schema compilation required
- Added `HeadersProvider` abstraction for flexible authentication strategies
- Implemented `OAuthHeadersProvider` for OAuth 2.0 Client Credentials flow (default authentication method used by `create_stream()`)
Expand Down Expand Up @@ -45,7 +46,7 @@
- `RecordType.PROTO` (default): For protobuf serialization
- `RecordType.JSON`: For JSON serialization
- Example: `StreamConfigurationOptions(record_type=RecordType.JSON)`
- **ZerobusStream.ingest_record**: Now accepts JSON strings (when using `RecordType.JSON`) in addition to protobuf messages and bytes
- **ZerobusStream.ingest_record**: Now accepts Python dicts (when using `RecordType.JSON`) in addition to protobuf messages and bytes
- Added `RecordType` enum with `PROTO` and `JSON` values
- Added `HeadersProvider` abstract base class for custom header strategies
- Added `OAuthHeadersProvider` class for OAuth 2.0 authentication with Databricks OIDC endpoint
Expand Down
5 changes: 5 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

### Documentation

- Fixed JSON examples and documentation to correctly show passing dict objects instead of pre-serialized JSON strings with `json.dumps()`
- Updated `examples/async_example_json.py` and `examples/sync_example_json.py`
- Fixed code examples in `README.md` and `examples/README.md`
- Clarified that the SDK handles JSON serialization internally

### Internal Changes

### API Changes
35 changes: 14 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ The SDK supports two serialization formats:
**Synchronous Example:**

```python
import json
import logging
from zerobus.sdk.sync import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
Expand Down Expand Up @@ -187,15 +186,14 @@ stream = sdk.create_stream(client_id, client_secret, table_properties, options)
try:
# Ingest records
for i in range(100):
# Create JSON record
record_dict = {
# Create JSON record as a dict (SDK handles serialization)
record = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
}
json_record = json.dumps(record_dict)

ack = stream.ingest_record(json_record)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Optional: Wait for durability confirmation

print(f"Ingested record {i + 1}")
Expand All @@ -209,7 +207,6 @@ finally:

```python
import asyncio
import json
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
Expand Down Expand Up @@ -248,15 +245,14 @@ async def main():
try:
# Ingest records
for i in range(100):
# Create JSON record
record_dict = {
# Create JSON record as a dict (SDK handles serialization)
record = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + (i % 15),
"humidity": 50 + (i % 40)
}
json_record = json.dumps(record_dict)

future = await stream.ingest_record(json_record)
future = await stream.ingest_record(record)
await future # Optional: Wait for durability confirmation

print(f"Ingested record {i + 1}")
Expand Down Expand Up @@ -475,14 +471,13 @@ stream = sdk.create_stream(client_id, client_secret, table_properties, options)

try:
for i in range(1000):
record_dict = {
record = {
"device_name": f"sensor-{i}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
json_record = json.dumps(record_dict)

ack = stream.ingest_record(json_record)
ack = stream.ingest_record(record)
ack.wait_for_ack() # Optional: Wait for durability confirmation
finally:
stream.close()
Expand All @@ -492,7 +487,6 @@ finally:

```python
import asyncio
import json
import logging
from zerobus.sdk.aio import ZerobusSdk
from zerobus.sdk.shared import RecordType, StreamConfigurationOptions, TableProperties
Expand All @@ -515,14 +509,13 @@ async def main():
futures = []
try:
for i in range(100000):
record_dict = {
record = {
"device_name": f"sensor-{i % 10}",
"temp": 20 + i % 15,
"humidity": 50 + i % 40
}
json_record = json.dumps(record_dict)

future = await stream.ingest_record(json_record)
future = await stream.ingest_record(record)
futures.append(future)

await stream.flush()
Expand Down Expand Up @@ -829,9 +822,9 @@ Represents an active ingestion stream.
**Synchronous Methods:**

```python
def ingest_record(record: Union[str, bytes, Message]) -> RecordAcknowledgment
def ingest_record(record: Union[dict, bytes, Message]) -> RecordAcknowledgment
```
Ingests a single record. Pass a JSON string (JSON mode) or protobuf message/bytes (protobuf mode). Returns a `RecordAcknowledgment` for tracking.
Ingests a single record. Pass a dict (JSON mode) or protobuf message/bytes (protobuf mode). Returns a `RecordAcknowledgment` for tracking.

```python
def flush() -> None
Expand Down Expand Up @@ -859,9 +852,9 @@ Returns the unique stream ID assigned by the server.
**Asynchronous Methods:**

```python
async def ingest_record(record: Union[str, bytes, Message]) -> Awaitable
async def ingest_record(record: Union[dict, bytes, Message]) -> Awaitable
```
Ingests a single record. Pass a JSON string (JSON mode) or protobuf message/bytes (protobuf mode). Returns an awaitable that completes when the record is durably written.
Ingests a single record. Pass a dict (JSON mode) or protobuf message/bytes (protobuf mode). Returns an awaitable that completes when the record is durably written.

```python
async def flush() -> None
Expand Down
16 changes: 8 additions & 8 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ ack = stream.ingest_record(record)
#### JSON
**Files:** `sync_example_json.py`, `async_example_json.py`

Good for getting started. Send records as JSON-encoded strings. No protobuf schema required.
Good for getting started. Send records as Python dicts. No protobuf schema required. The SDK handles JSON serialization internally.

```python
# Create and ingest JSON record
json_record = json.dumps({"device_name": "sensor-1", "temp": 25, "humidity": 60})
record = {"device_name": "sensor-1", "temp": 25, "humidity": 60}
table_properties = TableProperties(TABLE_NAME)
options = StreamConfigurationOptions(record_type=RecordType.JSON)
ack = stream.ingest_record(json_record)
ack = stream.ingest_record(record)
```

### Synchronous vs Asynchronous APIs
Expand Down Expand Up @@ -131,7 +131,7 @@ Both APIs provide the same functionality and performance. The key differences ar
| Format | Record Input | Configuration |
|--------|-------------|---------------|
| **Protobuf** (Default) | Protobuf object or bytes | `TableProperties(table_name, descriptor)` |
| **JSON** | JSON string | `TableProperties(table_name)` + `StreamConfigurationOptions(record_type=RecordType.JSON)` |
| **JSON** | Python dict | `TableProperties(table_name)` + `StreamConfigurationOptions(record_type=RecordType.JSON)` |

## Authentication

Expand All @@ -154,14 +154,14 @@ To use your own protobuf schema:

To use your own JSON structure:

1. Define your JSON structure in code:
1. Define your record as a Python dict:
```python
json_record = json.dumps({"field1": "value1", "field2": 123})
record = {"field1": "value1", "field2": 123}
```
2. Configure `StreamConfigurationOptions` with `record_type=RecordType.JSON`
3. Ensure your JSON structure matches the schema of your Databricks table
3. Ensure your dict structure matches the schema of your Databricks table

Note: The SDK sends JSON strings directly without client-side schema validation.
Note: The SDK serializes dicts to JSON internally without client-side schema validation.

## Additional Resources

Expand Down
14 changes: 7 additions & 7 deletions examples/async_example_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
This example demonstrates record ingestion using the asynchronous API with JSON serialization.

Record Type Mode: JSON
- Records are sent as JSON-encoded strings
- Records are sent as Python dictionaries
- Uses RecordType.JSON to specify JSON serialization
- The SDK handles JSON serialization internally
- Best for dynamic schemas or when working with JSON data

Use Case: Best for applications already using asyncio, async web frameworks (FastAPI, aiohttp),
Expand All @@ -20,7 +21,6 @@
"""

import asyncio
import json
import logging
import os
import time
Expand Down Expand Up @@ -58,12 +58,12 @@

def create_sample_json_record(index):
"""
Creates a sample AirQuality record as a JSON string.
Creates a sample AirQuality record as a dictionary.

With JSON mode, records are plain JSON strings that match your schema.
With JSON mode, records are Python dicts that match your schema.
The SDK handles JSON serialization internally.
"""
record_dict = {"device_name": f"sensor-{index % 10}", "temp": 20 + (index % 15), "humidity": 50 + (index % 40)}
return json.dumps(record_dict)
return {"device_name": f"sensor-{index % 10}", "temp": 20 + (index % 15), "humidity": 50 + (index % 40)}


class CustomHeadersProvider(HeadersProvider):
Expand Down Expand Up @@ -172,7 +172,7 @@ async def main():
futures = []

for i in range(NUM_RECORDS):
# Create a JSON record (as a string)
# Create a JSON record (as a dict)
json_record = create_sample_json_record(i)

# Ingest record asynchronously
Expand Down
14 changes: 7 additions & 7 deletions examples/sync_example_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
This example demonstrates record ingestion using the synchronous API with JSON serialization.

Record Type Mode: JSON
- Records are sent as JSON-encoded strings
- Records are sent as Python dictionaries
- Uses RecordType.JSON to specify JSON serialization
- The SDK handles JSON serialization internally
- Best for dynamic schemas or when working with JSON data

Authentication:
Expand All @@ -16,7 +17,6 @@
Choose based on your application's architecture, not performance requirements.
"""

import json
import logging
import os
import time
Expand Down Expand Up @@ -54,12 +54,12 @@

def create_sample_json_record(index):
"""
Creates a sample AirQuality record as a JSON string.
Creates a sample AirQuality record as a dictionary.

With JSON mode, records are plain JSON strings that match your schema.
With JSON mode, records are Python dicts that match your schema.
The SDK handles JSON serialization internally.
"""
record_dict = {"device_name": f"sensor-{index % 10}", "temp": 20 + (index % 15), "humidity": 50 + (index % 40)}
return json.dumps(record_dict)
return {"device_name": f"sensor-{index % 10}", "temp": 20 + (index % 15), "humidity": 50 + (index % 40)}


class CustomHeadersProvider(HeadersProvider):
Expand Down Expand Up @@ -150,7 +150,7 @@ def main():

try:
for i in range(NUM_RECORDS):
# Create a JSON record (as a string)
# Create a JSON record (as a dict)
json_record = create_sample_json_record(i)

# Ingest and wait for acknowledgment
Expand Down