Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
name: test${{ '' }} # zizmor: ignore[obfuscation] nest jobs under the same sidebar category
strategy:
matrix:
# 3.9 supported until 2026-04-29.
# See https://github.com/boto/botocore?tab=readme-ov-file#notices
python-version:
- 3.9
- >-
Expand Down
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Changes
^^^^^^^^^^^^^^^^^^^
* support passing `socket_factory` as part of `connector_args` in `AioConfig`
* relax botocore dependency specification
* Add ability to specify executor on AioConfig to run file load events on

3.0.0 (2025-12-09)
^^^^^^^^^^^^^^^^^^^
Expand Down
17 changes: 17 additions & 0 deletions aiobotocore/_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import inspect
from asyncio import AbstractEventLoop
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional

from aiobotocore.config import PARAM_SENTINAL


async def resolve_awaitable(obj):
Expand All @@ -14,3 +19,15 @@ async def async_any(items):
return True

return False


async def optionally_run_in_executor(
loop: AbstractEventLoop,
executor: Optional[ThreadPoolExecutor],
Copy link
Collaborator

@jakob-keller jakob-keller Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_in_executor() accepts instances of concurrent.futures.Executor and uses the default executor if executor is None. It might reduce friction, if we follow the standard library's approach.

Then again, the function would be reduced to line 29 and could be dropped entirely.

func: Callable,
*args,
):
if executor != PARAM_SENTINAL:
return await loop.run_in_executor(executor, func, *args)

return func(*args)
3 changes: 3 additions & 0 deletions aiobotocore/_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from aiobotocore.httpsession import AIOHTTPSession

DEFAULT_HTTP_SESSION_CLS = AIOHTTPSession
4 changes: 3 additions & 1 deletion aiobotocore/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import botocore.serialize
from botocore.args import ClientArgsCreator, EPRBuiltins

from aiobotocore._types import DEFAULT_HTTP_SESSION_CLS

from .config import AioConfig
from .endpoint import DEFAULT_HTTP_SESSION_CLS, AioEndpointCreator
from .endpoint import AioEndpointCreator
from .parsers import create_parser
from .regions import AioEndpointRulesetResolver
from .signers import AioRequestSigner
Expand Down
31 changes: 27 additions & 4 deletions aiobotocore/client.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about remaining blocking file I/O? Is the scope of this PR limited to _load_service_model() and _load_service_endpoints_ruleset() on purpose?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the main pain points to my understanding, we can always expand upon this later

Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from botocore.auth import resolve_auth_type
from botocore.awsrequest import prepare_request_dict
from botocore.client import (
Expand All @@ -17,7 +19,10 @@
from botocore.utils import get_service_module_name
from botocore.waiter import xform_name

from aiobotocore.config import PARAM_SENTINAL

from . import waiter
from ._helpers import optionally_run_in_executor
from .args import AioClientArgsCreator
from .context import with_current_context
from .credentials import AioRefreshableCredentials
Expand Down Expand Up @@ -48,12 +53,30 @@ async def create_client(
'choose-service-name', service_name=service_name
)
service_name = first_non_none_response(responses, default=service_name)
service_model = self._load_service_model(service_name, api_version)
loop = asyncio.get_event_loop()
load_executor = client_config and getattr(
client_config, 'load_executor', PARAM_SENTINAL
)

service_model = await optionally_run_in_executor(
loop,
load_executor,
self._load_service_model,
service_name,
api_version,
)

try:
endpoints_ruleset_data = self._load_service_endpoints_ruleset(
service_name, api_version
endpoints_ruleset_data = await optionally_run_in_executor(
loop,
load_executor,
self._load_service_endpoints_ruleset,
service_name,
api_version,
)
partition_data = await optionally_run_in_executor(
loop, load_executor, self._loader.load_data, 'partitions'
)
partition_data = self._loader.load_data('partitions')
except UnknownServiceError:
endpoints_ruleset_data = None
partition_data = None
Expand Down
23 changes: 19 additions & 4 deletions aiobotocore/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import ssl
import sys
from concurrent.futures import ThreadPoolExecutor
from typing import Optional, TypedDict, Union

import botocore.client
Expand All @@ -10,7 +11,7 @@
from botocore.exceptions import ParamValidationError

from ._constants import DEFAULT_KEEPALIVE_TIMEOUT
from .endpoint import DEFAULT_HTTP_SESSION_CLS
from ._types import DEFAULT_HTTP_SESSION_CLS
from .httpsession import AIOHTTPSession
from .httpxsession import HttpxSession

Expand Down Expand Up @@ -38,23 +39,34 @@ class _ConnectorArgs(TypedDict):

_HttpSessionType = Union[AIOHTTPSession, HttpxSession]

PARAM_SENTINAL = object()


class AioConfig(botocore.client.Config):
def __init__(
self,
connector_args: Optional[_ConnectorArgs] = None,
http_session_cls: type[_HttpSessionType] = DEFAULT_HTTP_SESSION_CLS,
load_executor: Optional[ThreadPoolExecutor] = PARAM_SENTINAL,
**kwargs,
):
super().__init__(**kwargs)

self._validate_connector_args(connector_args, http_session_cls)

if load_executor not in (None, PARAM_SENTINAL) and not isinstance(
load_executor, ThreadPoolExecutor
):
raise ParamValidationError(
report='load_executor value must be an instance of an Executor.'
)

self.load_executor = load_executor

self.connector_args: _ConnectorArgs = (
copy.copy(connector_args) if connector_args else {}
)
self.http_session_cls: type[_HttpSessionType] = http_session_cls
self._validate_connector_args(
self.connector_args, self.http_session_cls
)

if 'keepalive_timeout' not in self.connector_args:
self.connector_args['keepalive_timeout'] = (
Expand All @@ -72,6 +84,9 @@ def _validate_connector_args(
connector_args: _ConnectorArgs,
http_session_cls: type[_HttpSessionType],
) -> None:
if connector_args is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are changes from #1454 reverted on purpose? Why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate before assignment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if connector_args: would be slightly more efficient

return

for k, v in connector_args.items():
# verify_ssl is handled by verify parameter to create_client
if k == 'use_dns_cache':
Expand Down
4 changes: 1 addition & 3 deletions aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
)
from botocore.hooks import first_non_none_response

from aiobotocore._types import DEFAULT_HTTP_SESSION_CLS
from aiobotocore.httpchecksum import handle_checksum_body
from aiobotocore.httpsession import AIOHTTPSession
from aiobotocore.parsers import AioResponseParserFactory
from aiobotocore.response import HttpxStreamingBody, StreamingBody

Expand All @@ -24,8 +24,6 @@
except ImportError:
httpx = None

DEFAULT_HTTP_SESSION_CLS = AIOHTTPSession


async def convert_to_response_dict(http_response, operation_model):
"""Convert an HTTP response object to a request dict.
Expand Down
45 changes: 45 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import socket

import aiohttp.resolver
Expand Down Expand Up @@ -81,6 +82,12 @@ async def test_connector_args(current_http_backend: str):
):
AioConfig({'socket_factory': True}, http_session_cls=HttpxSession)

with pytest.raises(
ParamValidationError,
match='load_executor value must be an instance of an Executor.',
):
AioConfig(load_executor='abc')

# Test valid configs:
AioConfig({"ttl_dns_cache": None})
AioConfig({"ttl_dns_cache": 1})
Expand All @@ -89,6 +96,10 @@ async def test_connector_args(current_http_backend: str):
AioConfig({'socket_factory': None})
AioConfig({'socket_factory': socket.socket})

with concurrent.futures.ThreadPoolExecutor() as executor:
AioConfig(load_executor=executor)
AioConfig(load_executor=None)

# test merge
cfg = AioConfig(read_timeout=75)
aio_cfg = AioConfig({'keepalive_timeout': 75})
Expand Down Expand Up @@ -185,3 +196,37 @@ async def send(self, request):
):
with pytest.raises(SuccessExc):
await s3_client.get_object(Bucket='foo', Key='bar')


async def test_config_load_executor():
with concurrent.futures.ThreadPoolExecutor() as executor:
config = AioConfig(load_executor=executor)

session = AioSession()
async with (
AIOServer() as server,
session.create_client(
's3',
config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx',
) as s3_client,
):
response = await s3_client.get_object(Bucket='foo', Key='bar')
assert response['Body']

config = AioConfig(load_executor=None)
session = AioSession()
async with (
AIOServer() as server,
session.create_client(
's3',
config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx',
) as s3_client,
):
response = await s3_client.get_object(Bucket='foo', Key='bar')
assert response['Body']
Loading