Skip to content

Commit cb39964

Browse files
committed
Cut some more stuff out of the config
1 parent 8b41389 commit cb39964

File tree

6 files changed

+27
-36
lines changed

6 files changed

+27
-36
lines changed

dispatcher/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def run_service() -> None:
1212
Before calling this you need to configure by calling dispatcher.config.setup
1313
"""
1414
loop = asyncio.get_event_loop()
15-
dispatcher = DispatcherMain.from_config()
15+
dispatcher = DispatcherMain.from_settings()
1616
try:
1717
loop.run_until_complete(dispatcher.main())
1818
except KeyboardInterrupt:

dispatcher/brokers/__init__.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from types import ModuleType
33

44
from dispatcher.brokers.base import BaseBroker
5+
from dispatcher.config import settings
56

67

78
def get_broker_module(broker_name) -> ModuleType:
@@ -10,11 +11,19 @@ def get_broker_module(broker_name) -> ModuleType:
1011

1112
def get_async_broker(broker_name, broker_config) -> BaseBroker:
1213
broker_module = get_broker_module(broker_name)
13-
broker_cls = getattr(broker_module, 'AsyncBroker')
14-
return broker_cls(**broker_config)
14+
return broker_module.AsyncBroker(**broker_config)
1515

1616

1717
def get_sync_broker(broker_name, broker_config) -> BaseBroker:
1818
broker_module = get_broker_module(broker_name)
19-
broker_cls = getattr(broker_module, 'SyncBroker')
20-
return broker_cls(**broker_config)
19+
return broker_module.SyncBroker(**broker_config)
20+
21+
22+
def get_sync_publisher_from_settings() -> BaseBroker:
23+
publish_broker = settings.publish['default_broker']
24+
return get_sync_broker(publish_broker, settings.brokers[publish_broker])
25+
26+
27+
def get_async_publisher_from_settings() -> BaseBroker:
28+
publish_broker = settings.publish['default_broker']
29+
return get_async_broker(publish_broker, settings.brokers[publish_broker])

dispatcher/brokers/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ async def aclose(self) -> None: ...
1818
def get_connection(self): ...
1919

2020
@abstractmethod
21-
def publish_message(self, queue, message): ...
21+
def publish_message(self, channel=None, message=None): ...
2222

2323
@abstractmethod
2424
def close(self): ...

dispatcher/config.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,16 @@
44

55
import yaml
66

7-
from dispatcher.publish import DispatcherDecorator
8-
from dispatcher.registry import DispatcherMethodRegistry, registry
9-
from dispatcher.utils import resolve_callable
10-
117

128
class DispatcherSettings:
13-
def __init__(self, config: dict, task_registry: DispatcherMethodRegistry = registry) -> None:
9+
def __init__(self, config: dict) -> None:
1410
self.brokers: list = config.get('brokers', [])
1511
self.producers: list = config.get('producers', [])
1612
self.service: dict = config.get('service', {'max_workers': 3})
17-
self.callbacks: dict = config.get('callbacks', {})
18-
self.tasks: dict = config.get('tasks', {})
19-
self.settings: dict = config.get('settings', {})
20-
21-
for task_name, options in self.tasks.items():
22-
method = resolve_callable(task_name)
23-
if method:
24-
DispatcherDecorator(task_registry, **options)(method)
25-
else:
26-
raise RuntimeError(f'Could not locate task given in config: {task_name}')
13+
self.publish: dict = config.get('publish', {})
14+
# TODO: firmly planned sections of config for later
15+
# self.callbacks: dict = config.get('callbacks', {})
16+
# self.options: dict = config.get('options', {})
2717

2818

2919
def settings_from_file(path: str) -> DispatcherSettings:

dispatcher/main.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,12 @@ def __init__(self, service_config: dict, producers: Iterable[producer_module.Bas
9696
self.events: DispatcherEvents = DispatcherEvents()
9797

9898
@classmethod
99-
def from_config(cls) -> 'DispatcherMain':
100-
producers = DispatcherMain.get_producers()
99+
def from_settings(cls) -> 'DispatcherMain':
100+
producers = DispatcherMain.producers_from_settings()
101101
return cls(settings.service, producers)
102102

103103
@classmethod
104-
def get_producers(cls) -> Iterable[producer_module.BaseProducer]:
104+
def producers_from_settings(cls) -> Iterable[producer_module.BaseProducer]:
105105
producers = []
106106
for broker_name, broker_kwargs in settings.brokers.items():
107107
broker = get_async_broker(broker_name, broker_kwargs)

dispatcher/registry.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from typing import Callable, Optional, Set, Tuple
77
from uuid import uuid4
88

9-
from dispatcher.brokers import get_sync_broker
9+
from dispatcher.brokers import get_sync_publisher_from_settings
1010
from dispatcher.utils import MODULE_METHOD_DELIMITER, DispatcherCallable, resolve_callable
1111

1212
logger = logging.getLogger(__name__)
@@ -82,25 +82,17 @@ def get_async_body(
8282

8383
def apply_async(self, args=None, kwargs=None, queue=None, uuid=None, **kw) -> Tuple[dict, str]:
8484
queue = queue or self.submission_defaults.get('queue')
85-
if not queue:
86-
msg = f'{self.fn}: Queue value required and may not be None'
87-
logger.error(msg)
88-
raise ValueError(msg)
8985

9086
if callable(queue):
9187
queue = queue()
9288

9389
obj = self.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw)
9490

95-
from dispatcher.conf import settings
91+
broker = get_sync_publisher_from_settings()
9692

97-
publish_broker = settings.publish['default_broker']
98-
broker = get_sync_broker(publish_broker, settings.brokers[publish_broker])
93+
# TODO: exit if a setting is applied to disable publishing
9994

100-
# TODO: before sending, consult an app-specific callback if configured
101-
102-
# NOTE: the kw will communicate things in the database connection data
103-
broker.publish_message(queue, json.dumps(obj))
95+
broker.publish_message(channel=queue, message=json.dumps(obj))
10496
return (obj, queue)
10597

10698

0 commit comments

Comments
 (0)