Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 63 additions & 26 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ class RobustWebSocket:
_num_reconnections: int = 0
auto_reconnect: bool = True

def __post_init__(self):
def __post_init__(self) -> None:
self._configure_websocket_events()

def _configure_websocket_events(self):
def _configure_websocket_events(self) -> None:
with log_context(
logging.INFO,
msg="handle websocket message (set to --log-cli-level=DEBUG level if you wanna see all of them)",
Expand All @@ -172,10 +172,11 @@ def on_framereceived(payload: str | bytes) -> None:
ctx.logger.debug("⬆️ Frame received: %s", payload)

def on_close(_: WebSocket) -> None:
ctx.logger.warning("⚠️ WebSocket closed.")
if self.auto_reconnect:
ctx.logger.warning("⚠️ WebSocket closed. Attempting to reconnect...")
self._attempt_reconnect(ctx.logger)
else:
ctx.logger.warning("⚠️ WebSocket closed.")

def on_socketerror(error_msg: str) -> None:
ctx.logger.error("❌ WebSocket error: %s", error_msg)
Expand Down Expand Up @@ -448,17 +449,20 @@ def __call__(self, message: str) -> bool:
f"{json.dumps({k: round(v, 2) for k, v in self._current_progress.items()})}",
)

return self.got_expected_node_progress_types() and all(
progress_completed = self.got_expected_node_progress_types() and all(
round(progress, 1) == 1.0
for progress in self._current_progress.values()
)
if progress_completed:
self.logger.info("✅ Service start completed successfully!! ✅")

time_since_last_progress = datetime.now(UTC) - self._last_progress_time
if time_since_last_progress > self.max_idle_timeout:
self.logger.warning(
"⚠️ %s passed since the last received progress message. "
"The service might be stuck, or we missed some messages ⚠️",
"The service %s might be stuck, or we missed some messages ⚠️",
time_since_last_progress,
self.node_id,
)
return True

Expand Down Expand Up @@ -569,6 +573,9 @@ class ServiceRunning:
iframe_locator: FrameLocator | None


_MIN_TIMEOUT_WAITING_FOR_SERVICE_ENDPOINT: Final[int] = 30 * SECOND


@contextlib.contextmanager
def expected_service_running(
*,
Expand All @@ -582,27 +589,42 @@ def expected_service_running(
assertion_output_folder: Path,
) -> Generator[ServiceRunning, None, None]:
started = arrow.utcnow()
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
with contextlib.ExitStack() as stack:
ctx = stack.enter_context(
log_context(
logging.INFO,
msg=f"Waiting for node to run. Timeout: {timeout}",
)
)
if is_service_legacy:
ctx.logger.info(
"⚠️ Legacy service detected. We are skipping websocket messages in this case! ⚠️"
)
else:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
)
stack.enter_context(
websocket.expect_event("framereceived", waiter, timeout=timeout)
)
service_running = ServiceRunning(iframe_locator=None)
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)
if press_start_button:
_trigger_service_start(page, node_id)

yield service_running

yield service_running
elapsed_time = arrow.utcnow() - started

wait_for_service_endpoint_responding(
node_id,
api_request_context=page.request,
product_url=product_url,
is_legacy_service=is_service_legacy,
timeout=min(timeout - int(elapsed_time.total_seconds() * SECOND), 5 * SECOND),
timeout=max(
timeout - int(elapsed_time.total_seconds() * SECOND),
_MIN_TIMEOUT_WAITING_FOR_SERVICE_ENDPOINT,
),
)
service_running.iframe_locator = page.frame_locator(
f'[osparc-test-id="iframe_{node_id}"]'
Expand All @@ -624,23 +646,38 @@ def wait_for_service_running(
In which case this will need further adjutment"""

started = arrow.utcnow()
with log_context(
logging.INFO, msg=f"Waiting for node to run. Timeout: {timeout}"
) as ctx:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
with contextlib.ExitStack() as stack:
ctx = stack.enter_context(
log_context(
logging.INFO,
msg=f"Waiting for node to run. Timeout: {timeout}",
)
)
with websocket.expect_event("framereceived", waiter, timeout=timeout):
if press_start_button:
_trigger_service_start(page, node_id)
if is_service_legacy:
ctx.logger.info(
"⚠️ Legacy service detected. We are skipping websocket messages in this case! ⚠️"
)
else:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
)
stack.enter_context(
websocket.expect_event("framereceived", waiter, timeout=timeout)
)
if press_start_button:
_trigger_service_start(page, node_id)

elapsed_time = arrow.utcnow() - started
wait_for_service_endpoint_responding(
node_id,
api_request_context=page.request,
product_url=product_url,
is_legacy_service=is_service_legacy,
timeout=min(timeout - int(elapsed_time.total_seconds() * SECOND), 5 * SECOND),
timeout=max(
timeout - int(elapsed_time.total_seconds() * SECOND),
_MIN_TIMEOUT_WAITING_FOR_SERVICE_ENDPOINT,
),
)
return page.frame_locator(f'[osparc-test-id="iframe_{node_id}"]')

Expand Down