diff --git a/managedkafka/examples/pom.xml b/managedkafka/examples/pom.xml
index 7f1343971b3..217ef96ba08 100644
--- a/managedkafka/examples/pom.xml
+++ b/managedkafka/examples/pom.xml
@@ -3,7 +3,7 @@
4.0.0
com.example.managedkafka
managedkafka-snippets
- pom
+ jar
Google Cloud Managed Kafka Snippets
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/managedkafka
@@ -29,7 +29,7 @@
com.google.cloud
libraries-bom
- 26.50.0
+ 26.64.0
pom
import
diff --git a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java
new file mode 100644
index 00000000000..c21fe9b9625
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_bigquery_sink_connector]
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CreateBigQuerySinkConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-bigquery-sink-connector";
+ String bigqueryProjectId = "my-bigquery-project-id";
+ String datasetName = "my_dataset";
+ String kafkaTopicName = "kafka-topic";
+ String maxTasks = "3";
+ String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+ String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
+ String valueSchemasEnable = "false";
+ createBigQuerySinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ connectorId,
+ bigqueryProjectId,
+ datasetName,
+ kafkaTopicName,
+ maxTasks,
+ connectorClass,
+ keyConverter,
+ valueConverter,
+ valueSchemasEnable);
+ }
+
+ public static void createBigQuerySinkConnector(
+ String projectId,
+ String region,
+ String connectClusterId,
+ String connectorId,
+ String bigqueryProjectId,
+ String datasetName,
+ String kafkaTopicName,
+ String maxTasks,
+ String connectorClass,
+ String keyConverter,
+ String valueConverter,
+ String valueSchemasEnable)
+ throws Exception {
+
+ // Build the connector configuration
+ Map configMap = new HashMap<>();
+ configMap.put("name", connectorId);
+ configMap.put("project", bigqueryProjectId);
+ configMap.put("topics", kafkaTopicName);
+ configMap.put("tasks.max", maxTasks);
+ configMap.put("connector.class", connectorClass);
+ configMap.put("key.converter", keyConverter);
+ configMap.put("value.converter", valueConverter);
+ configMap.put("value.converter.schemas.enable", valueSchemasEnable);
+ configMap.put("defaultDataset", datasetName);
+
+ Connector connector =
+ Connector.newBuilder()
+ .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ CreateConnectorRequest request =
+ CreateConnectorRequest.newBuilder()
+ .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
+ .setConnectorId(connectorId)
+ .setConnector(connector)
+ .build();
+
+ // This operation is being handled synchronously.
+ Connector response = managedKafkaConnectClient.createConnector(request);
+ System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_create_bigquery_sink_connector]
diff --git a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java
new file mode 100644
index 00000000000..be14c0e4a47
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java
@@ -0,0 +1,115 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_cloud_storage_sink_connector]
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CreateCloudStorageSinkConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-gcs-sink-connector";
+ String bucketName = "my-gcs-bucket";
+ String kafkaTopicName = "kafka-topic";
+ String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
+ String maxTasks = "3";
+ String gcsCredentialsDefault = "true";
+ String formatOutputType = "json";
+ String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
+ String valueSchemasEnable = "false";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+ createCloudStorageSinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ connectorId,
+ bucketName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ gcsCredentialsDefault,
+ formatOutputType,
+ valueConverter,
+ valueSchemasEnable,
+ keyConverter);
+ }
+
+ public static void createCloudStorageSinkConnector(
+ String projectId,
+ String region,
+ String connectClusterId,
+ String connectorId,
+ String bucketName,
+ String kafkaTopicName,
+ String connectorClass,
+ String maxTasks,
+ String gcsCredentialsDefault,
+ String formatOutputType,
+ String valueConverter,
+ String valueSchemasEnable,
+ String keyConverter)
+ throws Exception {
+
+ // Build the connector configuration
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", connectorClass);
+ configMap.put("tasks.max", maxTasks);
+ configMap.put("topics", kafkaTopicName);
+ configMap.put("gcs.bucket.name", bucketName);
+ configMap.put("gcs.credentials.default", gcsCredentialsDefault);
+ configMap.put("format.output.type", formatOutputType);
+ configMap.put("name", connectorId);
+ configMap.put("value.converter", valueConverter);
+ configMap.put("value.converter.schemas.enable", valueSchemasEnable);
+ configMap.put("key.converter", keyConverter);
+
+ Connector connector = Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
+ .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
+ .setConnectorId(connectorId)
+ .setConnector(connector)
+ .build();
+
+ // This operation is being handled synchronously.
+ Connector response = managedKafkaConnectClient.createConnector(request);
+ System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_create_cloud_storage_sink_connector]
diff --git a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java
new file mode 100644
index 00000000000..1f48eecb44e
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_connect_cluster]
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.retrying.TimedRetryAlgorithm;
+import com.google.cloud.managedkafka.v1.CapacityConfig;
+import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
+import com.google.cloud.managedkafka.v1.ConnectCluster;
+import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
+import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
+import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
+import com.google.cloud.managedkafka.v1.LocationName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
+import com.google.cloud.managedkafka.v1.OperationMetadata;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+public class CreateConnectCluster {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
+ String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
+ int cpu = 12;
+ long memoryBytes = 12884901888L; // 12 GiB
+ createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
+ }
+
+ public static void createConnectCluster(
+ String projectId,
+ String region,
+ String clusterId,
+ String subnet,
+ String kafkaCluster,
+ int cpu,
+ long memoryBytes)
+ throws Exception {
+ CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
+ .setMemoryBytes(memoryBytes).build();
+ ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
+ .setPrimarySubnet(subnet)
+ .build();
+ // Optionally, you can also specify additional accessible subnets and resolvable
+ // DNS domains as part of your network configuration. For example:
+ // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
+ // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
+ ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
+ .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
+ .build();
+ ConnectCluster connectCluster = ConnectCluster.newBuilder()
+ .setCapacityConfig(capacityConfig)
+ .setGcpConfig(gcpConfig)
+ .setKafkaCluster(kafkaCluster)
+ .build();
+
+ // Create the settings to configure the timeout for polling operations
+ ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
+ TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
+ RetrySettings.newBuilder()
+ .setTotalTimeoutDuration(Duration.ofHours(1L))
+ .build());
+ settingsBuilder.createConnectClusterOperationSettings()
+ .setPollingAlgorithm(timedRetryAlgorithm);
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
+ .create(settingsBuilder.build())) {
+ CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
+ .setParent(LocationName.of(projectId, region).toString())
+ .setConnectClusterId(clusterId)
+ .setConnectCluster(connectCluster)
+ .build();
+
+ // The duration of this operation can vary considerably, typically taking
+ // between 10-30 minutes.
+ OperationFuture future = managedKafkaConnectClient
+ .createConnectClusterOperationCallable().futureCall(request);
+
+ // Get the initial LRO and print details.
+ OperationSnapshot operation = future.getInitialFuture().get();
+ System.out.printf(
+ "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
+ operation.getName(), operation.isDone(), future.getMetadata().get().toString());
+
+ while (!future.isDone()) {
+ // The pollingFuture gives us the most recent status of the operation
+ RetryingFuture pollingFuture = future.getPollingFuture();
+ OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
+ System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
+ currentOp.getName(),
+ currentOp.isDone());
+ }
+
+ // NOTE: future.get() blocks completion until the operation is complete (isDone
+ // = True)
+ ConnectCluster response = future.get();
+ System.out.printf("Created connect cluster: %s\n", response.getName());
+ } catch (ExecutionException e) {
+ System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n",
+ e.getMessage());
+ throw e;
+ }
+ }
+}
+// [END managedkafka_create_connect_cluster]
diff --git a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java
new file mode 100644
index 00000000000..5e6ade27823
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_mirrormaker2_connector]
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CreateMirrorMaker2SourceConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String maxTasks = "3";
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-mirrormaker2-connector";
+ String sourceClusterBootstrapServers = "my-source-cluster:9092";
+ String targetClusterBootstrapServers = "my-target-cluster:9092";
+ String sourceClusterAlias = "source";
+ String targetClusterAlias = "target"; // This is usually the primary cluster.
+ String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
+ String topics = ".*";
+ createMirrorMaker2SourceConnector(
+ projectId,
+ region,
+ maxTasks,
+ connectClusterId,
+ connectorId,
+ sourceClusterBootstrapServers,
+ targetClusterBootstrapServers,
+ sourceClusterAlias,
+ targetClusterAlias,
+ connectorClass,
+ topics);
+ }
+
+ public static void createMirrorMaker2SourceConnector(
+ String projectId,
+ String region,
+ String maxTasks,
+ String connectClusterId,
+ String connectorId,
+ String sourceClusterBootstrapServers,
+ String targetClusterBootstrapServers,
+ String sourceClusterAlias,
+ String targetClusterAlias,
+ String connectorClass,
+ String topics)
+ throws Exception {
+
+ // Build the connector configuration
+ Map configMap = new HashMap<>();
+ configMap.put("tasks.max", maxTasks);
+ configMap.put("connector.class", connectorClass);
+ configMap.put("name", connectorId);
+ configMap.put("source.cluster.alias", sourceClusterAlias);
+ configMap.put("target.cluster.alias", targetClusterAlias);
+ configMap.put("topics", topics);
+ configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers);
+ configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers);
+
+ Connector connector = Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
+ .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
+ .setConnectorId(connectorId)
+ .setConnector(connector)
+ .build();
+
+ // This operation is being handled synchronously.
+ Connector response = managedKafkaConnectClient.createConnector(request);
+ System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_create_mirrormaker2_connector]
diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java
new file mode 100644
index 00000000000..2492a5c8833
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_pubsub_sink_connector]
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CreatePubSubSinkConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-pubsub-sink-connector";
+ String pubsubProjectId = "my-pubsub-project-id";
+ String pubsubTopicName = "my-pubsub-topic";
+ String kafkaTopicName = "kafka-topic";
+ String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
+ String maxTasks = "3";
+ String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+ createPubSubSinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ connectorId,
+ pubsubProjectId,
+ pubsubTopicName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ valueConverter,
+ keyConverter);
+ }
+
+ public static void createPubSubSinkConnector(
+ String projectId,
+ String region,
+ String connectClusterId,
+ String connectorId,
+ String pubsubProjectId,
+ String pubsubTopicName,
+ String kafkaTopicName,
+ String connectorClass,
+ String maxTasks,
+ String valueConverter,
+ String keyConverter)
+ throws Exception {
+
+ // Build the connector configuration
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", connectorClass);
+ configMap.put("name", connectorId);
+ configMap.put("tasks.max", maxTasks);
+ configMap.put("topics", kafkaTopicName);
+ configMap.put("value.converter", valueConverter);
+ configMap.put("key.converter", keyConverter);
+ configMap.put("cps.topic", pubsubTopicName);
+ configMap.put("cps.project", pubsubProjectId);
+
+ Connector connector = Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
+ .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
+ .setConnectorId(connectorId)
+ .setConnector(connector)
+ .build();
+
+ // This operation is being handled synchronously.
+ Connector response = managedKafkaConnectClient.createConnector(request);
+ System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_create_pubsub_sink_connector]
diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java
new file mode 100644
index 00000000000..c43537b152b
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_create_pubsub_source_connector]
+
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CreatePubSubSourceConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-pubsub-source-connector";
+ String pubsubProjectId = "my-pubsub-project-id";
+ String subscriptionName = "my-subscription";
+ String kafkaTopicName = "pubsub-topic";
+ String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
+ String maxTasks = "3";
+ String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+ createPubSubSourceConnector(
+ projectId,
+ region,
+ connectClusterId,
+ connectorId,
+ pubsubProjectId,
+ subscriptionName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ valueConverter,
+ keyConverter);
+ }
+
+ public static void createPubSubSourceConnector(
+ String projectId,
+ String region,
+ String connectClusterId,
+ String connectorId,
+ String pubsubProjectId,
+ String subscriptionName,
+ String kafkaTopicName,
+ String connectorClass,
+ String maxTasks,
+ String valueConverter,
+ String keyConverter)
+ throws Exception {
+
+ // Build the connector configuration
+ Map configMap = new HashMap<>();
+ configMap.put("connector.class", connectorClass);
+ configMap.put("name", connectorId);
+ configMap.put("tasks.max", maxTasks);
+ configMap.put("kafka.topic", kafkaTopicName);
+ configMap.put("cps.subscription", subscriptionName);
+ configMap.put("cps.project", pubsubProjectId);
+ configMap.put("value.converter", valueConverter);
+ configMap.put("key.converter", keyConverter);
+
+ Connector connector = Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
+ .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
+ .setConnectorId(connectorId)
+ .setConnector(connector)
+ .build();
+
+ // This operation is being handled synchronously.
+ Connector response = managedKafkaConnectClient.createConnector(request);
+ System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_create_pubsub_source_connector]
diff --git a/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java
new file mode 100644
index 00000000000..18196c36b2b
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_delete_connect_cluster]
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.retrying.TimedRetryAlgorithm;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.DeleteConnectClusterRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
+import com.google.cloud.managedkafka.v1.OperationMetadata;
+import com.google.protobuf.Empty;
+import java.io.IOException;
+import java.time.Duration;
+
+public class DeleteConnectCluster {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ deleteConnectCluster(projectId, region, clusterId);
+ }
+
+ public static void deleteConnectCluster(String projectId, String region, String clusterId)
+ throws Exception {
+
+ // Create the settings to configure the timeout for polling operations
+ ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
+ TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
+ RetrySettings.newBuilder()
+ .setTotalTimeoutDuration(Duration.ofHours(1L))
+ .build());
+ settingsBuilder.deleteConnectClusterOperationSettings()
+ .setPollingAlgorithm(timedRetryAlgorithm);
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
+ settingsBuilder.build())) {
+ DeleteConnectClusterRequest request = DeleteConnectClusterRequest.newBuilder()
+ .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
+ .build();
+ OperationFuture future = managedKafkaConnectClient
+ .deleteConnectClusterOperationCallable().futureCall(request);
+
+ // Get the initial LRO and print details. CreateConnectCluster contains sample
+ // code for polling logs.
+ OperationSnapshot operation = future.getInitialFuture().get();
+ System.out.printf(
+ "Connect cluster deletion started. Operation name: %s\nDone: %s\nMetadata: %s\n",
+ operation.getName(),
+ operation.isDone(),
+ future.getMetadata().get().toString());
+
+ future.get();
+ System.out.println("Deleted connect cluster");
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.deleteConnectCluster got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_delete_connect_cluster]
diff --git a/managedkafka/examples/src/main/java/examples/DeleteConnector.java b/managedkafka/examples/src/main/java/examples/DeleteConnector.java
new file mode 100644
index 00000000000..f4ea98c0b32
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/DeleteConnector.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the a specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_delete_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+
+public class DeleteConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ deleteConnector(projectId, region, clusterId, connectorId);
+ }
+
+ public static void deleteConnector(
+ String projectId, String region, String clusterId, String connectorId) throws IOException {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId);
+ // This operation is handled synchronously.
+ managedKafkaConnectClient.deleteConnector(name);
+ System.out.printf("Deleted connector: %s\n", name);
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.deleteConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+// [END managedkafka_delete_connector]
diff --git a/managedkafka/examples/src/main/java/examples/GetConnectCluster.java b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java
new file mode 100644
index 00000000000..47a080be1a1
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_get_connect_cluster]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectCluster;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+
+public class GetConnectCluster {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ getConnectCluster(projectId, region, clusterId);
+ }
+
+ public static void getConnectCluster(String projectId, String region, String clusterId)
+ throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ // This operation is being handled synchronously.
+ ConnectCluster connectCluster = managedKafkaConnectClient
+ .getConnectCluster(ConnectClusterName.of(projectId, region, clusterId));
+ System.out.println(connectCluster.getAllFields());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_get_connect_cluster]
diff --git a/managedkafka/examples/src/main/java/examples/GetConnector.java b/managedkafka/examples/src/main/java/examples/GetConnector.java
new file mode 100644
index 00000000000..b5be2672e19
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/GetConnector.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_get_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+
+public class GetConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ getConnector(projectId, region, clusterId, connectorId);
+ }
+
+ public static void getConnector(
+ String projectId, String region, String clusterId, String connectorId) throws IOException {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId);
+ // This operation is handled synchronously.
+ Connector connector = managedKafkaConnectClient.getConnector(name);
+ System.out.println(connector.getAllFields());
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.getConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+// [END managedkafka_get_connector]
diff --git a/managedkafka/examples/src/main/java/examples/ListConnectClusters.java b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java
new file mode 100644
index 00000000000..2dcdbd55b03
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_list_connect_clusters]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectCluster;
+import com.google.cloud.managedkafka.v1.LocationName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+
+public class ListConnectClusters {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ listConnectClusters(projectId, region);
+ }
+
+ public static void listConnectClusters(String projectId, String region) throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient =
+ ManagedKafkaConnectClient.create()) {
+ LocationName locationName = LocationName.of(projectId, region);
+ // This operation is being handled synchronously.
+ for (ConnectCluster connectCluster : managedKafkaConnectClient
+ .listConnectClusters(locationName).iterateAll()) {
+ System.out.println(connectCluster.getAllFields());
+ }
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.listConnectClusters got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_list_connect_clusters]
diff --git a/managedkafka/examples/src/main/java/examples/ListConnectors.java b/managedkafka/examples/src/main/java/examples/ListConnectors.java
new file mode 100644
index 00000000000..41d8ea9610b
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/ListConnectors.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_list_connectors]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.IOException;
+
+public class ListConnectors {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ listConnectors(projectId, region, clusterId);
+ }
+
+ public static void listConnectors(String projectId, String region, String clusterId)
+ throws IOException {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ ConnectClusterName parent = ConnectClusterName.of(projectId, region, clusterId);
+ // This operation is handled synchronously.
+ for (Connector connector : managedKafkaConnectClient.listConnectors(parent).iterateAll()) {
+ System.out.println(connector.getAllFields());
+ }
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.listConnectors got err: %s\n", e.getMessage());
+ }
+ }
+}
+// [END managedkafka_list_connectors]
diff --git a/managedkafka/examples/src/main/java/examples/PauseConnector.java b/managedkafka/examples/src/main/java/examples/PauseConnector.java
new file mode 100644
index 00000000000..36c26ee1ae1
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/PauseConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_pause_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.PauseConnectorRequest;
+import java.io.IOException;
+
+public class PauseConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ pauseConnector(projectId, region, connectClusterId, connectorId);
+ }
+
+ public static void pauseConnector(
+ String projectId, String region, String connectClusterId, String connectorId)
+ throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient =
+ ManagedKafkaConnectClient.create()) {
+ ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId,
+ connectorId);
+ PauseConnectorRequest request = PauseConnectorRequest.newBuilder()
+ .setName(connectorName.toString()).build();
+
+ // This operation is being handled synchronously.
+ managedKafkaConnectClient.pauseConnector(request);
+ System.out.printf("Connector %s paused successfully.\n", connectorId);
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_pause_connector]
diff --git a/managedkafka/examples/src/main/java/examples/RestartConnector.java b/managedkafka/examples/src/main/java/examples/RestartConnector.java
new file mode 100644
index 00000000000..78ef135313c
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/RestartConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_restart_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.RestartConnectorRequest;
+import java.io.IOException;
+
+public class RestartConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ restartConnector(projectId, region, connectClusterId, connectorId);
+ }
+
+ public static void restartConnector(
+ String projectId, String region, String connectClusterId, String connectorId)
+ throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient =
+ ManagedKafkaConnectClient.create()) {
+ ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId,
+ connectorId);
+ RestartConnectorRequest request = RestartConnectorRequest.newBuilder()
+ .setName(connectorName.toString()).build();
+
+ // This operation is being handled synchronously.
+ managedKafkaConnectClient.restartConnector(request);
+ System.out.printf("Connector %s restarted successfully.\n", connectorId);
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.restartConnector got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_restart_connector]
diff --git a/managedkafka/examples/src/main/java/examples/ResumeConnector.java b/managedkafka/examples/src/main/java/examples/ResumeConnector.java
new file mode 100644
index 00000000000..b3aa808d0f3
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/ResumeConnector.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_resume_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.ResumeConnectorRequest;
+import java.io.IOException;
+
+public class ResumeConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ resumeConnector(projectId, region, connectClusterId, connectorId);
+ }
+
+ public static void resumeConnector(
+ String projectId, String region, String connectClusterId, String connectorId)
+ throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient =
+ ManagedKafkaConnectClient.create()) {
+ ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId,
+ connectorId);
+ ResumeConnectorRequest request = ResumeConnectorRequest.newBuilder()
+ .setName(connectorName.toString()).build();
+
+ // This operation is being handled synchronously.
+ managedKafkaConnectClient.resumeConnector(request);
+ System.out.printf("Connector %s resumed successfully.\n", connectorId);
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_resume_connector]
diff --git a/managedkafka/examples/src/main/java/examples/StopConnector.java b/managedkafka/examples/src/main/java/examples/StopConnector.java
new file mode 100644
index 00000000000..e5bcd7ccd76
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/StopConnector.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_stop_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.StopConnectorRequest;
+import java.io.IOException;
+
+public class StopConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String connectClusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ stopConnector(projectId, region, connectClusterId, connectorId);
+ }
+
+ public static void stopConnector(
+ String projectId, String region, String connectClusterId, String connectorId)
+ throws Exception {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient =
+ ManagedKafkaConnectClient.create()) {
+ ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId,
+ connectorId);
+ StopConnectorRequest request = StopConnectorRequest.newBuilder()
+ .setName(connectorName.toString()).build();
+
+ // This operation is being handled synchronously.
+ managedKafkaConnectClient.stopConnector(request);
+ System.out.printf("Connector %s stopped successfully.\n", connectorId);
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.stopConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_stop_connector]
diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java
new file mode 100644
index 00000000000..7d22efedcab
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_update_connect_cluster]
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.retrying.TimedRetryAlgorithm;
+import com.google.cloud.managedkafka.v1.CapacityConfig;
+import com.google.cloud.managedkafka.v1.ConnectCluster;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
+import com.google.cloud.managedkafka.v1.OperationMetadata;
+import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
+import com.google.protobuf.FieldMask;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+
+public class UpdateConnectCluster {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ long memoryBytes = 25769803776L; // 24 GiB
+ updateConnectCluster(projectId, region, clusterId, memoryBytes);
+ }
+
+ public static void updateConnectCluster(
+ String projectId, String region, String clusterId, long memoryBytes) throws Exception {
+ CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
+ ConnectCluster connectCluster = ConnectCluster.newBuilder()
+ .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
+ .setCapacityConfig(capacityConfig)
+ .build();
+ FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
+
+ // Create the settings to configure the timeout for polling operations
+ ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
+ TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
+ RetrySettings.newBuilder()
+ .setTotalTimeoutDuration(Duration.ofHours(1L))
+ .build());
+ settingsBuilder.updateConnectClusterOperationSettings()
+ .setPollingAlgorithm(timedRetryAlgorithm);
+
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
+ settingsBuilder.build())) {
+ UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
+ .setUpdateMask(updateMask)
+ .setConnectCluster(connectCluster).build();
+ OperationFuture future = managedKafkaConnectClient
+ .updateConnectClusterOperationCallable().futureCall(request);
+
+ // Get the initial LRO and print details. CreateConnectCluster contains sample
+ // code for polling logs.
+ OperationSnapshot operation = future.getInitialFuture().get();
+ System.out.printf(
+ "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
+ operation.getName(),
+ operation.isDone(),
+ future.getMetadata().get().toString());
+
+ ConnectCluster response = future.get();
+ System.out.printf("Updated connect cluster: %s\n", response.getName());
+ } catch (ExecutionException e) {
+ System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n",
+ e.getMessage());
+ }
+ }
+}
+
+// [END managedkafka_update_connect_cluster]
diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnector.java b/managedkafka/examples/src/main/java/examples/UpdateConnector.java
new file mode 100644
index 00000000000..37186b31de9
--- /dev/null
+++ b/managedkafka/examples/src/main/java/examples/UpdateConnector.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+// [START managedkafka_update_connector]
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.protobuf.FieldMask;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class UpdateConnector {
+
+ public static void main(String[] args) throws Exception {
+ // TODO(developer): Replace these variables before running the example.
+ String projectId = "my-project-id";
+ String region = "my-region"; // e.g. us-east1
+ String clusterId = "my-connect-cluster";
+ String connectorId = "my-connector";
+ // The new value for the 'tasks.max' configuration.
+ String maxTasks = "5";
+ updateConnector(projectId, region, clusterId, connectorId, maxTasks);
+ }
+
+ public static void updateConnector(
+ String projectId, String region, String clusterId, String connectorId, String maxTasks)
+ throws IOException {
+ try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
+ Map configMap = new HashMap<>();
+ configMap.put("tasks.max", maxTasks);
+
+ Connector connector =
+ Connector.newBuilder()
+ .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString())
+ .putAllConfigs(configMap)
+ .build();
+
+ // The field mask specifies which fields to update. Here, we update the 'config' field.
+ FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build();
+
+ // This operation is handled synchronously.
+ Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask);
+ System.out.printf("Updated connector: %s\n", updatedConnector.getName());
+ System.out.println(updatedConnector.getAllFields());
+
+ } catch (IOException | ApiException e) {
+ System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage());
+ }
+ }
+}
+// [END managedkafka_update_connector]
diff --git a/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java
new file mode 100644
index 00000000000..9d42dcafbcc
--- /dev/null
+++ b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java
@@ -0,0 +1,433 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+import static com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient.create;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.api.gax.longrunning.OperationSnapshot;
+import com.google.api.gax.retrying.RetryingFuture;
+import com.google.api.gax.rpc.OperationCallable;
+import com.google.cloud.managedkafka.v1.ConnectCluster;
+import com.google.cloud.managedkafka.v1.ConnectClusterName;
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
+import com.google.cloud.managedkafka.v1.DeleteConnectClusterRequest;
+import com.google.cloud.managedkafka.v1.LocationName;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
+import com.google.cloud.managedkafka.v1.OperationMetadata;
+import com.google.cloud.managedkafka.v1.PauseConnectorRequest;
+import com.google.cloud.managedkafka.v1.RestartConnectorRequest;
+import com.google.cloud.managedkafka.v1.ResumeConnectorRequest;
+import com.google.cloud.managedkafka.v1.StopConnectorRequest;
+import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
+import com.google.protobuf.Empty;
+import com.google.protobuf.FieldMask;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class ConnectClustersTest {
+ protected static final String projectId = "test-project";
+ protected static final String region = "us-central1";
+ protected static final String clusterId = "test-connect-cluster";
+ protected static final String kafkaCluster = "test-kafka-cluster";
+ protected static final String connectClusterName =
+ "projects/test-project/locations/us-central1/connectClusters/test-connect-cluster";
+ protected static final String connectorId = "test-connector";
+ protected static final String connectorName =
+ "projects/test-project/locations/us-central1/connectClusters/test-connect-cluster"
+ + "/connectors/test-connector";
+ private ByteArrayOutputStream bout;
+
+ @Before
+ public void setUp() {
+ bout = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bout));
+ }
+
+ @Test
+ public void createConnectClusterTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ OperationCallable
+ operationCallable = mock(OperationCallable.class);
+ OperationFuture operationFuture =
+ mock(OperationFuture.class);
+
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+
+ // client creation
+ mockedStatic
+ .when(() -> create(any(ManagedKafkaConnectSettings.class)))
+ .thenReturn(managedKafkaConnectClient);
+
+ // operation callable
+ when(managedKafkaConnectClient.createConnectClusterOperationCallable())
+ .thenReturn(operationCallable);
+ when(operationCallable.futureCall(any(CreateConnectClusterRequest.class)))
+ .thenReturn(operationFuture);
+
+ // initial future
+ ApiFuture initialFuture = mock(ApiFuture.class);
+ when(operationFuture.getInitialFuture()).thenReturn(initialFuture);
+
+ // Metadata
+ ApiFuture metadataFuture = mock(ApiFuture.class);
+ OperationMetadata metadata = mock(OperationMetadata.class);
+ when(operationFuture.getMetadata()).thenReturn(metadataFuture);
+ when(metadataFuture.get()).thenReturn(metadata);
+
+ // operation snapshot
+ OperationSnapshot operationSnapshot = mock(OperationSnapshot.class);
+ when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot);
+ when(operationSnapshot.getName())
+ .thenReturn("projects/test-project/locations/test-location/operations/test-operation");
+ when(operationSnapshot.isDone()).thenReturn(false, false, true);
+
+ // polling future
+ RetryingFuture pollingFuture = mock(RetryingFuture.class);
+ when(operationFuture.getPollingFuture()).thenReturn(pollingFuture);
+ when(operationFuture.isDone()).thenReturn(false, false, true);
+ ApiFuture attemptResult = mock(ApiFuture.class);
+ when(pollingFuture.getAttemptResult()).thenReturn(attemptResult);
+ when(attemptResult.get()).thenReturn(operationSnapshot);
+
+ // Setup final result
+ ConnectCluster resultCluster = mock(ConnectCluster.class);
+ when(operationFuture.get()).thenReturn(resultCluster);
+ when(resultCluster.getName()).thenReturn(connectClusterName);
+
+ String subnet = "test-subnet";
+ int vcpu = 12;
+ long memory = 12884901888L; // 12 GiB
+ CreateConnectCluster.createConnectCluster(
+ projectId, region, clusterId, subnet, kafkaCluster, vcpu, memory);
+ String output = bout.toString();
+ assertThat(output).contains("Created connect cluster");
+ assertThat(output).contains(connectClusterName);
+ verify(managedKafkaConnectClient, times(1)).createConnectClusterOperationCallable();
+ verify(operationCallable, times(1)).futureCall(any(CreateConnectClusterRequest.class));
+ verify(operationFuture, times(2)).getPollingFuture(); // Verify 2 polling attempts
+ verify(pollingFuture, times(2)).getAttemptResult(); // Verify 2 attempt results
+ verify(operationSnapshot, times(3)).isDone(); // 2 polls + 1 initial check
+ }
+ }
+
+ @Test
+ public void getConnectClusterTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ ConnectCluster connectCluster =
+ ConnectCluster.newBuilder()
+ .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
+ .build();
+ when(managedKafkaConnectClient.getConnectCluster(any(ConnectClusterName.class)))
+ .thenReturn(connectCluster);
+ GetConnectCluster.getConnectCluster(projectId, region, clusterId);
+ String output = bout.toString();
+ assertThat(output).contains(connectClusterName);
+ verify(managedKafkaConnectClient, times(1)).getConnectCluster(any(ConnectClusterName.class));
+ }
+ }
+
+ @Test
+ public void listConnectClustersTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ ManagedKafkaConnectClient.ListConnectClustersPagedResponse response =
+ mock(ManagedKafkaConnectClient.ListConnectClustersPagedResponse.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Iterable iterable =
+ () -> {
+ List connectClusters = new ArrayList<>();
+ connectClusters.add(
+ ConnectCluster.newBuilder()
+ .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
+ .build());
+ return connectClusters.iterator();
+ };
+ when(response.iterateAll()).thenReturn(iterable);
+ when(managedKafkaConnectClient.listConnectClusters(any(LocationName.class)))
+ .thenReturn(response);
+ ListConnectClusters.listConnectClusters(projectId, region);
+ String output = bout.toString();
+ assertThat(output).contains(connectClusterName);
+ verify(managedKafkaConnectClient, times(1)).listConnectClusters(any(LocationName.class));
+ }
+ }
+
+ @Test
+ public void updateConnectClusterTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ OperationCallable
+ operationCallable = mock(OperationCallable.class);
+ OperationFuture operationFuture =
+ mock(OperationFuture.class);
+
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+
+ // client creation
+ mockedStatic
+ .when(() -> create(any(ManagedKafkaConnectSettings.class)))
+ .thenReturn(managedKafkaConnectClient);
+
+ // operation callable
+ when(managedKafkaConnectClient.updateConnectClusterOperationCallable())
+ .thenReturn(operationCallable);
+ when(operationCallable.futureCall(any(UpdateConnectClusterRequest.class)))
+ .thenReturn(operationFuture);
+
+ // initial future
+ ApiFuture initialFuture = mock(ApiFuture.class);
+ when(operationFuture.getInitialFuture()).thenReturn(initialFuture);
+
+ // Metadata
+ ApiFuture metadataFuture = mock(ApiFuture.class);
+ OperationMetadata metadata = mock(OperationMetadata.class);
+ when(operationFuture.getMetadata()).thenReturn(metadataFuture);
+ when(metadataFuture.get()).thenReturn(metadata);
+
+ // operation snapshot
+ OperationSnapshot operationSnapshot = mock(OperationSnapshot.class);
+ when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot);
+ when(operationSnapshot.getName())
+ .thenReturn("projects/test-project/locations/test-location/operations/test-operation");
+ when(operationSnapshot.isDone()).thenReturn(true);
+
+ // Setup final result
+ ConnectCluster resultCluster = mock(ConnectCluster.class);
+ when(operationFuture.get()).thenReturn(resultCluster);
+ when(resultCluster.getName()).thenReturn(connectClusterName);
+
+ long memory = 25769803776L; // 24 GiB
+ UpdateConnectCluster.updateConnectCluster(projectId, region, clusterId, memory);
+ String output = bout.toString();
+ assertThat(output).contains("Updated connect cluster");
+ assertThat(output).contains(connectClusterName);
+ verify(managedKafkaConnectClient, times(1)).updateConnectClusterOperationCallable();
+ verify(operationCallable, times(1)).futureCall(any(UpdateConnectClusterRequest.class));
+ }
+ }
+
+ @Test
+ public void deleteConnectClusterTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ OperationCallable operationCallable =
+ mock(OperationCallable.class);
+ OperationFuture operationFuture = mock(OperationFuture.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+
+ // client creation
+ mockedStatic
+ .when(() -> create(any(ManagedKafkaConnectSettings.class)))
+ .thenReturn(managedKafkaConnectClient);
+
+ // operation callable
+ when(managedKafkaConnectClient.deleteConnectClusterOperationCallable())
+ .thenReturn(operationCallable);
+ when(operationCallable.futureCall(any(DeleteConnectClusterRequest.class)))
+ .thenReturn(operationFuture);
+
+ // initial future
+ ApiFuture initialFuture = mock(ApiFuture.class);
+ when(operationFuture.getInitialFuture()).thenReturn(initialFuture);
+
+ // Metadata
+ ApiFuture metadataFuture = mock(ApiFuture.class);
+ OperationMetadata metadata = mock(OperationMetadata.class);
+ when(operationFuture.getMetadata()).thenReturn(metadataFuture);
+ when(metadataFuture.get()).thenReturn(metadata);
+
+ // operation snapshot
+ OperationSnapshot operationSnapshot = mock(OperationSnapshot.class);
+ when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot);
+ when(operationSnapshot.getName())
+ .thenReturn("projects/test-project/locations/test-location/operations/test-operation");
+ when(operationSnapshot.isDone()).thenReturn(true);
+
+ // Setup final result
+ Empty resultEmpty = mock(Empty.class);
+ when(operationFuture.get()).thenReturn(resultEmpty);
+
+ DeleteConnectCluster.deleteConnectCluster(projectId, region, clusterId);
+ String output = bout.toString();
+ assertThat(output).contains("Deleted connect cluster");
+ verify(managedKafkaConnectClient, times(1)).deleteConnectClusterOperationCallable();
+ verify(operationCallable, times(1)).futureCall(any(DeleteConnectClusterRequest.class));
+ }
+ }
+
+ @Test
+ public void pauseConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ PauseConnector.pauseConnector(projectId, region, clusterId, connectorId);
+ String output = bout.toString();
+ assertThat(output).contains("Connector " + connectorId + " paused successfully.");
+ verify(managedKafkaConnectClient, times(1)).pauseConnector(any(PauseConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void listConnectorsTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ ManagedKafkaConnectClient.ListConnectorsPagedResponse response =
+ mock(ManagedKafkaConnectClient.ListConnectorsPagedResponse.class);
+
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+
+ List connectors = new ArrayList<>();
+ connectors.add(Connector.newBuilder().setName(connectorName).build());
+ Iterable iterable = () -> connectors.iterator();
+
+ when(response.iterateAll()).thenReturn(iterable);
+ when(managedKafkaConnectClient.listConnectors(any(ConnectClusterName.class)))
+ .thenReturn(response);
+
+ ListConnectors.listConnectors(projectId, region, clusterId);
+
+ String output = bout.toString();
+ assertThat(output).contains(connectorName);
+ verify(managedKafkaConnectClient, times(1)).listConnectors(any(ConnectClusterName.class));
+ }
+ }
+
+ @Test
+ public void getConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+
+ Connector connector = Connector.newBuilder().setName(connectorName).build();
+ when(managedKafkaConnectClient.getConnector(any(ConnectorName.class))).thenReturn(connector);
+
+ GetConnector.getConnector(projectId, region, clusterId, connectorId);
+ String output = bout.toString();
+
+ assertThat(output).contains(connectorName);
+ verify(managedKafkaConnectClient, times(1)).getConnector(any(ConnectorName.class));
+ }
+ }
+
+ @Test
+ public void deleteConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+
+ DeleteConnector.deleteConnector(projectId, region, clusterId, connectorId);
+
+ String output = bout.toString();
+ assertThat(output).contains("Deleted connector: " + connectorName);
+ verify(managedKafkaConnectClient, times(1)).deleteConnector(any(ConnectorName.class));
+ }
+ }
+
+ @Test
+ public void updateConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+
+ Connector updatedConnector =
+ Connector.newBuilder().setName(connectorName).putConfigs("tasks.max", "5").build();
+
+ when(managedKafkaConnectClient.updateConnector(any(Connector.class), any(FieldMask.class)))
+ .thenReturn(updatedConnector);
+
+ UpdateConnector.updateConnector(projectId, region, clusterId, connectorId, "5");
+
+ String output = bout.toString();
+ assertThat(output).contains("Updated connector: " + connectorName);
+ assertThat(output).contains("tasks.max");
+ assertThat(output).contains("5");
+ verify(managedKafkaConnectClient, times(1))
+ .updateConnector(any(Connector.class), any(FieldMask.class));
+ }
+ }
+
+ @Test
+ public void resumeConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ ResumeConnector.resumeConnector(projectId, region, clusterId, connectorId);
+ String output = bout.toString();
+ assertThat(output).contains("Connector " + connectorId + " resumed successfully.");
+ verify(managedKafkaConnectClient, times(1))
+ .resumeConnector(any(ResumeConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void restartConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ RestartConnector.restartConnector(projectId, region, clusterId, connectorId);
+ String output = bout.toString();
+ assertThat(output).contains("Connector " + connectorId + " restarted successfully.");
+ verify(managedKafkaConnectClient, times(1))
+ .restartConnector(any(RestartConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void stopConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ StopConnector.stopConnector(projectId, region, clusterId, connectorId);
+ String output = bout.toString();
+ assertThat(output).contains("Connector " + connectorId + " stopped successfully.");
+ verify(managedKafkaConnectClient, times(1)).stopConnector(any(StopConnectorRequest.class));
+ }
+ }
+}
diff --git a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java
new file mode 100644
index 00000000000..e426b5f76bf
--- /dev/null
+++ b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java
@@ -0,0 +1,300 @@
+/*
+ * Copyright 2025 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package examples;
+
+import static com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient.create;
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.managedkafka.v1.Connector;
+import com.google.cloud.managedkafka.v1.ConnectorName;
+import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
+import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+@RunWith(JUnit4.class)
+public class ConnectorsTest {
+
+ protected static final String projectId = "test-project";
+ protected static final String region = "us-central1";
+ protected static final String connectClusterId = "test-connect-cluster";
+ protected static final String mirrorMaker2ConnectorId = "test-mirrormaker2-source-connector";
+ protected static final String pubsubSourceConnectorId = "test-pubsub-source-connector";
+ protected static final String pubsubSinkConnectorId = "test-pubsub-sink-connector";
+ protected static final String gcsConnectorId = "test-gcs-sink-connector";
+ protected static final String bigqueryConnectorId = "test-bigquery-sink-connector";
+
+ protected static final String mirrorMaker2SourceConnectorName =
+ "projects/test-project/locations/us-central1/connectClusters/"
+ + "test-connect-cluster/connectors/test-mirrormaker2-source-connector";
+ protected static final String pubsubSourceConnectorName =
+ "projects/test-project/locations/us-central1/connectClusters/"
+ + "test-connect-cluster/connectors/test-pubsub-source-connector";
+ protected static final String pubsubSinkConnectorName =
+ "projects/test-project/locations/us-central1/connectClusters/"
+ + "test-connect-cluster/connectors/test-pubsub-sink-connector";
+ protected static final String gcsConnectorName =
+ "projects/test-project/locations/us-central1/connectClusters/"
+ + "test-connect-cluster/connectors/test-gcs-sink-connector";
+ protected static final String bigqueryConnectorName =
+ "projects/test-project/locations/us-central1/connectClusters/"
+ + "test-connect-cluster/connectors/test-bigquery-sink-connector";
+
+ private ByteArrayOutputStream bout;
+
+ @Before
+ public void setUp() {
+ bout = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bout));
+ }
+
+ @Test
+ public void createMirrorMaker2SourceConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Connector connector =
+ Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, mirrorMaker2ConnectorId)
+ .toString())
+ .build();
+ when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class)))
+ .thenReturn(connector);
+
+ String sourceClusterBootstrapServers = "source-cluster:9092";
+ String targetClusterBootstrapServers = "target-cluster:9092";
+ String maxTasks = "3";
+ String sourceClusterAlias = "source";
+ String targetClusterAlias = "target";
+ String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
+ String topics = ".*";
+
+ CreateMirrorMaker2SourceConnector.createMirrorMaker2SourceConnector(
+ projectId,
+ region,
+ maxTasks,
+ connectClusterId,
+ mirrorMaker2ConnectorId,
+ sourceClusterBootstrapServers,
+ targetClusterBootstrapServers,
+ sourceClusterAlias,
+ targetClusterAlias,
+ connectorClass,
+ topics);
+
+ String output = bout.toString();
+ assertThat(output).contains("Created MirrorMaker2 Source connector");
+ assertThat(output).contains(mirrorMaker2SourceConnectorName);
+ verify(managedKafkaConnectClient, times(1))
+ .createConnector(any(CreateConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void createPubSubSourceConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Connector connector =
+ Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, pubsubSourceConnectorId)
+ .toString())
+ .build();
+ when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class)))
+ .thenReturn(connector);
+
+ String pubsubProjectId = "test-pubsub-project";
+ String subscriptionName = "test-subscription";
+ String kafkaTopicName = "test-kafka-topic";
+ String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
+ String maxTasks = "3";
+ String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+
+ CreatePubSubSourceConnector.createPubSubSourceConnector(
+ projectId,
+ region,
+ connectClusterId,
+ pubsubSourceConnectorId,
+ pubsubProjectId,
+ subscriptionName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ valueConverter,
+ keyConverter);
+
+ String output = bout.toString();
+ assertThat(output).contains("Created Pub/Sub Source connector");
+ assertThat(output).contains(pubsubSourceConnectorName);
+ verify(managedKafkaConnectClient, times(1))
+ .createConnector(any(CreateConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void createPubSubSinkConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Connector connector =
+ Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, pubsubSinkConnectorId)
+ .toString())
+ .build();
+ when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class)))
+ .thenReturn(connector);
+
+ String pubsubProjectId = "test-pubsub-project";
+ String pubsubTopicName = "test-pubsub-topic";
+ String kafkaTopicName = "test-kafka-topic";
+ String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
+ String maxTasks = "3";
+ String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+
+ CreatePubSubSinkConnector.createPubSubSinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ pubsubSinkConnectorId,
+ pubsubProjectId,
+ pubsubTopicName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ valueConverter,
+ keyConverter);
+
+ String output = bout.toString();
+ assertThat(output).contains("Created Pub/Sub Sink connector");
+ assertThat(output).contains(pubsubSinkConnectorName);
+ verify(managedKafkaConnectClient, times(1))
+ .createConnector(any(CreateConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void createCloudStorageSinkConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Connector connector =
+ Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, gcsConnectorId).toString())
+ .build();
+ when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class)))
+ .thenReturn(connector);
+
+ String bucketName = "test-gcs-bucket";
+ String kafkaTopicName = "test-kafka-topic";
+ String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
+ String maxTasks = "3";
+ String gcsCredentialsDefault = "true";
+ String formatOutputType = "json";
+ String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
+ String valueSchemasEnable = "false";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+
+ CreateCloudStorageSinkConnector.createCloudStorageSinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ gcsConnectorId,
+ bucketName,
+ kafkaTopicName,
+ connectorClass,
+ maxTasks,
+ gcsCredentialsDefault,
+ formatOutputType,
+ valueConverter,
+ valueSchemasEnable,
+ keyConverter);
+
+ String output = bout.toString();
+ assertThat(output).contains("Created Cloud Storage Sink connector");
+ assertThat(output).contains(gcsConnectorName);
+ verify(managedKafkaConnectClient, times(1))
+ .createConnector(any(CreateConnectorRequest.class));
+ }
+ }
+
+ @Test
+ public void createBigQuerySinkConnectorTest() throws Exception {
+ ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class);
+ try (MockedStatic mockedStatic =
+ Mockito.mockStatic(ManagedKafkaConnectClient.class)) {
+ mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient);
+ Connector connector =
+ Connector.newBuilder()
+ .setName(
+ ConnectorName.of(projectId, region, connectClusterId, bigqueryConnectorId)
+ .toString())
+ .build();
+ when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class)))
+ .thenReturn(connector);
+
+ String bigqueryProjectId = "test-bigquery-project";
+ String datasetName = "test_dataset";
+ String kafkaTopicName = "test-kafka-topic";
+ String maxTasks = "3";
+ String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
+ String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
+ String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
+ String valueSchemasEnable = "false";
+
+ CreateBigQuerySinkConnector.createBigQuerySinkConnector(
+ projectId,
+ region,
+ connectClusterId,
+ bigqueryConnectorId,
+ bigqueryProjectId,
+ datasetName,
+ kafkaTopicName,
+ maxTasks,
+ connectorClass,
+ keyConverter,
+ valueConverter,
+ valueSchemasEnable);
+
+ String output = bout.toString();
+ assertThat(output).contains("Created BigQuery Sink connector");
+ assertThat(output).contains(bigqueryConnectorName);
+ verify(managedKafkaConnectClient, times(1))
+ .createConnector(any(CreateConnectorRequest.class));
+ }
+ }
+}