Skip to content

Commit 59edec8

Browse files
authored
docs(pubsub): Add Pub/Sub ingestion from Kafka samples (#14954)
1 parent 2335df2 commit 59edec8

File tree

1 file changed

+219
-0
lines changed

1 file changed

+219
-0
lines changed

google/cloud/pubsub/samples/topic_admin_samples.cc

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,121 @@ void CreateTopicWithCloudStorageIngestion(
272272
argv.at(4), argv.at(5), argv.at(6));
273273
}
274274

275+
void CreateTopicWithAwsMskIngestion(
276+
google::cloud::pubsub_admin::TopicAdminClient client,
277+
std::vector<std::string> const& argv) {
278+
// [START pubsub_create_topic_with_aws_msk_ingestion]
279+
namespace pubsub = ::google::cloud::pubsub;
280+
namespace pubsub_admin = ::google::cloud::pubsub_admin;
281+
[](pubsub_admin::TopicAdminClient client, std::string project_id,
282+
std::string topic_id, std::string const& cluster_arn,
283+
std::string const& msk_topic, std::string const& aws_role_arn,
284+
std::string const& gcp_service_account) {
285+
google::pubsub::v1::Topic request;
286+
request.set_name(
287+
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
288+
auto* aws_msk =
289+
request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
290+
aws_msk->set_cluster_arn(cluster_arn);
291+
aws_msk->set_topic(msk_topic);
292+
aws_msk->set_aws_role_arn(aws_role_arn);
293+
aws_msk->set_gcp_service_account(gcp_service_account);
294+
295+
auto topic = client.CreateTopic(request);
296+
// Note that kAlreadyExists is a possible error when the library retries.
297+
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
298+
std::cout << "The topic already exists\n";
299+
return;
300+
}
301+
if (!topic) throw std::move(topic).status();
302+
303+
std::cout << "The topic was successfully created: " << topic->DebugString()
304+
<< "\n";
305+
}
306+
// [END pubsub_create_topic_with_aws_msk_ingestion]
307+
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
308+
argv.at(4), argv.at(5));
309+
}
310+
311+
void CreateTopicWithConfluentCloudIngestion(
312+
google::cloud::pubsub_admin::TopicAdminClient client,
313+
std::vector<std::string> const& argv) {
314+
// [START pubsub_create_topic_with_confluent_cloud_ingestion]
315+
namespace pubsub = ::google::cloud::pubsub;
316+
namespace pubsub_admin = ::google::cloud::pubsub_admin;
317+
[](pubsub_admin::TopicAdminClient client, std::string project_id,
318+
std::string topic_id, std::string const& bootstrap_server,
319+
std::string const& cluster_id, std::string const& confluent_topic,
320+
std::string const& identity_pool_id,
321+
std::string const& gcp_service_account) {
322+
google::pubsub::v1::Topic request;
323+
request.set_name(
324+
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
325+
auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
326+
->mutable_confluent_cloud();
327+
confluent_cloud->set_bootstrap_server(bootstrap_server);
328+
confluent_cloud->set_cluster_id(cluster_id);
329+
confluent_cloud->set_topic(confluent_topic);
330+
confluent_cloud->set_identity_pool_id(identity_pool_id);
331+
confluent_cloud->set_gcp_service_account(gcp_service_account);
332+
333+
auto topic = client.CreateTopic(request);
334+
// Note that kAlreadyExists is a possible error when the library retries.
335+
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
336+
std::cout << "The topic already exists\n";
337+
return;
338+
}
339+
if (!topic) throw std::move(topic).status();
340+
341+
std::cout << "The topic was successfully created: " << topic->DebugString()
342+
<< "\n";
343+
}
344+
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
345+
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
346+
argv.at(4), argv.at(5), argv.at(6));
347+
}
348+
349+
void CreateTopicWithAzureEventHubsIngestion(
350+
google::cloud::pubsub_admin::TopicAdminClient client,
351+
std::vector<std::string> const& argv) {
352+
// [START pubsub_create_topic_with_azure_event_hubs_ingestion]
353+
namespace pubsub = ::google::cloud::pubsub;
354+
namespace pubsub_admin = ::google::cloud::pubsub_admin;
355+
[](pubsub_admin::TopicAdminClient client, std::string project_id,
356+
std::string topic_id, std::string const& resource_group,
357+
std::string const& event_hubs_namespace, std::string const& event_hub,
358+
std::string const& client_id, std::string const& tenant_id,
359+
std::string const& subscription_id,
360+
std::string const& gcp_service_account) {
361+
google::pubsub::v1::Topic request;
362+
request.set_name(
363+
pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
364+
auto* azure_event_hubs = request.mutable_ingestion_data_source_settings()
365+
->mutable_azure_event_hubs();
366+
azure_event_hubs->set_resource_group(resource_group);
367+
azure_event_hubs->set_namespace_(event_hubs_namespace);
368+
azure_event_hubs->set_event_hub(event_hub);
369+
azure_event_hubs->set_client_id(client_id);
370+
azure_event_hubs->set_tenant_id(tenant_id);
371+
azure_event_hubs->set_subscription_id(subscription_id);
372+
azure_event_hubs->set_gcp_service_account(gcp_service_account);
373+
374+
auto topic = client.CreateTopic(request);
375+
// Note that kAlreadyExists is a possible error when the library retries.
376+
if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
377+
std::cout << "The topic already exists\n";
378+
return;
379+
}
380+
if (!topic) throw std::move(topic).status();
381+
382+
std::cout << "The topic was successfully created: " << topic->DebugString()
383+
<< "\n";
384+
}
385+
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
386+
(std::move(client), argv.at(0), argv.at(1), argv.at(2), argv.at(3),
387+
argv.at(4), argv.at(5), argv.at(6), argv.at(7), argv.at(8));
388+
}
389+
275390
void GetTopic(google::cloud::pubsub_admin::TopicAdminClient client,
276391
std::vector<std::string> const& argv) {
277392
namespace pubsub = ::google::cloud::pubsub;
@@ -626,10 +741,42 @@ void AutoRun(std::vector<std::string> const& argv) {
626741
627742
auto const* const kinesis_updated_gcp_service_account =
628743
"fake-update-service-account@fake-gcp-project.iam.gserviceaccount.com";
744+
629745
auto const cloud_storage_topic_id =
630746
"cloud-storage-" + RandomTopicId(generator) + "_ingestion_topic";
631747
auto const cloud_storage_bucket = project_id + "-pubsub-bucket";
632748

749+
auto const aws_msk_topic_id =
750+
"aws-msk-" + RandomTopicId(generator) + "_ingestion_topic";
751+
auto const* const aws_msk_cluster_arn =
752+
"arn:aws:kafka:us-east-1:1111111111:cluster/fake-cluster-name/11111111";
753+
auto const* const aws_msk_topic = "fake-msk-topic";
754+
auto const* const aws_msk_role_arn =
755+
"arn:aws:iam::111111111111:role/fake-role-name";
756+
auto const* const aws_msk_gcp_service_account =
757+
758+
759+
auto const confluent_cloud_topic_id =
760+
"confluent-cloud-" + RandomTopicId(generator) + "_ingestion_topic";
761+
auto const* const confluent_cloud_bootstrap_server =
762+
"fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092";
763+
auto const* const confluent_cloud_cluster_id = "fake-cluster-id";
764+
auto const* const confluent_cloud_topic = "fake-topic";
765+
auto const* const confluent_cloud_identity_pool_id = "fake-pool-id";
766+
auto const* const confluent_cloud_gcp_service_account =
767+
768+
769+
auto const azure_event_hubs_topic_id =
770+
"azure-event-hubs-" + RandomTopicId(generator) + "_ingestion_topic";
771+
auto const* const azure_event_hubs_resource_group = "fake-resource-group";
772+
auto const* const azure_event_hubs_namespace = "fake-namespace";
773+
auto const* const azure_event_hubs_event_hub = "fake-event-hub";
774+
auto const* const azure_event_hubs_client_id = "fake-client-id";
775+
auto const* const azure_event_hubs_tenant_id = "fake-tenant-id";
776+
auto const* const azure_event_hubs_subscription_id = "fake-subscription-id";
777+
auto const* const azure_event_hubs_gcp_service_account =
778+
779+
633780
using ::google::cloud::StatusCode;
634781
auto ignore_emulator_failures =
635782
[](auto lambda, StatusCode code = StatusCode::kUnimplemented) {
@@ -696,6 +843,63 @@ void AutoRun(std::vector<std::string> const& argv) {
696843
},
697844
StatusCode::kInvalidArgument);
698845

846+
std::cout << "\nRunning CreateTopicWithAwsMskIngestion() sample" << std::endl;
847+
848+
ignore_emulator_failures(
849+
[&] {
850+
CreateTopicWithAwsMskIngestion(
851+
topic_admin_client,
852+
{project_id, aws_msk_topic_id, aws_msk_cluster_arn, aws_msk_topic,
853+
aws_msk_role_arn, aws_msk_gcp_service_account});
854+
cleanup.Defer(
855+
[topic_admin_client, project_id, aws_msk_topic_id]() mutable {
856+
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
857+
DeleteTopic(topic_admin_client, {project_id, aws_msk_topic_id});
858+
});
859+
},
860+
StatusCode::kInvalidArgument);
861+
862+
std::cout << "\nRunning CreateTopicWithConfluentCloudIngestion() sample"
863+
<< std::endl;
864+
865+
ignore_emulator_failures(
866+
[&] {
867+
CreateTopicWithConfluentCloudIngestion(
868+
topic_admin_client,
869+
{project_id, confluent_cloud_topic_id,
870+
confluent_cloud_bootstrap_server, confluent_cloud_cluster_id,
871+
confluent_cloud_topic, confluent_cloud_identity_pool_id,
872+
confluent_cloud_gcp_service_account});
873+
cleanup.Defer([topic_admin_client, project_id,
874+
confluent_cloud_topic_id]() mutable {
875+
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
876+
DeleteTopic(topic_admin_client,
877+
{project_id, confluent_cloud_topic_id});
878+
});
879+
},
880+
StatusCode::kInvalidArgument);
881+
882+
std::cout << "\nRunning CreateTopicWithAzureEventHubsIngestion() sample"
883+
<< std::endl;
884+
885+
ignore_emulator_failures(
886+
[&] {
887+
CreateTopicWithAzureEventHubsIngestion(
888+
topic_admin_client,
889+
{project_id, azure_event_hubs_topic_id,
890+
azure_event_hubs_resource_group, azure_event_hubs_namespace,
891+
azure_event_hubs_event_hub, azure_event_hubs_client_id,
892+
azure_event_hubs_tenant_id, azure_event_hubs_subscription_id,
893+
azure_event_hubs_gcp_service_account});
894+
cleanup.Defer([topic_admin_client, project_id,
895+
azure_event_hubs_topic_id]() mutable {
896+
std::cout << "\nRunning DeleteTopic() sample" << std::endl;
897+
DeleteTopic(topic_admin_client,
898+
{project_id, azure_event_hubs_topic_id});
899+
});
900+
},
901+
StatusCode::kInvalidArgument);
902+
699903
std::cout << "\nRunning UpdateTopicType() sample" << std::endl;
700904

701905
UpdateTopicType(
@@ -768,6 +972,21 @@ int main(int argc, char* argv[]) { // NOLINT(bugprone-exception-escape)
768972
{"project-id", "topic-id", "bucket", "input-format", "text-delimiter",
769973
"match-glob", "minimum-object-create-time"},
770974
CreateTopicWithCloudStorageIngestion),
975+
CreateTopicAdminCommand(
976+
"create-topic-with-aws-msk-ingestion",
977+
{"project-id", "topic-id", "cluster-arn", "msk-topic", "aws-role-arn",
978+
"gcp-service-account"},
979+
CreateTopicWithAwsMskIngestion),
980+
CreateTopicAdminCommand(
981+
"create-topic-with-confluent-cloud-ingestion",
982+
{"project-id", "topic-id", "bootstrap-server", "cluster-id",
983+
"confluent-cloud-topic", "identity-pool-id", "gcp-service-account"},
984+
CreateTopicWithConfluentCloudIngestion),
985+
CreateTopicAdminCommand(
986+
"create-topic-with-azure-event-hubs-ingestion",
987+
{"project-id", "topic-id", "resource-group", "namespace", "event-hub",
988+
"client-id", "tenant-id", "subscription-id", "gcp-service-account"},
989+
CreateTopicWithAzureEventHubsIngestion),
771990
CreateTopicAdminCommand(
772991
"create-topic-with-schema",
773992
{"project-id", "topic-id", "schema-id", "encoding"},

0 commit comments

Comments
 (0)