Skip to content

Commit 2f40cea

Browse files
committed
fix timeout
1 parent 50710a8 commit 2f40cea

File tree

2 files changed

+107
-39
lines changed

2 files changed

+107
-39
lines changed

services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py

Lines changed: 51 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import aiofiles
1212
import aiofiles.tempfile
13+
import aiohttp
1314
import arrow
1415
from aiodocker import Docker, DockerError
1516
from aiodocker.containers import DockerContainer
@@ -268,42 +269,57 @@ async def _parse_container_docker_logs(
268269
with log_context(
269270
logger, logging.DEBUG, "started monitoring of >=1.0 service - using docker logs"
270271
):
271-
async with aiofiles.tempfile.TemporaryDirectory() as tmp_dir:
272-
log_file_path = (
273-
Path(tmp_dir)
274-
/ f"{service_key.split(sep='/')[-1]}_{service_version}.logs"
272+
assert isinstance(container.docker.connector, aiohttp.UnixConnector) # nosec
273+
async with Docker(
274+
session=aiohttp.ClientSession(
275+
connector=aiohttp.UnixConnector(container.docker.connector.path),
276+
timeout=aiohttp.ClientTimeout(total=_AIODOCKER_LOGS_TIMEOUT_S),
275277
)
276-
log_file_path.parent.mkdir(parents=True, exist_ok=True)
277-
async with aiofiles.open(log_file_path, mode="wb+") as log_fp:
278-
async for log_line in cast(
279-
AsyncGenerator[str, None],
280-
container.log(
281-
stdout=True,
282-
stderr=True,
283-
follow=True,
284-
timestamps=True,
285-
timeout=_AIODOCKER_LOGS_TIMEOUT_S,
286-
),
287-
):
288-
log_msg_without_timestamp = log_line.split(" ", maxsplit=1)[1]
289-
logger.info(
290-
"[%s]: %s",
291-
f"{service_key}:{service_version} - {container.id}{container_name}",
292-
log_msg_without_timestamp,
293-
)
294-
await log_fp.write(log_line.encode("utf-8"))
295-
# NOTE: here we remove the timestamp, only needed for the file
296-
await _parse_and_publish_logs(
297-
log_msg_without_timestamp,
298-
task_publishers=task_publishers,
299-
progress_regexp=progress_regexp,
300-
progress_bar=progress_bar,
301-
)
302-
303-
# copy the log file to the log_file_url
304-
await push_file_to_remote(
305-
log_file_path, log_file_url, log_publishing_cb, s3_settings
278+
) as docker_client_for_logs:
279+
# NOTE: this is a workaround for aiodocker not being able to get the container
280+
# logs when the container is not running
281+
container_for_long_running_logs = (
282+
await docker_client_for_logs.containers.get(container.id)
306283
)
284+
# NOTE: this is a workaround for aiodocker not being able to get the container
285+
# logs when the container is not running
286+
await container.show()
287+
await container_for_long_running_logs.show()
288+
async with aiofiles.tempfile.TemporaryDirectory() as tmp_dir:
289+
log_file_path = (
290+
Path(tmp_dir)
291+
/ f"{service_key.split(sep='/')[-1]}_{service_version}.logs"
292+
)
293+
log_file_path.parent.mkdir(parents=True, exist_ok=True)
294+
async with aiofiles.open(log_file_path, mode="wb+") as log_fp:
295+
async for log_line in cast(
296+
AsyncGenerator[str, None],
297+
container_for_long_running_logs.log(
298+
stdout=True,
299+
stderr=True,
300+
follow=True,
301+
timestamps=True,
302+
),
303+
):
304+
log_msg_without_timestamp = log_line.split(" ", maxsplit=1)[1]
305+
logger.info(
306+
"[%s]: %s",
307+
f"{service_key}:{service_version} - {container_for_long_running_logs.id}{container_name}",
308+
log_msg_without_timestamp,
309+
)
310+
await log_fp.write(log_line.encode("utf-8"))
311+
# NOTE: here we remove the timestamp, only needed for the file
312+
await _parse_and_publish_logs(
313+
log_msg_without_timestamp,
314+
task_publishers=task_publishers,
315+
progress_regexp=progress_regexp,
316+
progress_bar=progress_bar,
317+
)
318+
319+
# copy the log file to the log_file_url
320+
await push_file_to_remote(
321+
log_file_path, log_file_url, log_publishing_cb, s3_settings
322+
)
307323

308324

309325
async def _monitor_container_logs( # noqa: PLR0913 # pylint: disable=too-many-arguments
@@ -325,6 +341,7 @@ async def _monitor_container_logs( # noqa: PLR0913 # pylint: disable=too-many-a
325341
Services above are not creating a file and use the usual docker logging. These logs
326342
are retrieved using the usual cli 'docker logs CONTAINERID'
327343
"""
344+
328345
with log_catch(logger, reraise=False):
329346
container_info = await container.show()
330347
container_name = container_info.get("Name", "undefined")

services/dask-sidecar/tests/unit/test_tasks.py

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,13 @@ def sleeper_task(
208208
"the_input_43": 15.0,
209209
"the_bool_input_54": False,
210210
**{
211-
f"some_file_input_{index+1}": FileUrl(url=file)
211+
f"some_file_input_{index + 1}": FileUrl(url=file)
212212
for index, file in enumerate(list_of_files)
213213
},
214214
**{
215-
f"some_file_input_with_mapping{index+1}": FileUrl(
215+
f"some_file_input_with_mapping{index + 1}": FileUrl(
216216
url=file,
217-
file_mapping=f"{index+1}/some_file_input_{index+1}",
217+
file_mapping=f"{index + 1}/some_file_input_{index + 1}",
218218
)
219219
for index, file in enumerate(list_of_files)
220220
},
@@ -241,7 +241,7 @@ def sleeper_task(
241241
)
242242
list_of_bash_commands += _bash_check_env_exist(
243243
variable_name="SIMCORE_NANO_CPUS_LIMIT",
244-
variable_value=f"{int(_DEFAULT_MAX_RESOURCES['CPU']*1e9)}",
244+
variable_value=f"{int(_DEFAULT_MAX_RESOURCES['CPU'] * 1e9)}",
245245
)
246246
list_of_bash_commands += _bash_check_env_exist(
247247
variable_name="SIMCORE_MEMORY_BYTES_LIMIT",
@@ -268,7 +268,7 @@ def sleeper_task(
268268
f"echo '{faker.text(max_nb_chars=17216)}'",
269269
f"(test -f ${{INPUT_FOLDER}}/{input_json_file_name} || (echo ${{INPUT_FOLDER}}/{input_json_file_name} file does not exists && exit 1))",
270270
f"echo $(cat ${{INPUT_FOLDER}}/{input_json_file_name})",
271-
f"sleep {randint(1,4)}", # noqa: S311
271+
f"sleep {randint(1, 4)}", # noqa: S311
272272
]
273273

274274
# defines the expected outputs
@@ -435,6 +435,13 @@ def sleeper_task_unexpected_output(
435435
return sleeper_task
436436

437437

438+
@pytest.fixture
439+
def hugely_talkative_ubuntu_task(
440+
sidecar_task: Callable[..., ServiceExampleParam],
441+
) -> ServiceExampleParam:
442+
return sidecar_task(command=["/bin/bash", "-c", "some stupid failing command"])
443+
444+
438445
@pytest.fixture()
439446
def caplog_info_level(
440447
caplog: pytest.LogCaptureFixture,
@@ -709,3 +716,47 @@ def test_running_service_that_generates_unexpected_data_raises_exception(
709716
run_computational_sidecar(
710717
**sleeper_task_unexpected_output.sidecar_params(),
711718
)
719+
720+
721+
@pytest.mark.parametrize(
722+
"integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True
723+
)
724+
def test_delayed_logging_with_small_timeout_raises_exception(
725+
caplog: pytest.LogCaptureFixture,
726+
app_environment: EnvVarsDict,
727+
dask_subsystem_mock: dict[str, mock.Mock],
728+
sidecar_task: Callable[..., ServiceExampleParam],
729+
mocked_get_image_labels: mock.Mock,
730+
mocker: MockerFixture,
731+
):
732+
"""https://github.com/aio-libs/aiodocker/issues/901"""
733+
# Mock the timeout with a very small value
734+
mocker.patch(
735+
"simcore_service_dask_sidecar.computational_sidecar.docker_utils._AIODOCKER_LOGS_TIMEOUT_S",
736+
0.5, # Small timeout that should cause failure
737+
)
738+
739+
# Configure the task to sleep first and then generate logs
740+
waiting_task = sidecar_task(
741+
command=[
742+
"/bin/bash",
743+
"-c",
744+
'echo "Starting task"; sleep 5; echo "After sleep"',
745+
]
746+
)
747+
748+
# Execute the task and expect a timeout exception in the logs
749+
with caplog.at_level(logging.ERROR, logger="simcore_service_dask_sidecar"):
750+
run_computational_sidecar(**waiting_task.sidecar_params())
751+
assert len(caplog.records) == 1
752+
record = caplog.records[0]
753+
assert record.exc_info
754+
assert isinstance(record.exc_info[1], TimeoutError)
755+
caplog.clear()
756+
mocker.patch(
757+
"simcore_service_dask_sidecar.computational_sidecar.docker_utils._AIODOCKER_LOGS_TIMEOUT_S",
758+
10, # larger timeout to avoid issues
759+
)
760+
with caplog.at_level(logging.ERROR, logger="simcore_service_dask_sidecar"):
761+
run_computational_sidecar(**waiting_task.sidecar_params())
762+
assert len(caplog.records) == 0

0 commit comments

Comments
 (0)