Skip to content

Commit 3de0a03

Browse files
salmanygemini-code-assist[bot]glasnt
authored
MKC Examples - Connector Operations (#13527)
* Add Managed Kafka Connect code samples for clusters * Adds code examples for creating, deleting, getting, listing and updating Managed Kafka Connect clusters * Update google-cloud-managedkafka version to 0.1.12 * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/create_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/delete_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/get_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/list_connect_clusters.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update managedkafka/snippets/connect/clusters/update_connect_cluster.py Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Add timeouts and improve error handling. * Addressed PR comments. * Adds requirements.txt file for samples. As per the [Authoring Guide] (https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/AUTHORING_GUIDE.md#dependencies), each sample is required to have a requirements.txt file that lists the dependencies needed to run the sample. * Adds code examples for Managed Kafka Connect Connectors: * Creating BigQuery Sink Connector * Creating MirrorMaker Source Connector * Creating Pub/Sub Sink Connector * Creating Pub/Sub Source Connector * Creating Storage Sink Connector * Getting, updating, deleting, and listing connectors * Starting, stopping, pausing, restarting and resuming connectors * Minor fixes. * Remove redundant file. * Fix merge issues. * Add example for `bootstrap.servers` * Remove obsolete MM2 connector example. * Clarify use for restart_connector. * Fix comment on bootstrap.servers for MM2 example. * Remove obsolete file. * Fix variable naming and tasks.max value. * Fix connectors_test.py linting errors. * Fix linting errors for connector samples. * linting (black) * Remove requirements.txt to fix linting conflicts. * sort order --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Katie McLaughlin <[email protected]>
1 parent 624e0a1 commit 3de0a03

14 files changed

+719
-5
lines changed

managedkafka/snippets/connect/clusters/delete_connect_cluster.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@ def delete_connect_cluster(
4141
# region = "us-central1"
4242
# connect_cluster_id = "my-connect-cluster"
4343

44-
client = ManagedKafkaConnectClient()
44+
connect_client = ManagedKafkaConnectClient()
4545

4646
request = managedkafka_v1.DeleteConnectClusterRequest(
47-
name=client.connect_cluster_path(project_id, region, connect_cluster_id),
47+
name=connect_client.connect_cluster_path(project_id, region, connect_cluster_id),
4848
)
4949

5050
try:
51-
operation = client.delete_connect_cluster(request=request)
51+
operation = connect_client.delete_connect_cluster(request=request)
5252
print(f"Waiting for operation {operation.operation.name} to complete...")
5353
operation.result()
5454
print("Deleted Connect cluster")

managedkafka/snippets/connect/connectors/connectors_test.py

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,23 @@
2020
import create_mirrormaker2_source_connector
2121
import create_pubsub_sink_connector
2222
import create_pubsub_source_connector
23+
import delete_connector
24+
import get_connector
2325
from google.api_core.operation import Operation
2426
from google.cloud import managedkafka_v1
27+
import list_connectors
28+
import pause_connector
2529
import pytest
30+
import restart_connector
31+
import resume_connector
32+
import stop_connector
33+
import update_connector
2634

2735

2836
PROJECT_ID = "test-project-id"
2937
REGION = "us-central1"
3038
CONNECT_CLUSTER_ID = "test-connect-cluster-id"
39+
CONNECTOR_ID = "test-connector-id"
3140

3241

3342
@mock.patch(
@@ -194,3 +203,203 @@ def test_create_bigquery_sink_connector(
194203
assert "Created Connector" in out
195204
assert connector_id in out
196205
mock_method.assert_called_once()
206+
207+
208+
@mock.patch(
209+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.list_connectors"
210+
)
211+
def test_list_connectors(
212+
mock_method: MagicMock,
213+
capsys: pytest.CaptureFixture[str],
214+
) -> None:
215+
connector = managedkafka_v1.types.Connector()
216+
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
217+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
218+
)
219+
mock_method.return_value = [connector]
220+
221+
list_connectors.list_connectors(
222+
project_id=PROJECT_ID,
223+
region=REGION,
224+
connect_cluster_id=CONNECT_CLUSTER_ID,
225+
)
226+
227+
out, _ = capsys.readouterr()
228+
assert "Got connector" in out
229+
assert CONNECTOR_ID in out
230+
mock_method.assert_called_once()
231+
232+
233+
@mock.patch(
234+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.get_connector"
235+
)
236+
def test_get_connector(
237+
mock_method: MagicMock,
238+
capsys: pytest.CaptureFixture[str],
239+
) -> None:
240+
connector = managedkafka_v1.types.Connector()
241+
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
242+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
243+
)
244+
mock_method.return_value = connector
245+
246+
get_connector.get_connector(
247+
project_id=PROJECT_ID,
248+
region=REGION,
249+
connect_cluster_id=CONNECT_CLUSTER_ID,
250+
connector_id=CONNECTOR_ID,
251+
)
252+
253+
out, _ = capsys.readouterr()
254+
assert "Got connector" in out
255+
assert CONNECTOR_ID in out
256+
mock_method.assert_called_once()
257+
258+
259+
@mock.patch(
260+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.update_connector"
261+
)
262+
def test_update_connector(
263+
mock_method: MagicMock,
264+
capsys: pytest.CaptureFixture[str],
265+
) -> None:
266+
configs = {"tasks.max": "6", "value.converter.schemas.enable": "true"}
267+
operation = mock.MagicMock(spec=Operation)
268+
connector = managedkafka_v1.types.Connector()
269+
connector.name = managedkafka_v1.ManagedKafkaConnectClient.connector_path(
270+
PROJECT_ID, REGION, CONNECT_CLUSTER_ID, CONNECTOR_ID
271+
)
272+
operation.result = mock.MagicMock(return_value=connector)
273+
mock_method.return_value = operation
274+
275+
update_connector.update_connector(
276+
project_id=PROJECT_ID,
277+
region=REGION,
278+
connect_cluster_id=CONNECT_CLUSTER_ID,
279+
connector_id=CONNECTOR_ID,
280+
configs=configs,
281+
)
282+
283+
out, _ = capsys.readouterr()
284+
assert "Updated connector" in out
285+
assert CONNECTOR_ID in out
286+
mock_method.assert_called_once()
287+
288+
289+
@mock.patch(
290+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.delete_connector"
291+
)
292+
def test_delete_connector(
293+
mock_method: MagicMock,
294+
capsys: pytest.CaptureFixture[str],
295+
) -> None:
296+
operation = mock.MagicMock(spec=Operation)
297+
operation.result = mock.MagicMock(return_value=None)
298+
mock_method.return_value = operation
299+
300+
delete_connector.delete_connector(
301+
project_id=PROJECT_ID,
302+
region=REGION,
303+
connect_cluster_id=CONNECT_CLUSTER_ID,
304+
connector_id=CONNECTOR_ID,
305+
)
306+
307+
out, _ = capsys.readouterr()
308+
assert "Deleted connector" in out
309+
mock_method.assert_called_once()
310+
311+
312+
@mock.patch(
313+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.pause_connector"
314+
)
315+
def test_pause_connector(
316+
mock_method: MagicMock,
317+
capsys: pytest.CaptureFixture[str],
318+
) -> None:
319+
operation = mock.MagicMock(spec=Operation)
320+
operation.result = mock.MagicMock(return_value=None)
321+
mock_method.return_value = operation
322+
323+
pause_connector.pause_connector(
324+
project_id=PROJECT_ID,
325+
region=REGION,
326+
connect_cluster_id=CONNECT_CLUSTER_ID,
327+
connector_id=CONNECTOR_ID,
328+
)
329+
330+
out, _ = capsys.readouterr()
331+
assert "Paused connector" in out
332+
assert CONNECTOR_ID in out
333+
mock_method.assert_called_once()
334+
335+
336+
@mock.patch(
337+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.resume_connector"
338+
)
339+
def test_resume_connector(
340+
mock_method: MagicMock,
341+
capsys: pytest.CaptureFixture[str],
342+
) -> None:
343+
operation = mock.MagicMock(spec=Operation)
344+
operation.result = mock.MagicMock(return_value=None)
345+
mock_method.return_value = operation
346+
347+
resume_connector.resume_connector(
348+
project_id=PROJECT_ID,
349+
region=REGION,
350+
connect_cluster_id=CONNECT_CLUSTER_ID,
351+
connector_id=CONNECTOR_ID,
352+
)
353+
354+
out, _ = capsys.readouterr()
355+
assert "Resumed connector" in out
356+
assert CONNECTOR_ID in out
357+
mock_method.assert_called_once()
358+
359+
360+
@mock.patch(
361+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.stop_connector"
362+
)
363+
def test_stop_connector(
364+
mock_method: MagicMock,
365+
capsys: pytest.CaptureFixture[str],
366+
) -> None:
367+
operation = mock.MagicMock(spec=Operation)
368+
operation.result = mock.MagicMock(return_value=None)
369+
mock_method.return_value = operation
370+
371+
stop_connector.stop_connector(
372+
project_id=PROJECT_ID,
373+
region=REGION,
374+
connect_cluster_id=CONNECT_CLUSTER_ID,
375+
connector_id=CONNECTOR_ID,
376+
)
377+
378+
out, _ = capsys.readouterr()
379+
assert "Stopped connector" in out
380+
assert CONNECTOR_ID in out
381+
mock_method.assert_called_once()
382+
383+
384+
@mock.patch(
385+
"google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.restart_connector"
386+
)
387+
def test_restart_connector(
388+
mock_method: MagicMock,
389+
capsys: pytest.CaptureFixture[str],
390+
) -> None:
391+
operation = mock.MagicMock(spec=Operation)
392+
operation.result = mock.MagicMock(return_value=None)
393+
mock_method.return_value = operation
394+
395+
restart_connector.restart_connector(
396+
project_id=PROJECT_ID,
397+
region=REGION,
398+
connect_cluster_id=CONNECT_CLUSTER_ID,
399+
connector_id=CONNECTOR_ID,
400+
)
401+
402+
out, _ = capsys.readouterr()
403+
assert "Restarted connector" in out
404+
assert CONNECTOR_ID in out
405+
mock_method.assert_called_once()

managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
1516
def create_bigquery_sink_connector(
1617
project_id: str,
1718
region: str,

managedkafka/snippets/connect/connectors/create_mirrormaker2_source_connector.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
1516
def create_mirrormaker2_source_connector(
1617
project_id: str,
1718
region: str,
@@ -75,8 +76,9 @@ def create_mirrormaker2_source_connector(
7576
"target.cluster.alias": target_cluster_alias, # This is usually the primary cluster.
7677
# Replicate all topics from the source
7778
"topics": topics,
78-
# The value for bootstrap.servers is a comma-separated list of hostname:port pairs
79-
# for one or more Kafka brokers in the source/target cluster.
79+
# The value for bootstrap.servers is a hostname:port pair for the Kafka broker in
80+
# the source/target cluster.
81+
# For example: "kafka-broker:9092"
8082
"source.cluster.bootstrap.servers": source_bootstrap_servers,
8183
"target.cluster.bootstrap.servers": target_bootstrap_servers,
8284
# You can define an exclusion policy for topics as follows:

managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
1516
def create_pubsub_sink_connector(
1617
project_id: str,
1718
region: str,

managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
1516
def create_pubsub_source_connector(
1617
project_id: str,
1718
region: str,
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def delete_connector(
17+
project_id: str,
18+
region: str,
19+
connect_cluster_id: str,
20+
connector_id: str,
21+
) -> None:
22+
"""
23+
Delete a connector.
24+
25+
Args:
26+
project_id: Google Cloud project ID.
27+
region: Cloud region.
28+
connect_cluster_id: ID of the Kafka Connect cluster.
29+
connector_id: ID of the connector.
30+
31+
Raises:
32+
This method will raise the GoogleAPICallError exception if the operation errors.
33+
"""
34+
# [START managedkafka_delete_connector]
35+
from google.api_core.exceptions import GoogleAPICallError
36+
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
37+
ManagedKafkaConnectClient,
38+
)
39+
from google.cloud import managedkafka_v1
40+
41+
# TODO(developer)
42+
# project_id = "my-project-id"
43+
# region = "us-central1"
44+
# connect_cluster_id = "my-connect-cluster"
45+
# connector_id = "my-connector"
46+
47+
connect_client = ManagedKafkaConnectClient()
48+
49+
request = managedkafka_v1.DeleteConnectorRequest(
50+
name=connect_client.connector_path(project_id, region, connect_cluster_id, connector_id),
51+
)
52+
53+
try:
54+
operation = connect_client.delete_connector(request=request)
55+
print(f"Waiting for operation {operation.operation.name} to complete...")
56+
operation.result()
57+
print("Deleted connector")
58+
except GoogleAPICallError as e:
59+
print(f"The operation failed with error: {e}")
60+
61+
# [END managedkafka_delete_connector]

0 commit comments

Comments
 (0)