Skip to content

Commit c643578

Browse files
committed
Factories refactor
1 parent bbaa9f7 commit c643578

File tree

10 files changed

+160
-120
lines changed

10 files changed

+160
-120
lines changed

dispatcher/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import logging
33

4-
from dispatcher.main import DispatcherMain
4+
from dispatcher.factories import from_settings
55

66
logger = logging.getLogger(__name__)
77

@@ -12,7 +12,7 @@ def run_service() -> None:
1212
Before calling this you need to configure by calling dispatcher.config.setup
1313
"""
1414
loop = asyncio.get_event_loop()
15-
dispatcher = DispatcherMain.from_settings()
15+
dispatcher = from_settings()
1616
try:
1717
loop.run_until_complete(dispatcher.main())
1818
except KeyboardInterrupt:

dispatcher/brokers/__init__.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +0,0 @@
1-
import importlib
2-
from typing import Optional
3-
from types import ModuleType
4-
5-
from dispatcher.brokers.base import BaseBroker
6-
from dispatcher.config import settings
7-
8-
9-
def get_broker_module(broker_name) -> ModuleType:
10-
return importlib.import_module(f'dispatcher.brokers.{broker_name}')
11-
12-
13-
def get_async_broker(broker_name, broker_config) -> BaseBroker:
14-
broker_module = get_broker_module(broker_name)
15-
return broker_module.AsyncBroker(**broker_config)
16-
17-
18-
def get_sync_broker(broker_name, broker_config) -> BaseBroker:
19-
broker_module = get_broker_module(broker_name)
20-
return broker_module.SyncBroker(**broker_config)
21-
22-
23-
def get_sync_publisher_from_settings(publish_broker: Optional[str] = None) -> BaseBroker:
24-
if publish_broker:
25-
pass
26-
elif len(settings.brokers) == 1:
27-
publish_broker = list(settings.brokers.keys())[0]
28-
elif 'default_broker' in settings.publish:
29-
publish_broker = settings.publish['default_broker']
30-
else:
31-
raise RuntimeError(f'Could not determine which broker to publish with between options {list(settings.brokers.keys())}')
32-
33-
return get_sync_broker(publish_broker, settings.brokers[publish_broker])
34-
35-
36-
def get_async_publisher_from_settings() -> BaseBroker:
37-
publish_broker = settings.publish['default_broker']
38-
return get_async_broker(publish_broker, settings.brokers[publish_broker])

dispatcher/brokers/pg_notify.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import logging
2-
from typing import Any, Iterable, Optional, Callable
2+
from typing import Callable, Iterable, Optional
33

44
import psycopg
55

@@ -21,9 +21,22 @@ class PGNotifyBase(BaseBroker):
2121

2222
def __init__(
2323
self,
24+
config: Optional[dict] = None,
2425
channels: Iterable[str] = ('dispatcher_default',),
2526
default_publish_channel: Optional[str] = None,
2627
) -> None:
28+
"""
29+
channels - listening channels for the service and used for control-and-reply
30+
default_publish_channel - if not specified on task level or in the submission
31+
by default messages will be sent to this channel.
32+
this should be one of the listening channels for messages to be received.
33+
"""
34+
if config:
35+
self._config: dict = config.copy()
36+
self._config['autocommit'] = True
37+
else:
38+
self._config = {}
39+
2740
self.channels = channels
2841
self.default_publish_channel = default_publish_channel
2942

@@ -47,6 +60,8 @@ def get_connection_method(self, factory_path: Optional[str] = None) -> Callable:
4760
else:
4861
raise RuntimeError('Could not construct connection for lack of config or factory')
4962

63+
def create_connection(self): ...
64+
5065

5166
class AsyncBroker(PGNotifyBase):
5267
def __init__(
@@ -60,21 +75,17 @@ def __init__(
6075
if not (config or async_connection_factory or connection):
6176
raise RuntimeError('Must specify either config or async_connection_factory')
6277

63-
if config:
64-
self._config: dict = config.copy()
65-
self._config['autocommit'] = True
66-
else:
67-
self._config = {}
68-
6978
self._async_connection_factory = async_connection_factory
70-
self._connection: Optional[Any] = connection
79+
self._connection = connection
7180

72-
super().__init__(**kwargs)
81+
super().__init__(config=config, **kwargs)
7382

7483
async def get_connection(self) -> psycopg.AsyncConnection:
7584
if not self._connection:
7685
factory = self.get_connection_method(factory_path=self._async_connection_factory)
77-
self._connection = await factory(**self._config)
86+
connection = await factory(**self._config)
87+
self._connection = connection
88+
return connection # slightly weird due to MyPY
7889
return self._connection
7990

8091
@staticmethod
@@ -145,20 +156,16 @@ def __init__(
145156
if not (config or sync_connection_factory or connection):
146157
raise RuntimeError('Must specify either config or async_connection_factory')
147158

148-
if config:
149-
self._config: Optional[dict] = config.copy()
150-
self._config['autocommit'] = True
151-
else:
152-
self._config = None
153-
154159
self._sync_connection_factory = sync_connection_factory
155-
self._connection: Optional[Any] = connection
156-
super().__init__(**kwargs)
160+
self._connection = connection
161+
super().__init__(config=config, **kwargs)
157162

158163
def get_connection(self) -> psycopg.Connection:
159164
if not self._connection:
160165
factory = self.get_connection_method(factory_path=self._sync_connection_factory)
161-
self._connection = factory(**self._config)
166+
connection = factory(**self._config)
167+
self._connection = connection
168+
return connection
162169
return self._connection
163170

164171
@staticmethod

dispatcher/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
class DispatcherSettings:
99
def __init__(self, config: dict) -> None:
10-
self.brokers: list = config.get('brokers', [])
11-
self.producers: list = config.get('producers', [])
10+
self.brokers: dict = config.get('brokers', [])
11+
self.producers: dict = config.get('producers', [])
1212
self.service: dict = config.get('service', {'max_workers': 3})
1313
self.publish: dict = config.get('publish', {})
1414
# TODO: firmly planned sections of config for later

dispatcher/control.py

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import uuid
66
from types import SimpleNamespace
77

8-
from dispatcher.producers.brokered import BrokeredProducer
8+
from dispatcher.factories import get_async_publisher_from_settings, get_sync_publisher_from_settings
9+
from dispatcher.producers import BrokeredProducer
910

1011
logger = logging.getLogger('awx.main.dispatch.control')
1112

@@ -55,10 +56,9 @@ def fatal_error_callback(self, *args):
5556

5657

5758
class Control(object):
58-
def __init__(self, queue, config=None, async_connection=None):
59+
def __init__(self, queue, config=None):
5960
self.queuename = queue
6061
self.config = config
61-
self.async_connection = async_connection
6262

6363
def running(self, *args, **kwargs):
6464
return self.control_with_reply('running', *args, **kwargs)
@@ -90,11 +90,8 @@ async def acontrol_with_reply_internal(self, producer, send_data, expected_repli
9090
return [json.loads(payload) for payload in control_callbacks.received_replies]
9191

9292
def make_producer(self, reply_queue):
93-
if self.async_connection:
94-
conn_kwargs = {'connection': self.async_connection}
95-
else:
96-
conn_kwargs = {'config': self.config}
97-
return BrokeredProducer(broker='pg_notify', channels=[reply_queue], **conn_kwargs)
93+
broker = get_async_publisher_from_settings(channels=[reply_queue])
94+
return BrokeredProducer(broker, close_on_exit=True)
9895

9996
async def acontrol_with_reply(self, command, expected_replies=1, timeout=1, data=None):
10097
reply_queue = Control.generate_reply_queue_name()
@@ -118,9 +115,6 @@ def control_with_reply(self, command, expected_replies=1, timeout=1, data=None):
118115
start = time.time()
119116
reply_queue = Control.generate_reply_queue_name()
120117

121-
if (not self.config) and (not self.async_connection):
122-
raise RuntimeError('Must use a new psycopg connection to do control-and-reply')
123-
124118
send_data = {'control': command, 'reply_to': reply_queue}
125119
if data:
126120
send_data['control_data'] = data
@@ -139,11 +133,10 @@ def control_with_reply(self, command, expected_replies=1, timeout=1, data=None):
139133

140134
# NOTE: this is the synchronous version, only to be used for no-reply
141135
def control(self, command, data=None):
142-
from dispatcher.brokers.pg_notify import publish_message
143-
144136
send_data = {'control': command}
145137
if data:
146138
send_data['control_data'] = data
147139

148140
payload = json.dumps(send_data)
149-
publish_message(self.queuename, payload, config=self.config)
141+
broker = get_sync_publisher_from_settings()
142+
broker.publish_message(channel=self.queuename, message=payload)

dispatcher/factories.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import importlib
2+
from types import ModuleType
3+
from typing import Iterable, Optional
4+
5+
from dispatcher import producers
6+
from dispatcher.brokers.base import BaseBroker
7+
from dispatcher.config import LazySettings
8+
from dispatcher.config import settings as global_settings
9+
from dispatcher.main import DispatcherMain
10+
11+
"""
12+
Creates objects from settings,
13+
This is kept separate from the settings and the class definitions themselves,
14+
which is to avoid import dependencies.
15+
"""
16+
17+
# ---- Service objects ----
18+
19+
20+
def get_broker_module(broker_name) -> ModuleType:
21+
"Static method to alias import_module so we use a consistent import path"
22+
return importlib.import_module(f'dispatcher.brokers.{broker_name}')
23+
24+
25+
def get_async_broker(broker_name: str, broker_config: dict, **overrides) -> BaseBroker:
26+
"""
27+
Given the name of the broker in the settings, and the data under that entry in settings,
28+
return the asyncio broker object.
29+
"""
30+
broker_module = get_broker_module(broker_name)
31+
kwargs = broker_config.copy()
32+
kwargs.update(overrides)
33+
return broker_module.AsyncBroker(**kwargs)
34+
35+
36+
def producers_from_settings(settings: LazySettings = global_settings) -> Iterable[producers.BaseProducer]:
37+
producer_objects = []
38+
for broker_name, broker_kwargs in settings.brokers.items():
39+
broker = get_async_broker(broker_name, broker_kwargs)
40+
producer = producers.BrokeredProducer(broker=broker)
41+
producer_objects.append(producer)
42+
43+
for producer_cls, producer_kwargs in settings.producers.items():
44+
producer_objects.append(getattr(producers, producer_cls)(**producer_kwargs))
45+
46+
return producer_objects
47+
48+
49+
def from_settings(settings: LazySettings = global_settings) -> DispatcherMain:
50+
"""
51+
Returns the main dispatcher object, used for running the background task service.
52+
You could initialize this yourself, but using the shared settings allows for consistency
53+
between the service, publisher, and any other interacting processes.
54+
"""
55+
producers = producers_from_settings(settings=settings)
56+
return DispatcherMain(settings.service, producers)
57+
58+
59+
# ---- Publisher objects ----
60+
61+
62+
def get_sync_broker(broker_name, broker_config) -> BaseBroker:
63+
"""
64+
Given the name of the broker in the settings, and the data under that entry in settings,
65+
return the synchronous broker object.
66+
"""
67+
broker_module = get_broker_module(broker_name)
68+
return broker_module.SyncBroker(**broker_config)
69+
70+
71+
def get_sync_publisher_from_settings(publish_broker: Optional[str] = None, settings: LazySettings = global_settings) -> BaseBroker:
72+
if publish_broker:
73+
pass
74+
elif len(settings.brokers) == 1:
75+
publish_broker = list(settings.brokers.keys())[0]
76+
elif 'default_broker' in settings.publish:
77+
publish_broker = settings.publish['default_broker']
78+
else:
79+
raise RuntimeError(f'Could not determine which broker to publish with between options {list(settings.brokers.keys())}')
80+
81+
return get_sync_broker(publish_broker, settings.brokers[publish_broker])
82+
83+
84+
def get_async_publisher_from_settings(settings: LazySettings = global_settings, **overrides) -> BaseBroker:
85+
"""
86+
An asynchronous publisher is the ideal choice for submitting control-and-reply actions.
87+
This returns an asyncio broker of the default publisher type.
88+
89+
If channels are specified, these completely replace the channel list from settings.
90+
For control-and-reply, this will contain only the reply_to channel, to not receive
91+
unrelated traffic.
92+
"""
93+
publish_broker = settings.publish['default_broker']
94+
return get_async_broker(publish_broker, settings.brokers[publish_broker], **overrides)

0 commit comments

Comments
 (0)