diff --git a/Cargo.lock b/Cargo.lock index 30b9fe3191edc..83cc7fae2d1ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9874,9 +9874,9 @@ dependencies = [ [[package]] name = "protobuf-native" -version = "0.3.2+26.1" +version = "0.3.3+26.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab382248bd8151301b96bd4630671199d65401c7d39a96a1423aa581669f0049" +checksum = "3efb1fd5ce9a55e805a5eda5d0f631d44aab9a7782df2eeae4825dfb980b38fa" dependencies = [ "cxx", "cxx-build", diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index e1901bdcff8f8..cfc752070a98d 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -69,7 +69,7 @@ mz-tracing = { path = "../tracing" } mz-txn-wal = { path = "../txn-wal" } paste = "1.0" prometheus = { version = "0.14.0", default-features = false } -protobuf-native = "0.3.2" +protobuf-native = "0.3.3+26.1" proptest = { version = "1.9.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.7.0", features = ["boxed_union"] } prost = { version = "0.13.5", features = ["no-recursion-limit"] } diff --git a/src/sql/src/pure.rs b/src/sql/src/pure.rs index 4e13779bbcf5e..e1273f22a17c1 100644 --- a/src/sql/src/pure.rs +++ b/src/sql/src/pure.rs @@ -2513,6 +2513,12 @@ async fn compile_proto( // Compile .proto files into a file descriptor set. let mut source_tree = VirtualSourceTree::new(); + + // Add well-known types (e.g., google/protobuf/timestamp.proto) to the source + // tree. These are implicitly available to protoc but are typically not + // registered in the schema registry. + source_tree.as_mut().map_well_known_types(); + for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) { source_tree.as_mut().add_file( Path::new(&subject.name), diff --git a/test/testdrive/protobuf-well-known-types.td b/test/testdrive/protobuf-well-known-types.td new file mode 100644 index 0000000000000..fb328681bad55 --- /dev/null +++ b/test/testdrive/protobuf-well-known-types.td @@ -0,0 +1,114 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ set-arg-default single-replica-cluster=quickstart + +# Test that Protobuf schemas using well-known types can be compiled from the +# Confluent Schema Registry even when the well-known types themselves are NOT +# registered in the schema registry. +# +# This is a common real-world scenario: users register their own schemas that +# import types like google/protobuf/timestamp.proto, but they don't register +# the well-known types themselves (since protoc implicitly provides them). +# +# Previously this would fail with "invalid protobuf schema" because the +# well-known types couldn't be found. The fix embeds well-known types so they +# are always available during schema compilation. + +# A simple schema that uses google.protobuf.Timestamp without any other imports. +$ set timestamp-user-schema +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +message Event { + string id = 1; + google.protobuf.Timestamp created_at = 2; +} + +# Compile the schema locally (protoc has access to well-known types) +$ file-append path=event.proto +\${timestamp-user-schema} + +$ protobuf-compile-descriptors inputs=event.proto output=event.pb set-var=event-schema + +# Create Kafka topic and connections +$ kafka-create-topic topic=well-known-types-test partitions=1 + +> CREATE CONNECTION IF NOT EXISTS kafka_conn + TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT); + +> CREATE CONNECTION IF NOT EXISTS csr_conn TO CONFLUENT SCHEMA REGISTRY ( + URL '${testdrive.schema-registry-url}' + ); + +# Publish ONLY the user schema to the schema registry. +# Note: We are NOT publishing google/protobuf/timestamp.proto to the registry. +# This simulates the common case where users don't register well-known types. +$ schema-registry-publish subject=testdrive-well-known-types-test-${testdrive.seed}-value schema-type=protobuf +\${timestamp-user-schema} + +# Ingest a test message +$ kafka-ingest topic=well-known-types-test format=protobuf descriptor-file=event.pb message=Event confluent-wire-format=true +{"id": "evt-123", "created_at": "2024-01-15T10:30:00Z"} + +# Create source using CSR - this should succeed even though timestamp.proto +# is not in the registry, because we now embed well-known types. +> BEGIN +> CREATE SOURCE well_known_types_source + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-well-known-types-test-${testdrive.seed}') + +> CREATE TABLE well_known_types_tbl FROM SOURCE well_known_types_source (REFERENCE "testdrive-well-known-types-test-${testdrive.seed}") + FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn +> COMMIT + +# Verify the data was ingested correctly +> SELECT id, created_at::text FROM well_known_types_tbl +id created_at +-------------------------- +evt-123 "(1705314600,0)" + +# Test with google.protobuf.Duration as well +$ set duration-user-schema +syntax = "proto3"; + +import "google/protobuf/duration.proto"; + +message Task { + string name = 1; + google.protobuf.Duration timeout = 2; +} + +$ file-append path=task.proto +\${duration-user-schema} + +$ protobuf-compile-descriptors inputs=task.proto output=task.pb set-var=task-schema + +$ kafka-create-topic topic=well-known-duration-test partitions=1 + +$ schema-registry-publish subject=testdrive-well-known-duration-test-${testdrive.seed}-value schema-type=protobuf +\${duration-user-schema} + +$ kafka-ingest topic=well-known-duration-test format=protobuf descriptor-file=task.pb message=Task confluent-wire-format=true +{"name": "my-task", "timeout": "30s"} + +> BEGIN +> CREATE SOURCE well_known_duration_source + IN CLUSTER ${arg.single-replica-cluster} + FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-well-known-duration-test-${testdrive.seed}') + +> CREATE TABLE well_known_duration_tbl FROM SOURCE well_known_duration_source (REFERENCE "testdrive-well-known-duration-test-${testdrive.seed}") + FORMAT PROTOBUF USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn +> COMMIT + +> SELECT name, timeout::text FROM well_known_duration_tbl +name timeout +------------------- +my-task "(30,0)"