Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions kombu/transport/gcpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
* ``retry_timeout_seconds``: (int) The maximum time to wait before retrying.
* ``bulk_max_messages``: (int) The maximum number of messages to pull in bulk.
Defaults to 32.
* ``enable_exactly_once_delivery``: (bool) Enable exactly-once delivery for
subscriptions. When enabled, Pub/Sub provides message deduplication.
Defaults to False.
"""

from __future__ import annotations
Expand Down Expand Up @@ -162,6 +165,7 @@ class Channel(virtual.Channel):
default_expiration_seconds = 86400
default_retry_timeout_seconds = 300
default_bulk_max_messages = 32
default_enable_exactly_once_delivery = False

_min_ack_deadline = 10
_fanout_exchanges = set()
Expand Down Expand Up @@ -316,11 +320,10 @@ def _create_subscription(
request={
"name": subscription_path,
"topic": topic_path,
'ack_deadline_seconds': self.ack_deadline_seconds,
'expiration_policy': {
'ttl': f'{self.expiration_seconds}s'
},
'message_retention_duration': f'{msg_retention}s',
"ack_deadline_seconds": self.ack_deadline_seconds,
"expiration_policy": {"ttl": f"{self.expiration_seconds}s"},
"message_retention_duration": f"{msg_retention}s",
"enable_exactly_once_delivery": self.enable_exactly_once_delivery,
**(filter_args or {}),
}
)
Expand Down Expand Up @@ -663,6 +666,13 @@ def bulk_max_messages(self):
'bulk_max_messages', self.default_bulk_max_messages
)

@cached_property
def enable_exactly_once_delivery(self):
return self.transport_options.get(
'enable_exactly_once_delivery',
self.default_enable_exactly_once_delivery
)

def close(self):
"""Close the channel."""
logger.debug('closing channel')
Expand Down
73 changes: 72 additions & 1 deletion t/unit/transport/test_gcpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from concurrent.futures import Future
from datetime import datetime
from queue import Empty
from unittest.mock import MagicMock, call, patch
from unittest.mock import MagicMock, PropertyMock, call, patch

import pytest
from _socket import timeout as socket_timeout
Expand Down Expand Up @@ -622,6 +622,77 @@ def test_bulk_max_messages_default(self, channel):
'bulk_max_messages'
)

@pytest.mark.parametrize(
"transport_options,expected_value",
[
({}, False), # default
({'enable_exactly_once_delivery': True}, True), # enabled
({'enable_exactly_once_delivery': False}, False), # disabled
],
)
def test_enable_exactly_once_delivery(
self, channel, transport_options, expected_value
):
"""Test enable_exactly_once_delivery property with different configurations."""
# Given: A channel with specific transport_options
with patch.object(
type(channel), 'transport_options',
new_callable=PropertyMock, return_value=transport_options
):
# When: Accessing the enable_exactly_once_delivery property
# Then: It should return the expected value
assert channel.enable_exactly_once_delivery is expected_value

@pytest.mark.parametrize(
"enable_exactly_once,expected_value",
[
(True, True), # with exactly-once delivery
(False, False), # without exactly-once delivery
],
)
def test_create_subscription_exactly_once_delivery(
self, channel, enable_exactly_once, expected_value
):
"""Test that create_subscription correctly handles exactly-once delivery setting."""
# Given: A channel with specified exactly-once delivery configuration
channel.project_id = "project_id"
topic_id = "topic_id"
subscription_path = "subscription_path"
topic_path = "topic_path"

channel.subscriber.subscription_path = MagicMock(
return_value=subscription_path
)
channel.publisher.topic_path = MagicMock(return_value=topic_path)
channel.subscriber.create_subscription = MagicMock()

transport_opts = {
'ack_deadline_seconds': 240,
'expiration_seconds': 86400,
}
if enable_exactly_once is not None:
transport_opts['enable_exactly_once_delivery'] = enable_exactly_once

with patch.object(
type(channel), 'transport_options',
new_callable=PropertyMock,
return_value=transport_opts
):
# When: Creating a subscription
result = channel._create_subscription(
project_id=channel.project_id,
topic_id=topic_id,
subscription_path=subscription_path,
topic_path=topic_path,
)

# Then: The subscription should be created with correct exactly-once delivery setting
assert result == subscription_path
channel.subscriber.create_subscription.assert_called_once()
call_args = channel.subscriber.create_subscription.call_args
request_config = call_args[1]['request']
assert request_config['enable_exactly_once_delivery'] is expected_value

def test_close(self, channel):
channel._tmp_subscriptions = {'sub1', 'sub2'}
channel._n_channels.dec.return_value = 0
Expand Down
Loading