-
-
Notifications
You must be signed in to change notification settings - Fork 204
Add support for executor configuration in AioConfig for file loading #1451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Important Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services. |
This comment was marked as outdated.
This comment was marked as outdated.
|
Sure looks like it will solve the issue. Nice work! @chemelli74 and @zweckj will need to confirm since they have the test cases. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #1451 +/- ##
==========================================
+ Coverage 91.60% 91.64% +0.04%
==========================================
Files 76 77 +1
Lines 8078 8118 +40
==========================================
+ Hits 7400 7440 +40
Misses 678 678
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
Thank you for taking the initiative! I would like to clarify the proposed design, before providing an in-depth review. In my understanding, this PR:
Pros:
Cons:
Alternative proposal: UPDATE: #1452 is provided as proof-of-concept of the following, alternative design proposal. I was thinking of a cache-aware wrapper for blocking loader functions, such as: # to be placed in aiobotocore/loaders.py
async def call_cached(loader, func, *args, **kwargs):
# key building logic taken from botocore.loaders
key = (func.__name__,) + args
for pair in sorted(kwargs.items()):
key += pair
if key in loader._cache:
return loader._cache[key]
return await asyncio.to_thread(func, loader, *args, **kwargs)This could be used in light-weight patches wherever we decide to provide non-blocking behavior (everywhere!?), e.g.: class AioClientCreator(ClientCreator):
# ...
async def _load_service_model(self, service_name, api_version=None):
json_model = await call_cached(
self._loader,
self._loader.load_service_model,
service_name,
'service-2',
api_version=api_version,
)
service_model = ServiceModel(json_model, service_name=service_name)
return service_modelI believe this would provide more potential to deliver full non-blocking loaders. Also note that no client configuration would be needed, if we agree that a cache-aware implementation would eliminate the need for user opt-in and use the recommended, high-level Nitpick 1: I welcome introducing type hints such as those enabled by Nitpick 2: botocore provides support for Python 3.9 until 2026-04-29. Are we sure we want to drop support before then? If so, it would simplify this PR if we do this in a separate PR. I'd be happy to prepare that, just let me know. |
|
ya you're right, we should support 3.9 until then, will remove that part, not as bad given we can use the type_extensions |
|
Responses for Cons
|
I don't know what those operations are and was working under the assumption that we need to wrap all file IO in a thread, at least eventually. All the more reason to clarify the goal. @chemelli74 and @zweckj: Could the HA community weigh in on what exactly constitutes unacceptable blocking from their point of view?
Correct, but being aware of the cache could be used to avoid paying the overhead of running code in a separate thread, as demonstrated by #1452. This is not a big issue in
aiobotocore / botocore already supports passing in a custom loader. As I had originally proposed, HA could "simply" initialize and pre-warm a loader (in a thread to prevent blocking) and then have their clients use that without having to worrying about blocking. If that is not feasible, we could provide a utility function to initialize and pre-warm a loader for such a purpose. |
|
the loader case is interesting, since you have context could you post a little pseudo code of how that could work? |
import asyncio
import aiobotocore.session
from botocore.loaders import create_loader
def pre_warm_loader(
loader,
/,
service_name,
api_version=None,
):
# from session.py
loader.load_data_with_path('endpoints')
loader.load_data('sdk-default-configuration')
loader.load_service_model(
service_name, 'waiters-2', api_version
)
loader.load_service_model(
service_name, 'paginators-1', api_version
)
loader.load_service_model(
service_name, type_name='service-2', api_version=api_version
)
loader.list_available_services(
type_name='service-2'
)
# from client.py
loader.load_data('partitions')
loader.load_service_model(
service_name, 'service-2', api_version=api_version
)
loader.load_service_model(
service_name, 'endpoint-rule-set-1', api_version=api_version
)
loader.load_data('_retry')
# from docs/service.py
loader.load_service_model(
service_name, 'examples-1', api_version
)
async def main():
loader = create_loader()
await asyncio.to_thread(pre_warm_loader, loader, "s3")
session = aiobotocore.session.get_session()
session.register_component("data_loader", loader)
async with session.create_client("s3") as client: # should not block
await client.list_buckets() |
|
It should be feasible to perform such pre-warming steps optionally in
The default would initially be
If that turns out to be useful and performant, |
|
@jakob-keller sorry for delays, really want to get back to this, will try to care out some time to think about this some more |
|
Did a quick test, and get: even if |
|
@chemelli74 this is most likely your botocore does not match the aiobotocore version requirement |
|
ok did some research with AI and it seems to think along my lines that 1452 puts its fingers on too much internals when we have a solution that should work at a high level for the problem at hand and is opt-in so it can provide valuable feedback. Also the slowdown would be minimal given they'll be passing in their own executor which could be pre-spawned. Also I did some research in that there's no docs stating that thread workers are GC'd, and, it keeps around 5 threads already once you touch the default work pool (as I previously saw). It will re-use existing threads if they exist but that's just the definition of a pool. Also, I think we want to give the option for clients to restrict how many threads are spawned across multiple sessions and clients if they so please so I think this is would be a great enhancement. I've gone ahead and reverted the 3.9 changes so I think this PR should be good to go. thoughts? |
I took the liberty and added type annotations to |
As I wrote is |
…load # Conflicts: # aiobotocore/config.py # tests/test_config.py # uv.lock
…load # Conflicts: # CHANGES.rst
|
@chemelli74 yes but you didn't say what version of aiobotocore, works fine for me: $ python3
Python 3.11.11 (main, Jan 14 2025, 23:36:41) [Clang 19.1.6 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from aiobotocore.config import AioConfig
>>> import aiobotocore
>>> aiobotocore.__version__
'3.1.0'
>>> import botocore
>>> botocore.__version__
'1.42.19'
>>> |
|
@jakob-keller ok updated and merged in your PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These may be dropped from the PR.
aiobotocore/config.py
Outdated
| ssl_context: NotRequired[ssl.SSLContext] | ||
| resolver: NotRequired[AbstractResolver] | ||
| socket_factory: NotRequired[Optional[SocketFactoryType]] | ||
| resolver: NotRequired[AbstractResolver] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate of line 36.
| connector_args: _ConnectorArgs, | ||
| http_session_cls: type[_HttpSessionType], | ||
| ) -> None: | ||
| if connector_args is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are changes from #1454 reverted on purpose? Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
validate before assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if connector_args: would be slightly more efficient
aiobotocore/config.py
Outdated
|
|
||
| self._validate_connector_args(connector_args, http_session_cls) | ||
|
|
||
| if load_executor and not isinstance(load_executor, ThreadPoolExecutor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move to end of method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to validate before assigning any variables
|
|
||
| async def optionally_run_in_executor( | ||
| loop: AbstractEventLoop, | ||
| executor: Optional[ThreadPoolExecutor], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_in_executor() accepts instances of concurrent.futures.Executor and uses the default executor if executor is None. It might reduce friction, if we follow the standard library's approach.
Then again, the function would be reduced to line 29 and could be dropped entirely.
aiobotocore/config.py
Outdated
| self, | ||
| connector_args: Optional[_ConnectorArgs] = None, | ||
| http_session_cls: type[_HttpSessionType] = DEFAULT_HTTP_SESSION_CLS, | ||
| load_executor: Optional[ThreadPoolExecutor] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
load_executor: Union[bool, concurrent.futures.Executor] = False,
Would be more flexible:
False(default): keep current behaviorTrue: run in default executor, i.e. usingrun_in_executor(None, ...)orasyncio.to_thread(...)- instance of
Executor: userun_in_executor(executor, ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added support for default executor via sentinal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about remaining blocking file I/O? Is the scope of this PR limited to _load_service_model() and _load_service_endpoints_ruleset() on purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the main pain points to my understanding, we can always expand upon this later
#1462 sketches this alternative design. I think it is cleaner and more comprehensive. What are your thoughts? |
we should get feedback from community, @bdraco @chemelli74 . No reason both can't exist too I think. The warm-up one seems like something people can do on their own tho |
I agree, but they seemed to have struggled with it, probably since it's not at all obvious how proper cache warming would need to be performed. My PR is aiming to provide a smooth experience for users who wish to do so. |
Obviously this PR code so: |
still works for me with that version of botocore: >>> from aiobotocore.config import AioConfig
>>> import aiobotocore
>>> import botocore
>>> aiobotocore.__version__
'3.1.0'
>>> botocore.__version__
'1.42.1'you should do a runtime version check, may have a pathing issue. And not obvious, you could always apply this patch to older versions |
|
lets wait until we hear back from @bdraco @chemelli74 |
Description of Change
Added the ability to configure a specific executor in
AioConfigfor handling file load events. This enhancement enables greater flexibility and control for users managing asynchronous tasks.Assumptions
No additional assumptions were made.
Checklist for All Submissions