55import re
66from datetime import timedelta
77from enum import Enum
8- from http import HTTPStatus
98from pprint import pformat
109from typing import Any , Final , cast
1110
1211import aiodocker
1312import aiodocker .networks
14- import aiohttp
1513import arrow
14+ import httpx
1615import tenacity
17- from aiohttp import (
18- ClientConnectionError ,
19- ClientError ,
20- ClientResponse ,
21- ClientResponseError ,
22- ClientSession ,
23- ClientTimeout ,
24- )
25- from fastapi import FastAPI
16+ from fastapi import FastAPI , status
2617from packaging .version import Version
2718from servicelib .async_utils import run_sequentially_in_context
2819from servicelib .docker_utils import to_datetime
2920from settings_library .docker_registry import RegistrySettings
30- from tenacity import retry
21+ from tenacity import retry , wait_random_exponential
3122from tenacity .retry import retry_if_exception_type
3223from tenacity .stop import stop_after_attempt
33- from tenacity .wait import wait_fixed
3424
3525from . import docker_utils , registry_proxy
3626from .client_session import get_client_session
@@ -547,7 +537,7 @@ async def _pass_port_to_service(
547537 service_name : str ,
548538 port : str ,
549539 service_boot_parameters_labels : list [Any ],
550- session : ClientSession ,
540+ session : httpx . AsyncClient ,
551541 app_settings : ApplicationSettings ,
552542) -> None :
553543 for param in service_boot_parameters_labels :
@@ -566,8 +556,8 @@ async def _pass_port_to_service(
566556 "port" : str (port ),
567557 }
568558 _logger .debug ("creating request %s and query %s" , service_url , query_string )
569- async with session .post (service_url , data = query_string ) as response :
570- _logger .debug ("query response: %s" , await response .text () )
559+ response = await session .post (service_url , data = query_string )
560+ _logger .debug ("query response: %s" , response .text )
571561 return
572562 _logger .debug ("service %s does not need to know its external port" , service_name )
573563
@@ -1149,50 +1139,48 @@ async def get_service_details(app: FastAPI, node_uuid: str) -> dict:
11491139
11501140
11511141@retry (
1152- wait = wait_fixed ( 2 ),
1142+ wait = wait_random_exponential ( min = 1 , max = 5 ),
11531143 stop = stop_after_attempt (3 ),
11541144 reraise = True ,
1155- retry = retry_if_exception_type (ClientConnectionError ),
1145+ retry = retry_if_exception_type (httpx . RequestError ),
11561146)
11571147async def _save_service_state (
1158- service_host_name : str , session : aiohttp . ClientSession
1148+ service_host_name : str , session : httpx . AsyncClient
11591149) -> None :
1160- response : ClientResponse
1161- async with session .post (
1162- url = f"http://{ service_host_name } /state" , # NOSONAR
1163- timeout = ClientTimeout (
1164- ServicesCommonSettings ().director_dynamic_service_save_timeout
1165- ),
1166- ) as response :
1167- try :
1168- response .raise_for_status ()
1150+ try :
1151+ response = await session .post (
1152+ url = f"http://{ service_host_name } /state" , # NOSONAR
1153+ timeout = ServicesCommonSettings ().director_dynamic_service_save_timeout ,
1154+ )
1155+ response .raise_for_status ()
11691156
1170- except ClientResponseError as err :
1171- if err .status in (
1172- HTTPStatus .METHOD_NOT_ALLOWED ,
1173- HTTPStatus .NOT_FOUND ,
1174- HTTPStatus .NOT_IMPLEMENTED ,
1175- ):
1176- # NOTE: Legacy Override. Some old services do not have a state entrypoint defined
1177- # therefore we assume there is nothing to be saved and do not raise exception
1178- # Responses found so far:
1179- # METHOD NOT ALLOWED https://httpstatuses.com/405
1180- # NOT FOUND https://httpstatuses.com/404
1181- #
1182- _logger .warning (
1183- "Service '%s' does not seem to implement save state functionality: %s. Skipping save" ,
1184- service_host_name ,
1185- err ,
1186- )
1187- else :
1188- # upss ... could service had troubles saving, reraise
1189- raise
1190- else :
1191- _logger .info (
1192- "Service '%s' successfully saved its state: %s" ,
1157+ except httpx .HTTPStatusError as err :
1158+
1159+ if err .response .status_code in (
1160+ status .HTTP_405_METHOD_NOT_ALLOWED ,
1161+ status .HTTP_404_NOT_FOUND ,
1162+ status .HTTP_501_NOT_IMPLEMENTED ,
1163+ ):
1164+ # NOTE: Legacy Override. Some old services do not have a state entrypoint defined
1165+ # therefore we assume there is nothing to be saved and do not raise exception
1166+ # Responses found so far:
1167+ # METHOD NOT ALLOWED https://httpstatuses.com/405
1168+ # NOT FOUND https://httpstatuses.com/404
1169+ #
1170+ _logger .warning (
1171+ "Service '%s' does not seem to implement save state functionality: %s. Skipping save" ,
11931172 service_host_name ,
1194- f" { response } " ,
1173+ err ,
11951174 )
1175+ else :
1176+ # upss ... could service had troubles saving, reraise
1177+ raise
1178+ else :
1179+ _logger .info (
1180+ "Service '%s' successfully saved its state: %s" ,
1181+ service_host_name ,
1182+ f"{ response } " ,
1183+ )
11961184
11971185
11981186@run_sequentially_in_context (target_args = ["node_uuid" ])
@@ -1246,20 +1234,21 @@ async def stop_service(app: FastAPI, *, node_uuid: str, save_state: bool) -> Non
12461234 await _save_service_state (
12471235 service_host_name , session = get_client_session (app )
12481236 )
1249- except ClientResponseError as err :
1237+ except httpx .HTTPStatusError as err :
1238+
12501239 raise ServiceStateSaveError (
12511240 service_uuid = node_uuid ,
12521241 reason = f"service { service_host_name } rejected to save state, "
1253- f"responded { err .message } (status { err .status } )."
1242+ f"responded { err .response . text } (status { err .response . status_code } )."
12541243 "Aborting stop service to prevent data loss." ,
12551244 ) from err
12561245
1257- except ClientError as err :
1246+ except httpx . RequestError as err :
12581247 _logger .warning (
12591248 "Could not save state because %s is unreachable [%s]."
12601249 "Resuming stop_service." ,
12611250 service_host_name ,
1262- err ,
1251+ err . request ,
12631252 )
12641253
12651254 # remove the services
0 commit comments