Skip to content

Commit 69c0c6c

Browse files
authored
feat(PubSub): Add CreateTopicWithCloudStorageIngestion Sample (#3047)
1 parent feb0d39 commit 69c0c6c

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2025 Google LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
using Google.Cloud.PubSub.V1;
16+
using Google.Protobuf.WellKnownTypes;
17+
using System;
18+
using Xunit;
19+
20+
[Collection(nameof(PubsubFixture))]
21+
public class CreateTopicWithCloudStorageIngestionTest
22+
{
23+
private readonly PubsubFixture _pubsubFixture;
24+
private readonly CreateTopicWithCloudStorageIngestionSample _createTopicWithCloudStorageIngestionSample;
25+
26+
public CreateTopicWithCloudStorageIngestionTest(PubsubFixture pubsubFixture)
27+
{
28+
_pubsubFixture = pubsubFixture;
29+
_createTopicWithCloudStorageIngestionSample = new CreateTopicWithCloudStorageIngestionSample();
30+
}
31+
32+
[Fact]
33+
public void CreateTopicWithCloudStorageIngestion()
34+
{
35+
string topicId = _pubsubFixture.RandomTopicId();
36+
string bucket = _pubsubFixture.RandomName("Bucket");
37+
string inputFormat = "text";
38+
string textDelimiter = "\n";
39+
string matchGlob = "**.txt";
40+
string minimumObjectCreateTime = "1970-01-01T00:00:00Z";
41+
Topic createdTopic = _createTopicWithCloudStorageIngestionSample.CreateTopicWithCloudStorageIngestion(_pubsubFixture.ProjectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
42+
43+
// Confirm that the created topic and topic retrieved by ID are equal
44+
Topic retrievedTopic = _pubsubFixture.GetTopic(topicId);
45+
Assert.Equal(createdTopic, retrievedTopic);
46+
47+
// Confirm that all Cloud Storage Ingestion params are equal to expected values
48+
Assert.Equal(bucket, createdTopic.IngestionDataSourceSettings.CloudStorage.Bucket);
49+
Assert.NotNull(createdTopic.IngestionDataSourceSettings.CloudStorage.TextFormat);
50+
Assert.Equal(textDelimiter, createdTopic.IngestionDataSourceSettings.CloudStorage.TextFormat.Delimiter);
51+
Assert.Equal(matchGlob, createdTopic.IngestionDataSourceSettings.CloudStorage.MatchGlob);
52+
Assert.Equal(Timestamp.FromDateTime(DateTime.Parse(minimumObjectCreateTime)), createdTopic.IngestionDataSourceSettings.CloudStorage.MinimumObjectCreateTime);
53+
}
54+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright 2025 Google LLC.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// [START pubsub_create_topic_with_cloud_storage_ingestion]
16+
17+
using Google.Cloud.PubSub.V1;
18+
using Google.Protobuf.WellKnownTypes;
19+
using Grpc.Core;
20+
using System;
21+
22+
public class CreateTopicWithCloudStorageIngestionSample
23+
{
24+
public Topic CreateTopicWithCloudStorageIngestion(string projectId, string topicId, string bucket, string inputFormat, string textDelimiter, string matchGlob, string minimumObjectCreateTime)
25+
{
26+
27+
IngestionDataSourceSettings.Types.CloudStorage cloudStorageSettings = new IngestionDataSourceSettings.Types.CloudStorage { Bucket = bucket };
28+
29+
switch (inputFormat)
30+
{
31+
case "text":
32+
cloudStorageSettings.TextFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.TextFormat
33+
{
34+
Delimiter = textDelimiter
35+
};
36+
break;
37+
case "avro":
38+
cloudStorageSettings.AvroFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.AvroFormat();
39+
break;
40+
case "pubsub_avro":
41+
cloudStorageSettings.PubsubAvroFormat = new IngestionDataSourceSettings.Types.CloudStorage.Types.PubSubAvroFormat();
42+
break;
43+
default:
44+
throw new RpcException(new Status(StatusCode.InvalidArgument, $"inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: {inputFormat}"));
45+
}
46+
47+
if (!string.IsNullOrEmpty(matchGlob))
48+
{
49+
cloudStorageSettings.MatchGlob = matchGlob;
50+
}
51+
52+
if (!string.IsNullOrEmpty(minimumObjectCreateTime))
53+
{
54+
cloudStorageSettings.MinimumObjectCreateTime = Timestamp.FromDateTime(DateTime.Parse(minimumObjectCreateTime));
55+
}
56+
57+
PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
58+
Topic topic = new Topic()
59+
{
60+
Name = TopicName.FormatProjectTopic(projectId, topicId),
61+
IngestionDataSourceSettings = new IngestionDataSourceSettings() { CloudStorage = cloudStorageSettings }
62+
};
63+
Topic createdTopic = publisher.CreateTopic(topic);
64+
Console.WriteLine($"Topic {createdTopic.Name} created.");
65+
66+
return createdTopic;
67+
}
68+
}
69+
70+
// [END pubsub_create_topic_with_cloud_storage_ingestion]

0 commit comments

Comments
 (0)