Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ def setup(cls, logger: logging.Logger) -> None:
_handler = logging.StreamHandler()
_handler.setFormatter(_formatter)
logger.addHandler(_handler)
logger.setLevel(logging.INFO)


test_logger = logging.getLogger(__name__)
Expand All @@ -95,6 +94,7 @@ def setup(cls, logger: logging.Logger) -> None:
_DONE_SUFFIX: Final[str] = " ✅"
_RAISED_PREFIX: Final[str] = "❌❌❌ Error: "
_RAISED_SUFFIX: Final[str] = " ❌❌❌"
_STACK_LEVEL_OFFSET: Final[int] = 3


@dataclass
Expand Down Expand Up @@ -170,25 +170,28 @@ def log_context(
_resolve(ctx_msg.starting, _STARTING_PREFIX, _STARTING_SUFFIX),
*args,
**kwargs,
stacklevel=_STACK_LEVEL_OFFSET,
)
with _increased_logger_indent(logger):
yield SimpleNamespace(logger=logger, messages=ctx_msg)
elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time
done_message = f"{_resolve(ctx_msg.done, _DONE_PREFIX, _DONE_SUFFIX)} ({_timedelta_as_minute_second_ms(elapsed_time)})"
done_message = f"{_resolve(ctx_msg.done, _DONE_PREFIX, _DONE_SUFFIX)} (total time spent: {_timedelta_as_minute_second_ms(elapsed_time)})"
logger.log(
level,
done_message,
*args,
**kwargs,
stacklevel=_STACK_LEVEL_OFFSET,
)

except:
elapsed_time = datetime.datetime.now(tz=datetime.UTC) - started_time
error_message = f"{_resolve(ctx_msg.raised, _RAISED_PREFIX, _RAISED_SUFFIX)} ({_timedelta_as_minute_second_ms(elapsed_time)})"
error_message = f"{_resolve(ctx_msg.raised, _RAISED_PREFIX, _RAISED_SUFFIX)} (total time spent: {_timedelta_as_minute_second_ms(elapsed_time)})"
logger.exception(
error_message,
*args,
**kwargs,
stacklevel=_STACK_LEVEL_OFFSET,
)
raise

Expand Down
200 changes: 95 additions & 105 deletions packages/pytest-simcore/src/pytest_simcore/helpers/playwright.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,20 +265,24 @@ def retrieve_node_progress_from_decoded_message(

@dataclass
class SocketIOProjectClosedWaiter:
logger: logging.Logger

def __call__(self, message: str) -> bool:
# socket.io encodes messages like so
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
if (
(decoded_message.name == _OSparcMessages.PROJECT_STATE_UPDATED.value)
and (decoded_message.obj["data"]["shareState"]["status"] == "CLOSED")
and (decoded_message.obj["data"]["shareState"]["locked"] is False)
):
self.logger.info("project successfully closed")
return True
with log_context(logging.DEBUG, msg=f"handling websocket {message=}") as ctx:
# socket.io encodes messages like so
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
if (
(
decoded_message.name
== _OSparcMessages.PROJECT_STATE_UPDATED.value
)
and (
decoded_message.obj["data"]["shareState"]["status"] == "CLOSED"
)
and (decoded_message.obj["data"]["shareState"]["locked"] is False)
):
ctx.logger.info("project successfully closed")
return True

return False

Expand All @@ -304,42 +308,27 @@ def __call__(self, message: str) -> bool:

@dataclass
class SocketIOWaitNodeForOutputs:
logger: logging.Logger
expected_number_of_outputs: int
node_id: str

def __call__(self, message: str) -> bool:
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
if decoded_message.name == _OSparcMessages.NODE_UPDATED:
assert "data" in decoded_message.obj
assert "node_id" in decoded_message.obj
if decoded_message.obj["node_id"] == self.node_id:
assert "outputs" in decoded_message.obj["data"]

return (
len(decoded_message.obj["data"]["outputs"])
== self.expected_number_of_outputs
)
with log_context(logging.DEBUG, msg=f"handling websocket {message=}"):
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
if decoded_message.name == _OSparcMessages.NODE_UPDATED:
assert "data" in decoded_message.obj
assert "node_id" in decoded_message.obj
if decoded_message.obj["node_id"] == self.node_id:
assert "outputs" in decoded_message.obj["data"]

return (
len(decoded_message.obj["data"]["outputs"])
== self.expected_number_of_outputs
)

return False


@dataclass
class SocketIOOsparcMessagePrinter:
include_logger_messages: bool = False

def __call__(self, message: str) -> None:
osparc_messages = [_.value for _ in _OSparcMessages]
if not self.include_logger_messages:
osparc_messages.pop(osparc_messages.index(_OSparcMessages.LOGGER.value))

if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message: SocketIOEvent = decode_socketio_42_message(message)
if decoded_message.name in osparc_messages:
print("WS Message:", decoded_message.name, decoded_message.obj)


_FAIL_FAST_DYNAMIC_SERVICE_STATES: Final[tuple[str, ...]] = ("idle", "failed")
_SERVICE_ROOT_POINT_STATUS_TIMEOUT: Final[timedelta] = timedelta(seconds=30)

Expand Down Expand Up @@ -416,7 +405,6 @@ def _check_service_endpoint(
@dataclass
class SocketIONodeProgressCompleteWaiter:
node_id: str
logger: logging.Logger
max_idle_timeout: timedelta = _SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT
_current_progress: dict[NodeProgressType, float] = field(
default_factory=defaultdict
Expand All @@ -426,71 +414,75 @@ class SocketIONodeProgressCompleteWaiter:
_result: bool = False

def __call__(self, message: str) -> bool:
# socket.io encodes messages like so
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
self._received_messages.append(decoded_message)
if (
(decoded_message.name == _OSparcMessages.SERVICE_STATUS.value)
and (decoded_message.obj["service_uuid"] == self.node_id)
and (
decoded_message.obj["service_state"]
in _FAIL_FAST_DYNAMIC_SERVICE_STATES
)
):
# NOTE: this is a fail fast for dynamic services that fail to start
self.logger.error(
"❌ node %s failed with state %s, failing fast ❌",
with log_context(logging.DEBUG, msg=f"handling websocket {message=}") as ctx:
# socket.io encodes messages like so
# https://stackoverflow.com/questions/24564877/what-do-these-numbers-mean-in-socket-io-payload
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message = decode_socketio_42_message(message)
self._received_messages.append(decoded_message)
if (
(decoded_message.name == _OSparcMessages.SERVICE_STATUS.value)
and (decoded_message.obj["service_uuid"] == self.node_id)
and (
decoded_message.obj["service_state"]
in _FAIL_FAST_DYNAMIC_SERVICE_STATES
)
):
# NOTE: this is a fail fast for dynamic services that fail to start
ctx.logger.error(
"❌ node %s failed with state %s, failing fast ❌",
self.node_id,
decoded_message.obj["service_state"],
)
self._result = False
return True
if decoded_message.name == _OSparcMessages.NODE_PROGRESS.value:
node_progress_event = retrieve_node_progress_from_decoded_message(
decoded_message
)
if node_progress_event.node_id == self.node_id:
new_progress = (
node_progress_event.current_progress
/ node_progress_event.total_progress
)
self._last_progress_time = datetime.now(UTC)
if (
node_progress_event.progress_type
not in self._current_progress
) or (
new_progress
!= self._current_progress[node_progress_event.progress_type]
):
self._current_progress[
node_progress_event.progress_type
] = new_progress

ctx.logger.info(
"Current startup progress [expected %d types]: %s",
len(
NodeProgressType.required_types_for_started_service()
),
f"{json.dumps({k: round(v, 2) for k, v in self._current_progress.items()})}",
)

done = self._completed_successfully()
if done:
self._result = True # NOTE: might have failed but it is not sure. so we set the result to True
ctx.logger.info("✅ Service start completed successfully!! ✅")
return done

time_since_last_progress = datetime.now(UTC) - self._last_progress_time
if time_since_last_progress > self.max_idle_timeout:
ctx.logger.warning(
"⚠️ %s passed since the last received progress message. "
"The service %s might be stuck, or we missed some messages ⚠️",
time_since_last_progress,
self.node_id,
decoded_message.obj["service_state"],
)
self._result = False
self._result = True
return True
if decoded_message.name == _OSparcMessages.NODE_PROGRESS.value:
node_progress_event = retrieve_node_progress_from_decoded_message(
decoded_message
)
if node_progress_event.node_id == self.node_id:
new_progress = (
node_progress_event.current_progress
/ node_progress_event.total_progress
)
self._last_progress_time = datetime.now(UTC)
if (
node_progress_event.progress_type not in self._current_progress
) or (
new_progress
!= self._current_progress[node_progress_event.progress_type]
):
self._current_progress[node_progress_event.progress_type] = (
new_progress
)

self.logger.info(
"Current startup progress [expected %d types]: %s",
len(NodeProgressType.required_types_for_started_service()),
f"{json.dumps({k: round(v, 2) for k, v in self._current_progress.items()})}",
)

done = self._completed_successfully()
if done:
self._result = True # NOTE: might have failed but it is not sure. so we set the result to True
self.logger.info("✅ Service start completed successfully!! ✅")
return done

time_since_last_progress = datetime.now(UTC) - self._last_progress_time
if time_since_last_progress > self.max_idle_timeout:
self.logger.warning(
"⚠️ %s passed since the last received progress message. "
"The service %s might be stuck, or we missed some messages ⚠️",
time_since_last_progress,
self.node_id,
)
self._result = True
return True

return False
return False

def _completed_successfully(self) -> bool:
return all(
Expand Down Expand Up @@ -631,7 +623,6 @@ def expected_service_running(
else:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
max_idle_timeout=min(
_SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT,
timedelta(seconds=timeout / 1000 - 10),
Expand Down Expand Up @@ -693,7 +684,6 @@ def wait_for_service_running(
else:
waiter = SocketIONodeProgressCompleteWaiter(
node_id=node_id,
logger=ctx.logger,
max_idle_timeout=min(
_SOCKET_IO_NODE_PROGRESS_WAITER_MAX_IDLE_TIMEOUT,
timedelta(seconds=timeout / 1000 - 10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,46 @@ def __call__(self, new_websocket: WebSocket) -> bool:
@dataclass(kw_only=True)
class _S4LSocketIOCheckBitRateIncreasesMessagePrinter:
min_waiting_time_before_checking_bitrate: datetime.timedelta
logger: logging.Logger
_initial_bit_rate: float = 0
_initial_bit_rate_time: datetime.datetime = arrow.utcnow().datetime

def __call__(self, message: str) -> bool:
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message: SocketIOEvent = decode_socketio_42_message(message)
if (
decoded_message.name == "server.video_stream.bitrate_data"
and "bitrate" in decoded_message.obj
):
current_bit_rate = decoded_message.obj["bitrate"]
if self._initial_bit_rate == 0:
self._initial_bit_rate = current_bit_rate
self._initial_bit_rate_time = arrow.utcnow().datetime
self.logger.info(
"%s",
f"{TypeAdapter(ByteSize).validate_python(self._initial_bit_rate).human_readable()}/s at {self._initial_bit_rate_time.isoformat()}",
)
return False

# NOTE: MaG says the value might also go down, but it shall definitely change,
# if this code proves unsafe we should change it.
if "bitrate" in decoded_message.obj:
self.logger.info(
"bitrate: %s",
f"{TypeAdapter(ByteSize).validate_python(current_bit_rate).human_readable()}/s",
)
elapsed_time = arrow.utcnow().datetime - self._initial_bit_rate_time
with log_context(logging.DEBUG, msg=f"handling websocket {message=}") as ctx:
if message.startswith(SOCKETIO_MESSAGE_PREFIX):
decoded_message: SocketIOEvent = decode_socketio_42_message(message)
if (
elapsed_time > self.min_waiting_time_before_checking_bitrate
decoded_message.name == "server.video_stream.bitrate_data"
and "bitrate" in decoded_message.obj
):
current_bit_rate = decoded_message.obj["bitrate"]
bitrate_test = bool(self._initial_bit_rate != current_bit_rate)
self.logger.info(
"%s",
f"{TypeAdapter(ByteSize).validate_python(current_bit_rate).human_readable()}/s after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}",
)
return bitrate_test
if self._initial_bit_rate == 0:
self._initial_bit_rate = current_bit_rate
self._initial_bit_rate_time = arrow.utcnow().datetime
ctx.logger.info(
"%s",
f"{TypeAdapter(ByteSize).validate_python(self._initial_bit_rate).human_readable()}/s at {self._initial_bit_rate_time.isoformat()}",
)
return False

# NOTE: MaG says the value might also go down, but it shall definitely change,
# if this code proves unsafe we should change it.
if "bitrate" in decoded_message.obj:
ctx.logger.info(
"bitrate: %s",
f"{TypeAdapter(ByteSize).validate_python(current_bit_rate).human_readable()}/s",
)
elapsed_time = arrow.utcnow().datetime - self._initial_bit_rate_time
if (
elapsed_time > self.min_waiting_time_before_checking_bitrate
and "bitrate" in decoded_message.obj
):
current_bit_rate = decoded_message.obj["bitrate"]
bitrate_test = bool(self._initial_bit_rate != current_bit_rate)
ctx.logger.info(
"%s",
f"{TypeAdapter(ByteSize).validate_python(current_bit_rate).human_readable()}/s after {elapsed_time=}: {'good!' if bitrate_test else 'failed! bitrate did not change! TIP: talk with MaG about underwater cables!'}",
)
return bitrate_test

return False

Expand Down Expand Up @@ -158,12 +158,11 @@ def check_video_streaming(
_S4L_STREAMING_ESTABLISHMENT_MIN_WAITING_TIME
< _S4L_STREAMING_ESTABLISHMENT_MAX_TIME
)
with log_context(logging.INFO, "Check videostreaming works") as ctx:
with log_context(logging.INFO, "Check videostreaming works"):
waiter = _S4LSocketIOCheckBitRateIncreasesMessagePrinter(
min_waiting_time_before_checking_bitrate=datetime.timedelta(
milliseconds=_S4L_STREAMING_ESTABLISHMENT_MIN_WAITING_TIME,
),
logger=ctx.logger,
)
with s4l_websocket.expect_event(
"framereceived",
Expand Down
Loading
Loading