|
12 | 12 | from ..auth import AuthParams |
13 | 13 | from ..deserializers import DeserializeableShape, ShapeDeserializer |
14 | 14 | from ..endpoints import EndpointResolverParams |
15 | | -from ..exceptions import RetryError, SmithyError |
| 15 | +from ..exceptions import ClientTimeoutError, RetryError, SmithyError |
16 | 16 | from ..interceptors import ( |
17 | 17 | InputContext, |
18 | 18 | Interceptor, |
@@ -448,24 +448,32 @@ async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape]( |
448 | 448 |
|
449 | 449 | _LOGGER.debug("Sending request %s", request_context.transport_request) |
450 | 450 |
|
451 | | - if request_future is not None: |
452 | | - # If we have an input event stream (or duplex event stream) then we |
453 | | - # need to let the client return ASAP so that it can start sending |
454 | | - # events. So here we start the transport send in a background task |
455 | | - # then set the result of the request future. It's important to sequence |
456 | | - # it just like that so that the client gets a stream that's ready |
457 | | - # to send. |
458 | | - transport_task = asyncio.create_task( |
459 | | - self.transport.send(request=request_context.transport_request) |
460 | | - ) |
461 | | - request_future.set_result(request_context) |
462 | | - transport_response = await transport_task |
463 | | - else: |
464 | | - # If we don't have an input stream, there's no point in creating a |
465 | | - # task, so we just immediately await the coroutine. |
466 | | - transport_response = await self.transport.send( |
467 | | - request=request_context.transport_request |
468 | | - ) |
| 451 | + try: |
| 452 | + if request_future is not None: |
| 453 | + # If we have an input event stream (or duplex event stream) then we |
| 454 | + # need to let the client return ASAP so that it can start sending |
| 455 | + # events. So here we start the transport send in a background task |
| 456 | + # then set the result of the request future. It's important to sequence |
| 457 | + # it just like that so that the client gets a stream that's ready |
| 458 | + # to send. |
| 459 | + transport_task = asyncio.create_task( |
| 460 | + self.transport.send(request=request_context.transport_request) |
| 461 | + ) |
| 462 | + request_future.set_result(request_context) |
| 463 | + transport_response = await transport_task |
| 464 | + else: |
| 465 | + # If we don't have an input stream, there's no point in creating a |
| 466 | + # task, so we just immediately await the coroutine. |
| 467 | + transport_response = await self.transport.send( |
| 468 | + request=request_context.transport_request |
| 469 | + ) |
| 470 | + except Exception as e: |
| 471 | + error_info = self.transport.get_error_info(e) |
| 472 | + if error_info.is_timeout_error: |
| 473 | + raise ClientTimeoutError( |
| 474 | + message=f"Client timeout occurred: {e}", fault=error_info.fault |
| 475 | + ) from e |
| 476 | + raise |
469 | 477 |
|
470 | 478 | _LOGGER.debug("Received response: %s", transport_response) |
471 | 479 |
|
|
0 commit comments