Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 166 additions & 15 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@ A more advanced example that uses Protocol Buffers for type-safe data serializat
- Better for production use cases
- More efficient binary encoding

### 3. Nested STRUCT Example (`nested_example_proto/`)
An advanced example demonstrating how to work with nested STRUCT types in Protocol Buffers.

- Shows how to handle nested STRUCT fields (STRUCT within STRUCT)
- Demonstrates proper proto file structure for nested messages
- Type-safe nested record creation
- Ideal for complex schemas with hierarchical data

## Common Features

Both examples demonstrate:
All examples demonstrate:
- Creating a stream with OAuth authentication
- Ingesting a single record
- Waiting for acknowledgment
Expand All @@ -33,7 +41,9 @@ Both examples demonstrate:

### 1. Create a Databricks Table

First, create a table in your Databricks workspace using the following SQL:
**For basic examples (`basic_example_json` and `basic_example_proto`):**

Create a table in your Databricks workspace using the following SQL:

```sql
CREATE TABLE catalog.schema.orders (
Expand All @@ -48,6 +58,45 @@ CREATE TABLE catalog.schema.orders (
);
```

**For nested example (`nested_example_proto`):**

Create a table with nested STRUCT fields:

```sql
CREATE TABLE catalog.schema.orders (
id INT,
product_name STRING,
quantity INT,
price DOUBLE,
status STRING,
created_at BIGINT,
updated_at BIGINT,
customer STRUCT<
name STRING,
email STRING,
address STRUCT<
street STRING,
city STRING,
state STRING,
zip_code STRING,
country STRING
>
>,
shipping STRUCT<
method STRING,
tracking_number STRING,
estimated_delivery BIGINT,
delivery_address STRUCT<
street STRING,
city STRING,
state STRING,
zip_code STRING,
country STRING
>
>
);
```

Replace `catalog.schema.orders` with your actual catalog, schema, and table name.

### 2. Set Up OAuth Service Principal
Expand All @@ -64,7 +113,7 @@ You'll need a Databricks service principal with OAuth credentials:

### 3. Configure Credentials

Both examples require the same credentials. Edit the `src/main.rs` file in your chosen example directory and update these constants:
All examples require the same credentials. Edit the `src/main.rs` file in your chosen example directory and update these constants:

```rust
const DATABRICKS_WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com";
Expand Down Expand Up @@ -192,6 +241,87 @@ Key features:

---

## Running the Nested STRUCT Example

The nested STRUCT example demonstrates how to work with complex nested data structures using Protocol Buffers.

### Navigate to the Nested Example

```bash
cd examples/nested_example_proto
```

### Run the Example

```bash
cargo run
```

**Expected output:**

```
Record with nested STRUCT fields acknowledged with offset Id: 0
Stream closed successfully
```

### Code Highlights

The nested STRUCT example shows how to create records with nested message types:

```rust
let ack_future = stream
.ingest_record(
TableOrders {
id: Some(1),
product_name: Some("Wireless Mouse".to_string()),
quantity: Some(2),
price: Some(25.99),
status: Some("pending".to_string()),
created_at: Some(chrono::Utc::now().timestamp()),
updated_at: Some(chrono::Utc::now().timestamp()),
// Nested Customer STRUCT
customer: Some(table_orders::Customer {
name: Some("Alice Smith".to_string()),
email: Some("[email protected]".to_string()),
// Nested Address STRUCT within Customer
address: Some(table_orders::Address {
street: Some("123 Main Street".to_string()),
city: Some("San Francisco".to_string()),
state: Some("CA".to_string()),
zip_code: Some("94102".to_string()),
country: Some("USA".to_string()),
}),
}),
// Nested Shipping STRUCT
shipping: Some(table_orders::Shipping {
method: Some("Standard Shipping".to_string()),
tracking_number: Some("TRACK123456789".to_string()),
estimated_delivery: Some(chrono::Utc::now().timestamp() + 86400 * 5),
delivery_address: Some(table_orders::Address {
street: Some("456 Oak Avenue".to_string()),
city: Some("Oakland".to_string()),
state: Some("CA".to_string()),
zip_code: Some("94601".to_string()),
country: Some("USA".to_string()),
}),
}),
}
.encode_to_vec(),
)
.await
.unwrap();
```

**Key features:**
- Nested message types are accessed via the `table_orders` module (e.g., `table_orders::Customer`)
- Supports multiple levels of nesting (STRUCT within STRUCT)
- All nested types are included in the proto descriptor automatically
- Type-safe nested record creation

**Important:** The proto file structure requires nested messages to be defined within the main message for the descriptor to be self-contained. The example demonstrates the correct structure.

---

## Adapting for Your Custom Table

### For JSON Example
Expand Down Expand Up @@ -304,7 +434,7 @@ let ack_future = stream.ingest_record(

## Common Code Patterns

Both examples follow the same general flow:
All examples follow the same general flow:

### 1. Configure Table Properties

Expand Down Expand Up @@ -453,20 +583,24 @@ for i in 0..100 {
stream.flush().await?;
```

## Choosing Between JSON and Protocol Buffers
## Choosing Between Examples

| Feature | JSON Example | Protocol Buffers Example |
|---------|-------------|-------------------------|
| **Setup Complexity** | Simple - no schema files needed | Works out of the box (schema files included) |
| **Type Safety** | Runtime validation only | Compile-time type checking |
| **Performance** | Text-based encoding | Efficient binary encoding |
| **Flexibility** | Easy to modify records on-the-fly | Requires regenerating schema for custom tables |
| **Best For** | Prototyping, simple use cases | Production, high-throughput scenarios |
| **Learning Curve** | Low | Moderate |
| Feature | JSON Example | Basic Proto Example | Nested Proto Example |
|---------|-------------|---------------------|---------------------|
| **Setup Complexity** | Simple - no schema files needed | Works out of the box (schema files included) | Works out of the box (schema files included) |
| **Type Safety** | Runtime validation only | Compile-time type checking | Compile-time type checking |
| **Performance** | Text-based encoding | Efficient binary encoding | Efficient binary encoding |
| **Flexibility** | Easy to modify records on-the-fly | Requires regenerating schema for custom tables | Requires regenerating schema for custom tables |
| **Nested STRUCT Support** | ✅ (via JSON objects) | ❌ | ✅ (via nested messages) |
| **Best For** | Prototyping, simple use cases | Production, flat schemas | Production, complex nested schemas |
| **Learning Curve** | Low | Moderate | Advanced |

**Recommendation:** Start with the JSON example for quick prototyping, then migrate to Protocol Buffers for production deployments where type safety and performance matter.
**Recommendation:**
- Start with the **JSON example** for quick prototyping
- Use the **basic Protocol Buffers example** for production deployments with flat schemas
- Use the **nested Protocol Buffers example** when your table schema includes nested STRUCT types

**Note:** Both examples work immediately out of the box. Schema generation is only required when customizing the Protocol Buffers example for your own table.
**Note:** All examples work immediately out of the box. Schema generation is only required when customizing the Protocol Buffers examples for your own table.

## Troubleshooting

Expand Down Expand Up @@ -505,6 +639,23 @@ stream.flush().await?;

**Solution:** Verify your JSON structure matches the Databricks table schema exactly.

### Error: "Proto not self-contained" (Nested Proto example)

**Possible causes:**
- Nested message types are defined as top-level messages instead of nested within the main message
- Missing nested message types in the proto descriptor

**Solution:** Ensure all nested message types (like `Customer`, `Address`, `Shipping`) are defined within the main message in your `.proto` file. See the `nested_example_proto/output/orders.proto` file for the correct structure.

### Error: Schema validation errors (Nested Proto example)

**Possible causes:**
- Table schema doesn't match the proto definition
- Missing STRUCT fields in the table
- Field type mismatches

**Solution:** Verify your table schema includes all nested STRUCT fields matching the proto definition. Use the SQL CREATE TABLE statement provided in the example's `main.rs` file as a reference.

## Next Steps

- Try ingesting larger batches of records
Expand Down
21 changes: 21 additions & 0 deletions examples/nested_example_proto/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "nested_example_proto"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
databricks-zerobus-ingest-sdk = { path = "../../sdk" }
tonic-build = "0.13.1"
prost = "0.13.3"
prost-build = "0.12"
prost-reflect = "0.14.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1.16"
tonic = { version = "0.12.3", features = ["tls"] }
tracing = "0.1.41"
chrono = "0.4"

[build-dependencies]
tonic-build = "0.13.1"
protoc-bin-vendored = "3"
7 changes: 7 additions & 0 deletions examples/nested_example_proto/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.out_dir("output")
.file_descriptor_set_path("output/orders.descriptor")
.compile_protos(&["output/orders.proto"], &["output"])?;
Ok(())
}
Binary file not shown.
36 changes: 36 additions & 0 deletions examples/nested_example_proto/output/orders.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
syntax = "proto2";

package orders;

message table_Orders {
optional int32 id = 1;
optional string product_name = 2;
optional int32 quantity = 3;
optional double price = 4;
optional string status = 5;
optional int64 created_at = 6;
optional int64 updated_at = 7;
optional Customer customer = 8;
optional Shipping shipping = 9;

message Customer {
optional string name = 1;
optional string email = 2;
optional Address address = 3;
}

message Address {
optional string street = 1;
optional string city = 2;
optional string state = 3;
optional string zip_code = 4;
optional string country = 5;
}

message Shipping {
optional string method = 1;
optional string tracking_number = 2;
optional int64 estimated_delivery = 3;
optional Address delivery_address = 4;
}
}
58 changes: 58 additions & 0 deletions examples/nested_example_proto/output/orders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// This file is @generated by prost-build.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableOrders {
#[prost(int32, optional, tag = "1")]
pub id: ::core::option::Option<i32>,
#[prost(string, optional, tag = "2")]
pub product_name: ::core::option::Option<::prost::alloc::string::String>,
#[prost(int32, optional, tag = "3")]
pub quantity: ::core::option::Option<i32>,
#[prost(double, optional, tag = "4")]
pub price: ::core::option::Option<f64>,
#[prost(string, optional, tag = "5")]
pub status: ::core::option::Option<::prost::alloc::string::String>,
#[prost(int64, optional, tag = "6")]
pub created_at: ::core::option::Option<i64>,
#[prost(int64, optional, tag = "7")]
pub updated_at: ::core::option::Option<i64>,
#[prost(message, optional, tag = "8")]
pub customer: ::core::option::Option<table_orders::Customer>,
#[prost(message, optional, tag = "9")]
pub shipping: ::core::option::Option<table_orders::Shipping>,
}
/// Nested message and enum types in `table_Orders`.
pub mod table_orders {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Customer {
#[prost(string, optional, tag = "1")]
pub name: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "2")]
pub email: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, optional, tag = "3")]
pub address: ::core::option::Option<Address>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Address {
#[prost(string, optional, tag = "1")]
pub street: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "2")]
pub city: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "3")]
pub state: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "4")]
pub zip_code: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "5")]
pub country: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Shipping {
#[prost(string, optional, tag = "1")]
pub method: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag = "2")]
pub tracking_number: ::core::option::Option<::prost::alloc::string::String>,
#[prost(int64, optional, tag = "3")]
pub estimated_delivery: ::core::option::Option<i64>,
#[prost(message, optional, tag = "4")]
pub delivery_address: ::core::option::Option<Address>,
}
}
Loading