Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public final class ConfigGenerator implements Runnable {
.nullable(false)
.initialize(writer -> {
writer.addDependency(SmithyPythonDependency.SMITHY_CORE);
writer.addImport("smithy_core.retries", "SimpleRetryStrategy");
writer.write("self.retry_strategy = retry_strategy or SimpleRetryStrategy()");
writer.addImport("smithy_core.retries", "StandardRetryStrategy");
writer.write("self.retry_strategy = retry_strategy or StandardRetryStrategy()");
})
.build(),
ConfigProperty.builder()
Expand Down
1 change: 1 addition & 0 deletions packages/smithy-aws-core/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
### Features

* Added a hand-written implmentation for the `restJson1` protocol.
* Added a new retry mode `standard` and made it the default retry strategy.

## v0.0.3

Expand Down
52 changes: 30 additions & 22 deletions packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..auth import AuthParams
from ..deserializers import DeserializeableShape, ShapeDeserializer
from ..endpoints import EndpointResolverParams
from ..exceptions import RetryError, SmithyError
from ..exceptions import ClientTimeoutError, RetryError, SmithyError
from ..interceptors import (
InputContext,
Interceptor,
Expand Down Expand Up @@ -330,7 +330,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
return await self._handle_attempt(call, request_context, request_future)

retry_strategy = call.retry_strategy
retry_token = retry_strategy.acquire_initial_retry_token(
retry_token = await retry_strategy.acquire_initial_retry_token(
token_scope=call.retry_scope
)

Expand All @@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

if isinstance(output_context.response, Exception):
try:
retry_strategy.refresh_retry_token_for_retry(
retry_token = await retry_strategy.refresh_retry_token_for_retry(
token_to_renew=retry_token,
error=output_context.response,
)
Expand All @@ -364,7 +364,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

await seek(request_context.transport_request.body, 0)
else:
retry_strategy.record_success(token=retry_token)
await retry_strategy.record_success(token=retry_token)
return output_context

async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](
Expand Down Expand Up @@ -448,24 +448,32 @@ async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](

_LOGGER.debug("Sending request %s", request_context.transport_request)

if request_future is not None:
# If we have an input event stream (or duplex event stream) then we
# need to let the client return ASAP so that it can start sending
# events. So here we start the transport send in a background task
# then set the result of the request future. It's important to sequence
# it just like that so that the client gets a stream that's ready
# to send.
transport_task = asyncio.create_task(
self.transport.send(request=request_context.transport_request)
)
request_future.set_result(request_context)
transport_response = await transport_task
else:
# If we don't have an input stream, there's no point in creating a
# task, so we just immediately await the coroutine.
transport_response = await self.transport.send(
request=request_context.transport_request
)
try:
if request_future is not None:
# If we have an input event stream (or duplex event stream) then we
# need to let the client return ASAP so that it can start sending
# events. So here we start the transport send in a background task
# then set the result of the request future. It's important to sequence
# it just like that so that the client gets a stream that's ready
# to send.
transport_task = asyncio.create_task(
self.transport.send(request=request_context.transport_request)
)
request_future.set_result(request_context)
transport_response = await transport_task
else:
# If we don't have an input stream, there's no point in creating a
# task, so we just immediately await the coroutine.
transport_response = await self.transport.send(
request=request_context.transport_request
)
except Exception as e:
error_info = self.transport.get_error_info(e)
if error_info.is_timeout_error:
raise ClientTimeoutError(
message=f"Client timeout occurred: {e}", fault=error_info.fault
) from e
raise

_LOGGER.debug("Received response: %s", transport_response)

Expand Down
33 changes: 31 additions & 2 deletions packages/smithy-core/src/smithy_core/aio/interfaces/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
from collections.abc import AsyncIterable, Callable
from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, Protocol, runtime_checkable

from ...documents import TypeRegistry
from ...endpoints import EndpointResolverParams
Expand All @@ -10,6 +11,18 @@
from ...interfaces import StreamingBlob as SyncStreamingBlob
from .eventstream import EventPublisher, EventReceiver


@dataclass(frozen=True)
class ErrorInfo:
"""Information about an error from a transport."""

is_timeout_error: bool
"""Whether this error represents a timeout condition."""

fault: Literal["client", "server"] = "client"
"""Whether the client or server is at fault."""


if TYPE_CHECKING:
from typing_extensions import TypeForm

Expand Down Expand Up @@ -86,7 +99,23 @@ async def resolve_endpoint(self, params: EndpointResolverParams[Any]) -> Endpoin


class ClientTransport[I: Request, O: Response](Protocol):
"""Protocol-agnostic representation of a client tranport (e.g. an HTTP client)."""
"""Protocol-agnostic representation of a client transport (e.g. an HTTP client).

Transport implementations must define the get_error_info method to determine which
exceptions represent timeout conditions for that transport.
"""

def get_error_info(self, exception: Exception, **kwargs) -> ErrorInfo:
"""Get information about an exception.

Args:
exception: The exception to analyze
**kwargs: Additional context for analysis

Returns:
ErrorInfo with timeout and fault information.
"""
...

async def send(self, request: I) -> O:
"""Send a request over the transport and receive the response."""
Expand Down
17 changes: 17 additions & 0 deletions packages/smithy-core/src/smithy_core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class CallError(SmithyError):
is_throttling_error: bool = False
"""Whether the error is a throttling error."""

is_timeout_error: bool = False
"""Whether the error represents a timeout condition."""

def __post_init__(self):
super().__init__(self.message)

Expand All @@ -61,6 +64,20 @@ class ModeledError(CallError):
fault: Fault = "client"


@dataclass(kw_only=True)
class ClientTimeoutError(CallError):
"""Exception raised when a client-side timeout occurs.

This error indicates that the client transport layer encountered a timeout while
attempting to communicate with the server. This typically occurs when network
requests take longer than the configured timeout period.
"""

fault: Fault = "client"
is_timeout_error: bool = True
is_retry_safe: bool = True


class SerializationError(SmithyError):
"""Base exception type for exceptions raised during serialization."""

Expand Down
6 changes: 3 additions & 3 deletions packages/smithy-core/src/smithy_core/interfaces/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class RetryStrategy(Protocol):
max_attempts: int
"""Upper limit on total attempt count (initial attempt plus retries)."""

def acquire_initial_retry_token(
async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> RetryToken:
"""Called before any retries (for the first attempt at the operation).
Expand All @@ -74,7 +74,7 @@ def acquire_initial_retry_token(
"""
...

def refresh_retry_token_for_retry(
async def refresh_retry_token_for_retry(
self, *, token_to_renew: RetryToken, error: Exception
) -> RetryToken:
"""Replace an existing retry token from a failed attempt with a new token.
Expand All @@ -91,7 +91,7 @@ def refresh_retry_token_for_retry(
"""
...

def record_success(self, *, token: RetryToken) -> None:
async def record_success(self, *, token: RetryToken) -> None:
"""Return token after successful completion of an operation.

Upon successful completion of the operation, a user calls this function to
Expand Down
Loading