Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions compute_endpoint/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"coverage>=5.2",
"pytest-mock==3.2.0",
"pyfakefs<5.9.2", # 5.9.2 (Jul 30, 2025), breaks us; retry after 6.0.0 lands?
"pytest-random-order",
]


Expand Down
17 changes: 11 additions & 6 deletions compute_endpoint/tests/unit/test_execute_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import random
from itertools import chain
from unittest import mock

import pytest
Expand Down Expand Up @@ -69,14 +70,13 @@ def test_bad_run_dir(endpoint_uuid, task_uuid, run_dir):
execute_task(task_uuid, b"", endpoint_uuid, run_dir=None)


def test_happy_path(serde, caplog, task_uuid, ez_pack_task, execute_task_runner):
def test_happy_path(serde, mock_log, task_uuid, ez_pack_task, execute_task_runner):
out = random.randint(1, 100_000)
divisor = random.randint(1, 100_000)

task_bytes = ez_pack_task(divide, divisor * out, divisor)

with caplog.at_level(logging.DEBUG):
packed_result = execute_task_runner(task_bytes)
packed_result = execute_task_runner(task_bytes)
assert isinstance(packed_result, bytes)

result = messagepack.unpack(packed_result)
Expand All @@ -89,7 +89,12 @@ def test_happy_path(serde, caplog, task_uuid, ez_pack_task, execute_task_runner)
assert "endpoint_id" in result.details
assert serde.deserialize(result.data) == out

log_msgs = "\n".join(r.msg % r.args for r in caplog.records)
log_recs = []
for a, _ in chain(mock_log.info.call_args_list, mock_log.debug.call_args_list):
fmt, *a = a
log_recs.append(a and fmt % tuple(a) or fmt)
log_msgs = "\n".join(log_recs)

assert "Preparing to execute" in log_msgs, "Expect log clue of progress"
assert "Unpacking" in log_msgs, "Expect log clue of progress"
assert "Deserializing function" in log_msgs, "Expect log to clue of progress"
Expand All @@ -98,8 +103,8 @@ def test_happy_path(serde, caplog, task_uuid, ez_pack_task, execute_task_runner)
assert "Task function complete" in log_msgs, "Expect log clue of progress"
assert "Execution completed" in log_msgs, "Expect log clue of progress"
assert "Task processing completed in" in log_msgs, "Expect log clue of progress"
assert len(caplog.records) == 7, "Time to update test?"
assert log_msgs.count(str(task_uuid)) == len(caplog.records), "Expect id prefixed"
assert len(log_recs) == 7, "Time to update test?"
assert log_msgs.count(str(task_uuid)) == len(log_recs), "Expect id always prefixed"


def test_sandbox(task_10_2, execute_task_runner, task_uuid, tmp_path):
Expand Down
2 changes: 1 addition & 1 deletion compute_endpoint/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ extras = test
usedevelop = true
commands =
coverage erase
coverage run -m pytest --durations 5 --log-cli-level=ERROR {posargs}
coverage run -m pytest --random-order --durations 5 --log-cli-level=ERROR {posargs}
coverage report

[testenv:mypy]
Expand Down
2 changes: 2 additions & 0 deletions compute_sdk/globus_compute_sdk/sdk/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1447,10 +1447,12 @@ def run(self):
fut.set_exception(self._cancellation_reason)
log.debug("Cancelled: %s", fut.task_id)
self._open_futures_empty.set()
self.shutdown()
log.debug("%r AMQP thread complete.", self)

def shutdown(self, wait=True, *, cancel_futures=False):
if not self.is_alive():
_RESULT_WATCHERS.pop(self.task_group_id, None)
return

self._closed = True # No more futures will be accepted
Expand Down
1 change: 1 addition & 0 deletions compute_sdk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"flake8==3.8.0",
"pytest>=7.2",
"pytest-mock",
"pytest-random-order",
"pyfakefs",
"coverage",
# easy mocking of the `requests` library
Expand Down
8 changes: 7 additions & 1 deletion compute_sdk/tests/unit/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def __init__(self, *args, **kwargs):
}
kwargs.setdefault("client", mock_client)
super().__init__(*args, **kwargs)
Executor._default_task_group_id = None # Reset for each test
self._test_task_submitter_exception: t.Type[Exception] | None = None
self._test_task_submitter_done = False

Expand Down Expand Up @@ -112,6 +111,11 @@ def join(self, timeout: float | None = None) -> None:
super().join(timeout=timeout)


@pytest.fixture(autouse=True)
def reset_default_executor_id():
Executor._default_task_group_id = None # Reset for each test


@pytest.fixture
def mock_result_watcher(mocker: MockerFixture):
rw = mocker.patch(f"{_MOCK_BASE}_ResultWatcher", autospec=True)
Expand Down Expand Up @@ -1581,6 +1585,7 @@ def test_resultwatcher_onmessage_verifies_result_type(mocker, unpacked):
mrw._on_message(mock_channel, mock_deliver, mock_props, b"some_bytes")
mock_channel.basic_nack.assert_called()
assert not mrw._received_results
mrw.shutdown()


def test_resultwatcher_onmessage_sets_check_results_flag():
Expand All @@ -1594,6 +1599,7 @@ def test_resultwatcher_onmessage_sets_check_results_flag():
mock_channel.basic_nack.assert_not_called()
assert mrw._received_results
assert mrw._time_to_check_results.is_set()
mrw.shutdown()


@pytest.mark.parametrize("exc", (MemoryError("some description"), "some description"))
Expand Down
2 changes: 1 addition & 1 deletion compute_sdk/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ usedevelop = true
extras = test
commands =
coverage erase
coverage run -m pytest --durations 5 {posargs}
coverage run -m pytest --durations 5 --random-order {posargs}
coverage report --skip-covered

[testenv:mypy]
Expand Down
Loading