From bdbd46c35e65f5cafdf71f8c0509b211ab18ac83 Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 4 Aug 2025 17:24:01 +0000 Subject: [PATCH 1/6] feat(managedkafka): Connect code samples --- managedkafka/examples/pom.xml | 4 +- .../examples/CreateBigQuerySinkConnector.java | 115 ++++++ .../CreateCloudStorageSinkConnector.java | 115 ++++++ .../java/examples/CreateConnectCluster.java | 126 +++++++ .../examples/CreateMirrorMaker2Connector.java | 143 ++++++++ .../examples/CreatePubSubSinkConnector.java | 107 ++++++ .../examples/CreatePubSubSourceConnector.java | 107 ++++++ .../java/examples/DeleteConnectCluster.java | 84 +++++ .../main/java/examples/GetConnectCluster.java | 49 +++ .../java/examples/ListConnectClusters.java | 51 +++ .../main/java/examples/PauseConnector.java | 57 +++ .../main/java/examples/RestartConnector.java | 57 +++ .../main/java/examples/ResumeConnector.java | 57 +++ .../src/main/java/examples/StopConnector.java | 56 +++ .../java/examples/UpdateConnectCluster.java | 92 +++++ .../java/examples/ConnectClustersTest.java | 347 ++++++++++++++++++ .../test/java/examples/ConnectorsTest.java | 322 ++++++++++++++++ 17 files changed, 1887 insertions(+), 2 deletions(-) create mode 100644 managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/CreateConnectCluster.java create mode 100644 managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java create mode 100644 managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java create mode 100644 managedkafka/examples/src/main/java/examples/GetConnectCluster.java create mode 100644 managedkafka/examples/src/main/java/examples/ListConnectClusters.java create mode 100644 managedkafka/examples/src/main/java/examples/PauseConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/RestartConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/ResumeConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/StopConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java create mode 100644 managedkafka/examples/src/test/java/examples/ConnectClustersTest.java create mode 100644 managedkafka/examples/src/test/java/examples/ConnectorsTest.java diff --git a/managedkafka/examples/pom.xml b/managedkafka/examples/pom.xml index 7f1343971b3..217ef96ba08 100644 --- a/managedkafka/examples/pom.xml +++ b/managedkafka/examples/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.example.managedkafka managedkafka-snippets - pom + jar Google Cloud Managed Kafka Snippets https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/managedkafka @@ -29,7 +29,7 @@ com.google.cloud libraries-bom - 26.50.0 + 26.64.0 pom import diff --git a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java new file mode 100644 index 00000000000..56904140e84 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java @@ -0,0 +1,115 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_bigquery_sink_connector] + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CreateBigQuerySinkConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-bigquery-sink-connector"; + String bigqueryProjectId = "my-bigquery-project-id"; + String datasetName = "my_dataset"; + String tableName = "kafka_sink_table"; + String kafkaTopicName = "kafka-topic"; + String maxTasks = "3"; + String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; + String valueSchemasEnable = "false"; + createBigQuerySinkConnector( + projectId, + region, + connectClusterId, + connectorId, + bigqueryProjectId, + datasetName, + tableName, + kafkaTopicName, + maxTasks, + connectorClass, + keyConverter, + valueConverter, + valueSchemasEnable); + } + + public static void createBigQuerySinkConnector( + String projectId, + String region, + String connectClusterId, + String connectorId, + String bigqueryProjectId, + String datasetName, + String tableName, + String kafkaTopicName, + String maxTasks, + String connectorClass, + String keyConverter, + String valueConverter, + String valueSchemasEnable) + throws Exception { + + // Build the connector configuration + Map configMap = new HashMap<>(); + configMap.put("name", connectorId); + configMap.put("project", bigqueryProjectId); + configMap.put("topics", kafkaTopicName); + configMap.put("tasks.max", maxTasks); + configMap.put("connector.class", connectorClass); + configMap.put("key.converter", keyConverter); + configMap.put("value.converter", valueConverter); + configMap.put("value.converter.schemas.enable", valueSchemasEnable); + configMap.put("defaultDataset", datasetName); + + Connector connector = + Connector.newBuilder() + .setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + CreateConnectorRequest request = + CreateConnectorRequest.newBuilder() + .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString()) + .setConnectorId(connectorId) + .setConnector(connector) + .build(); + + // This operation is being handled synchronously. + Connector response = managedKafkaConnectClient.createConnector(request); + System.out.printf("Created BigQuery Sink connector: %s\n", response.getName()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_create_bigquery_sink_connector] diff --git a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java new file mode 100644 index 00000000000..152c29e29bd --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java @@ -0,0 +1,115 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_cloud_storage_sink_connector] + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CreateCloudStorageSinkConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-gcs-sink-connector"; + String bucketName = "my-gcs-bucket"; + String kafkaTopicName = "kafka-topic"; + String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector"; + String maxTasks = "1"; + String gcsCredentialsDefault = "true"; + String formatOutputType = "json"; + String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; + String valueSchemasEnable = "false"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + createCloudStorageSinkConnector( + projectId, + region, + connectClusterId, + connectorId, + bucketName, + kafkaTopicName, + connectorClass, + maxTasks, + gcsCredentialsDefault, + formatOutputType, + valueConverter, + valueSchemasEnable, + keyConverter); + } + + public static void createCloudStorageSinkConnector( + String projectId, + String region, + String connectClusterId, + String connectorId, + String bucketName, + String kafkaTopicName, + String connectorClass, + String maxTasks, + String gcsCredentialsDefault, + String formatOutputType, + String valueConverter, + String valueSchemasEnable, + String keyConverter) + throws Exception { + + // Build the connector configuration + Map configMap = new HashMap<>(); + configMap.put("connector.class", connectorClass); + configMap.put("tasks.max", maxTasks); + configMap.put("topics", kafkaTopicName); + configMap.put("gcs.bucket.name", bucketName); + configMap.put("gcs.credentials.default", gcsCredentialsDefault); + configMap.put("format.output.type", formatOutputType); + configMap.put("name", connectorId); + configMap.put("value.converter", valueConverter); + configMap.put("value.converter.schemas.enable", valueSchemasEnable); + configMap.put("key.converter", keyConverter); + + Connector connector = Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + CreateConnectorRequest request = CreateConnectorRequest.newBuilder() + .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString()) + .setConnectorId(connectorId) + .setConnector(connector) + .build(); + + // This operation is being handled synchronously. + Connector response = managedKafkaConnectClient.createConnector(request); + System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_create_cloud_storage_sink_connector] diff --git a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java new file mode 100644 index 00000000000..551126b6998 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java @@ -0,0 +1,126 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_connect_cluster] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.retrying.TimedRetryAlgorithm; +import com.google.cloud.managedkafka.v1.CapacityConfig; +import com.google.cloud.managedkafka.v1.ConnectAccessConfig; +import com.google.cloud.managedkafka.v1.ConnectCluster; +import com.google.cloud.managedkafka.v1.ConnectGcpConfig; +import com.google.cloud.managedkafka.v1.ConnectNetworkConfig; +import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest; +import com.google.cloud.managedkafka.v1.LocationName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings; +import com.google.cloud.managedkafka.v1.OperationMetadata; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class CreateConnectCluster { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet + String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to + int cpu = 12; + long memoryBytes = 12884901888L; // 12 GiB + createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes); + } + + public static void createConnectCluster( + String projectId, + String region, + String clusterId, + String subnet, + String kafkaCluster, + int cpu, + long memoryBytes) + throws Exception { + CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu) + .setMemoryBytes(memoryBytes).build(); + ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder() + .setPrimarySubnet(subnet) + .build(); + ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder() + .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build()) + .build(); + ConnectCluster connectCluster = ConnectCluster.newBuilder() + .setCapacityConfig(capacityConfig) + .setGcpConfig(gcpConfig) + .setKafkaCluster(kafkaCluster) + .build(); + + // Create the settings to configure the timeout for polling operations + ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder(); + TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setTotalTimeoutDuration(Duration.ofHours(1L)) + .build()); + settingsBuilder.createConnectClusterOperationSettings() + .setPollingAlgorithm(timedRetryAlgorithm); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient + .create(settingsBuilder.build())) { + CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder() + .setParent(LocationName.of(projectId, region).toString()) + .setConnectClusterId(clusterId) + .setConnectCluster(connectCluster) + .build(); + + // The duration of this operation can vary considerably, typically taking + // between 10-30 minutes. + OperationFuture future = managedKafkaConnectClient + .createConnectClusterOperationCallable().futureCall(request); + + // Get the initial LRO and print details. + OperationSnapshot operation = future.getInitialFuture().get(); + System.out.printf( + "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n", + operation.getName(), operation.isDone(), future.getMetadata().get().toString()); + + while (!future.isDone()) { + // The pollingFuture gives us the most recent status of the operation + RetryingFuture pollingFuture = future.getPollingFuture(); + OperationSnapshot currentOp = pollingFuture.getAttemptResult().get(); + System.out.printf("Polling Operation:\nName: %s\n Done: %s\n", + currentOp.getName(), + currentOp.isDone()); + } + + // NOTE: future.get() blocks completion until the operation is complete (isDone + // = True) + ConnectCluster response = future.get(); + System.out.printf("Created connect cluster: %s\n", response.getName()); + } catch (ExecutionException e) { + System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s", + e.getMessage()); + throw e; + } + } +} +// [END managedkafka_create_connect_cluster] diff --git a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java new file mode 100644 index 00000000000..f5c7b3ef004 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java @@ -0,0 +1,143 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_mirrormaker2_connector] + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CreateMirrorMaker2Connector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-mirrormaker2-connector"; + String sourceClusterBootstrapServers = "my-source-cluster:9092"; + String targetClusterBootstrapServers = "my-target-cluster:9092"; + String sourceClusterAlias = "source"; + String targetClusterAlias = "target"; + String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; + String topics = ".*"; + String offsetSyncsReplicationFactor = "1"; + String sourceSecurityProtocol = "SASL_SSL"; + String sourceSaslMechanism = "OAUTHBEARER"; + String sourceLoginCallbackHandler = + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; + String sourceJaasConfig = + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; + String targetSecurityProtocol = "SASL_SSL"; + String targetSaslMechanism = "OAUTHBEARER"; + String targetLoginCallbackHandler = + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; + String targetJaasConfig = + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; + createMirrorMaker2Connector( + projectId, + region, + connectClusterId, + connectorId, + sourceClusterBootstrapServers, + targetClusterBootstrapServers, + sourceClusterAlias, + targetClusterAlias, + connectorClass, + topics, + offsetSyncsReplicationFactor, + sourceSecurityProtocol, + sourceSaslMechanism, + sourceLoginCallbackHandler, + sourceJaasConfig, + targetSecurityProtocol, + targetSaslMechanism, + targetLoginCallbackHandler, + targetJaasConfig); + } + + public static void createMirrorMaker2Connector( + String projectId, + String region, + String connectClusterId, + String connectorId, + String sourceClusterBootstrapServers, + String targetClusterBootstrapServers, + String sourceClusterAlias, + String targetClusterAlias, + String connectorClass, + String topics, + String offsetSyncsReplicationFactor, + String sourceSecurityProtocol, + String sourceSaslMechanism, + String sourceLoginCallbackHandler, + String sourceJaasConfig, + String targetSecurityProtocol, + String targetSaslMechanism, + String targetLoginCallbackHandler, + String targetJaasConfig) + throws Exception { + + // Build the connector configuration + Map configMap = new HashMap<>(); + configMap.put("connector.class", connectorClass); + configMap.put("name", connectorId); + configMap.put("source.cluster.alias", sourceClusterAlias); + configMap.put("target.cluster.alias", targetClusterAlias); + configMap.put("topics", topics); + configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers); + configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers); + configMap.put("offset-syncs.topic.replication.factor", offsetSyncsReplicationFactor); + configMap.put("source.cluster.security.protocol", sourceSecurityProtocol); + configMap.put("source.cluster.sasl.mechanism", sourceSaslMechanism); + configMap.put("source.cluster.sasl.login.callback.handler.class", sourceLoginCallbackHandler); + configMap.put("source.cluster.sasl.jaas.config", sourceJaasConfig); + configMap.put("target.cluster.security.protocol", targetSecurityProtocol); + configMap.put("target.cluster.sasl.mechanism", targetSaslMechanism); + configMap.put("target.cluster.sasl.login.callback.handler.class", targetLoginCallbackHandler); + configMap.put("target.cluster.sasl.jaas.config", targetJaasConfig); + + Connector connector = Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + CreateConnectorRequest request = CreateConnectorRequest.newBuilder() + .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString()) + .setConnectorId(connectorId) + .setConnector(connector) + .build(); + + // This operation is being handled synchronously. + Connector response = managedKafkaConnectClient.createConnector(request); + System.out.printf("Created MirrorMaker2 connector: %s\n", response.getName()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_create_mirrormaker2_connector] diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java new file mode 100644 index 00000000000..6aee84ac735 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java @@ -0,0 +1,107 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_pubsub_sink_connector] + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CreatePubSubSinkConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-pubsub-sink-connector"; + String pubsubProjectId = "my-pubsub-project-id"; + String pubsubTopicName = "my-pubsub-topic"; + String kafkaTopicName = "kafka-topic"; + String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"; + String maxTasks = "1"; + String valueConverter = "org.apache.kafka.connect.storage.StringConverter"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + createPubSubSinkConnector( + projectId, + region, + connectClusterId, + connectorId, + pubsubProjectId, + pubsubTopicName, + kafkaTopicName, + connectorClass, + maxTasks, + valueConverter, + keyConverter); + } + + public static void createPubSubSinkConnector( + String projectId, + String region, + String connectClusterId, + String connectorId, + String pubsubProjectId, + String pubsubTopicName, + String kafkaTopicName, + String connectorClass, + String maxTasks, + String valueConverter, + String keyConverter) + throws Exception { + + // Build the connector configuration + Map configMap = new HashMap<>(); + configMap.put("connector.class", connectorClass); + configMap.put("name", connectorId); + configMap.put("tasks.max", maxTasks); + configMap.put("topics", kafkaTopicName); + configMap.put("value.converter", valueConverter); + configMap.put("key.converter", keyConverter); + configMap.put("cps.topic", pubsubTopicName); + configMap.put("cps.project", pubsubProjectId); + + Connector connector = Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + CreateConnectorRequest request = CreateConnectorRequest.newBuilder() + .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString()) + .setConnectorId(connectorId) + .setConnector(connector) + .build(); + + // This operation is being handled synchronously. + Connector response = managedKafkaConnectClient.createConnector(request); + System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_create_pubsub_sink_connector] diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java new file mode 100644 index 00000000000..06854af450b --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java @@ -0,0 +1,107 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_create_pubsub_source_connector] + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class CreatePubSubSourceConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-pubsub-source-connector"; + String pubsubProjectId = "my-pubsub-project-id"; + String subscriptionName = "my-subscription"; + String kafkaTopicName = "pubsub-topic"; + String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"; + String maxTasks = "1"; + String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + createPubSubSourceConnector( + projectId, + region, + connectClusterId, + connectorId, + pubsubProjectId, + subscriptionName, + kafkaTopicName, + connectorClass, + maxTasks, + valueConverter, + keyConverter); + } + + public static void createPubSubSourceConnector( + String projectId, + String region, + String connectClusterId, + String connectorId, + String pubsubProjectId, + String subscriptionName, + String kafkaTopicName, + String connectorClass, + String maxTasks, + String valueConverter, + String keyConverter) + throws Exception { + + // Build the connector configuration + Map configMap = new HashMap<>(); + configMap.put("connector.class", connectorClass); + configMap.put("name", connectorId); + configMap.put("tasks.max", maxTasks); + configMap.put("kafka.topic", kafkaTopicName); + configMap.put("cps.subscription", subscriptionName); + configMap.put("cps.project", pubsubProjectId); + configMap.put("value.converter", valueConverter); + configMap.put("key.converter", keyConverter); + + Connector connector = Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + CreateConnectorRequest request = CreateConnectorRequest.newBuilder() + .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString()) + .setConnectorId(connectorId) + .setConnector(connector) + .build(); + + // This operation is being handled synchronously. + Connector response = managedKafkaConnectClient.createConnector(request); + System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_create_pubsub_source_connector] diff --git a/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java new file mode 100644 index 00000000000..ab50a3eab27 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java @@ -0,0 +1,84 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_delete_connect_cluster] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.TimedRetryAlgorithm; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.DeleteConnectClusterRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings; +import com.google.cloud.managedkafka.v1.OperationMetadata; +import com.google.protobuf.Empty; +import java.io.IOException; +import java.time.Duration; + +public class DeleteConnectCluster { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + deleteConnectCluster(projectId, region, clusterId); + } + + public static void deleteConnectCluster(String projectId, String region, String clusterId) + throws Exception { + + // Create the settings to configure the timeout for polling operations + ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder(); + TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setTotalTimeoutDuration(Duration.ofHours(1L)) + .build()); + settingsBuilder.deleteConnectClusterOperationSettings() + .setPollingAlgorithm(timedRetryAlgorithm); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create( + settingsBuilder.build())) { + DeleteConnectClusterRequest request = DeleteConnectClusterRequest.newBuilder() + .setName(ConnectClusterName.of(projectId, region, clusterId).toString()) + .build(); + OperationFuture future = managedKafkaConnectClient + .deleteConnectClusterOperationCallable().futureCall(request); + + // Get the initial LRO and print details. CreateConnectCluster contains sample + // code for polling logs. + OperationSnapshot operation = future.getInitialFuture().get(); + System.out.printf( + "Connect cluster deletion started. Operation name: %s\nDone: %s\nMetadata: %s\n", + operation.getName(), + operation.isDone(), + future.getMetadata().get().toString()); + + future.get(); + System.out.println("Deleted connect cluster"); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.deleteConnectCluster got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_delete_connect_cluster] diff --git a/managedkafka/examples/src/main/java/examples/GetConnectCluster.java b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java new file mode 100644 index 00000000000..679b0f28878 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_get_connect_cluster] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectCluster; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; + +public class GetConnectCluster { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + getConnectCluster(projectId, region, clusterId); + } + + public static void getConnectCluster(String projectId, String region, String clusterId) + throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + // This operation is being handled synchronously. + ConnectCluster connectCluster = managedKafkaConnectClient + .getConnectCluster(ConnectClusterName.of(projectId, region, clusterId)); + System.out.println(connectCluster.getAllFields()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_get_connect_cluster] diff --git a/managedkafka/examples/src/main/java/examples/ListConnectClusters.java b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java new file mode 100644 index 00000000000..5da00e852da --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java @@ -0,0 +1,51 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_list_connect_clusters] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectCluster; +import com.google.cloud.managedkafka.v1.LocationName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; + +public class ListConnectClusters { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + listConnectClusters(projectId, region); + } + + public static void listConnectClusters(String projectId, String region) throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = + ManagedKafkaConnectClient.create()) { + LocationName locationName = LocationName.of(projectId, region); + // This operation is being handled synchronously. + for (ConnectCluster connectCluster : managedKafkaConnectClient + .listConnectClusters(locationName).iterateAll()) { + System.out.println(connectCluster.getAllFields()); + } + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.listConnectClusters got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_list_connect_clusters] diff --git a/managedkafka/examples/src/main/java/examples/PauseConnector.java b/managedkafka/examples/src/main/java/examples/PauseConnector.java new file mode 100644 index 00000000000..9db8b5a62bb --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/PauseConnector.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_pause_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.PauseConnectorRequest; +import java.io.IOException; + +public class PauseConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + pauseConnector(projectId, region, connectClusterId, connectorId); + } + + public static void pauseConnector( + String projectId, String region, String connectClusterId, String connectorId) + throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = + ManagedKafkaConnectClient.create()) { + ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, + connectorId); + PauseConnectorRequest request = PauseConnectorRequest.newBuilder() + .setName(connectorName.toString()).build(); + + // This operation is being handled synchronously. + managedKafkaConnectClient.pauseConnector(request); + System.out.printf("Connector %s paused successfully.\n", connectorId); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_pause_connector] diff --git a/managedkafka/examples/src/main/java/examples/RestartConnector.java b/managedkafka/examples/src/main/java/examples/RestartConnector.java new file mode 100644 index 00000000000..4764b1f13d7 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/RestartConnector.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_restart_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.RestartConnectorRequest; +import java.io.IOException; + +public class RestartConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + restartConnector(projectId, region, connectClusterId, connectorId); + } + + public static void restartConnector( + String projectId, String region, String connectClusterId, String connectorId) + throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = + ManagedKafkaConnectClient.create()) { + ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, + connectorId); + RestartConnectorRequest request = RestartConnectorRequest.newBuilder() + .setName(connectorName.toString()).build(); + + // This operation is being handled synchronously. + managedKafkaConnectClient.restartConnector(request); + System.out.printf("Connector %s restarted successfully.\n", connectorId); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.restartConnector got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_restart_connector] diff --git a/managedkafka/examples/src/main/java/examples/ResumeConnector.java b/managedkafka/examples/src/main/java/examples/ResumeConnector.java new file mode 100644 index 00000000000..d16b458a12a --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/ResumeConnector.java @@ -0,0 +1,57 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_resume_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.ResumeConnectorRequest; +import java.io.IOException; + +public class ResumeConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + resumeConnector(projectId, region, connectClusterId, connectorId); + } + + public static void resumeConnector( + String projectId, String region, String connectClusterId, String connectorId) + throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = + ManagedKafkaConnectClient.create()) { + ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, + connectorId); + ResumeConnectorRequest request = ResumeConnectorRequest.newBuilder() + .setName(connectorName.toString()).build(); + + // This operation is being handled synchronously. + managedKafkaConnectClient.resumeConnector(request); + System.out.printf("Connector %s resumed successfully.\n", connectorId); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_resume_connector] diff --git a/managedkafka/examples/src/main/java/examples/StopConnector.java b/managedkafka/examples/src/main/java/examples/StopConnector.java new file mode 100644 index 00000000000..f39f06b7557 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/StopConnector.java @@ -0,0 +1,56 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_stop_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.StopConnectorRequest; +import java.io.IOException; + +public class StopConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String connectClusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + stopConnector(projectId, region, connectClusterId, connectorId); + } + + public static void stopConnector( + String projectId, String region, String connectClusterId, String connectorId) + throws Exception { + try (ManagedKafkaConnectClient managedKafkaConnectClient = + ManagedKafkaConnectClient.create()) { + ConnectorName connectorName = ConnectorName.of(projectId, region, connectClusterId, + connectorId); + StopConnectorRequest request = StopConnectorRequest.newBuilder() + .setName(connectorName.toString()).build(); + + // This operation is being handled synchronously. + managedKafkaConnectClient.stopConnector(request); + System.out.printf("Connector %s stopped successfully.\n", connectorId); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.stopConnector got err: %s", e.getMessage()); + } + } +} + +// [END managedkafka_stop_connector] diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java new file mode 100644 index 00000000000..e3c10df2a48 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java @@ -0,0 +1,92 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_update_connect_cluster] + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.retrying.TimedRetryAlgorithm; +import com.google.cloud.managedkafka.v1.CapacityConfig; +import com.google.cloud.managedkafka.v1.ConnectCluster; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings; +import com.google.cloud.managedkafka.v1.OperationMetadata; +import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest; +import com.google.protobuf.FieldMask; +import java.time.Duration; +import java.util.concurrent.ExecutionException; + +public class UpdateConnectCluster { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + long memoryBytes = 12884901888L; // 12 GiB + updateConnectCluster(projectId, region, clusterId, memoryBytes); + } + + public static void updateConnectCluster( + String projectId, String region, String clusterId, long memoryBytes) throws Exception { + CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build(); + ConnectCluster connectCluster = ConnectCluster.newBuilder() + .setName(ConnectClusterName.of(projectId, region, clusterId).toString()) + .setCapacityConfig(capacityConfig) + .build(); + FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build(); + + // Create the settings to configure the timeout for polling operations + ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder(); + TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create( + RetrySettings.newBuilder() + .setTotalTimeoutDuration(Duration.ofHours(1L)) + .build()); + settingsBuilder.updateConnectClusterOperationSettings() + .setPollingAlgorithm(timedRetryAlgorithm); + + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create( + settingsBuilder.build())) { + UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder() + .setUpdateMask(updateMask) + .setConnectCluster(connectCluster).build(); + OperationFuture future = managedKafkaConnectClient + .updateConnectClusterOperationCallable().futureCall(request); + + // Get the initial LRO and print details. CreateConnectCluster contains sample + // code for polling logs. + OperationSnapshot operation = future.getInitialFuture().get(); + System.out.printf( + "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n", + operation.getName(), + operation.isDone(), + future.getMetadata().get().toString()); + + ConnectCluster response = future.get(); + System.out.printf("Updated connect cluster: %s\n", response.getName()); + } catch (ExecutionException e) { + System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s", + e.getMessage()); + } + } +} + +// [END managedkafka_update_connect_cluster] diff --git a/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java new file mode 100644 index 00000000000..c99653d8a63 --- /dev/null +++ b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java @@ -0,0 +1,347 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +import static com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient.create; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.longrunning.OperationFuture; +import com.google.api.gax.longrunning.OperationSnapshot; +import com.google.api.gax.retrying.RetryingFuture; +import com.google.api.gax.rpc.OperationCallable; +import com.google.cloud.managedkafka.v1.ConnectCluster; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest; +import com.google.cloud.managedkafka.v1.DeleteConnectClusterRequest; +import com.google.cloud.managedkafka.v1.LocationName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings; +import com.google.cloud.managedkafka.v1.OperationMetadata; +import com.google.cloud.managedkafka.v1.PauseConnectorRequest; +import com.google.cloud.managedkafka.v1.RestartConnectorRequest; +import com.google.cloud.managedkafka.v1.ResumeConnectorRequest; +import com.google.cloud.managedkafka.v1.StopConnectorRequest; +import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest; +import com.google.protobuf.Empty; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class ConnectClustersTest { + protected static final String projectId = "test-project"; + protected static final String region = "us-central1"; + protected static final String clusterId = "test-connect-cluster"; + protected static final String kafkaCluster = "test-kafka-cluster"; + protected static final String connectClusterName = + "projects/test-project/locations/us-central1/connectClusters/test-connect-cluster"; + protected static final String connectorId = "test-connector"; + protected static final String connectorName = + "projects/test-project/locations/us-central1/connectClusters/test-connect-cluster" + + "/connectors/test-connector"; + private ByteArrayOutputStream bout; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @Test + public void createConnectClusterTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + OperationCallable + operationCallable = mock(OperationCallable.class); + OperationFuture operationFuture = + mock(OperationFuture.class); + + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + + // client creation + mockedStatic + .when(() -> create(any(ManagedKafkaConnectSettings.class))) + .thenReturn(managedKafkaConnectClient); + + // operation callable + when(managedKafkaConnectClient.createConnectClusterOperationCallable()) + .thenReturn(operationCallable); + when(operationCallable.futureCall(any(CreateConnectClusterRequest.class))) + .thenReturn(operationFuture); + + // initial future + ApiFuture initialFuture = mock(ApiFuture.class); + when(operationFuture.getInitialFuture()).thenReturn(initialFuture); + + // Metadata + ApiFuture metadataFuture = mock(ApiFuture.class); + OperationMetadata metadata = mock(OperationMetadata.class); + when(operationFuture.getMetadata()).thenReturn(metadataFuture); + when(metadataFuture.get()).thenReturn(metadata); + + // operation snapshot + OperationSnapshot operationSnapshot = mock(OperationSnapshot.class); + when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot); + when(operationSnapshot.getName()) + .thenReturn("projects/test-project/locations/test-location/operations/test-operation"); + when(operationSnapshot.isDone()).thenReturn(false, false, true); + + // polling future + RetryingFuture pollingFuture = mock(RetryingFuture.class); + when(operationFuture.getPollingFuture()).thenReturn(pollingFuture); + when(operationFuture.isDone()).thenReturn(false, false, true); + ApiFuture attemptResult = mock(ApiFuture.class); + when(pollingFuture.getAttemptResult()).thenReturn(attemptResult); + when(attemptResult.get()).thenReturn(operationSnapshot); + + // Setup final result + ConnectCluster resultCluster = mock(ConnectCluster.class); + when(operationFuture.get()).thenReturn(resultCluster); + when(resultCluster.getName()).thenReturn(connectClusterName); + + String subnet = "test-subnet"; + int vcpu = 12; + long memory = 12884901888L; // 12 GiB + CreateConnectCluster.createConnectCluster( + projectId, region, clusterId, subnet, kafkaCluster, vcpu, memory); + String output = bout.toString(); + assertThat(output).contains("Created connect cluster"); + assertThat(output).contains(connectClusterName); + verify(managedKafkaConnectClient, times(1)).createConnectClusterOperationCallable(); + verify(operationCallable, times(1)).futureCall(any(CreateConnectClusterRequest.class)); + verify(operationFuture, times(2)).getPollingFuture(); // Verify 2 polling attempts + verify(pollingFuture, times(2)).getAttemptResult(); // Verify 2 attempt results + verify(operationSnapshot, times(3)).isDone(); // 2 polls + 1 initial check + } + } + + @Test + public void getConnectClusterTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + ConnectCluster connectCluster = + ConnectCluster.newBuilder() + .setName(ConnectClusterName.of(projectId, region, clusterId).toString()) + .build(); + when(managedKafkaConnectClient.getConnectCluster(any(ConnectClusterName.class))) + .thenReturn(connectCluster); + GetConnectCluster.getConnectCluster(projectId, region, clusterId); + String output = bout.toString(); + assertThat(output).contains(connectClusterName); + verify(managedKafkaConnectClient, times(1)).getConnectCluster(any(ConnectClusterName.class)); + } + } + + @Test + public void listConnectClustersTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + ManagedKafkaConnectClient.ListConnectClustersPagedResponse response = + mock(ManagedKafkaConnectClient.ListConnectClustersPagedResponse.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Iterable iterable = + () -> { + List connectClusters = new ArrayList<>(); + connectClusters.add( + ConnectCluster.newBuilder() + .setName(ConnectClusterName.of(projectId, region, clusterId).toString()) + .build()); + return connectClusters.iterator(); + }; + when(response.iterateAll()).thenReturn(iterable); + when(managedKafkaConnectClient.listConnectClusters(any(LocationName.class))) + .thenReturn(response); + ListConnectClusters.listConnectClusters(projectId, region); + String output = bout.toString(); + assertThat(output).contains(connectClusterName); + verify(managedKafkaConnectClient, times(1)).listConnectClusters(any(LocationName.class)); + } + } + + @Test + public void updateConnectClusterTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + OperationCallable + operationCallable = mock(OperationCallable.class); + OperationFuture operationFuture = + mock(OperationFuture.class); + + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + + // client creation + mockedStatic + .when(() -> create(any(ManagedKafkaConnectSettings.class))) + .thenReturn(managedKafkaConnectClient); + + // operation callable + when(managedKafkaConnectClient.updateConnectClusterOperationCallable()) + .thenReturn(operationCallable); + when(operationCallable.futureCall(any(UpdateConnectClusterRequest.class))) + .thenReturn(operationFuture); + + // initial future + ApiFuture initialFuture = mock(ApiFuture.class); + when(operationFuture.getInitialFuture()).thenReturn(initialFuture); + + // Metadata + ApiFuture metadataFuture = mock(ApiFuture.class); + OperationMetadata metadata = mock(OperationMetadata.class); + when(operationFuture.getMetadata()).thenReturn(metadataFuture); + when(metadataFuture.get()).thenReturn(metadata); + + // operation snapshot + OperationSnapshot operationSnapshot = mock(OperationSnapshot.class); + when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot); + when(operationSnapshot.getName()) + .thenReturn("projects/test-project/locations/test-location/operations/test-operation"); + when(operationSnapshot.isDone()).thenReturn(true); + + // Setup final result + ConnectCluster resultCluster = mock(ConnectCluster.class); + when(operationFuture.get()).thenReturn(resultCluster); + when(resultCluster.getName()).thenReturn(connectClusterName); + + long memory = 25769803776L; // 24 GiB + UpdateConnectCluster.updateConnectCluster(projectId, region, clusterId, memory); + String output = bout.toString(); + assertThat(output).contains("Updated connect cluster"); + assertThat(output).contains(connectClusterName); + verify(managedKafkaConnectClient, times(1)).updateConnectClusterOperationCallable(); + verify(operationCallable, times(1)).futureCall(any(UpdateConnectClusterRequest.class)); + } + } + + @Test + public void deleteConnectClusterTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + OperationCallable operationCallable = + mock(OperationCallable.class); + OperationFuture operationFuture = mock(OperationFuture.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + + // client creation + mockedStatic + .when(() -> create(any(ManagedKafkaConnectSettings.class))) + .thenReturn(managedKafkaConnectClient); + + // operation callable + when(managedKafkaConnectClient.deleteConnectClusterOperationCallable()) + .thenReturn(operationCallable); + when(operationCallable.futureCall(any(DeleteConnectClusterRequest.class))) + .thenReturn(operationFuture); + + // initial future + ApiFuture initialFuture = mock(ApiFuture.class); + when(operationFuture.getInitialFuture()).thenReturn(initialFuture); + + // Metadata + ApiFuture metadataFuture = mock(ApiFuture.class); + OperationMetadata metadata = mock(OperationMetadata.class); + when(operationFuture.getMetadata()).thenReturn(metadataFuture); + when(metadataFuture.get()).thenReturn(metadata); + + // operation snapshot + OperationSnapshot operationSnapshot = mock(OperationSnapshot.class); + when(operationFuture.getInitialFuture().get()).thenReturn(operationSnapshot); + when(operationSnapshot.getName()) + .thenReturn("projects/test-project/locations/test-location/operations/test-operation"); + when(operationSnapshot.isDone()).thenReturn(true); + + // Setup final result + Empty resultEmpty = mock(Empty.class); + when(operationFuture.get()).thenReturn(resultEmpty); + + DeleteConnectCluster.deleteConnectCluster(projectId, region, clusterId); + String output = bout.toString(); + assertThat(output).contains("Deleted connect cluster"); + verify(managedKafkaConnectClient, times(1)).deleteConnectClusterOperationCallable(); + verify(operationCallable, times(1)).futureCall(any(DeleteConnectClusterRequest.class)); + } + } + + @Test + public void pauseConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + PauseConnector.pauseConnector(projectId, region, clusterId, connectorId); + String output = bout.toString(); + assertThat(output).contains("Connector " + connectorId + " paused successfully."); + verify(managedKafkaConnectClient, times(1)).pauseConnector(any(PauseConnectorRequest.class)); + } + } + + @Test + public void resumeConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + ResumeConnector.resumeConnector(projectId, region, clusterId, connectorId); + String output = bout.toString(); + assertThat(output).contains("Connector " + connectorId + " resumed successfully."); + verify(managedKafkaConnectClient, times(1)) + .resumeConnector(any(ResumeConnectorRequest.class)); + } + } + + @Test + public void restartConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + RestartConnector.restartConnector(projectId, region, clusterId, connectorId); + String output = bout.toString(); + assertThat(output).contains("Connector " + connectorId + " restarted successfully."); + verify(managedKafkaConnectClient, times(1)) + .restartConnector(any(RestartConnectorRequest.class)); + } + } + + @Test + public void stopConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + StopConnector.stopConnector(projectId, region, clusterId, connectorId); + String output = bout.toString(); + assertThat(output).contains("Connector " + connectorId + " stopped successfully."); + verify(managedKafkaConnectClient, times(1)).stopConnector(any(StopConnectorRequest.class)); + } + } +} diff --git a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java new file mode 100644 index 00000000000..9e958fb9e6c --- /dev/null +++ b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java @@ -0,0 +1,322 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +import static com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient.create; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.CreateConnectorRequest; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +@RunWith(JUnit4.class) +public class ConnectorsTest { + + protected static final String projectId = "test-project"; + protected static final String region = "us-central1"; + protected static final String connectClusterId = "test-connect-cluster"; + protected static final String mirrorMaker2ConnectorId = "test-mirrormaker2-connector"; + protected static final String pubsubSourceConnectorId = "test-pubsub-source-connector"; + protected static final String pubsubSinkConnectorId = "test-pubsub-sink-connector"; + protected static final String gcsConnectorId = "test-gcs-sink-connector"; + protected static final String bigqueryConnectorId = "test-bigquery-sink-connector"; + + protected static final String mirrorMaker2ConnectorName = + "projects/test-project/locations/us-central1/connectClusters/" + + "test-connect-cluster/connectors/test-mirrormaker2-connector"; + protected static final String pubsubSourceConnectorName = + "projects/test-project/locations/us-central1/connectClusters/" + + "test-connect-cluster/connectors/test-pubsub-source-connector"; + protected static final String pubsubSinkConnectorName = + "projects/test-project/locations/us-central1/connectClusters/" + + "test-connect-cluster/connectors/test-pubsub-sink-connector"; + protected static final String gcsConnectorName = + "projects/test-project/locations/us-central1/connectClusters/" + + "test-connect-cluster/connectors/test-gcs-sink-connector"; + protected static final String bigqueryConnectorName = + "projects/test-project/locations/us-central1/connectClusters/" + + "test-connect-cluster/connectors/test-bigquery-sink-connector"; + + private ByteArrayOutputStream bout; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + System.setOut(new PrintStream(bout)); + } + + @Test + public void createMirrorMaker2ConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Connector connector = + Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, mirrorMaker2ConnectorId) + .toString()) + .build(); + when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class))) + .thenReturn(connector); + + String sourceClusterBootstrapServers = "source-cluster:9092"; + String targetClusterBootstrapServers = "target-cluster:9092"; + String sourceClusterAlias = "source"; + String targetClusterAlias = "target"; + String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; + String topics = ".*"; + String offsetSyncsReplicationFactor = "1"; + String sourceSecurityProtocol = "SASL_SSL"; + String sourceSaslMechanism = "OAUTHBEARER"; + String sourceLoginCallbackHandler = + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; + String sourceJaasConfig = + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; + String targetSecurityProtocol = "SASL_SSL"; + String targetSaslMechanism = "OAUTHBEARER"; + String targetLoginCallbackHandler = + "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; + String targetJaasConfig = + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; + + CreateMirrorMaker2Connector.createMirrorMaker2Connector( + projectId, + region, + connectClusterId, + mirrorMaker2ConnectorId, + sourceClusterBootstrapServers, + targetClusterBootstrapServers, + sourceClusterAlias, + targetClusterAlias, + connectorClass, + topics, + offsetSyncsReplicationFactor, + sourceSecurityProtocol, + sourceSaslMechanism, + sourceLoginCallbackHandler, + sourceJaasConfig, + targetSecurityProtocol, + targetSaslMechanism, + targetLoginCallbackHandler, + targetJaasConfig); + + String output = bout.toString(); + assertThat(output).contains("Created MirrorMaker2 connector"); + assertThat(output).contains(mirrorMaker2ConnectorName); + verify(managedKafkaConnectClient, times(1)) + .createConnector(any(CreateConnectorRequest.class)); + } + } + + @Test + public void createPubSubSourceConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Connector connector = + Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, pubsubSourceConnectorId) + .toString()) + .build(); + when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class))) + .thenReturn(connector); + + String pubsubProjectId = "test-pubsub-project"; + String subscriptionName = "test-subscription"; + String kafkaTopicName = "test-kafka-topic"; + String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"; + String maxTasks = "1"; + String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + + CreatePubSubSourceConnector.createPubSubSourceConnector( + projectId, + region, + connectClusterId, + pubsubSourceConnectorId, + pubsubProjectId, + subscriptionName, + kafkaTopicName, + connectorClass, + maxTasks, + valueConverter, + keyConverter); + + String output = bout.toString(); + assertThat(output).contains("Created Pub/Sub Source connector"); + assertThat(output).contains(pubsubSourceConnectorName); + verify(managedKafkaConnectClient, times(1)) + .createConnector(any(CreateConnectorRequest.class)); + } + } + + @Test + public void createPubSubSinkConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Connector connector = + Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, pubsubSinkConnectorId) + .toString()) + .build(); + when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class))) + .thenReturn(connector); + + String pubsubProjectId = "test-pubsub-project"; + String pubsubTopicName = "test-pubsub-topic"; + String kafkaTopicName = "test-kafka-topic"; + String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"; + String maxTasks = "1"; + String valueConverter = "org.apache.kafka.connect.storage.StringConverter"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + + CreatePubSubSinkConnector.createPubSubSinkConnector( + projectId, + region, + connectClusterId, + pubsubSinkConnectorId, + pubsubProjectId, + pubsubTopicName, + kafkaTopicName, + connectorClass, + maxTasks, + valueConverter, + keyConverter); + + String output = bout.toString(); + assertThat(output).contains("Created Pub/Sub Sink connector"); + assertThat(output).contains(pubsubSinkConnectorName); + verify(managedKafkaConnectClient, times(1)) + .createConnector(any(CreateConnectorRequest.class)); + } + } + + @Test + public void createCloudStorageSinkConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Connector connector = + Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, gcsConnectorId).toString()) + .build(); + when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class))) + .thenReturn(connector); + + String bucketName = "test-gcs-bucket"; + String kafkaTopicName = "test-kafka-topic"; + String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector"; + String maxTasks = "1"; + String gcsCredentialsDefault = "true"; + String formatOutputType = "json"; + String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; + String valueSchemasEnable = "false"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + + CreateCloudStorageSinkConnector.createCloudStorageSinkConnector( + projectId, + region, + connectClusterId, + gcsConnectorId, + bucketName, + kafkaTopicName, + connectorClass, + maxTasks, + gcsCredentialsDefault, + formatOutputType, + valueConverter, + valueSchemasEnable, + keyConverter); + + String output = bout.toString(); + assertThat(output).contains("Created Cloud Storage Sink connector"); + assertThat(output).contains(gcsConnectorName); + verify(managedKafkaConnectClient, times(1)) + .createConnector(any(CreateConnectorRequest.class)); + } + } + + @Test + public void createBigQuerySinkConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + Connector connector = + Connector.newBuilder() + .setName( + ConnectorName.of(projectId, region, connectClusterId, bigqueryConnectorId) + .toString()) + .build(); + when(managedKafkaConnectClient.createConnector(any(CreateConnectorRequest.class))) + .thenReturn(connector); + + String bigqueryProjectId = "test-bigquery-project"; + String datasetName = "test_dataset"; + String tableName = "test_table"; + String kafkaTopicName = "test-kafka-topic"; + String maxTasks = "3"; + String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"; + String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; + String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; + String valueSchemasEnable = "false"; + + CreateBigQuerySinkConnector.createBigQuerySinkConnector( + projectId, + region, + connectClusterId, + bigqueryConnectorId, + bigqueryProjectId, + datasetName, + tableName, + kafkaTopicName, + maxTasks, + connectorClass, + keyConverter, + valueConverter, + valueSchemasEnable); + + String output = bout.toString(); + assertThat(output).contains("Created BigQuery Sink connector"); + assertThat(output).contains(bigqueryConnectorName); + verify(managedKafkaConnectClient, times(1)) + .createConnector(any(CreateConnectorRequest.class)); + } + } +} From 3e40aa36987fb6253e42b8a61eb92b5be1167a2b Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 4 Aug 2025 17:40:43 +0000 Subject: [PATCH 2/6] Fixed CreateBigQuerySinkConnector example arguments. --- .../src/main/java/examples/CreateBigQuerySinkConnector.java | 3 --- .../examples/src/test/java/examples/ConnectorsTest.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java index 56904140e84..0c9f2ba7811 100644 --- a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java @@ -38,7 +38,6 @@ public static void main(String[] args) throws Exception { String connectorId = "my-bigquery-sink-connector"; String bigqueryProjectId = "my-bigquery-project-id"; String datasetName = "my_dataset"; - String tableName = "kafka_sink_table"; String kafkaTopicName = "kafka-topic"; String maxTasks = "3"; String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"; @@ -52,7 +51,6 @@ public static void main(String[] args) throws Exception { connectorId, bigqueryProjectId, datasetName, - tableName, kafkaTopicName, maxTasks, connectorClass, @@ -68,7 +66,6 @@ public static void createBigQuerySinkConnector( String connectorId, String bigqueryProjectId, String datasetName, - String tableName, String kafkaTopicName, String maxTasks, String connectorClass, diff --git a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java index 9e958fb9e6c..2afe3a2afe2 100644 --- a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java +++ b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java @@ -289,7 +289,6 @@ public void createBigQuerySinkConnectorTest() throws Exception { String bigqueryProjectId = "test-bigquery-project"; String datasetName = "test_dataset"; - String tableName = "test_table"; String kafkaTopicName = "test-kafka-topic"; String maxTasks = "3"; String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"; @@ -304,7 +303,6 @@ public void createBigQuerySinkConnectorTest() throws Exception { bigqueryConnectorId, bigqueryProjectId, datasetName, - tableName, kafkaTopicName, maxTasks, connectorClass, From c3aa4cbe802fe05078c505f61be08e2a0ffbe090 Mon Sep 17 00:00:00 2001 From: salmany Date: Mon, 4 Aug 2025 17:44:48 +0000 Subject: [PATCH 3/6] Fix syntax and minor errors. --- .../src/main/java/examples/CreateBigQuerySinkConnector.java | 2 +- .../main/java/examples/CreateCloudStorageSinkConnector.java | 2 +- .../examples/src/main/java/examples/CreateConnectCluster.java | 3 +-- .../src/main/java/examples/CreateMirrorMaker2Connector.java | 2 +- .../src/main/java/examples/CreatePubSubSinkConnector.java | 2 +- .../src/main/java/examples/CreatePubSubSourceConnector.java | 2 +- .../examples/src/main/java/examples/DeleteConnectCluster.java | 2 +- .../examples/src/main/java/examples/GetConnectCluster.java | 2 +- .../examples/src/main/java/examples/ListConnectClusters.java | 2 +- .../examples/src/main/java/examples/PauseConnector.java | 2 +- .../examples/src/main/java/examples/RestartConnector.java | 2 +- .../examples/src/main/java/examples/ResumeConnector.java | 2 +- .../examples/src/main/java/examples/StopConnector.java | 2 +- .../examples/src/main/java/examples/UpdateConnectCluster.java | 2 +- 14 files changed, 14 insertions(+), 15 deletions(-) diff --git a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java index 0c9f2ba7811..c21fe9b9625 100644 --- a/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreateBigQuerySinkConnector.java @@ -104,7 +104,7 @@ public static void createBigQuerySinkConnector( Connector response = managedKafkaConnectClient.createConnector(request); System.out.printf("Created BigQuery Sink connector: %s\n", response.getName()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java index 152c29e29bd..45cc7f82bb1 100644 --- a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java @@ -107,7 +107,7 @@ public static void createCloudStorageSinkConnector( Connector response = managedKafkaConnectClient.createConnector(request); System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java index 551126b6998..5bee6b2bdaf 100644 --- a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java @@ -36,7 +36,6 @@ import com.google.cloud.managedkafka.v1.OperationMetadata; import java.time.Duration; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; public class CreateConnectCluster { @@ -117,7 +116,7 @@ public static void createConnectCluster( ConnectCluster response = future.get(); System.out.printf("Created connect cluster: %s\n", response.getName()); } catch (ExecutionException e) { - System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s", + System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", e.getMessage()); throw e; } diff --git a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java index f5c7b3ef004..8016fcabff0 100644 --- a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java +++ b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java @@ -135,7 +135,7 @@ public static void createMirrorMaker2Connector( Connector response = managedKafkaConnectClient.createConnector(request); System.out.printf("Created MirrorMaker2 connector: %s\n", response.getName()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java index 6aee84ac735..9dfe0939e41 100644 --- a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java @@ -99,7 +99,7 @@ public static void createPubSubSinkConnector( Connector response = managedKafkaConnectClient.createConnector(request); System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java index 06854af450b..9877e8b0a73 100644 --- a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java @@ -99,7 +99,7 @@ public static void createPubSubSourceConnector( Connector response = managedKafkaConnectClient.createConnector(request); System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.createConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java index ab50a3eab27..18196c36b2b 100644 --- a/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/DeleteConnectCluster.java @@ -75,7 +75,7 @@ public static void deleteConnectCluster(String projectId, String region, String future.get(); System.out.println("Deleted connect cluster"); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.deleteConnectCluster got err: %s", + System.err.printf("managedKafkaConnectClient.deleteConnectCluster got err: %s\n", e.getMessage()); } } diff --git a/managedkafka/examples/src/main/java/examples/GetConnectCluster.java b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java index 679b0f28878..47a080be1a1 100644 --- a/managedkafka/examples/src/main/java/examples/GetConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/GetConnectCluster.java @@ -41,7 +41,7 @@ public static void getConnectCluster(String projectId, String region, String clu .getConnectCluster(ConnectClusterName.of(projectId, region, clusterId)); System.out.println(connectCluster.getAllFields()); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.getConnectCluster got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/ListConnectClusters.java b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java index 5da00e852da..2dcdbd55b03 100644 --- a/managedkafka/examples/src/main/java/examples/ListConnectClusters.java +++ b/managedkafka/examples/src/main/java/examples/ListConnectClusters.java @@ -42,7 +42,7 @@ public static void listConnectClusters(String projectId, String region) throws E System.out.println(connectCluster.getAllFields()); } } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.listConnectClusters got err: %s", + System.err.printf("managedKafkaConnectClient.listConnectClusters got err: %s\n", e.getMessage()); } } diff --git a/managedkafka/examples/src/main/java/examples/PauseConnector.java b/managedkafka/examples/src/main/java/examples/PauseConnector.java index 9db8b5a62bb..36c26ee1ae1 100644 --- a/managedkafka/examples/src/main/java/examples/PauseConnector.java +++ b/managedkafka/examples/src/main/java/examples/PauseConnector.java @@ -48,7 +48,7 @@ public static void pauseConnector( managedKafkaConnectClient.pauseConnector(request); System.out.printf("Connector %s paused successfully.\n", connectorId); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s", + System.err.printf("managedKafkaConnectClient.pauseConnector got err: %s\n", e.getMessage()); } } diff --git a/managedkafka/examples/src/main/java/examples/RestartConnector.java b/managedkafka/examples/src/main/java/examples/RestartConnector.java index 4764b1f13d7..78ef135313c 100644 --- a/managedkafka/examples/src/main/java/examples/RestartConnector.java +++ b/managedkafka/examples/src/main/java/examples/RestartConnector.java @@ -48,7 +48,7 @@ public static void restartConnector( managedKafkaConnectClient.restartConnector(request); System.out.printf("Connector %s restarted successfully.\n", connectorId); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.restartConnector got err: %s", + System.err.printf("managedKafkaConnectClient.restartConnector got err: %s\n", e.getMessage()); } } diff --git a/managedkafka/examples/src/main/java/examples/ResumeConnector.java b/managedkafka/examples/src/main/java/examples/ResumeConnector.java index d16b458a12a..b3aa808d0f3 100644 --- a/managedkafka/examples/src/main/java/examples/ResumeConnector.java +++ b/managedkafka/examples/src/main/java/examples/ResumeConnector.java @@ -48,7 +48,7 @@ public static void resumeConnector( managedKafkaConnectClient.resumeConnector(request); System.out.printf("Connector %s resumed successfully.\n", connectorId); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s", + System.err.printf("managedKafkaConnectClient.resumeConnector got err: %s\n", e.getMessage()); } } diff --git a/managedkafka/examples/src/main/java/examples/StopConnector.java b/managedkafka/examples/src/main/java/examples/StopConnector.java index f39f06b7557..e5bcd7ccd76 100644 --- a/managedkafka/examples/src/main/java/examples/StopConnector.java +++ b/managedkafka/examples/src/main/java/examples/StopConnector.java @@ -48,7 +48,7 @@ public static void stopConnector( managedKafkaConnectClient.stopConnector(request); System.out.printf("Connector %s stopped successfully.\n", connectorId); } catch (IOException | ApiException e) { - System.err.printf("managedKafkaConnectClient.stopConnector got err: %s", e.getMessage()); + System.err.printf("managedKafkaConnectClient.stopConnector got err: %s\n", e.getMessage()); } } } diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java index e3c10df2a48..8df933077c1 100644 --- a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java @@ -83,7 +83,7 @@ public static void updateConnectCluster( ConnectCluster response = future.get(); System.out.printf("Updated connect cluster: %s\n", response.getName()); } catch (ExecutionException e) { - System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s", + System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", e.getMessage()); } } From 91a53e13c4bbca40e8eb10f889cf58a63222ac01 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 18:36:55 +0000 Subject: [PATCH 4/6] Added Connector operations and fixed configs. --- .../CreateCloudStorageSinkConnector.java | 2 +- ...=> CreateMirrorMaker2SourceConnector.java} | 56 +++--------- .../examples/CreatePubSubSinkConnector.java | 2 +- .../examples/CreatePubSubSourceConnector.java | 2 +- .../main/java/examples/DeleteConnector.java | 48 +++++++++++ .../src/main/java/examples/GetConnector.java | 49 +++++++++++ .../main/java/examples/ListConnectors.java | 49 +++++++++++ .../java/examples/UpdateConnectCluster.java | 2 +- .../main/java/examples/UpdateConnector.java | 68 +++++++++++++++ .../java/examples/ConnectClustersTest.java | 86 +++++++++++++++++++ .../test/java/examples/ConnectorsTest.java | 46 +++------- 11 files changed, 327 insertions(+), 83 deletions(-) rename managedkafka/examples/src/main/java/examples/{CreateMirrorMaker2Connector.java => CreateMirrorMaker2SourceConnector.java} (61%) create mode 100644 managedkafka/examples/src/main/java/examples/DeleteConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/GetConnector.java create mode 100644 managedkafka/examples/src/main/java/examples/ListConnectors.java create mode 100644 managedkafka/examples/src/main/java/examples/UpdateConnector.java diff --git a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java index 45cc7f82bb1..be14c0e4a47 100644 --- a/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreateCloudStorageSinkConnector.java @@ -39,7 +39,7 @@ public static void main(String[] args) throws Exception { String bucketName = "my-gcs-bucket"; String kafkaTopicName = "kafka-topic"; String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String gcsCredentialsDefault = "true"; String formatOutputType = "json"; String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; diff --git a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java similarity index 61% rename from managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java rename to managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java index 8016fcabff0..dc2a511c825 100644 --- a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2Connector.java +++ b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java @@ -28,12 +28,13 @@ import java.util.HashMap; import java.util.Map; -public class CreateMirrorMaker2Connector { +public class CreateMirrorMaker2SourceConnector { public static void main(String[] args) throws Exception { // TODO(developer): Replace these variables before running the example. String projectId = "my-project-id"; String region = "my-region"; // e.g. us-east1 + String maxTasks = "3"; String connectClusterId = "my-connect-cluster"; String connectorId = "my-mirrormaker2-connector"; String sourceClusterBootstrapServers = "my-source-cluster:9092"; @@ -42,22 +43,10 @@ public static void main(String[] args) throws Exception { String targetClusterAlias = "target"; String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; String topics = ".*"; - String offsetSyncsReplicationFactor = "1"; - String sourceSecurityProtocol = "SASL_SSL"; - String sourceSaslMechanism = "OAUTHBEARER"; - String sourceLoginCallbackHandler = - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; - String sourceJaasConfig = - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; - String targetSecurityProtocol = "SASL_SSL"; - String targetSaslMechanism = "OAUTHBEARER"; - String targetLoginCallbackHandler = - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; - String targetJaasConfig = - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; - createMirrorMaker2Connector( + createMirrorMaker2SourceConnector( projectId, region, + maxTasks, connectClusterId, connectorId, sourceClusterBootstrapServers, @@ -65,21 +54,13 @@ public static void main(String[] args) throws Exception { sourceClusterAlias, targetClusterAlias, connectorClass, - topics, - offsetSyncsReplicationFactor, - sourceSecurityProtocol, - sourceSaslMechanism, - sourceLoginCallbackHandler, - sourceJaasConfig, - targetSecurityProtocol, - targetSaslMechanism, - targetLoginCallbackHandler, - targetJaasConfig); + topics); } - public static void createMirrorMaker2Connector( + public static void createMirrorMaker2SourceConnector( String projectId, String region, + String maxTasks, String connectClusterId, String connectorId, String sourceClusterBootstrapServers, @@ -87,20 +68,12 @@ public static void createMirrorMaker2Connector( String sourceClusterAlias, String targetClusterAlias, String connectorClass, - String topics, - String offsetSyncsReplicationFactor, - String sourceSecurityProtocol, - String sourceSaslMechanism, - String sourceLoginCallbackHandler, - String sourceJaasConfig, - String targetSecurityProtocol, - String targetSaslMechanism, - String targetLoginCallbackHandler, - String targetJaasConfig) + String topics) throws Exception { // Build the connector configuration Map configMap = new HashMap<>(); + configMap.put("tasks.max", maxTasks); configMap.put("connector.class", connectorClass); configMap.put("name", connectorId); configMap.put("source.cluster.alias", sourceClusterAlias); @@ -108,15 +81,6 @@ public static void createMirrorMaker2Connector( configMap.put("topics", topics); configMap.put("source.cluster.bootstrap.servers", sourceClusterBootstrapServers); configMap.put("target.cluster.bootstrap.servers", targetClusterBootstrapServers); - configMap.put("offset-syncs.topic.replication.factor", offsetSyncsReplicationFactor); - configMap.put("source.cluster.security.protocol", sourceSecurityProtocol); - configMap.put("source.cluster.sasl.mechanism", sourceSaslMechanism); - configMap.put("source.cluster.sasl.login.callback.handler.class", sourceLoginCallbackHandler); - configMap.put("source.cluster.sasl.jaas.config", sourceJaasConfig); - configMap.put("target.cluster.security.protocol", targetSecurityProtocol); - configMap.put("target.cluster.sasl.mechanism", targetSaslMechanism); - configMap.put("target.cluster.sasl.login.callback.handler.class", targetLoginCallbackHandler); - configMap.put("target.cluster.sasl.jaas.config", targetJaasConfig); Connector connector = Connector.newBuilder() .setName( @@ -133,7 +97,7 @@ public static void createMirrorMaker2Connector( // This operation is being handled synchronously. Connector response = managedKafkaConnectClient.createConnector(request); - System.out.printf("Created MirrorMaker2 connector: %s\n", response.getName()); + System.out.printf("Created MirrorMaker2 Source connector: %s\n", response.getName()); } catch (IOException | ApiException e) { System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage()); } diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java index 9dfe0939e41..2492a5c8833 100644 --- a/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSinkConnector.java @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { String pubsubTopicName = "my-pubsub-topic"; String kafkaTopicName = "kafka-topic"; String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String valueConverter = "org.apache.kafka.connect.storage.StringConverter"; String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; createPubSubSinkConnector( diff --git a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java index 9877e8b0a73..c43537b152b 100644 --- a/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreatePubSubSourceConnector.java @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { String subscriptionName = "my-subscription"; String kafkaTopicName = "pubsub-topic"; String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter"; String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; createPubSubSourceConnector( diff --git a/managedkafka/examples/src/main/java/examples/DeleteConnector.java b/managedkafka/examples/src/main/java/examples/DeleteConnector.java new file mode 100644 index 00000000000..f4ea98c0b32 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/DeleteConnector.java @@ -0,0 +1,48 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the a specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_delete_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; + +public class DeleteConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + deleteConnector(projectId, region, clusterId, connectorId); + } + + public static void deleteConnector( + String projectId, String region, String clusterId, String connectorId) throws IOException { + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId); + // This operation is handled synchronously. + managedKafkaConnectClient.deleteConnector(name); + System.out.printf("Deleted connector: %s\n", name); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.deleteConnector got err: %s\n", e.getMessage()); + } + } +} +// [END managedkafka_delete_connector] diff --git a/managedkafka/examples/src/main/java/examples/GetConnector.java b/managedkafka/examples/src/main/java/examples/GetConnector.java new file mode 100644 index 00000000000..b5be2672e19 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/GetConnector.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_get_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; + +public class GetConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + getConnector(projectId, region, clusterId, connectorId); + } + + public static void getConnector( + String projectId, String region, String clusterId, String connectorId) throws IOException { + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + ConnectorName name = ConnectorName.of(projectId, region, clusterId, connectorId); + // This operation is handled synchronously. + Connector connector = managedKafkaConnectClient.getConnector(name); + System.out.println(connector.getAllFields()); + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.getConnector got err: %s\n", e.getMessage()); + } + } +} +// [END managedkafka_get_connector] diff --git a/managedkafka/examples/src/main/java/examples/ListConnectors.java b/managedkafka/examples/src/main/java/examples/ListConnectors.java new file mode 100644 index 00000000000..41d8ea9610b --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/ListConnectors.java @@ -0,0 +1,49 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_list_connectors] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import java.io.IOException; + +public class ListConnectors { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + listConnectors(projectId, region, clusterId); + } + + public static void listConnectors(String projectId, String region, String clusterId) + throws IOException { + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + ConnectClusterName parent = ConnectClusterName.of(projectId, region, clusterId); + // This operation is handled synchronously. + for (Connector connector : managedKafkaConnectClient.listConnectors(parent).iterateAll()) { + System.out.println(connector.getAllFields()); + } + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.listConnectors got err: %s\n", e.getMessage()); + } + } +} +// [END managedkafka_list_connectors] diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java index 8df933077c1..7d22efedcab 100644 --- a/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/UpdateConnectCluster.java @@ -41,7 +41,7 @@ public static void main(String[] args) throws Exception { String projectId = "my-project-id"; String region = "my-region"; // e.g. us-east1 String clusterId = "my-connect-cluster"; - long memoryBytes = 12884901888L; // 12 GiB + long memoryBytes = 25769803776L; // 24 GiB updateConnectCluster(projectId, region, clusterId, memoryBytes); } diff --git a/managedkafka/examples/src/main/java/examples/UpdateConnector.java b/managedkafka/examples/src/main/java/examples/UpdateConnector.java new file mode 100644 index 00000000000..37186b31de9 --- /dev/null +++ b/managedkafka/examples/src/main/java/examples/UpdateConnector.java @@ -0,0 +1,68 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package examples; + +// [START managedkafka_update_connector] +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; +import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient; +import com.google.protobuf.FieldMask; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class UpdateConnector { + + public static void main(String[] args) throws Exception { + // TODO(developer): Replace these variables before running the example. + String projectId = "my-project-id"; + String region = "my-region"; // e.g. us-east1 + String clusterId = "my-connect-cluster"; + String connectorId = "my-connector"; + // The new value for the 'tasks.max' configuration. + String maxTasks = "5"; + updateConnector(projectId, region, clusterId, connectorId, maxTasks); + } + + public static void updateConnector( + String projectId, String region, String clusterId, String connectorId, String maxTasks) + throws IOException { + try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) { + Map configMap = new HashMap<>(); + configMap.put("tasks.max", maxTasks); + + Connector connector = + Connector.newBuilder() + .setName(ConnectorName.of(projectId, region, clusterId, connectorId).toString()) + .putAllConfigs(configMap) + .build(); + + // The field mask specifies which fields to update. Here, we update the 'config' field. + FieldMask updateMask = FieldMask.newBuilder().addPaths("config").build(); + + // This operation is handled synchronously. + Connector updatedConnector = managedKafkaConnectClient.updateConnector(connector, updateMask); + System.out.printf("Updated connector: %s\n", updatedConnector.getName()); + System.out.println(updatedConnector.getAllFields()); + + } catch (IOException | ApiException e) { + System.err.printf("managedKafkaConnectClient.updateConnector got err: %s\n", e.getMessage()); + } + } +} +// [END managedkafka_update_connector] diff --git a/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java index c99653d8a63..9d42dcafbcc 100644 --- a/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java +++ b/managedkafka/examples/src/test/java/examples/ConnectClustersTest.java @@ -31,6 +31,8 @@ import com.google.api.gax.rpc.OperationCallable; import com.google.cloud.managedkafka.v1.ConnectCluster; import com.google.cloud.managedkafka.v1.ConnectClusterName; +import com.google.cloud.managedkafka.v1.Connector; +import com.google.cloud.managedkafka.v1.ConnectorName; import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest; import com.google.cloud.managedkafka.v1.DeleteConnectClusterRequest; import com.google.cloud.managedkafka.v1.LocationName; @@ -43,6 +45,7 @@ import com.google.cloud.managedkafka.v1.StopConnectorRequest; import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest; import com.google.protobuf.Empty; +import com.google.protobuf.FieldMask; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.ArrayList; @@ -304,6 +307,89 @@ public void pauseConnectorTest() throws Exception { } } + @Test + public void listConnectorsTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + ManagedKafkaConnectClient.ListConnectorsPagedResponse response = + mock(ManagedKafkaConnectClient.ListConnectorsPagedResponse.class); + + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + + List connectors = new ArrayList<>(); + connectors.add(Connector.newBuilder().setName(connectorName).build()); + Iterable iterable = () -> connectors.iterator(); + + when(response.iterateAll()).thenReturn(iterable); + when(managedKafkaConnectClient.listConnectors(any(ConnectClusterName.class))) + .thenReturn(response); + + ListConnectors.listConnectors(projectId, region, clusterId); + + String output = bout.toString(); + assertThat(output).contains(connectorName); + verify(managedKafkaConnectClient, times(1)).listConnectors(any(ConnectClusterName.class)); + } + } + + @Test + public void getConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + + Connector connector = Connector.newBuilder().setName(connectorName).build(); + when(managedKafkaConnectClient.getConnector(any(ConnectorName.class))).thenReturn(connector); + + GetConnector.getConnector(projectId, region, clusterId, connectorId); + String output = bout.toString(); + + assertThat(output).contains(connectorName); + verify(managedKafkaConnectClient, times(1)).getConnector(any(ConnectorName.class)); + } + } + + @Test + public void deleteConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + + DeleteConnector.deleteConnector(projectId, region, clusterId, connectorId); + + String output = bout.toString(); + assertThat(output).contains("Deleted connector: " + connectorName); + verify(managedKafkaConnectClient, times(1)).deleteConnector(any(ConnectorName.class)); + } + } + + @Test + public void updateConnectorTest() throws Exception { + ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); + try (MockedStatic mockedStatic = + Mockito.mockStatic(ManagedKafkaConnectClient.class)) { + mockedStatic.when(() -> create()).thenReturn(managedKafkaConnectClient); + + Connector updatedConnector = + Connector.newBuilder().setName(connectorName).putConfigs("tasks.max", "5").build(); + + when(managedKafkaConnectClient.updateConnector(any(Connector.class), any(FieldMask.class))) + .thenReturn(updatedConnector); + + UpdateConnector.updateConnector(projectId, region, clusterId, connectorId, "5"); + + String output = bout.toString(); + assertThat(output).contains("Updated connector: " + connectorName); + assertThat(output).contains("tasks.max"); + assertThat(output).contains("5"); + verify(managedKafkaConnectClient, times(1)) + .updateConnector(any(Connector.class), any(FieldMask.class)); + } + } + @Test public void resumeConnectorTest() throws Exception { ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); diff --git a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java index 2afe3a2afe2..e426b5f76bf 100644 --- a/managedkafka/examples/src/test/java/examples/ConnectorsTest.java +++ b/managedkafka/examples/src/test/java/examples/ConnectorsTest.java @@ -43,15 +43,15 @@ public class ConnectorsTest { protected static final String projectId = "test-project"; protected static final String region = "us-central1"; protected static final String connectClusterId = "test-connect-cluster"; - protected static final String mirrorMaker2ConnectorId = "test-mirrormaker2-connector"; + protected static final String mirrorMaker2ConnectorId = "test-mirrormaker2-source-connector"; protected static final String pubsubSourceConnectorId = "test-pubsub-source-connector"; protected static final String pubsubSinkConnectorId = "test-pubsub-sink-connector"; protected static final String gcsConnectorId = "test-gcs-sink-connector"; protected static final String bigqueryConnectorId = "test-bigquery-sink-connector"; - protected static final String mirrorMaker2ConnectorName = + protected static final String mirrorMaker2SourceConnectorName = "projects/test-project/locations/us-central1/connectClusters/" - + "test-connect-cluster/connectors/test-mirrormaker2-connector"; + + "test-connect-cluster/connectors/test-mirrormaker2-source-connector"; protected static final String pubsubSourceConnectorName = "projects/test-project/locations/us-central1/connectClusters/" + "test-connect-cluster/connectors/test-pubsub-source-connector"; @@ -74,7 +74,7 @@ public void setUp() { } @Test - public void createMirrorMaker2ConnectorTest() throws Exception { + public void createMirrorMaker2SourceConnectorTest() throws Exception { ManagedKafkaConnectClient managedKafkaConnectClient = mock(ManagedKafkaConnectClient.class); try (MockedStatic mockedStatic = Mockito.mockStatic(ManagedKafkaConnectClient.class)) { @@ -90,27 +90,16 @@ public void createMirrorMaker2ConnectorTest() throws Exception { String sourceClusterBootstrapServers = "source-cluster:9092"; String targetClusterBootstrapServers = "target-cluster:9092"; + String maxTasks = "3"; String sourceClusterAlias = "source"; String targetClusterAlias = "target"; String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; String topics = ".*"; - String offsetSyncsReplicationFactor = "1"; - String sourceSecurityProtocol = "SASL_SSL"; - String sourceSaslMechanism = "OAUTHBEARER"; - String sourceLoginCallbackHandler = - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; - String sourceJaasConfig = - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; - String targetSecurityProtocol = "SASL_SSL"; - String targetSaslMechanism = "OAUTHBEARER"; - String targetLoginCallbackHandler = - "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"; - String targetJaasConfig = - "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"; - CreateMirrorMaker2Connector.createMirrorMaker2Connector( + CreateMirrorMaker2SourceConnector.createMirrorMaker2SourceConnector( projectId, region, + maxTasks, connectClusterId, mirrorMaker2ConnectorId, sourceClusterBootstrapServers, @@ -118,20 +107,11 @@ public void createMirrorMaker2ConnectorTest() throws Exception { sourceClusterAlias, targetClusterAlias, connectorClass, - topics, - offsetSyncsReplicationFactor, - sourceSecurityProtocol, - sourceSaslMechanism, - sourceLoginCallbackHandler, - sourceJaasConfig, - targetSecurityProtocol, - targetSaslMechanism, - targetLoginCallbackHandler, - targetJaasConfig); + topics); String output = bout.toString(); - assertThat(output).contains("Created MirrorMaker2 connector"); - assertThat(output).contains(mirrorMaker2ConnectorName); + assertThat(output).contains("Created MirrorMaker2 Source connector"); + assertThat(output).contains(mirrorMaker2SourceConnectorName); verify(managedKafkaConnectClient, times(1)) .createConnector(any(CreateConnectorRequest.class)); } @@ -156,7 +136,7 @@ public void createPubSubSourceConnectorTest() throws Exception { String subscriptionName = "test-subscription"; String kafkaTopicName = "test-kafka-topic"; String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter"; String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; @@ -200,7 +180,7 @@ public void createPubSubSinkConnectorTest() throws Exception { String pubsubTopicName = "test-pubsub-topic"; String kafkaTopicName = "test-kafka-topic"; String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String valueConverter = "org.apache.kafka.connect.storage.StringConverter"; String keyConverter = "org.apache.kafka.connect.storage.StringConverter"; @@ -242,7 +222,7 @@ public void createCloudStorageSinkConnectorTest() throws Exception { String bucketName = "test-gcs-bucket"; String kafkaTopicName = "test-kafka-topic"; String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector"; - String maxTasks = "1"; + String maxTasks = "3"; String gcsCredentialsDefault = "true"; String formatOutputType = "json"; String valueConverter = "org.apache.kafka.connect.json.JsonConverter"; From 133193a7e898c9f2abf953b5ddb268aa5c3f6cf1 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 18:45:31 +0000 Subject: [PATCH 5/6] Add comment about additional subnets and DNS domains.. --- .../examples/src/main/java/examples/CreateConnectCluster.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java index 5bee6b2bdaf..1f48eecb44e 100644 --- a/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java +++ b/managedkafka/examples/src/main/java/examples/CreateConnectCluster.java @@ -65,6 +65,10 @@ public static void createConnectCluster( ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder() .setPrimarySubnet(subnet) .build(); + // Optionally, you can also specify additional accessible subnets and resolvable + // DNS domains as part of your network configuration. For example: + // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2")) + // .addAllDnsDomainNames(List.of("dns-1", "dns-2")) ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder() .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build()) .build(); From e47e72efd0cf36d23e0e4e2282fc4e37c13798b6 Mon Sep 17 00:00:00 2001 From: salmany Date: Thu, 7 Aug 2025 20:14:05 +0000 Subject: [PATCH 6/6] Add comment for MM target cluster. --- .../main/java/examples/CreateMirrorMaker2SourceConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java index dc2a511c825..5e6ade27823 100644 --- a/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java +++ b/managedkafka/examples/src/main/java/examples/CreateMirrorMaker2SourceConnector.java @@ -40,7 +40,7 @@ public static void main(String[] args) throws Exception { String sourceClusterBootstrapServers = "my-source-cluster:9092"; String targetClusterBootstrapServers = "my-target-cluster:9092"; String sourceClusterAlias = "source"; - String targetClusterAlias = "target"; + String targetClusterAlias = "target"; // This is usually the primary cluster. String connectorClass = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; String topics = ".*"; createMirrorMaker2SourceConnector(