diff --git a/kombu/transport/gcpubsub.py b/kombu/transport/gcpubsub.py index b47f62083..84f832d47 100644 --- a/kombu/transport/gcpubsub.py +++ b/kombu/transport/gcpubsub.py @@ -35,6 +35,9 @@ * ``retry_timeout_seconds``: (int) The maximum time to wait before retrying. * ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk. Defaults to 32. +* ``enable_exactly_once_delivery``: (bool) Enable exactly-once delivery for + subscriptions. When enabled, Pub/Sub provides message deduplication. + Defaults to False. """ from __future__ import annotations @@ -162,6 +165,7 @@ class Channel(virtual.Channel): default_expiration_seconds = 86400 default_retry_timeout_seconds = 300 default_bulk_max_messages = 32 + default_enable_exactly_once_delivery = False _min_ack_deadline = 10 _fanout_exchanges = set() @@ -316,11 +320,10 @@ def _create_subscription( request={ "name": subscription_path, "topic": topic_path, - 'ack_deadline_seconds': self.ack_deadline_seconds, - 'expiration_policy': { - 'ttl': f'{self.expiration_seconds}s' - }, - 'message_retention_duration': f'{msg_retention}s', + "ack_deadline_seconds": self.ack_deadline_seconds, + "expiration_policy": {"ttl": f"{self.expiration_seconds}s"}, + "message_retention_duration": f"{msg_retention}s", + "enable_exactly_once_delivery": self.enable_exactly_once_delivery, **(filter_args or {}), } ) @@ -672,6 +675,13 @@ def bulk_max_messages(self): 'bulk_max_messages', self.default_bulk_max_messages ) + @cached_property + def enable_exactly_once_delivery(self): + return self.transport_options.get( + 'enable_exactly_once_delivery', + self.default_enable_exactly_once_delivery + ) + def close(self): """Close the channel.""" logger.debug('closing channel') diff --git a/t/unit/transport/test_gcpubsub.py b/t/unit/transport/test_gcpubsub.py index 504eb50a4..778bb57ff 100644 --- a/t/unit/transport/test_gcpubsub.py +++ b/t/unit/transport/test_gcpubsub.py @@ -3,7 +3,7 @@ from concurrent.futures import Future from datetime import datetime from queue import Empty -from unittest.mock import MagicMock, call, patch +from unittest.mock import MagicMock, PropertyMock, call, patch import pytest from _socket import timeout as socket_timeout @@ -747,6 +747,78 @@ def test_bulk_max_messages_default(self, channel): 'bulk_max_messages' ) + @pytest.mark.parametrize( + "transport_options,expected_value", + [ + ({}, False), # default + ({'enable_exactly_once_delivery': True}, True), # enabled + ({'enable_exactly_once_delivery': False}, False), # disabled + ], + ) + def test_enable_exactly_once_delivery( + self, channel, transport_options, expected_value + ): + """Test enable_exactly_once_delivery property with different configurations.""" + # Given: A channel with specific transport_options + with patch.object( + type(channel), 'transport_options', + new_callable=PropertyMock, return_value=transport_options + ): + # When: Accessing the enable_exactly_once_delivery property + # Then: It should return the expected value + assert channel.enable_exactly_once_delivery is expected_value + + @pytest.mark.parametrize( + "enable_exactly_once,expected_value", + [ + (None, False), # default behaviour + (True, True), # with exactly-once delivery + (False, False), # without exactly-once delivery + ], + ) + def test_create_subscription_exactly_once_delivery( + self, channel, enable_exactly_once, expected_value + ): + """Test that create_subscription correctly handles exactly-once delivery setting.""" + # Given: A channel with specified exactly-once delivery configuration + channel.project_id = "project_id" + topic_id = "topic_id" + subscription_path = "subscription_path" + topic_path = "topic_path" + + channel.subscriber.subscription_path = MagicMock( + return_value=subscription_path + ) + channel.publisher.topic_path = MagicMock(return_value=topic_path) + channel.subscriber.create_subscription = MagicMock() + + transport_opts = { + 'ack_deadline_seconds': 240, + 'expiration_seconds': 86400, + } + if enable_exactly_once is not None: + transport_opts['enable_exactly_once_delivery'] = enable_exactly_once + + with patch.object( + type(channel), 'transport_options', + new_callable=PropertyMock, + return_value=transport_opts + ): + # When: Creating a subscription + result = channel._create_subscription( + project_id=channel.project_id, + topic_id=topic_id, + subscription_path=subscription_path, + topic_path=topic_path, + ) + + # Then: The subscription should be created with correct exactly-once delivery setting + assert result == subscription_path + channel.subscriber.create_subscription.assert_called_once() + call_args = channel.subscriber.create_subscription.call_args + request_config = call_args[1]['request'] + assert request_config['enable_exactly_once_delivery'] is expected_value + def test_close(self, channel): channel._tmp_subscriptions = {'sub1', 'sub2'} channel._n_channels.dec.return_value = 0