Skip to content

Commit 3891f04

Browse files
committed
Refactor plugin architecture
1 parent 63c6733 commit 3891f04

File tree

14 files changed

+120
-135
lines changed

14 files changed

+120
-135
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"ipython>=6.2.1,<7.0.0",
4444
"plyvel==1.0.5",
4545
"web3==4.4.1",
46-
"lahja==0.8.0",
46+
"lahja==0.9.0",
4747
"uvloop==0.11.2;platform_system=='Linux' or platform_system=='Darwin'",
4848
"websockets==5.0.1",
4949
],

tests/trinity/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ def event_loop():
6565

6666

6767
@pytest.fixture(scope='module')
68-
def event_bus(event_loop):
68+
async def event_bus(event_loop):
6969
bus = EventBus()
7070
endpoint = bus.create_endpoint('test')
7171
bus.start(event_loop)
72-
endpoint.connect(event_loop)
72+
await endpoint.connect(event_loop)
7373
try:
7474
yield endpoint
7575
finally:

trinity/extensibility/events.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@
33
Type,
44
TYPE_CHECKING,
55
)
6-
from argparse import (
7-
Namespace,
8-
)
96

10-
from trinity.config import (
11-
TrinityConfig,
7+
from lahja import (
8+
BaseEvent,
129
)
1310

1411

@@ -18,23 +15,12 @@
1815
)
1916

2017

21-
class BaseEvent:
22-
"""
23-
The base class for all plugin events. Plugin events can be broadcasted for all different
24-
kind of reasons. Plugins can act based on these events and consume the events even before
25-
the plugin is started, giving plugins the chance to start based on an event or a series of
26-
events. The startup of Trinity itself can be an event as well as the start of a plugin itself
27-
which, for instance, gives other plugins the chance to start based on these previous events.
28-
"""
29-
pass
30-
31-
3218
class PluginStartedEvent(BaseEvent):
3319
"""
3420
Broadcasted when a plugin was started
3521
"""
36-
def __init__(self, plugin: 'BasePlugin') -> None:
37-
self.plugin = plugin
22+
def __init__(self, plugin_type: Type['BasePlugin']) -> None:
23+
self.plugin_type = plugin_type
3824

3925

4026
class ResourceAvailableEvent(BaseEvent):

trinity/extensibility/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,11 @@ class UnsuitableShutdownError(BaseTrinityError):
1010
``PluginManager`` instance that operates in the ``SharedProcessScope``.
1111
"""
1212
pass
13+
14+
15+
class EventBusNotReady(BaseTrinityError):
16+
"""
17+
Raised when a plugin tried to access an EventBus before the plugin
18+
had received its ``ready`` call.
19+
"""
20+
pass

trinity/extensibility/plugin.py

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@
2929
MAIN_EVENTBUS_ENDPOINT
3030
)
3131
from trinity.events import (
32-
ShutdownRequest
32+
ShutdownRequest,
3333
)
3434
from trinity.extensibility.events import (
35-
BaseEvent
35+
BaseEvent,
36+
PluginStartedEvent,
37+
)
38+
from trinity.extensibility.exceptions import (
39+
EventBusNotReady,
3640
)
3741
from trinity.utils.ipc import (
3842
kill_process_gracefully
@@ -70,6 +74,7 @@ def shutdown_host(self) -> None:
7074
class BasePlugin(ABC):
7175

7276
context: PluginContext = None
77+
running: bool = False
7378

7479
@property
7580
@abstractmethod
@@ -85,35 +90,41 @@ def name(self) -> str:
8590
def logger(self) -> logging.Logger:
8691
return logging.getLogger('trinity.extensibility.plugin.BasePlugin#{0}'.format(self.name))
8792

93+
@property
94+
def event_bus(self) -> Endpoint:
95+
if self.context is None:
96+
raise EventBusNotReady("Tried accessing ``event_bus`` before ``ready`` was called")
97+
98+
return self.context.event_bus
99+
88100
def set_context(self, context: PluginContext) -> None:
89101
"""
90102
Set the :class:`~trinity.extensibility.plugin.PluginContext` for this plugin.
91103
"""
92104
self.context = context
93105

94-
def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAction) -> None:
106+
def ready(self) -> None:
95107
"""
96-
Called at startup, giving the plugin a chance to amend the Trinity CLI argument parser
108+
Called after the plugin received its context and is ready to bootstrap itself.
97109
"""
98110
pass
99111

100-
def handle_event(self, activation_event: BaseEvent) -> None:
112+
def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAction) -> None:
101113
"""
102-
Notify the plugin about an event, giving it the chance to do internal accounting right
103-
before :meth:`~trinity.extensibility.plugin.BasePlugin.should_start` is called
114+
Called at startup, giving the plugin a chance to amend the Trinity CLI argument parser
104115
"""
105-
106116
pass
107117

108-
def should_start(self) -> bool:
118+
def boot(self) -> None:
109119
"""
110-
Return ``True`` if the plugin should start, otherwise return ``False``
120+
Prepare the plugin to get started and eventually cause ``start`` to get called.
111121
"""
112-
113-
return False
114-
115-
def _start(self) -> None:
122+
self.running = True
116123
self.start()
124+
self.event_bus.broadcast(
125+
PluginStartedEvent(type(self))
126+
)
127+
self.logger.info("Plugin started: %s", self.name)
117128

118129
def start(self) -> None:
119130
"""
@@ -159,18 +170,26 @@ class BaseIsolatedPlugin(BaseSyncStopPlugin):
159170

160171
_process: Process = None
161172

162-
def _start(self) -> None:
173+
def boot(self) -> None:
174+
"""
175+
Prepare the plugin to get started and eventually cause ``start`` to get called.
176+
"""
177+
self.running = True
163178
self._process = ctx.Process(
164179
target=self._prepare_start,
165180
)
166181

167182
self._process.start()
183+
self.logger.info("Plugin started: %s", self.name)
168184

169185
def _prepare_start(self) -> None:
170186
log_queue = self.context.boot_kwargs['log_queue']
171187
level = self.context.boot_kwargs.get('log_level', logging.INFO)
172188
setup_queue_logging(log_queue, level)
173-
189+
self.event_bus.connect_no_wait()
190+
self.event_bus.broadcast(
191+
PluginStartedEvent(type(self))
192+
)
174193
self.start()
175194

176195
def stop(self) -> None:
@@ -193,10 +212,6 @@ def configure_parser(self, arg_parser: ArgumentParser, subparser: _SubParsersAct
193212
def handle_event(self, activation_event: BaseEvent) -> None:
194213
self.logger.info("Debug plugin: handle_event called: %s", activation_event)
195214

196-
def should_start(self) -> bool:
197-
self.logger.info("Debug plugin: should_start called")
198-
return True
199-
200215
def start(self) -> None:
201216
self.logger.info("Debug plugin: start called")
202217
asyncio.ensure_future(self.count_forever())

trinity/extensibility/plugin_manager.py

Lines changed: 7 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,6 @@
2727
from trinity.config import (
2828
TrinityConfig
2929
)
30-
from trinity.extensibility.events import (
31-
BaseEvent,
32-
PluginStartedEvent,
33-
)
3430
from trinity.extensibility.exceptions import (
3531
UnsuitableShutdownError,
3632
)
@@ -136,7 +132,6 @@ class PluginManager:
136132
def __init__(self, scope: BaseManagerProcessScope) -> None:
137133
self._scope = scope
138134
self._plugin_store: List[BasePlugin] = []
139-
self._started_plugins: List[BasePlugin] = []
140135
self._logger = logging.getLogger("trinity.extensibility.plugin_manager.PluginManager")
141136

142137
@property
@@ -165,34 +160,6 @@ def amend_argparser_config(self,
165160
for plugin in self._plugin_store:
166161
plugin.configure_parser(arg_parser, subparser)
167162

168-
def broadcast(self, event: BaseEvent, exclude: BasePlugin = None) -> None:
169-
"""
170-
Notify every registered :class:`~trinity.extensibility.plugin.BasePlugin` about an
171-
event and check whether the plugin wants to start based on that event.
172-
173-
If a plugin gets started it will cause a
174-
:class:`~trinity.extensibility.events.PluginStartedEvent` to get
175-
broadcasted to all other plugins, giving them the chance to start based on that.
176-
"""
177-
for plugin in self._plugin_store:
178-
179-
if plugin is exclude or not self._scope.is_responsible_for_plugin(plugin):
180-
self._logger.debug("Skipping plugin %s (not responsible)", plugin.name)
181-
continue
182-
183-
plugin.handle_event(event)
184-
185-
if plugin in self._started_plugins:
186-
continue
187-
188-
if not plugin.should_start():
189-
continue
190-
191-
plugin._start()
192-
self._started_plugins.append(plugin)
193-
self._logger.info("Plugin started: %s", plugin.name)
194-
self.broadcast(PluginStartedEvent(plugin), plugin)
195-
196163
def prepare(self,
197164
args: Namespace,
198165
trinity_config: TrinityConfig,
@@ -208,6 +175,7 @@ def prepare(self,
208175

209176
context = self._scope.create_plugin_context(plugin, args, trinity_config, boot_kwargs)
210177
plugin.set_context(context)
178+
plugin.ready()
211179

212180
def shutdown_blocking(self) -> None:
213181
"""
@@ -219,14 +187,15 @@ def shutdown_blocking(self) -> None:
219187

220188
self._logger.info("Shutting down PluginManager with scope %s", type(self._scope))
221189

222-
for plugin in self._started_plugins:
190+
for plugin in self._plugin_store:
223191

224-
if not isinstance(plugin, BaseSyncStopPlugin):
192+
if not isinstance(plugin, BaseSyncStopPlugin) or not plugin.running:
225193
continue
226194

227195
try:
228196
self._logger.info("Stopping plugin: %s", plugin.name)
229197
plugin.stop()
198+
plugin.running = False
230199
self._logger.info("Successfully stopped plugin: %s", plugin.name)
231200
except Exception:
232201
self._logger.exception("Exception thrown while stopping plugin %s", plugin.name)
@@ -241,8 +210,8 @@ async def shutdown(self) -> None:
241210
self._logger.info("Shutting down PluginManager with scope %s", type(self._scope))
242211

243212
async_plugins = [
244-
plugin for plugin in self._started_plugins
245-
if isinstance(plugin, BaseAsyncStopPlugin)
213+
plugin for plugin in self._plugin_store
214+
if isinstance(plugin, BaseAsyncStopPlugin) and plugin.running
246215
]
247216

248217
stop_results = await asyncio.gather(
@@ -255,6 +224,7 @@ async def shutdown(self) -> None:
255224
'Exception thrown while stopping plugin %s: %s', plugin.name, result
256225
)
257226
else:
227+
plugin.running = False
258228
self._logger.info("Successfully stopped plugin: %s", plugin.name)
259229

260230
def _stop_plugins(self,

trinity/main.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@
108108
def main() -> None:
109109
event_bus = EventBus(ctx)
110110
main_endpoint = event_bus.create_endpoint(MAIN_EVENTBUS_ENDPOINT)
111-
main_endpoint.connect()
111+
main_endpoint.connect_no_wait()
112112

113113
plugin_manager = setup_plugins(
114114
MainAndIsolatedProcessScope(event_bus, main_endpoint)
@@ -343,21 +343,15 @@ def _sigint_handler(*args: Any) -> None:
343343
def launch_node(args: Namespace, trinity_config: TrinityConfig, endpoint: Endpoint) -> None:
344344
with trinity_config.process_id_file('networking'):
345345

346-
endpoint.connect()
347-
348346
NodeClass = trinity_config.node_class
349-
# Temporary hack: We setup a second instance of the PluginManager.
350-
# The first instance was only to configure the ArgumentParser whereas
351-
# for now, the second instance that lives inside the networking process
352-
# performs the bulk of the work. In the future, the PluginManager
353-
# should probably live in its own process and manage whether plugins
354-
# run in the shared plugin process or spawn their own.
347+
node = NodeClass(endpoint, trinity_config)
348+
loop = node.get_event_loop()
355349

350+
endpoint.connect_no_wait(loop)
351+
# This is a second PluginManager instance governing plugins in a shared process.
356352
plugin_manager = setup_plugins(SharedProcessScope(endpoint))
357353
plugin_manager.prepare(args, trinity_config)
358354

359-
node = NodeClass(plugin_manager, trinity_config)
360-
loop = node.get_event_loop()
361355
asyncio.ensure_future(handle_networking_exit(node, plugin_manager, endpoint), loop=loop)
362356
asyncio.ensure_future(node.run(), loop=loop)
363357
loop.run_forever()

0 commit comments

Comments
 (0)