33import logging
44import re
55import socket
6+ from collections .abc import AsyncGenerator , AsyncIterator , Awaitable , Callable
67from pathlib import Path
78from pprint import pformat
8- from typing import Any , AsyncGenerator , AsyncIterator , Awaitable , Callable , cast
9+ from typing import Any , Final , cast
910
1011import aiofiles
1112import aiofiles .tempfile
@@ -151,15 +152,20 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float:
151152 return float (value_str .strip ())
152153
153154
155+ _OSPARC_LOG_NUM_PARTS : Final [int ] = 2
156+
157+
154158async def _try_parse_progress (
155159 line : str , * , progress_regexp : re .Pattern [str ]
156160) -> float | None :
157161 with log_catch (logger , reraise = False ):
158162 # pattern might be like "timestamp log"
159163 log = line .strip ("\n " )
160164 splitted_log = log .split (" " , maxsplit = 1 )
161- with contextlib .suppress (arrow .ParserError ):
162- if len (splitted_log ) == 2 and arrow .get (splitted_log [0 ]):
165+ with contextlib .suppress (arrow .ParserError , ValueError ):
166+ if len (splitted_log ) == _OSPARC_LOG_NUM_PARTS and arrow .get (
167+ splitted_log [0 ]
168+ ):
163169 log = splitted_log [1 ]
164170 if match := re .search (progress_regexp , log ):
165171 return _guess_progress_value (match )
@@ -172,19 +178,23 @@ async def _parse_and_publish_logs(
172178 * ,
173179 task_publishers : TaskPublisher ,
174180 progress_regexp : re .Pattern [str ],
181+ container_processing_progress_weight : float ,
175182) -> None :
176183 progress_value = await _try_parse_progress (
177184 log_line , progress_regexp = progress_regexp
178185 )
186+ assert 0 < container_processing_progress_weight <= 1.0 # nosec # noqa: PLR2004
179187 if progress_value is not None :
180- task_publishers .publish_progress (progress_value )
188+ task_publishers .publish_progress (
189+ container_processing_progress_weight * progress_value
190+ )
181191
182192 task_publishers .publish_logs (
183193 message = log_line , log_level = guess_message_log_level (log_line )
184194 )
185195
186196
187- async def _parse_container_log_file (
197+ async def _parse_container_log_file ( # noqa: PLR0913 # pylint: disable=too-many-arguments
188198 * ,
189199 container : DockerContainer ,
190200 progress_regexp : re .Pattern [str ],
@@ -196,6 +206,7 @@ async def _parse_container_log_file(
196206 log_file_url : LogFileUploadURL ,
197207 log_publishing_cb : LogPublishingCB ,
198208 s3_settings : S3Settings | None ,
209+ max_monitoring_progress_value : float ,
199210) -> None :
200211 log_file = task_volumes .logs_folder / LEGACY_SERVICE_LOG_FILE_NAME
201212 with log_context (
@@ -215,6 +226,7 @@ async def _parse_container_log_file(
215226 line ,
216227 task_publishers = task_publishers ,
217228 progress_regexp = progress_regexp ,
229+ container_processing_progress_weight = max_monitoring_progress_value ,
218230 )
219231
220232 # finish reading the logs if possible
@@ -228,6 +240,7 @@ async def _parse_container_log_file(
228240 line ,
229241 task_publishers = task_publishers ,
230242 progress_regexp = progress_regexp ,
243+ container_processing_progress_weight = max_monitoring_progress_value ,
231244 )
232245
233246 # copy the log file to the log_file_url
@@ -247,6 +260,7 @@ async def _parse_container_docker_logs(
247260 log_file_url : LogFileUploadURL ,
248261 log_publishing_cb : LogPublishingCB ,
249262 s3_settings : S3Settings | None ,
263+ container_processing_progress_weight : float ,
250264) -> None :
251265 with log_context (
252266 logger , logging .DEBUG , "started monitoring of >=1.0 service - using docker logs"
@@ -276,6 +290,7 @@ async def _parse_container_docker_logs(
276290 log_msg_without_timestamp ,
277291 task_publishers = task_publishers ,
278292 progress_regexp = progress_regexp ,
293+ container_processing_progress_weight = container_processing_progress_weight ,
279294 )
280295
281296 # copy the log file to the log_file_url
@@ -284,7 +299,7 @@ async def _parse_container_docker_logs(
284299 )
285300
286301
287- async def _monitor_container_logs (
302+ async def _monitor_container_logs ( # noqa: PLR0913 # pylint: disable=too-many-arguments
288303 * ,
289304 container : DockerContainer ,
290305 progress_regexp : re .Pattern [str ],
@@ -296,6 +311,7 @@ async def _monitor_container_logs(
296311 log_file_url : LogFileUploadURL ,
297312 log_publishing_cb : LogPublishingCB ,
298313 s3_settings : S3Settings | None ,
314+ container_processing_progress_weight : float ,
299315) -> None :
300316 """Services running with integration version 0.0.0 are logging into a file
301317 that must be available in task_volumes.log / log.dat
@@ -321,6 +337,7 @@ async def _monitor_container_logs(
321337 log_file_url = log_file_url ,
322338 log_publishing_cb = log_publishing_cb ,
323339 s3_settings = s3_settings ,
340+ container_processing_progress_weight = container_processing_progress_weight ,
324341 )
325342 else :
326343 await _parse_container_log_file (
@@ -334,11 +351,12 @@ async def _monitor_container_logs(
334351 log_file_url = log_file_url ,
335352 log_publishing_cb = log_publishing_cb ,
336353 s3_settings = s3_settings ,
354+ max_monitoring_progress_value = container_processing_progress_weight ,
337355 )
338356
339357
340358@contextlib .asynccontextmanager
341- async def managed_monitor_container_log_task (
359+ async def managed_monitor_container_log_task ( # noqa: PLR0913 # pylint: disable=too-many-arguments
342360 container : DockerContainer ,
343361 progress_regexp : re .Pattern [str ],
344362 service_key : ContainerImage ,
@@ -349,6 +367,7 @@ async def managed_monitor_container_log_task(
349367 log_file_url : LogFileUploadURL ,
350368 log_publishing_cb : LogPublishingCB ,
351369 s3_settings : S3Settings | None ,
370+ container_processing_progress_weight : float ,
352371) -> AsyncIterator [Awaitable [None ]]:
353372 monitoring_task = None
354373 try :
@@ -369,6 +388,7 @@ async def managed_monitor_container_log_task(
369388 log_file_url = log_file_url ,
370389 log_publishing_cb = log_publishing_cb ,
371390 s3_settings = s3_settings ,
391+ container_processing_progress_weight = container_processing_progress_weight ,
372392 ),
373393 name = f"{ service_key } :{ service_version } _{ container .id } _monitoring_task" ,
374394 )
0 commit comments