Skip to content

Commit 30c6a1d

Browse files
salmanygemini-code-assist[bot]glasnt
authored
Add Managed Kafka Connect cluster code samples (#13515)
* Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters * Update google-cloud-managedkafka version to 0.1.12 * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Add timeouts and improve error handling. * Addressed PR comments. * Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. * Remove timeout in update_connect_cluster.py Remove timeout to align with Managed Kafka Cluster update example: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/cdae4cacfe8f9612e554af11ef72bc8d34765ada/managedkafka/snippets/clusters/update_cluster.py#L60 * Fix CPU and memory values for Cluster creation. * Fixed comment for minimum vCpu. * Fix lint by adding space. * Fix lint with space. * Remove file added by mistake. * lint: fix lint errors --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Katie McLaughlin <[email protected]>
1 parent 71b6b42 commit 30c6a1d

File tree

8 files changed

+512
-1
lines changed

8 files changed

+512
-1
lines changed
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from unittest import mock
16+
from unittest.mock import MagicMock
17+
18+
from google.api_core.operation import Operation
19+
from google.cloud import managedkafka_v1
20+
import pytest
21+
22+
import create_connect_cluster # noqa: I100
23+
import delete_connect_cluster
24+
import get_connect_cluster
25+
import list_connect_clusters
26+
import update_connect_cluster
27+
28+
PROJECT_ID = "test-project-id"
29+
REGION = "us-central1"
30+
KAFKA_CLUSTER_ID = "test-cluster-id"
31+
CONNECT_CLUSTER_ID = "test-connect-cluster-id"
32+
33+
34+
@mock.patch(
35+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connect_cluster"
36+
)
37+
def test_create_connect_cluster(
38+
mock_method: MagicMock,
39+
capsys: pytest.CaptureFixture[str],
40+
) -> None:
41+
cpu = 12
42+
memory_bytes = 12884901900 # 12 GB
43+
primary_subnet = "test-subnet"
44+
operation = mock.MagicMock(spec=Operation)
45+
connect_cluster = managedkafka_v1.types.ConnectCluster()
46+
connect_cluster.name = (
47+
managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path(
48+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID
49+
)
50+
)
51+
operation.result = mock.MagicMock(return_value=connect_cluster)
52+
mock_method.return_value = operation
53+
54+
create_connect_cluster.create_connect_cluster(
55+
project_id=PROJECT_ID,
56+
region=REGION,
57+
connect_cluster_id=CONNECT_CLUSTER_ID,
58+
kafka_cluster_id=KAFKA_CLUSTER_ID,
59+
primary_subnet=primary_subnet,
60+
cpu=cpu,
61+
memory_bytes=memory_bytes,
62+
)
63+
64+
out, _ = capsys.readouterr()
65+
assert "Created Connect cluster" in out
66+
assert CONNECT_CLUSTER_ID in out
67+
mock_method.assert_called_once()
68+
69+
70+
@mock.patch(
71+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connect_cluster"
72+
)
73+
def test_get_connect_cluster(
74+
mock_method: MagicMock,
75+
capsys: pytest.CaptureFixture[str],
76+
) -> None:
77+
connect_cluster = managedkafka_v1.types.ConnectCluster()
78+
connect_cluster.name = (
79+
managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path(
80+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID
81+
)
82+
)
83+
mock_method.return_value = connect_cluster
84+
85+
get_connect_cluster.get_connect_cluster(
86+
project_id=PROJECT_ID,
87+
region=REGION,
88+
connect_cluster_id=CONNECT_CLUSTER_ID,
89+
)
90+
91+
out, _ = capsys.readouterr()
92+
assert "Got Connect cluster" in out
93+
assert CONNECT_CLUSTER_ID in out
94+
mock_method.assert_called_once()
95+
96+
97+
@mock.patch(
98+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connect_cluster"
99+
)
100+
def test_update_connect_cluster(
101+
mock_method: MagicMock,
102+
capsys: pytest.CaptureFixture[str],
103+
) -> None:
104+
new_memory_bytes = 12884901900 # 12 GB
105+
operation = mock.MagicMock(spec=Operation)
106+
connect_cluster = managedkafka_v1.types.ConnectCluster()
107+
connect_cluster.name = (
108+
managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path(
109+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID
110+
)
111+
)
112+
connect_cluster.capacity_config.memory_bytes = new_memory_bytes
113+
operation.result = mock.MagicMock(return_value=connect_cluster)
114+
mock_method.return_value = operation
115+
116+
update_connect_cluster.update_connect_cluster(
117+
project_id=PROJECT_ID,
118+
region=REGION,
119+
connect_cluster_id=CONNECT_CLUSTER_ID,
120+
memory_bytes=new_memory_bytes,
121+
)
122+
123+
out, _ = capsys.readouterr()
124+
assert "Updated Connect cluster" in out
125+
assert CONNECT_CLUSTER_ID in out
126+
assert str(new_memory_bytes) in out
127+
mock_method.assert_called_once()
128+
129+
130+
@mock.patch(
131+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connect_clusters"
132+
)
133+
def test_list_connect_clusters(
134+
mock_method: MagicMock,
135+
capsys: pytest.CaptureFixture[str],
136+
) -> None:
137+
connect_cluster = managedkafka_v1.types.ConnectCluster()
138+
connect_cluster.name = (
139+
managedkafka_v1.ManagedKafkaConnectClient.connect_cluster_path(
140+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID
141+
)
142+
)
143+
144+
response = [connect_cluster]
145+
mock_method.return_value = response
146+
147+
list_connect_clusters.list_connect_clusters(
148+
project_id=PROJECT_ID,
149+
region=REGION,
150+
)
151+
152+
out, _ = capsys.readouterr()
153+
assert "Got Connect cluster" in out
154+
assert CONNECT_CLUSTER_ID in out
155+
mock_method.assert_called_once()
156+
157+
158+
@mock.patch(
159+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connect_cluster"
160+
)
161+
def test_delete_connect_cluster(
162+
mock_method: MagicMock,
163+
capsys: pytest.CaptureFixture[str],
164+
) -> None:
165+
operation = mock.MagicMock(spec=Operation)
166+
mock_method.return_value = operation
167+
168+
delete_connect_cluster.delete_connect_cluster(
169+
project_id=PROJECT_ID,
170+
region=REGION,
171+
connect_cluster_id=CONNECT_CLUSTER_ID,
172+
)
173+
174+
out, _ = capsys.readouterr()
175+
assert "Deleted Connect cluster" in out
176+
mock_method.assert_called_once()
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def create_connect_cluster(
17+
project_id: str,
18+
region: str,
19+
connect_cluster_id: str,
20+
kafka_cluster_id: str,
21+
primary_subnet: str,
22+
cpu: int,
23+
memory_bytes: int,
24+
) -> None:
25+
"""
26+
Create a Kafka Connect cluster.
27+
28+
Args:
29+
project_id: Google Cloud project ID.
30+
region: Cloud region.
31+
connect_cluster_id: ID of the Kafka Connect cluster.
32+
kafka_cluster_id: The ID of the primary Managed Service for Apache Kafka cluster.
33+
primary_subnet: The primary VPC subnet for the Connect cluster workers. The expected format is projects/{project_id}/regions/{region}/subnetworks/{subnet_id}.
34+
cpu: Number of vCPUs to provision for the cluster. The minimum is 12.
35+
memory_bytes: The memory to provision for the cluster in bytes. Must be between 1 GiB * cpu and 8 GiB * cpu.
36+
37+
Raises:
38+
This method will raise the GoogleAPICallError exception if the operation errors or
39+
the timeout before the operation completes is reached.
40+
"""
41+
# [START managedkafka_create_connect_cluster]
42+
from google.api_core.exceptions import GoogleAPICallError
43+
from google.cloud import managedkafka_v1
44+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
45+
from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
46+
47+
# TODO(developer): Update with your values.
48+
# project_id = "my-project-id"
49+
# region = "us-central1"
50+
# connect_cluster_id = "my-connect-cluster"
51+
# kafka_cluster_id = "my-kafka-cluster"
52+
# primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
53+
# cpu = 12
54+
# memory_bytes = 12884901888 # 12 GiB
55+
56+
connect_client = ManagedKafkaConnectClient()
57+
kafka_client = managedkafka_v1.ManagedKafkaClient()
58+
59+
parent = connect_client.common_location_path(project_id, region)
60+
kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
61+
62+
connect_cluster = ConnectCluster()
63+
connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
64+
connect_cluster.kafka_cluster = kafka_cluster_path
65+
connect_cluster.capacity_config.vcpu_count = cpu
66+
connect_cluster.capacity_config.memory_bytes = memory_bytes
67+
connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
68+
# Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
69+
# For example:
70+
# connect_cluster.gcp_config.access_config.network_configs = [
71+
# ConnectNetworkConfig(
72+
# primary_subnet=primary_subnet,
73+
# additional_subnets=additional_subnets,
74+
# dns_domain_names=dns_domain_names,
75+
# )
76+
# ]
77+
78+
request = CreateConnectClusterRequest(
79+
parent=parent,
80+
connect_cluster_id=connect_cluster_id,
81+
connect_cluster=connect_cluster,
82+
)
83+
84+
try:
85+
operation = connect_client.create_connect_cluster(request=request)
86+
print(f"Waiting for operation {operation.operation.name} to complete...")
87+
# Creating a Connect cluster can take 10-40 minutes.
88+
response = operation.result(timeout=3000)
89+
print("Created Connect cluster:", response)
90+
except GoogleAPICallError as e:
91+
print(f"The operation failed with error: {e}")
92+
93+
# [END managedkafka_create_connect_cluster]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def delete_connect_cluster(
17+
project_id: str,
18+
region: str,
19+
connect_cluster_id: str,
20+
) -> None:
21+
"""
22+
Delete a Kafka Connect cluster.
23+
24+
Args:
25+
project_id: Google Cloud project ID.
26+
region: Cloud region.
27+
connect_cluster_id: ID of the Kafka Connect cluster.
28+
29+
Raises:
30+
This method will raise the GoogleAPICallError exception if the operation errors.
31+
"""
32+
# [START managedkafka_delete_connect_cluster]
33+
from google.api_core.exceptions import GoogleAPICallError
34+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
35+
ManagedKafkaConnectClient,
36+
)
37+
from google.cloud import managedkafka_v1
38+
39+
# TODO(developer)
40+
# project_id = "my-project-id"
41+
# region = "us-central1"
42+
# connect_cluster_id = "my-connect-cluster"
43+
44+
client = ManagedKafkaConnectClient()
45+
46+
request = managedkafka_v1.DeleteConnectClusterRequest(
47+
name=client.connect_cluster_path(project_id, region, connect_cluster_id),
48+
)
49+
50+
try:
51+
operation = client.delete_connect_cluster(request=request)
52+
print(f"Waiting for operation {operation.operation.name} to complete...")
53+
operation.result()
54+
print("Deleted Connect cluster")
55+
except GoogleAPICallError as e:
56+
print(f"The operation failed with error: {e}")
57+
58+
# [END managedkafka_delete_connect_cluster]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def get_connect_cluster(
17+
project_id: str,
18+
region: str,
19+
connect_cluster_id: str,
20+
) -> None:
21+
"""
22+
Get a Kafka Connect cluster.
23+
24+
Args:
25+
project_id: Google Cloud project ID.
26+
region: Cloud region.
27+
connect_cluster_id: ID of the Kafka Connect cluster.
28+
29+
Raises:
30+
This method will raise the NotFound exception if the Connect cluster is not found.
31+
"""
32+
# [START managedkafka_get_connect_cluster]
33+
from google.api_core.exceptions import NotFound
34+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
35+
from google.cloud import managedkafka_v1
36+
37+
# TODO(developer)
38+
# project_id = "my-project-id"
39+
# region = "us-central1"
40+
# connect_cluster_id = "my-connect-cluster"
41+
42+
client = ManagedKafkaConnectClient()
43+
44+
cluster_path = client.connect_cluster_path(project_id, region, connect_cluster_id)
45+
request = managedkafka_v1.GetConnectClusterRequest(
46+
name=cluster_path,
47+
)
48+
49+
try:
50+
cluster = client.get_connect_cluster(request=request)
51+
print("Got Connect cluster:", cluster)
52+
except NotFound as e:
53+
print(f"Failed to get Connect cluster {connect_cluster_id} with error: {e}")
54+
55+
# [END managedkafka_get_connect_cluster]

0 commit comments

Comments
 (0)