Skip to content

Commit 2a77e77

Browse files
salmanyglasnt
andauthored
feat(managedkafka): Connect code samples (#10143)
* feat(managedkafka): Connect code samples * Fixed CreateBigQuerySinkConnector example arguments. * Fix syntax and minor errors. * Added Connector operations and fixed configs. * Add comment about additional subnets and DNS domains.. * Add comment for MM target cluster. * linting corrections * Misc. fixes aligned with TF / Python code samples. * Typo fix. * Fix parameter topicsExclude for MM2 sample. --------- Co-authored-by: Katie McLaughlin <[email protected]>
1 parent e70e89e commit 2a77e77

22 files changed

+2139
-3
lines changed

managedkafka/examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.example.managedkafka</groupId>
55
<artifactId>managedkafka-snippets</artifactId>
6-
<packaging>pom</packaging>
6+
<packaging>jar</packaging>
77
<name>Google Cloud Managed Kafka Snippets</name>
88
<url>https://github.com/GoogleCloudPlatform/java-docs-samples/tree/main/managedkafka</url>
99

@@ -29,7 +29,7 @@
2929
<dependency>
3030
<groupId>com.google.cloud</groupId>
3131
<artifactId>libraries-bom</artifactId>
32-
<version>26.50.0</version>
32+
<version>26.64.0</version>
3333
<type>pom</type>
3434
<scope>import</scope>
3535
</dependency>
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package examples;
18+
19+
// [START managedkafka_create_bigquery_sink_connector]
20+
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.cloud.managedkafka.v1.ConnectClusterName;
23+
import com.google.cloud.managedkafka.v1.Connector;
24+
import com.google.cloud.managedkafka.v1.ConnectorName;
25+
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
26+
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
27+
import java.io.IOException;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
public class CreateBigQuerySinkConnector {
32+
33+
public static void main(String[] args) throws Exception {
34+
// TODO(developer): Replace these variables before running the example.
35+
String projectId = "my-project-id";
36+
String region = "my-region"; // e.g. us-east1
37+
String connectClusterId = "my-connect-cluster";
38+
String connectorId = "my-bigquery-sink-connector";
39+
String bigqueryProjectId = "my-bigquery-project-id";
40+
String datasetName = "my-dataset";
41+
String kafkaTopicName = "kafka-topic";
42+
String maxTasks = "3";
43+
String connectorClass = "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector";
44+
String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
45+
String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
46+
String valueSchemasEnable = "false";
47+
createBigQuerySinkConnector(
48+
projectId,
49+
region,
50+
connectClusterId,
51+
connectorId,
52+
bigqueryProjectId,
53+
datasetName,
54+
kafkaTopicName,
55+
maxTasks,
56+
connectorClass,
57+
keyConverter,
58+
valueConverter,
59+
valueSchemasEnable);
60+
}
61+
62+
public static void createBigQuerySinkConnector(
63+
String projectId,
64+
String region,
65+
String connectClusterId,
66+
String connectorId,
67+
String bigqueryProjectId,
68+
String datasetName,
69+
String kafkaTopicName,
70+
String maxTasks,
71+
String connectorClass,
72+
String keyConverter,
73+
String valueConverter,
74+
String valueSchemasEnable)
75+
throws Exception {
76+
77+
// Build the connector configuration
78+
Map<String, String> configMap = new HashMap<>();
79+
configMap.put("name", connectorId);
80+
configMap.put("project", bigqueryProjectId);
81+
configMap.put("topics", kafkaTopicName);
82+
configMap.put("tasks.max", maxTasks);
83+
configMap.put("connector.class", connectorClass);
84+
configMap.put("key.converter", keyConverter);
85+
configMap.put("value.converter", valueConverter);
86+
configMap.put("value.converter.schemas.enable", valueSchemasEnable);
87+
configMap.put("defaultDataset", datasetName);
88+
89+
Connector connector =
90+
Connector.newBuilder()
91+
.setName(ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
92+
.putAllConfigs(configMap)
93+
.build();
94+
95+
try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
96+
CreateConnectorRequest request =
97+
CreateConnectorRequest.newBuilder()
98+
.setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
99+
.setConnectorId(connectorId)
100+
.setConnector(connector)
101+
.build();
102+
103+
// This operation is being handled synchronously.
104+
Connector response = managedKafkaConnectClient.createConnector(request);
105+
System.out.printf("Created BigQuery Sink connector: %s\n", response.getName());
106+
} catch (IOException | ApiException e) {
107+
System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
108+
}
109+
}
110+
}
111+
112+
// [END managedkafka_create_bigquery_sink_connector]
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package examples;
18+
19+
// [START managedkafka_create_cloud_storage_sink_connector]
20+
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.cloud.managedkafka.v1.ConnectClusterName;
23+
import com.google.cloud.managedkafka.v1.Connector;
24+
import com.google.cloud.managedkafka.v1.ConnectorName;
25+
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
26+
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
27+
import java.io.IOException;
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
public class CreateCloudStorageSinkConnector {
32+
33+
public static void main(String[] args) throws Exception {
34+
// TODO(developer): Replace these variables before running the example.
35+
String projectId = "my-project-id";
36+
String region = "my-region"; // e.g. us-east1
37+
String connectClusterId = "my-connect-cluster";
38+
String connectorId = "my-gcs-sink-connector";
39+
String bucketName = "my-gcs-bucket";
40+
String kafkaTopicName = "kafka-topic";
41+
String connectorClass = "io.aiven.kafka.connect.gcs.GcsSinkConnector";
42+
String maxTasks = "3";
43+
String gcsCredentialsDefault = "true";
44+
String formatOutputType = "json";
45+
String valueConverter = "org.apache.kafka.connect.json.JsonConverter";
46+
String valueSchemasEnable = "false";
47+
String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
48+
createCloudStorageSinkConnector(
49+
projectId,
50+
region,
51+
connectClusterId,
52+
connectorId,
53+
bucketName,
54+
kafkaTopicName,
55+
connectorClass,
56+
maxTasks,
57+
gcsCredentialsDefault,
58+
formatOutputType,
59+
valueConverter,
60+
valueSchemasEnable,
61+
keyConverter);
62+
}
63+
64+
public static void createCloudStorageSinkConnector(
65+
String projectId,
66+
String region,
67+
String connectClusterId,
68+
String connectorId,
69+
String bucketName,
70+
String kafkaTopicName,
71+
String connectorClass,
72+
String maxTasks,
73+
String gcsCredentialsDefault,
74+
String formatOutputType,
75+
String valueConverter,
76+
String valueSchemasEnable,
77+
String keyConverter)
78+
throws Exception {
79+
80+
// Build the connector configuration
81+
Map<String, String> configMap = new HashMap<>();
82+
configMap.put("connector.class", connectorClass);
83+
configMap.put("tasks.max", maxTasks);
84+
configMap.put("topics", kafkaTopicName);
85+
configMap.put("gcs.bucket.name", bucketName);
86+
configMap.put("gcs.credentials.default", gcsCredentialsDefault);
87+
configMap.put("format.output.type", formatOutputType);
88+
configMap.put("name", connectorId);
89+
configMap.put("value.converter", valueConverter);
90+
configMap.put("value.converter.schemas.enable", valueSchemasEnable);
91+
configMap.put("key.converter", keyConverter);
92+
93+
Connector connector = Connector.newBuilder()
94+
.setName(
95+
ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
96+
.putAllConfigs(configMap)
97+
.build();
98+
99+
try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
100+
CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
101+
.setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
102+
.setConnectorId(connectorId)
103+
.setConnector(connector)
104+
.build();
105+
106+
// This operation is being handled synchronously.
107+
Connector response = managedKafkaConnectClient.createConnector(request);
108+
System.out.printf("Created Cloud Storage Sink connector: %s\n", response.getName());
109+
} catch (IOException | ApiException e) {
110+
System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
111+
}
112+
}
113+
}
114+
115+
// [END managedkafka_create_cloud_storage_sink_connector]
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package examples;
18+
19+
// [START managedkafka_create_connect_cluster]
20+
21+
import com.google.api.gax.longrunning.OperationFuture;
22+
import com.google.api.gax.longrunning.OperationSnapshot;
23+
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
24+
import com.google.api.gax.retrying.RetrySettings;
25+
import com.google.api.gax.retrying.RetryingFuture;
26+
import com.google.api.gax.retrying.TimedRetryAlgorithm;
27+
import com.google.cloud.managedkafka.v1.CapacityConfig;
28+
import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
29+
import com.google.cloud.managedkafka.v1.ConnectCluster;
30+
import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
31+
import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
32+
import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
33+
import com.google.cloud.managedkafka.v1.LocationName;
34+
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
35+
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
36+
import com.google.cloud.managedkafka.v1.OperationMetadata;
37+
import java.time.Duration;
38+
import java.util.concurrent.ExecutionException;
39+
40+
public class CreateConnectCluster {
41+
42+
public static void main(String[] args) throws Exception {
43+
// TODO(developer): Replace these variables before running the example.
44+
String projectId = "my-project-id";
45+
String region = "my-region"; // e.g. us-east1
46+
String clusterId = "my-connect-cluster";
47+
String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
48+
String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
49+
int cpu = 12;
50+
long memoryBytes = 12884901888L; // 12 GiB
51+
createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
52+
}
53+
54+
public static void createConnectCluster(
55+
String projectId,
56+
String region,
57+
String clusterId,
58+
String subnet,
59+
String kafkaCluster,
60+
int cpu,
61+
long memoryBytes)
62+
throws Exception {
63+
CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
64+
.setMemoryBytes(memoryBytes).build();
65+
ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
66+
.setPrimarySubnet(subnet)
67+
.build();
68+
// Optionally, you can also specify additional accessible subnets and resolvable
69+
// DNS domains as part of your network configuration. For example:
70+
// .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
71+
// .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
72+
ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
73+
.setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
74+
.build();
75+
ConnectCluster connectCluster = ConnectCluster.newBuilder()
76+
.setCapacityConfig(capacityConfig)
77+
.setGcpConfig(gcpConfig)
78+
.setKafkaCluster(kafkaCluster)
79+
.build();
80+
81+
// Create the settings to configure the timeout for polling operations
82+
ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
83+
TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
84+
RetrySettings.newBuilder()
85+
.setTotalTimeoutDuration(Duration.ofHours(1L))
86+
.build());
87+
settingsBuilder.createConnectClusterOperationSettings()
88+
.setPollingAlgorithm(timedRetryAlgorithm);
89+
90+
try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
91+
.create(settingsBuilder.build())) {
92+
CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
93+
.setParent(LocationName.of(projectId, region).toString())
94+
.setConnectClusterId(clusterId)
95+
.setConnectCluster(connectCluster)
96+
.build();
97+
98+
// The duration of this operation can vary considerably, typically taking
99+
// between 10-30 minutes.
100+
OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
101+
.createConnectClusterOperationCallable().futureCall(request);
102+
103+
// Get the initial LRO and print details.
104+
OperationSnapshot operation = future.getInitialFuture().get();
105+
System.out.printf(
106+
"Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
107+
operation.getName(), operation.isDone(), future.getMetadata().get().toString());
108+
109+
while (!future.isDone()) {
110+
// The pollingFuture gives us the most recent status of the operation
111+
RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
112+
OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
113+
System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
114+
currentOp.getName(),
115+
currentOp.isDone());
116+
}
117+
118+
// NOTE: future.get() blocks completion until the operation is complete (isDone
119+
// = True)
120+
ConnectCluster response = future.get();
121+
System.out.printf("Created connect cluster: %s\n", response.getName());
122+
} catch (ExecutionException e) {
123+
System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n",
124+
e.getMessage());
125+
throw e;
126+
}
127+
}
128+
}
129+
// [END managedkafka_create_connect_cluster]

0 commit comments

Comments
 (0)