-
Notifications
You must be signed in to change notification settings - Fork 10
A global config and setup method
#64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
2edf0e4
Use full class names in fixtures for simplicity
AlanCoding 477439f
Complete worker settings initialization
AlanCoding 46b808d
linter fixups after worker settings fixing
AlanCoding 622cd4a
Work factories into control module
AlanCoding bbf9a36
Add docs on the config
AlanCoding 34f7098
Fix type hinting issue
AlanCoding 6b4653f
Fix events data structure pattern goof
AlanCoding ff1d6e4
Implement change described in comment, cls and kwargs patterns
AlanCoding f97962a
Fix unit tests
AlanCoding 9706c14
Convert broker base to protocol
AlanCoding a489c1c
Refactor into single broker class
AlanCoding 7cc2b01
Produce a reference schema
AlanCoding 739861b
Fix linters
AlanCoding a351706
Fix link
AlanCoding 5a5125b
Add type hints to async generator
AlanCoding ea9c940
Add type hint to make clear default enforcing
AlanCoding 883a8fb
Test fix from review
AlanCoding 39c7259
Schema re-gen docs
AlanCoding 1e20eae
Add yield to keep protcol method async
AlanCoding File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,20 +1,23 @@ | ||
| # Demo config | ||
| --- | ||
| pool: | ||
| max_workers: 3 | ||
| producers: | ||
| brokers: | ||
| # List of channels to listen on | ||
| service: | ||
| max_workers: 4 | ||
| brokers: | ||
| pg_notify: | ||
| config: | ||
| conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777 | ||
| sync_connection_factory: dispatcher.brokers.pg_notify.connection_saver | ||
| channels: | ||
| - test_channel | ||
| - test_channel2 | ||
| - test_channel3 | ||
| pg_notify: | ||
| # Database connection details | ||
| conninfo: dbname=dispatch_db user=dispatch password=dispatching host=localhost | ||
| port=55777 | ||
| scheduled: | ||
| 'lambda: __import__("time").sleep(1)': | ||
| schedule: 3 | ||
| 'lambda: __import__("time").sleep(2)': | ||
| schedule: 3 | ||
| default_publish_channel: test_channel | ||
| producers: | ||
| ScheduledProducer: | ||
| task_schedule: | ||
| 'lambda: __import__("time").sleep(1)': | ||
| schedule: 3 | ||
| 'lambda: __import__("time").sleep(2)': | ||
| schedule: 3 | ||
| publish: | ||
| default_broker: pg_notify | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| import asyncio | ||
| import logging | ||
|
|
||
| from dispatcher.factories import from_settings | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def run_service() -> None: | ||
| """ | ||
| Runs dispatcher task service (runs tasks due to messages from brokers and other local producers) | ||
| Before calling this you need to configure by calling dispatcher.config.setup | ||
| """ | ||
| loop = asyncio.get_event_loop() | ||
| dispatcher = from_settings() | ||
| try: | ||
| loop.run_until_complete(dispatcher.main()) | ||
| except KeyboardInterrupt: | ||
| logger.info('Dispatcher stopped by KeyboardInterrupt') | ||
| finally: | ||
| loop.close() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| import importlib | ||
| from types import ModuleType | ||
|
|
||
| from .base import BaseBroker | ||
|
|
||
|
|
||
| def get_broker_module(broker_name) -> ModuleType: | ||
| "Static method to alias import_module so we use a consistent import path" | ||
| return importlib.import_module(f'dispatcher.brokers.{broker_name}') | ||
|
|
||
|
|
||
| def get_async_broker(broker_name: str, broker_config: dict, **overrides) -> BaseBroker: | ||
| """ | ||
| Given the name of the broker in the settings, and the data under that entry in settings, | ||
| return the asyncio broker object. | ||
| """ | ||
| broker_module = get_broker_module(broker_name) | ||
| kwargs = broker_config.copy() | ||
| kwargs.update(overrides) | ||
| return broker_module.AsyncBroker(**kwargs) | ||
|
|
||
|
|
||
| def get_sync_broker(broker_name, broker_config) -> BaseBroker: | ||
| """ | ||
| Given the name of the broker in the settings, and the data under that entry in settings, | ||
| return the synchronous broker object. | ||
| """ | ||
| broker_module = get_broker_module(broker_name) | ||
| return broker_module.SyncBroker(**broker_config) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| from abc import abstractmethod | ||
| from typing import Optional | ||
|
|
||
|
|
||
| class BaseBroker: | ||
AlanCoding marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| @abstractmethod | ||
| async def connect(self): ... | ||
|
|
||
| @abstractmethod | ||
| async def aprocess_notify(self, connected_callback=None): ... | ||
|
|
||
| @abstractmethod | ||
| async def apublish_message(self, channel: Optional[str] = None, message: str = '') -> None: ... | ||
|
|
||
| @abstractmethod | ||
| async def aclose(self) -> None: ... | ||
|
|
||
| @abstractmethod | ||
| def get_connection(self): ... | ||
|
|
||
| @abstractmethod | ||
| def publish_message(self, channel=None, message=None): ... | ||
|
|
||
| @abstractmethod | ||
| def close(self): ... | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.