Skip to content

Commit edafd4d

Browse files
committed
Fix failing unit test, handle queue can not be found
1 parent cb39964 commit edafd4d

File tree

6 files changed

+50
-32
lines changed

6 files changed

+50
-32
lines changed

dispatcher/brokers/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import importlib
2+
from typing import Optional
23
from types import ModuleType
34

45
from dispatcher.brokers.base import BaseBroker
@@ -19,8 +20,16 @@ def get_sync_broker(broker_name, broker_config) -> BaseBroker:
1920
return broker_module.SyncBroker(**broker_config)
2021

2122

22-
def get_sync_publisher_from_settings() -> BaseBroker:
23-
publish_broker = settings.publish['default_broker']
23+
def get_sync_publisher_from_settings(publish_broker: Optional[str] = None) -> BaseBroker:
24+
if publish_broker:
25+
pass
26+
elif len(settings.brokers) == 1:
27+
publish_broker = list(settings.brokers.keys())[0]
28+
elif 'default_broker' in settings.publish:
29+
publish_broker = settings.publish['default_broker']
30+
else:
31+
raise RuntimeError(f'Could not determine which broker to publish with between options {list(settings.brokers.keys())}')
32+
2433
return get_sync_broker(publish_broker, settings.brokers[publish_broker])
2534

2635

dispatcher/brokers/pg_notify.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class PGNotifyBase(BaseBroker):
2222
def __init__(
2323
self,
2424
channels: Iterable[str] = ('dispatcher_default',),
25-
default_publish_channel: str = 'dispatcher_default',
25+
default_publish_channel: Optional[str] = None,
2626
) -> None:
2727
self.channels = channels
2828
self.default_publish_channel = default_publish_channel
@@ -167,7 +167,9 @@ def create_connection(config) -> psycopg.Connection:
167167

168168
def publish_message(self, channel: Optional[str] = None, message: str = '') -> None:
169169
connection = self.get_connection()
170-
if not channel:
170+
if channel is None:
171+
if self.default_publish_channel is None:
172+
raise ValueError('Could not determine a channel to use publish to from settings or PGNotify config')
171173
channel = self.default_publish_channel
172174

173175
with connection.cursor() as cur:

tests/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from dispatcher.control import Control
1313

1414
from dispatcher.brokers.pg_notify import SyncBroker, AsyncBroker
15+
from dispatcher.registry import DispatcherMethodRegistry
1516

1617

1718
# List of channels to listen on
@@ -104,3 +105,9 @@ async def pg_control() -> AsyncIterator[Control]:
104105
async def psycopg_conn():
105106
async with aconnection_for_test() as conn:
106107
yield conn
108+
109+
110+
@pytest.fixture
111+
def registry() -> DispatcherMethodRegistry:
112+
"Return a fresh registry, separate from the global one, for testing"
113+
return DispatcherMethodRegistry()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from unittest import mock
2+
3+
import pytest
4+
5+
from dispatcher.publish import task
6+
from dispatcher.config import temporary_settings
7+
8+
9+
def test_apply_async_with_no_queue(registry, conn_config):
10+
@task(registry=registry)
11+
def test_method():
12+
return
13+
14+
dmethod = registry.get_from_callable(test_method)
15+
16+
# These settings do not specify a default channel, that is the main point
17+
with temporary_settings({'brokers': {'pg_notify': {'config': conn_config}}}):
18+
19+
# Can not run a method if we do not have a queue
20+
with pytest.raises(ValueError):
21+
dmethod.apply_async()
22+
23+
# But providing a queue at time of submission works
24+
with mock.patch('dispatcher.brokers.pg_notify.SyncBroker.publish_message') as mock_publish_method:
25+
dmethod.apply_async(queue='fooqueue')
26+
mock_publish_method.assert_called_once_with(channel='fooqueue', message=mock.ANY)
27+
28+
mock_publish_method.assert_called_once()

tests/unit/conftest.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +0,0 @@
1-
import pytest
2-
3-
from dispatcher.registry import DispatcherMethodRegistry
4-
5-
6-
@pytest.fixture
7-
def registry() -> DispatcherMethodRegistry:
8-
"Return a fresh registry, separate from the global one, for testing"
9-
return DispatcherMethodRegistry()

tests/unit/test_publish.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from unittest import mock
22

33
from dispatcher.publish import task
4-
from dispatcher.utils import serialize_task
54

65
import pytest
76

@@ -76,21 +75,3 @@ def run(self):
7675
TestMethod.delay()
7776

7877
mock_apply_async.assert_called_once_with((), {})
79-
80-
81-
def test_apply_async_with_no_queue(registry):
82-
@task(registry=registry)
83-
def test_method():
84-
return
85-
86-
dmethod = registry.get_from_callable(test_method)
87-
88-
# Can not run a method if we do not have a queue
89-
with pytest.raises(ValueError):
90-
dmethod.apply_async()
91-
92-
# But providing a queue at time of submission works
93-
with mock.patch('dispatcher.brokers.pg_notify.publish_message') as mock_publish_method:
94-
dmethod.apply_async(queue='fooqueue')
95-
96-
mock_publish_method.assert_called_once()

0 commit comments

Comments
 (0)