diff --git a/pubsub/subscriptions/create_message_transforms.go b/pubsub/subscriptions/create_message_transforms.go new file mode 100644 index 0000000000..230e9adfe4 --- /dev/null +++ b/pubsub/subscriptions/create_message_transforms.go @@ -0,0 +1,68 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package subscriptions + +// [START pubsub_create_subscription_with_smt] +import ( + "context" + "fmt" + "io" + + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" +) + +// createSubscriptionWithSMT creates a subscription with a single message transform function applied. +func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error { + // projectID := "my-project-id" + // topicID := "my-topic" + // subID := "my-sub" + ctx := context.Background() + client, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return fmt.Errorf("pubsub.NewClient: %w", err) + } + defer client.Close() + + code := `function redactSSN(message, metadata) { + const data = JSON.parse(message.data); + delete data['ssn']; + message.data = JSON.stringify(data); + return message; + }` + + transform := &pubsubpb.MessageTransform{ + Transform: &pubsubpb.MessageTransform_JavascriptUdf{ + JavascriptUdf: &pubsubpb.JavaScriptUDF{ + FunctionName: "redactSSN", + Code: code, + }, + }, + } + + sub := &pubsubpb.Subscription{ + Name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID), + Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + MessageTransforms: []*pubsubpb.MessageTransform{transform}, + } + sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub) + if err != nil { + return fmt.Errorf("CreateSubscription: %w", err) + } + fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub) + return nil +} + +// [END pubsub_create_subscription_with_smt] diff --git a/pubsub/subscriptions/subscription_test.go b/pubsub/subscriptions/subscription_test.go index eee232f0c2..b2d5ddb827 100644 --- a/pubsub/subscriptions/subscription_test.go +++ b/pubsub/subscriptions/subscription_test.go @@ -36,6 +36,8 @@ import ( trace "cloud.google.com/go/trace/apiv1" "cloud.google.com/go/trace/apiv1/tracepb" "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/GoogleCloudPlatform/golang-samples/internal/testutil" ) @@ -731,3 +733,34 @@ func createOrGetStorageBucket(projectID, bucketID string) error { return nil } + +func TestCreateSubscriptionWithSMT(t *testing.T) { + ctx := context.Background() + tc := testutil.SystemTest(t) + client := setup(t) + + smtSubID := subID + "-smt" + smtTopicID := topicID + "-smt" + testutil.Retry(t, 10, time.Second, func(r *testutil.R) { + _, err := client.TopicAdminClient.CreateTopic(ctx, &pb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", tc.ProjectID, smtTopicID), + }) + if err != nil { + if st, ok := status.FromError(err); !ok || st.Code() != codes.AlreadyExists { + r.Errorf("CreateTopic: %v", err) + } + } + }) + + testutil.Retry(t, 10, time.Second, func(r *testutil.R) { + buf := new(bytes.Buffer) + if err := createSubscriptionWithSMT(buf, tc.ProjectID, smtTopicID, smtSubID); err != nil { + r.Errorf("failed to create subscription with SMT: %v", err) + } + got := buf.String() + want := "Created subscription with message transform" + if !strings.Contains(got, want) { + r.Errorf("got: %s, want: %v", got, want) + } + }) +} diff --git a/pubsub/topics/create_topic_message_transforms.go b/pubsub/topics/create_topic_message_transforms.go new file mode 100644 index 0000000000..bb4edaf389 --- /dev/null +++ b/pubsub/topics/create_topic_message_transforms.go @@ -0,0 +1,67 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package topics + +// [START pubsub_create_topic_with_smt] +import ( + "context" + "fmt" + "io" + + "cloud.google.com/go/pubsub/v2" + "cloud.google.com/go/pubsub/v2/apiv1/pubsubpb" +) + +// createTopicWithSMT creates a topic with a single message transform function applied. +func createTopicWithSMT(w io.Writer, projectID, topicID string) error { + // projectID := "my-project-id" + // topicID := "my-topic" + ctx := context.Background() + client, err := pubsub.NewClient(ctx, projectID) + if err != nil { + return fmt.Errorf("pubsub.NewClient: %w", err) + } + defer client.Close() + + code := `function redactSSN(message, metadata) { + const data = JSON.parse(message.data); + delete data['ssn']; + message.data = JSON.stringify(data); + return message; + }` + transform := &pubsubpb.MessageTransform{ + Transform: &pubsubpb.MessageTransform_JavascriptUdf{ + JavascriptUdf: &pubsubpb.JavaScriptUDF{ + FunctionName: "redactSSN", + Code: code, + }, + }, + } + + topic := &pubsubpb.Topic{ + Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID), + MessageTransforms: []*pubsubpb.MessageTransform{transform}, + } + + topic, err = client.TopicAdminClient.CreateTopic(ctx, topic) + if err != nil { + return fmt.Errorf("CreateTopic: %w", err) + } + + fmt.Fprintf(w, "Created topic with message transform: %v\n", topic) + return nil +} + +// [END pubsub_create_topic_with_smt] diff --git a/pubsub/topics/topics_test.go b/pubsub/topics/topics_test.go index e58e78334b..8b6baf6d82 100644 --- a/pubsub/topics/topics_test.go +++ b/pubsub/topics/topics_test.go @@ -34,6 +34,8 @@ import ( "cloud.google.com/go/trace/apiv1/tracepb" "github.com/GoogleCloudPlatform/golang-samples/internal/testutil" "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var topicID string @@ -350,6 +352,30 @@ func TestPublishWithCompression(t *testing.T) { } } +func TestCreateTopicWithSMT(t *testing.T) { + setup(t) + tc := testutil.SystemTest(t) + smtTopicID := topicID + "-smt" + testutil.Retry(t, 10, time.Second, func(r *testutil.R) { + buf := new(bytes.Buffer) + err := createTopicWithSMT(buf, tc.ProjectID, smtTopicID) + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.AlreadyExists { + return // This is expected on a retry. + } + r.Errorf("failed to create topic with SMT: %v", err) + return + } + + got := buf.String() + want := "Created topic with message transform" + if !strings.Contains(got, want) { + r.Errorf("got %q, want to contain %q", got, want) + } + }) +} + func createTopic(ctx context.Context, client *pubsub.Client, topicName string) error { _, err := client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{ Name: topicName,