diff --git a/managedkafka/snippets/connect/connectors/connectors_test.py b/managedkafka/snippets/connect/connectors/connectors_test.py new file mode 100644 index 0000000000..7e3101af7f --- /dev/null +++ b/managedkafka/snippets/connect/connectors/connectors_test.py @@ -0,0 +1,195 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock +from unittest.mock import MagicMock + +from google.api_core.operation import Operation +from google.cloud import managedkafka_v1 +import pytest + +import create_bigquery_sink_connector +import create_cloud_storage_sink_connector +import create_mirrormaker_source_connector +import create_pubsub_sink_connector +import create_pubsub_source_connector + +PROJECT_ID = "test-project-id" +REGION = "us-central1" +CONNECT_CLUSTER_ID = "test-connect-cluster-id" + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_mirrormaker2_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "MM2_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_mirrormaker_source_connector.create_mirrormaker_source_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + "3", + connector_id, + "GMK_TOPIC_NAME", + "source", + "target", + "GMK_SOURCE_CLUSTER_DNS", + "GMK_TARGET_CLUSTER_DNS", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_source_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "CPS_SOURCE_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_source_connector.create_pubsub_source_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "CPS_SUBSCRIPTION_ID", + "GCP_PROJECT_ID", + "3", + "org.apache.kafka.connect.converters.ByteArrayConverter", + "org.apache.kafka.connect.storage.StringConverter", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_pubsub_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "CPS_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_pubsub_sink_connector.create_pubsub_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "org.apache.kafka.connect.storage.StringConverter", + "org.apache.kafka.connect.storage.StringConverter", + "CPS_TOPIC_ID", + "GCP_PROJECT_ID", + "3", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_cloud_storage_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "GCS_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_cloud_storage_sink_connector.create_cloud_storage_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "GCS_BUCKET_NAME", + "3", + "json", + "org.apache.kafka.connect.json.JsonConverter", + "false", + "org.apache.kafka.connect.storage.StringConverter", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id + + +@mock.patch( + "google.cloud.managedkafka_v1.services.managed_kafka_connect.ManagedKafkaConnectClient.create_connector" +) +def test_create_bigquery_sink_connector( + mock_method: MagicMock, + capsys: pytest.CaptureFixture[str], +) -> None: + connector_id = "BQ_SINK_CONNECTOR_ID" + operation = mock.MagicMock(spec=Operation) + connector = managedkafka_v1.types.Connector() + connector.name = connector_id + operation.result = mock.MagicMock(return_value=connector) + mock_method.return_value = operation + + create_bigquery_sink_connector.create_bigquery_sink_connector( + PROJECT_ID, + REGION, + CONNECT_CLUSTER_ID, + connector_id, + "GMK_TOPIC_ID", + "3", + "org.apache.kafka.connect.storage.StringConverter", + "org.apache.kafka.connect.json.JsonConverter", + "false", + "BQ_DATASET_ID", + ) + + out, _ = capsys.readouterr() + assert "Created Connector" in out + assert connector_id in out + mock_method.assert_called_once() diff --git a/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py new file mode 100644 index 0000000000..b6719e82ea --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_bigquery_sink_connector.py @@ -0,0 +1,97 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def create_bigquery_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + tasks_max: str, + key_converter: str, + value_converter: str, + value_converter_schemas_enable: str, + default_dataset: str, +) -> None: + """ + Create a BigQuery Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + tasks_max: Maximum number of tasks. + key_converter: Key converter class. + value_converter: Value converter class. + value_converter_schemas_enable: Enable schemas for value converter. + default_dataset: BigQuery dataset ID. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "BQ_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # tasks_max = "3" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + # value_converter = "org.apache.kafka.connect.json.JsonConverter" + # value_converter_schemas_enable = "false" + # default_dataset = "BQ_DATASET_ID" + + # [START managedkafka_create_bigquery_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "name": connector_id, + "project": project_id, + "topics": topics, + "tasks.max": tasks_max, + "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", + "key.converter": key_converter, + "value.converter": value_converter, + "value.converter.schemas.enable": value_converter_schemas_enable, + "defaultDataset": default_dataset, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_bigquery_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py new file mode 100644 index 0000000000..8e6d7bc2c7 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_cloud_storage_sink_connector.py @@ -0,0 +1,101 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def create_cloud_storage_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + gcs_bucket_name: str, + tasks_max: str, + format_output_type: str, + value_converter: str, + value_converter_schemas_enable: str, + key_converter: str, +) -> None: + """ + Create a Cloud Storage Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + gcs_bucket_name: Google Cloud Storage bucket name. + tasks_max: Maximum number of tasks. + format_output_type: Output format type. + value_converter: Value converter class. + value_converter_schemas_enable: Enable schemas for value converter. + key_converter: Key converter class. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "GCS_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # gcs_bucket_name = "GCS_BUCKET_NAME" + # tasks_max = "3" + # format_output_type = "json" + # value_converter = "org.apache.kafka.connect.json.JsonConverter" + # value_converter_schemas_enable = "false" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + + # [START managedkafka_create_cloud_storage_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "io.aiven.kafka.connect.gcs.GcsSinkConnector", + "tasks.max": tasks_max, + "topics": topics, + "gcs.bucket.name": gcs_bucket_name, + "gcs.credentials.default": "true", + "format.output.type": format_output_type, + "name": connector_id, + "value.converter": value_converter, + "value.converter.schemas.enable": value_converter_schemas_enable, + "key.converter": key_converter, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_cloud_storage_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py b/managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py new file mode 100644 index 0000000000..e897cd3c04 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_mirrormaker_source_connector.py @@ -0,0 +1,94 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def create_mirrormaker_source_connector( + project_id: str, + region: str, + connect_cluster_id: str, + tasks_max: str, + connector_id: str, + topics: str, + source_cluster_alias: str, + target_cluster_alias: str, + source_bootstrap_servers: str, + target_bootstrap_servers: str, +) -> None: + """ + Create a MirrorMaker 2.0 Source connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + tasks_max: Controls the level of parallelism for the connector. + connector_name: Name of the connector. + topics: Topics to mirror. + source_cluster_alias: Alias for the source cluster. + target_cluster_alias: Alias for the target cluster. + source_bootstrap_servers: Source cluster bootstrap servers. + target_bootstrap_servers: Target cluster bootstrap servers. This is usually the primary cluster. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # tasks_max = "3" + # connector_id = "MM2_CONNECTOR_ID" + # topics = "GMK_TOPIC_NAME" + # source_cluster_alias = "source" + # target_cluster_alias = "target" + # source_bootstrap_servers = "GMK_SOURCE_CLUSTER_DNS" + # target_bootstrap_servers = "GMK_TARGET_CLUSTER_DNS" + + # [START managedkafka_create_mirrormaker2_source_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", + "source.cluster.alias": source_cluster_alias, + "target.cluster.alias": target_cluster_alias, + "tasks.max": tasks_max, + "topics": topics, + "source.cluster.bootstrap.servers": source_bootstrap_servers, + "target.cluster.bootstrap.servers": target_bootstrap_servers, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_mirrormaker2_source_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py new file mode 100644 index 0000000000..2742d8166d --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_sink_connector.py @@ -0,0 +1,96 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def create_pubsub_sink_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + topics: str, + value_converter: str, + key_converter: str, + cps_topic: str, + cps_project: str, + tasks_max: str, +) -> None: + """ + Create a Pub/Sub Sink connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + topics: Kafka topics to read from. + value_converter: Value converter class. + key_converter: Key converter class. + cps_topic: Cloud Pub/Sub topic ID. + cps_project: Cloud Pub/Sub project ID. + tasks_max: Maximum number of tasks. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "CPS_SINK_CONNECTOR_ID" + # topics = "GMK_TOPIC_ID" + # value_converter = "org.apache.kafka.connect.storage.StringConverter" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + # cps_topic = "CPS_TOPIC_ID" + # cps_project = "GCP_PROJECT_ID" + # tasks_max = "3" + + # [START managedkafka_create_pubsub_sink_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector", + "name": connector_id, + "tasks.max": tasks_max, + "topics": topics, + "value.converter": value_converter, + "key.converter": key_converter, + "cps.topic": cps_topic, + "cps.project": cps_project, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_pubsub_sink_connector] diff --git a/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py new file mode 100644 index 0000000000..d5c2acc701 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/create_pubsub_source_connector.py @@ -0,0 +1,96 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def create_pubsub_source_connector( + project_id: str, + region: str, + connect_cluster_id: str, + connector_id: str, + kafka_topic: str, + cps_subscription: str, + cps_project: str, + tasks_max: str, + value_converter: str, + key_converter: str, +) -> None: + """ + Create a Pub/Sub Source connector. + + Args: + project_id: Google Cloud project ID. + region: Cloud region. + connect_cluster_id: ID of the Kafka Connect cluster. + connector_id: Name of the connector. + kafka_topic: Kafka topic to publish to. + cps_subscription: Cloud Pub/Sub subscription ID. + cps_project: Cloud Pub/Sub project ID. + tasks_max: Maximum number of tasks. + value_converter: Value converter class. + key_converter: Key converter class. + + Raises: + This method will raise the GoogleAPICallError exception if the operation errors or + the timeout before the operation completes is reached. + """ + # TODO(developer): Update with your config values. Here is a sample configuration: + # project_id = "my-project-id" + # region = "us-central1" + # connect_cluster_id = "my-connect-cluster" + # connector_id = "CPS_SOURCE_CONNECTOR_ID" + # kafka_topic = "GMK_TOPIC_ID" + # cps_subscription = "CPS_SUBSCRIPTION_ID" + # cps_project = "GCP_PROJECT_ID" + # tasks_max = "3" + # value_converter = "org.apache.kafka.connect.converters.ByteArrayConverter" + # key_converter = "org.apache.kafka.connect.storage.StringConverter" + + # [START managedkafka_create_pubsub_source_connector] + from google.api_core.exceptions import GoogleAPICallError + from google.cloud.managedkafka_v1.services.managed_kafka_connect import ( + ManagedKafkaConnectClient, + ) + from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest + + connect_client = ManagedKafkaConnectClient() + parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id) + + configs = { + "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector", + "name": connector_id, + "tasks.max": tasks_max, + "kafka.topic": kafka_topic, + "cps.subscription": cps_subscription, + "cps.project": cps_project, + "value.converter": value_converter, + "key.converter": key_converter, + } + + connector = Connector() + connector.name = connector_id + connector.configs = configs + + request = CreateConnectorRequest( + parent=parent, + connector_id=connector_id, + connector=connector, + ) + + try: + operation = connect_client.create_connector(request=request) + print(f"Waiting for operation {operation.operation.name} to complete...") + response = operation.result() + print("Created Connector:", response) + except GoogleAPICallError as e: + print(f"The operation failed with error: {e}") + # [END managedkafka_create_pubsub_source_connector] diff --git a/managedkafka/snippets/connect/connectors/requirements.txt b/managedkafka/snippets/connect/connectors/requirements.txt new file mode 100644 index 0000000000..5f372e81c4 --- /dev/null +++ b/managedkafka/snippets/connect/connectors/requirements.txt @@ -0,0 +1,6 @@ +protobuf==5.29.4 +pytest==8.2.2 +google-api-core==2.23.0 +google-auth==2.38.0 +google-cloud-managedkafka==0.1.12 +googleapis-common-protos==1.66.0