Skip to content

Commit 6afbd65

Browse files
Add HeadersProvider which provides needed gRPC connection headers (#5)
* Initial commit Signed-off-by: teodordelibasic-db <[email protected]> * Add header_provider.py Signed-off-by: teodordelibasic-db <[email protected]> * Update NEXT_CHANGELOG.md Signed-off-by: teodordelibasic-db <[email protected]> * Address comments Signed-off-by: teodordelibasic-db <[email protected]> --------- Signed-off-by: teodordelibasic-db <[email protected]>
1 parent c44e98c commit 6afbd65

File tree

11 files changed

+555
-65
lines changed

11 files changed

+555
-65
lines changed

NEXT_CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,20 @@
44

55
### New Features and Improvements
66

7+
- Added `HeadersProvider` abstraction for flexible authentication strategies
8+
- Implemented `OAuthHeadersProvider` for OAuth 2.0 Client Credentials flow (default authentication method used by `create_stream()`)
9+
710
### Bug Fixes
811

912
### Documentation
1013

14+
- Added Azure workspace and endpoint URL examples
15+
1116
### Internal Changes
1217

1318
### API Changes
19+
20+
- Added `HeadersProvider` abstract base class for custom header strategies
21+
- Added `OAuthHeadersProvider` class for OAuth 2.0 authentication with Databricks OIDC endpoint
22+
- Added `create_stream_with_headers_provider` method to `ZerobusSdk` and `aio.ZerobusSdk` for custom authentication header providers
23+
- **Note**: Custom headers providers must include both `authorization` and `x-databricks-zerobus-table-name` headers

README.md

Lines changed: 163 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ The Databricks Zerobus Ingest SDK for Python provides a high-performance client
2424
- [Usage Examples](#usage-examples)
2525
- [Blocking Ingestion](#blocking-ingestion)
2626
- [Non-Blocking Ingestion](#non-blocking-ingestion)
27+
- [Authentication](#authentication)
2728
- [Configuration](#configuration)
2829
- [Error Handling](#error-handling)
2930
- [API Reference](#api-reference)
@@ -479,6 +480,80 @@ async def main():
479480
asyncio.run(main())
480481
```
481482

483+
## Authentication
484+
485+
The SDK uses OAuth 2.0 Client Credentials for authentication:
486+
487+
```python
488+
from zerobus.sdk.sync import ZerobusSdk
489+
from zerobus.sdk.shared import TableProperties
490+
import record_pb2
491+
492+
sdk = ZerobusSdk(server_endpoint, workspace_url)
493+
table_properties = TableProperties(table_name, record_pb2.AirQuality.DESCRIPTOR)
494+
495+
# Create stream with OAuth authentication
496+
stream = sdk.create_stream(client_id, client_secret, table_properties)
497+
```
498+
499+
The SDK automatically fetches access tokens and includes these headers:
500+
- `"authorization": "Bearer <oauth_token>"` - Obtained via OAuth 2.0 Client Credentials flow
501+
- `"x-databricks-zerobus-table-name": "<table_name>"` - The fully qualified table name
502+
503+
### Advanced: Custom Headers
504+
505+
For advanced use cases where you need to provide custom headers (e.g., for future authentication methods or additional metadata), you can implement a custom `HeadersProvider`:
506+
507+
```python
508+
from zerobus.sdk.shared.headers_provider import HeadersProvider
509+
510+
class CustomHeadersProvider(HeadersProvider):
511+
"""
512+
Custom headers provider for advanced use cases.
513+
514+
Note: Currently, OAuth 2.0 Client Credentials (via create_stream())
515+
is the standard authentication method. Use this only if you have
516+
specific requirements for custom headers.
517+
518+
IMPORTANT: Custom headers providers MUST include both:
519+
- "authorization": "Bearer <token>" header for authentication
520+
- "x-databricks-zerobus-table-name": "<table_name>" header for routing
521+
"""
522+
523+
def __init__(self, token: str, table_name: str):
524+
self.token = token
525+
self.table_name = table_name
526+
527+
def get_headers(self):
528+
"""
529+
Return headers for gRPC metadata.
530+
531+
Returns:
532+
List of (header_name, header_value) tuples
533+
"""
534+
return [
535+
("authorization", f"Bearer {self.token}"),
536+
("x-databricks-zerobus-table-name", self.table_name),
537+
("x-custom-header", "custom-value"), # Optional: additional custom headers
538+
]
539+
540+
# Use the custom provider
541+
custom_provider = CustomHeadersProvider(
542+
token="your-token",
543+
table_name=table_properties.table_name
544+
)
545+
stream = sdk.create_stream_with_headers_provider(
546+
custom_provider,
547+
table_properties
548+
)
549+
```
550+
551+
**Potential use cases for custom headers:**
552+
- Integration with existing token management systems
553+
- Additional metadata headers for request tracking
554+
- Future authentication methods
555+
- Special routing or service mesh requirements
556+
482557
## Configuration
483558

484559
### Stream Configuration Options
@@ -565,7 +640,22 @@ def create_stream(
565640
options: StreamConfigurationOptions = None
566641
) -> ZerobusStream
567642
```
568-
Creates a new ingestion stream. Returns a `ZerobusStream` instance.
643+
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
644+
645+
Automatically includes these headers:
646+
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0 Client Credentials flow)
647+
- `"x-databricks-zerobus-table-name": "<table_name>"`
648+
649+
Returns a `ZerobusStream` instance.
650+
651+
```python
652+
def create_stream_with_headers_provider(
653+
headers_provider: HeadersProvider,
654+
table_properties: TableProperties,
655+
options: StreamConfigurationOptions = None
656+
) -> ZerobusStream
657+
```
658+
Creates a new ingestion stream using a custom headers provider. For advanced use cases only where custom headers are required. Returns a `ZerobusStream` instance.
569659

570660
---
571661

@@ -586,7 +676,22 @@ async def create_stream(
586676
options: StreamConfigurationOptions = None
587677
) -> ZerobusStream
588678
```
589-
Creates a new ingestion stream. Returns a `ZerobusStream` instance.
679+
Creates a new ingestion stream using OAuth 2.0 Client Credentials authentication.
680+
681+
Automatically includes these headers:
682+
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0 Client Credentials flow)
683+
- `"x-databricks-zerobus-table-name": "<table_name>"`
684+
685+
Returns a `ZerobusStream` instance.
686+
687+
```python
688+
async def create_stream_with_headers_provider(
689+
headers_provider: HeadersProvider,
690+
table_properties: TableProperties,
691+
options: StreamConfigurationOptions = None
692+
) -> ZerobusStream
693+
```
694+
Creates a new ingestion stream using a custom headers provider. For advanced use cases only where custom headers are required. Returns a `ZerobusStream` instance.
590695

591696
---
592697

@@ -683,6 +788,62 @@ Returns the protobuf message descriptor.
683788

684789
---
685790

791+
### HeadersProvider
792+
793+
Abstract base class for providing headers to gRPC streams. For advanced use cases only.
794+
795+
**Abstract Method:**
796+
797+
```python
798+
@abstractmethod
799+
def get_headers(self) -> List[Tuple[str, str]]
800+
```
801+
Returns headers for gRPC metadata as a list of (header_name, header_value) tuples.
802+
803+
**Built-in Implementation:**
804+
805+
#### OAuthHeadersProvider
806+
807+
OAuth 2.0 Client Credentials flow headers provider (used internally by `create_stream()`).
808+
809+
```python
810+
OAuthHeadersProvider(
811+
workspace_id: str,
812+
workspace_url: str,
813+
table_name: str,
814+
client_id: str,
815+
client_secret: str
816+
)
817+
```
818+
819+
Returns these headers:
820+
- `"authorization": "Bearer <oauth_token>"` (fetched via OAuth 2.0)
821+
- `"x-databricks-zerobus-table-name": "<table_name>"`
822+
823+
**Custom Implementation (Advanced):**
824+
825+
For advanced use cases requiring custom headers, extend the `HeadersProvider` class:
826+
827+
```python
828+
from zerobus.sdk.shared.headers_provider import HeadersProvider
829+
830+
class MyCustomProvider(HeadersProvider):
831+
def get_headers(self):
832+
return [
833+
("authorization", "Bearer my-token"), # Required
834+
("x-databricks-zerobus-table-name", "catalog.schema.table"), # Required
835+
("x-custom-header", "value"), # Optional: additional custom headers
836+
]
837+
```
838+
839+
**Important:** Custom headers providers MUST include both required headers:
840+
- `"authorization"`: Bearer token for authentication
841+
- `"x-databricks-zerobus-table-name"`: Fully qualified table name for routing
842+
843+
Note: Most users should use `create_stream()` with OAuth credentials rather than implementing a custom provider.
844+
845+
---
846+
686847
### StreamConfigurationOptions
687848

688849
Configuration options for stream behavior.

examples/async_example.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
Use Case: Best for applications already using asyncio, async web frameworks (FastAPI, aiohttp),
77
or when integrating ingestion with other asynchronous operations in an event loop.
88
9+
Authentication:
10+
- Uses OAuth 2.0 Client Credentials (standard method)
11+
- Includes example of custom headers provider for advanced use cases
12+
913
Note: Both sync and async APIs provide the same throughput and durability guarantees.
1014
Choose based on your application's architecture, not performance requirements.
1115
"""
@@ -20,6 +24,7 @@
2024

2125
from zerobus.sdk.aio import ZerobusSdk
2226
from zerobus.sdk.shared import StreamConfigurationOptions, TableProperties
27+
from zerobus.sdk.shared.headers_provider import HeadersProvider
2328

2429
# Configure logging
2530
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
@@ -56,6 +61,31 @@ def create_sample_record(index):
5661
return record_pb2.AirQuality(device_name=f"sensor-{index % 10}", temp=20 + (index % 15), humidity=50 + (index % 40))
5762

5863

64+
class CustomHeadersProvider(HeadersProvider):
65+
"""
66+
Example custom headers provider for advanced use cases.
67+
68+
Note: OAuth 2.0 Client Credentials (via create_stream()) is the standard
69+
authentication method. Use this only if you have specific requirements
70+
for custom headers (e.g., custom metadata, existing token management, etc.).
71+
"""
72+
73+
def __init__(self, custom_token: str):
74+
self.custom_token = custom_token
75+
76+
def get_headers(self):
77+
"""
78+
Return custom headers for gRPC metadata.
79+
80+
Returns:
81+
List of (header_name, header_value) tuples
82+
"""
83+
return [
84+
("authorization", f"Bearer {self.custom_token}"),
85+
("x-custom-header", "custom-value"),
86+
]
87+
88+
5989
def create_ack_callback():
6090
"""
6191
Creates an acknowledgment callback that logs progress.
@@ -111,8 +141,19 @@ async def main():
111141
table_properties = TableProperties(TABLE_NAME, record_pb2.AirQuality.DESCRIPTOR)
112142
logger.info(f"✓ Table properties configured for: {TABLE_NAME}")
113143

114-
# Step 4: Create a stream
144+
# Step 4: Create a stream with OAuth 2.0 authentication
145+
#
146+
# Standard method: OAuth 2.0 Client Credentials
147+
# The SDK automatically includes these headers:
148+
# - "authorization": "Bearer <oauth_token>" (fetched via OAuth 2.0 Client Credentials flow)
149+
# - "x-databricks-zerobus-table-name": "<table_name>"
115150
stream = await sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
151+
152+
# Advanced: Custom headers provider (for special use cases only)
153+
# Uncomment to use custom headers instead of OAuth:
154+
# custom_provider = CustomHeadersProvider(custom_token="your-custom-token")
155+
# stream = await sdk.create_stream_with_headers_provider(custom_provider, table_properties, options)
156+
116157
logger.info(f"✓ Stream created: {stream.stream_id}")
117158

118159
# Step 5: Ingest records asynchronously

examples/sync_example.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
66
Use Case: Best for applications that don't use asyncio or prefer blocking I/O patterns.
77
8+
Authentication:
9+
- Uses OAuth 2.0 Client Credentials (standard method)
10+
- Includes example of custom headers provider for advanced use cases
11+
812
Note: Both sync and async APIs provide the same throughput and durability guarantees.
913
Choose based on your application's architecture, not performance requirements.
1014
"""
@@ -17,6 +21,7 @@
1721
import record_pb2
1822

1923
from zerobus.sdk.shared import StreamConfigurationOptions, TableProperties
24+
from zerobus.sdk.shared.headers_provider import HeadersProvider
2025
from zerobus.sdk.sync import ZerobusSdk
2126

2227
# Configure logging
@@ -54,6 +59,31 @@ def create_sample_record(index):
5459
return record_pb2.AirQuality(device_name=f"sensor-{index % 10}", temp=20 + (index % 15), humidity=50 + (index % 40))
5560

5661

62+
class CustomHeadersProvider(HeadersProvider):
63+
"""
64+
Example custom headers provider for advanced use cases.
65+
66+
Note: OAuth 2.0 Client Credentials (via create_stream()) is the standard
67+
authentication method. Use this only if you have specific requirements
68+
for custom headers (e.g., custom metadata, existing token management, etc.).
69+
"""
70+
71+
def __init__(self, custom_token: str):
72+
self.custom_token = custom_token
73+
74+
def get_headers(self):
75+
"""
76+
Return custom headers for gRPC metadata.
77+
78+
Returns:
79+
List of (header_name, header_value) tuples
80+
"""
81+
return [
82+
("authorization", f"Bearer {self.custom_token}"),
83+
("x-custom-header", "custom-value"),
84+
]
85+
86+
5787
def main():
5888
print("Starting synchronous ingestion example...")
5989
print("=" * 60)
@@ -93,8 +123,19 @@ def main():
93123
)
94124
logger.info("✓ Stream configuration created")
95125

96-
# Step 4: Create a stream
126+
# Step 4: Create a stream with OAuth 2.0 authentication
127+
#
128+
# Standard method: OAuth 2.0 Client Credentials
129+
# The SDK automatically includes these headers:
130+
# - "authorization": "Bearer <oauth_token>" (fetched via OAuth 2.0 Client Credentials flow)
131+
# - "x-databricks-zerobus-table-name": "<table_name>"
97132
stream = sdk.create_stream(CLIENT_ID, CLIENT_SECRET, table_properties, options)
133+
134+
# Advanced: Custom headers provider (for special use cases only)
135+
# Uncomment to use custom headers instead of OAuth:
136+
# custom_provider = CustomHeadersProvider(custom_token="your-custom-token")
137+
# stream = sdk.create_stream_with_headers_provider(custom_provider, table_properties, options)
138+
98139
logger.info(f"✓ Stream created: {stream.stream_id}")
99140

100141
# Step 5: Ingest records synchronously

tests/mock_grpc.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ async def wrapper(test_instance, *args, **kwargs):
471471
# Run for async SDK
472472
logging.info(f"\nRunning test '{test_func.__name__}' for Async SDK")
473473
async_sdk_manager = SdkManager(ZerobusSdkAsync)
474-
with patch("zerobus.sdk.aio.zerobus_sdk.get_zerobus_token", return_value="mock_token"):
474+
with patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"):
475475
await test_func(test_instance, async_sdk_manager, *args, **kwargs)
476476
logging.info(f"Test '{test_func.__name__}' PASSED for Async SDK")
477477

@@ -482,7 +482,7 @@ async def wrapper(test_instance, *args, **kwargs):
482482
loop = asyncio.get_running_loop()
483483

484484
def run_sync_test_in_isolation():
485-
with patch("zerobus.sdk.sync.zerobus_sdk.get_zerobus_token", return_value="mock_token"):
485+
with patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"):
486486
asyncio.run(coro)
487487

488488
await loop.run_in_executor(None, run_sync_test_in_isolation)

tests/test_sdk.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def create_ephemeral_stream(generator, **kwargs):
5151

5252
with (
5353
patch("grpc.secure_channel", return_value=mock_channel),
54-
patch("zerobus.sdk.sync.zerobus_sdk.get_zerobus_token", return_value="mock_token"),
54+
patch("zerobus.sdk.shared.headers_provider.get_zerobus_token", return_value="mock_token"),
5555
):
5656
sdk_handle = ZerobusSdk(SERVER_ENDPOINT, unity_catalog_url="https://test.unity.catalog.url")
5757
try:

0 commit comments

Comments
 (0)