diff --git a/examples/README.md b/examples/README.md index 8058904..822bfe7 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,9 +20,17 @@ A more advanced example that uses Protocol Buffers for type-safe data serializat - Better for production use cases - More efficient binary encoding +### 3. Nested STRUCT Example (`nested_example_proto/`) +An advanced example demonstrating how to work with nested STRUCT types in Protocol Buffers. + +- Shows how to handle nested STRUCT fields (STRUCT within STRUCT) +- Demonstrates proper proto file structure for nested messages +- Type-safe nested record creation +- Ideal for complex schemas with hierarchical data + ## Common Features -Both examples demonstrate: +All examples demonstrate: - Creating a stream with OAuth authentication - Ingesting a single record - Waiting for acknowledgment @@ -33,7 +41,9 @@ Both examples demonstrate: ### 1. Create a Databricks Table -First, create a table in your Databricks workspace using the following SQL: +**For basic examples (`basic_example_json` and `basic_example_proto`):** + +Create a table in your Databricks workspace using the following SQL: ```sql CREATE TABLE catalog.schema.orders ( @@ -48,6 +58,45 @@ CREATE TABLE catalog.schema.orders ( ); ``` +**For nested example (`nested_example_proto`):** + +Create a table with nested STRUCT fields: + +```sql +CREATE TABLE catalog.schema.orders ( + id INT, + product_name STRING, + quantity INT, + price DOUBLE, + status STRING, + created_at BIGINT, + updated_at BIGINT, + customer STRUCT< + name STRING, + email STRING, + address STRUCT< + street STRING, + city STRING, + state STRING, + zip_code STRING, + country STRING + > + >, + shipping STRUCT< + method STRING, + tracking_number STRING, + estimated_delivery BIGINT, + delivery_address STRUCT< + street STRING, + city STRING, + state STRING, + zip_code STRING, + country STRING + > + > +); +``` + Replace `catalog.schema.orders` with your actual catalog, schema, and table name. ### 2. Set Up OAuth Service Principal @@ -64,7 +113,7 @@ You'll need a Databricks service principal with OAuth credentials: ### 3. Configure Credentials -Both examples require the same credentials. Edit the `src/main.rs` file in your chosen example directory and update these constants: +All examples require the same credentials. Edit the `src/main.rs` file in your chosen example directory and update these constants: ```rust const DATABRICKS_WORKSPACE_URL: &str = "https://your-workspace.cloud.databricks.com"; @@ -192,6 +241,87 @@ Key features: --- +## Running the Nested STRUCT Example + +The nested STRUCT example demonstrates how to work with complex nested data structures using Protocol Buffers. + +### Navigate to the Nested Example + +```bash +cd examples/nested_example_proto +``` + +### Run the Example + +```bash +cargo run +``` + +**Expected output:** + +``` +Record with nested STRUCT fields acknowledged with offset Id: 0 +Stream closed successfully +``` + +### Code Highlights + +The nested STRUCT example shows how to create records with nested message types: + +```rust +let ack_future = stream + .ingest_record( + TableOrders { + id: Some(1), + product_name: Some("Wireless Mouse".to_string()), + quantity: Some(2), + price: Some(25.99), + status: Some("pending".to_string()), + created_at: Some(chrono::Utc::now().timestamp()), + updated_at: Some(chrono::Utc::now().timestamp()), + // Nested Customer STRUCT + customer: Some(table_orders::Customer { + name: Some("Alice Smith".to_string()), + email: Some("alice.smith@example.com".to_string()), + // Nested Address STRUCT within Customer + address: Some(table_orders::Address { + street: Some("123 Main Street".to_string()), + city: Some("San Francisco".to_string()), + state: Some("CA".to_string()), + zip_code: Some("94102".to_string()), + country: Some("USA".to_string()), + }), + }), + // Nested Shipping STRUCT + shipping: Some(table_orders::Shipping { + method: Some("Standard Shipping".to_string()), + tracking_number: Some("TRACK123456789".to_string()), + estimated_delivery: Some(chrono::Utc::now().timestamp() + 86400 * 5), + delivery_address: Some(table_orders::Address { + street: Some("456 Oak Avenue".to_string()), + city: Some("Oakland".to_string()), + state: Some("CA".to_string()), + zip_code: Some("94601".to_string()), + country: Some("USA".to_string()), + }), + }), + } + .encode_to_vec(), + ) + .await + .unwrap(); +``` + +**Key features:** +- Nested message types are accessed via the `table_orders` module (e.g., `table_orders::Customer`) +- Supports multiple levels of nesting (STRUCT within STRUCT) +- All nested types are included in the proto descriptor automatically +- Type-safe nested record creation + +**Important:** The proto file structure requires nested messages to be defined within the main message for the descriptor to be self-contained. The example demonstrates the correct structure. + +--- + ## Adapting for Your Custom Table ### For JSON Example @@ -304,7 +434,7 @@ let ack_future = stream.ingest_record( ## Common Code Patterns -Both examples follow the same general flow: +All examples follow the same general flow: ### 1. Configure Table Properties @@ -453,20 +583,24 @@ for i in 0..100 { stream.flush().await?; ``` -## Choosing Between JSON and Protocol Buffers +## Choosing Between Examples -| Feature | JSON Example | Protocol Buffers Example | -|---------|-------------|-------------------------| -| **Setup Complexity** | Simple - no schema files needed | Works out of the box (schema files included) | -| **Type Safety** | Runtime validation only | Compile-time type checking | -| **Performance** | Text-based encoding | Efficient binary encoding | -| **Flexibility** | Easy to modify records on-the-fly | Requires regenerating schema for custom tables | -| **Best For** | Prototyping, simple use cases | Production, high-throughput scenarios | -| **Learning Curve** | Low | Moderate | +| Feature | JSON Example | Basic Proto Example | Nested Proto Example | +|---------|-------------|---------------------|---------------------| +| **Setup Complexity** | Simple - no schema files needed | Works out of the box (schema files included) | Works out of the box (schema files included) | +| **Type Safety** | Runtime validation only | Compile-time type checking | Compile-time type checking | +| **Performance** | Text-based encoding | Efficient binary encoding | Efficient binary encoding | +| **Flexibility** | Easy to modify records on-the-fly | Requires regenerating schema for custom tables | Requires regenerating schema for custom tables | +| **Nested STRUCT Support** | ✅ (via JSON objects) | ❌ | ✅ (via nested messages) | +| **Best For** | Prototyping, simple use cases | Production, flat schemas | Production, complex nested schemas | +| **Learning Curve** | Low | Moderate | Advanced | -**Recommendation:** Start with the JSON example for quick prototyping, then migrate to Protocol Buffers for production deployments where type safety and performance matter. +**Recommendation:** +- Start with the **JSON example** for quick prototyping +- Use the **basic Protocol Buffers example** for production deployments with flat schemas +- Use the **nested Protocol Buffers example** when your table schema includes nested STRUCT types -**Note:** Both examples work immediately out of the box. Schema generation is only required when customizing the Protocol Buffers example for your own table. +**Note:** All examples work immediately out of the box. Schema generation is only required when customizing the Protocol Buffers examples for your own table. ## Troubleshooting @@ -505,6 +639,23 @@ stream.flush().await?; **Solution:** Verify your JSON structure matches the Databricks table schema exactly. +### Error: "Proto not self-contained" (Nested Proto example) + +**Possible causes:** +- Nested message types are defined as top-level messages instead of nested within the main message +- Missing nested message types in the proto descriptor + +**Solution:** Ensure all nested message types (like `Customer`, `Address`, `Shipping`) are defined within the main message in your `.proto` file. See the `nested_example_proto/output/orders.proto` file for the correct structure. + +### Error: Schema validation errors (Nested Proto example) + +**Possible causes:** +- Table schema doesn't match the proto definition +- Missing STRUCT fields in the table +- Field type mismatches + +**Solution:** Verify your table schema includes all nested STRUCT fields matching the proto definition. Use the SQL CREATE TABLE statement provided in the example's `main.rs` file as a reference. + ## Next Steps - Try ingesting larger batches of records diff --git a/examples/nested_example_proto/Cargo.toml b/examples/nested_example_proto/Cargo.toml new file mode 100644 index 0000000..35a3e14 --- /dev/null +++ b/examples/nested_example_proto/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "nested_example_proto" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +databricks-zerobus-ingest-sdk = { path = "../../sdk" } +tonic-build = "0.13.1" +prost = "0.13.3" +prost-build = "0.12" +prost-reflect = "0.14.2" +tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio-stream = "0.1.16" +tonic = { version = "0.12.3", features = ["tls"] } +tracing = "0.1.41" +chrono = "0.4" + +[build-dependencies] +tonic-build = "0.13.1" +protoc-bin-vendored = "3" diff --git a/examples/nested_example_proto/build.rs b/examples/nested_example_proto/build.rs new file mode 100644 index 0000000..3df70f3 --- /dev/null +++ b/examples/nested_example_proto/build.rs @@ -0,0 +1,7 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .out_dir("output") + .file_descriptor_set_path("output/orders.descriptor") + .compile_protos(&["output/orders.proto"], &["output"])?; + Ok(()) +} diff --git a/examples/nested_example_proto/output/orders.descriptor b/examples/nested_example_proto/output/orders.descriptor new file mode 100644 index 0000000..58f7ad2 Binary files /dev/null and b/examples/nested_example_proto/output/orders.descriptor differ diff --git a/examples/nested_example_proto/output/orders.proto b/examples/nested_example_proto/output/orders.proto new file mode 100644 index 0000000..b61a18b --- /dev/null +++ b/examples/nested_example_proto/output/orders.proto @@ -0,0 +1,36 @@ +syntax = "proto2"; + +package orders; + +message table_Orders { + optional int32 id = 1; + optional string product_name = 2; + optional int32 quantity = 3; + optional double price = 4; + optional string status = 5; + optional int64 created_at = 6; + optional int64 updated_at = 7; + optional Customer customer = 8; + optional Shipping shipping = 9; + + message Customer { + optional string name = 1; + optional string email = 2; + optional Address address = 3; + } + + message Address { + optional string street = 1; + optional string city = 2; + optional string state = 3; + optional string zip_code = 4; + optional string country = 5; + } + + message Shipping { + optional string method = 1; + optional string tracking_number = 2; + optional int64 estimated_delivery = 3; + optional Address delivery_address = 4; + } +} diff --git a/examples/nested_example_proto/output/orders.rs b/examples/nested_example_proto/output/orders.rs new file mode 100644 index 0000000..2d38a69 --- /dev/null +++ b/examples/nested_example_proto/output/orders.rs @@ -0,0 +1,58 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TableOrders { + #[prost(int32, optional, tag = "1")] + pub id: ::core::option::Option, + #[prost(string, optional, tag = "2")] + pub product_name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(int32, optional, tag = "3")] + pub quantity: ::core::option::Option, + #[prost(double, optional, tag = "4")] + pub price: ::core::option::Option, + #[prost(string, optional, tag = "5")] + pub status: ::core::option::Option<::prost::alloc::string::String>, + #[prost(int64, optional, tag = "6")] + pub created_at: ::core::option::Option, + #[prost(int64, optional, tag = "7")] + pub updated_at: ::core::option::Option, + #[prost(message, optional, tag = "8")] + pub customer: ::core::option::Option, + #[prost(message, optional, tag = "9")] + pub shipping: ::core::option::Option, +} +/// Nested message and enum types in `table_Orders`. +pub mod table_orders { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Customer { + #[prost(string, optional, tag = "1")] + pub name: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "2")] + pub email: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, optional, tag = "3")] + pub address: ::core::option::Option
, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Address { + #[prost(string, optional, tag = "1")] + pub street: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "2")] + pub city: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "3")] + pub state: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "4")] + pub zip_code: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "5")] + pub country: ::core::option::Option<::prost::alloc::string::String>, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Shipping { + #[prost(string, optional, tag = "1")] + pub method: ::core::option::Option<::prost::alloc::string::String>, + #[prost(string, optional, tag = "2")] + pub tracking_number: ::core::option::Option<::prost::alloc::string::String>, + #[prost(int64, optional, tag = "3")] + pub estimated_delivery: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub delivery_address: ::core::option::Option
, + } +} diff --git a/examples/nested_example_proto/src/main.rs b/examples/nested_example_proto/src/main.rs new file mode 100644 index 0000000..cadc511 --- /dev/null +++ b/examples/nested_example_proto/src/main.rs @@ -0,0 +1,164 @@ +use std::error::Error; +use std::fs; + +use prost::Message; +use prost_reflect::prost_types; + +use databricks_zerobus_ingest_sdk::{StreamConfigurationOptions, TableProperties, ZerobusSdk}; +pub mod orders { + include!("../output/orders.rs"); +} +use crate::orders::{table_orders, TableOrders}; + +// Update this to match your table with nested STRUCT fields +// First, create a table with the following SQL: +// +// CREATE TABLE catalog.schema.orders ( +// id INT, +// product_name STRING, +// quantity INT, +// price DOUBLE, +// status STRING, +// created_at BIGINT, +// updated_at BIGINT, +// customer STRUCT< +// name STRING, +// email STRING, +// address STRUCT< +// street STRING, +// city STRING, +// state STRING, +// zip_code STRING, +// country STRING +// > +// >, +// shipping STRUCT< +// method STRING, +// tracking_number STRING, +// estimated_delivery BIGINT, +// delivery_address STRUCT< +// street STRING, +// city STRING, +// state STRING, +// zip_code STRING, +// country STRING +// > +// > +// ); + +// Change constants to match your data. +const TABLE_NAME: &str = ""; +const DATABRICKS_CLIENT_ID: &str = ""; +const DATABRICKS_CLIENT_SECRET: &str = ""; + +// Uncomment the appropriate lines for your cloud. + +// For AWS: +const DATABRICKS_WORKSPACE_URL: &str = "https://.cloud.databricks.com"; +const SERVER_ENDPOINT: &str = ".zerobus..cloud.databricks.com"; + +// For Azure: +// const DATABRICKS_WORKSPACE_URL: &str = "https://.azuredatabricks.net"; +// const SERVER_ENDPOINT: &str = ".zerobus..azuredatabricks.net"; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let descriptor_proto = + load_descriptor_proto("output/orders.descriptor", "orders.proto", "table_Orders"); + let table_properties = TableProperties { + table_name: TABLE_NAME.to_string(), + descriptor_proto: Some(descriptor_proto), + }; + let stream_configuration_options = StreamConfigurationOptions { + max_inflight_records: 100, + ..Default::default() + }; + let sdk_handle = ZerobusSdk::new( + SERVER_ENDPOINT.to_string(), + DATABRICKS_WORKSPACE_URL.to_string(), + )?; + + let mut stream = sdk_handle + .create_stream( + table_properties.clone(), + DATABRICKS_CLIENT_ID.to_string(), + DATABRICKS_CLIENT_SECRET.to_string(), + Some(stream_configuration_options), + ) + .await + .expect("Failed to create a stream."); + + // Example with nested STRUCT fields + // This demonstrates how to create records with nested Customer, Address, and Shipping structures + let ack_future = stream + .ingest_record( + TableOrders { + id: Some(1), + product_name: Some("Wireless Mouse".to_string()), + quantity: Some(2), + price: Some(25.99), + status: Some("pending".to_string()), + created_at: Some(chrono::Utc::now().timestamp()), + updated_at: Some(chrono::Utc::now().timestamp()), + // Nested Customer STRUCT + customer: Some(table_orders::Customer { + name: Some("Alice Smith".to_string()), + email: Some("alice.smith@example.com".to_string()), + // Nested Address STRUCT within Customer + address: Some(table_orders::Address { + street: Some("123 Main Street".to_string()), + city: Some("San Francisco".to_string()), + state: Some("CA".to_string()), + zip_code: Some("94102".to_string()), + country: Some("USA".to_string()), + }), + }), + // Nested Shipping STRUCT + shipping: Some(table_orders::Shipping { + method: Some("Standard Shipping".to_string()), + tracking_number: Some("TRACK123456789".to_string()), + estimated_delivery: Some(chrono::Utc::now().timestamp() + 86400 * 5), // 5 days from now + // Nested Address STRUCT within Shipping (different address) + delivery_address: Some(table_orders::Address { + street: Some("456 Oak Avenue".to_string()), + city: Some("Oakland".to_string()), + state: Some("CA".to_string()), + zip_code: Some("94601".to_string()), + country: Some("USA".to_string()), + }), + }), + } + .encode_to_vec(), + ) + .await + .unwrap(); + + let _ack = ack_future.await.unwrap(); + println!("Record with nested STRUCT fields acknowledged with offset Id: 0"); + let close_future = stream.close(); + close_future.await?; + println!("Stream closed successfully"); + Ok(()) +} + +fn load_descriptor_proto( + path: &str, + file_name: &str, + message_name: &str, +) -> prost_types::DescriptorProto { + let descriptor_bytes = fs::read(path).expect("Failed to read proto descriptor file"); + let file_descriptor_set = + prost_types::FileDescriptorSet::decode(descriptor_bytes.as_ref()).unwrap(); + + let file_descriptor_proto = file_descriptor_set + .file + .into_iter() + .find(|f| f.name.as_deref() == Some(file_name)) + .unwrap(); + + file_descriptor_proto + .message_type + .into_iter() + .find(|m| m.name.as_deref() == Some(message_name)) + .unwrap() +} diff --git a/tools/generate_files/src/generate.rs b/tools/generate_files/src/generate.rs index 87c04cf..226a00f 100644 --- a/tools/generate_files/src/generate.rs +++ b/tools/generate_files/src/generate.rs @@ -284,13 +284,52 @@ fn parse_struct_type(type_str: &str) -> Option> { fields .into_iter() .map(|f| { - let mut parts = f.splitn(2, ':'); - match (parts.next(), parts.next()) { - (Some(name), Some(type_str)) => { - Some((name.trim().to_string(), type_str.trim().to_string())) + let f = f.trim(); + // Try colon format first (field_name:type) + if let Some(colon_pos) = f.find(':') { + let name = f[..colon_pos].trim(); + let type_str = f[colon_pos + 1..].trim(); + if !name.is_empty() && !type_str.is_empty() { + return Some((name.to_string(), type_str.to_string())); } - _ => None, } + + // Try space format (field_name TYPE) + // Find the first space that's not inside angle brackets + let mut depth = 0; + let mut split_pos = None; + for (i, c) in f.char_indices() { + match c { + '<' => depth += 1, + '>' => depth -= 1, + ' ' if depth == 0 && split_pos.is_none() => { + // Check if this looks like a field name followed by a type + let potential_name = &f[..i]; + let potential_type = &f[i + 1..]; + // Field name should be a simple identifier (alphanumeric + underscore) + if potential_name + .chars() + .all(|c| c.is_alphanumeric() || c == '_') + && !potential_name.is_empty() + && !potential_type.is_empty() + { + split_pos = Some(i); + break; + } + } + _ => {} + } + } + + if let Some(pos) = split_pos { + let name = f[..pos].trim(); + let type_str = f[pos + 1..].trim(); + if !name.is_empty() && !type_str.is_empty() { + return Some((name.to_string(), type_str.to_string())); + } + } + + None }) .collect::>>() } @@ -703,4 +742,47 @@ mod tests { .contains("Invalid Protobuf field name '1field'. Cannot start with a digit.") ); } + + #[test] + fn test_struct_space_separated_format() { + // Test STRUCT format with space-separated field names and types (Databricks format) + let table_info = TableInfo { + columns: vec![Column { + name: "metadata".to_string(), + type_text: "STRUCT".to_string(), + nullable: true, + }], + }; + + let dir = tempdir().unwrap(); + let proto_path = dir.path().join("space_struct.proto"); + let output_dir = dir.path().to_path_buf(); + + generate_proto_file( + "SpaceStructMessage", + &table_info.columns, + &proto_path, + &output_dir, + ) + .unwrap(); + + let content = fs::read_to_string(proto_path.clone()).unwrap(); + // Verify that the struct was parsed correctly + assert!(content.contains("message Metadata")); + assert!(content.contains("optional int64 ingest_timestamp")); + assert!(content.contains("optional string pulsar_topic")); + assert!(content.contains("optional int32 pulsar_partition")); + assert!(content.contains("optional string pulsar_message_id")); + assert!(content.contains("optional int64 pulsar_publish_time")); + assert!(content.contains("optional int64 pulsar_event_time")); + assert!(content.contains("optional string pulsar_key")); + + // Also verify that the generated proto is valid and can be compiled. + generate_rust_and_descriptor( + proto_path.to_str().unwrap(), + "SpaceStructMessage", + &output_dir, + ) + .unwrap(); + } }