Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests-e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ jobs:
NEPTUNE_QUERY_RETRY_SOFT_TIMEOUT: "120"
TEST_MARKERS: ${{ matrix.test_markers }}
run: |
pytest --retries=3 --retry-delay=2 --junitxml="test-results/test-e2e.xml" ${TEST_MARKERS:+-m "$TEST_MARKERS"} tests/e2e
pytest -vx tests/e2e/test_connectivity.py &&
pytest --retries=1 --retry-delay=2 --junitxml="test-results/test-e2e.xml" ${TEST_MARKERS:+-m "$TEST_MARKERS"} tests/e2e

- name: Notify on failure
if: failure() && github.event_name == 'schedule'
Expand Down
64 changes: 63 additions & 1 deletion tests/e2e/data_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
import neptune_scale.types
from neptune_api import AuthenticatedClient

from neptune_query.internal.filters import (
_Attribute,
_Filter,
)
from neptune_query.internal.identifiers import (
ProjectIdentifier,
SysId,
)
from neptune_query.internal.retrieval import search

IngestionHistogram = neptune_scale.types.Histogram
IngestionFile = neptune_scale.types.File

Expand Down Expand Up @@ -143,6 +153,8 @@ def ingest_project(
project_data=project_data,
)

_wait_for_ingestion(client=client, project_identifier=project_identifier, expected_data=project_data)

return IngestedProjectData(
project_identifier=project_identifier,
ingested_runs=[
Expand Down Expand Up @@ -222,7 +234,57 @@ def _ingest_runs(runs_data: list[RunData], api_token: str, project_identifier: s
for run in runs:
run.close()

sleep(2) # Extra wait to ensure data is available to query before proceeding

def _wait_for_ingestion(
client: AuthenticatedClient, project_identifier: ProjectIdentifier, expected_data: ProjectData
) -> None:
def fetch_sys_ids(attribute_name: str, attribute_value: str) -> list[SysId]:
sys_ids: list[SysId] = []
for page in search.fetch_run_sys_ids(
client=client,
project_identifier=project_identifier,
filter_=_Filter.eq(_Attribute(attribute_name, type="string"), attribute_value),
):
for item in page.items:
sys_ids.append(item)
return sys_ids

all_runs = expected_data.runs
run_ids = [run.run_id for run in all_runs if run.run_id is not None]
experiment_names = [run.experiment_name for run in all_runs if run.experiment_name is not None]

for attempt in range(24):
found_runs = 0
found_experiments = 0

for run_id in run_ids:
sys_ids_by_run_id = fetch_sys_ids(attribute_name="sys/custom_run_id", attribute_value=run_id)
if len(sys_ids_by_run_id) > 1:
raise RuntimeError(f"Expected exactly one sys_id for run_id {run_id}, got {sys_ids_by_run_id}")
if len(sys_ids_by_run_id) == 1:
found_runs += 1

for experiment_name in experiment_names:
sys_ids_by_experiment_name = fetch_sys_ids(attribute_name="sys/name", attribute_value=experiment_name)
if len(sys_ids_by_experiment_name) > 1:
raise RuntimeError(
f"Expected exactly one sys_id for experiment_name {experiment_name}, "
f"got {sys_ids_by_experiment_name}"
)
if len(sys_ids_by_experiment_name) == 1:
found_experiments += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a single API call made with search.fetch_run_sys_attrs, without a filter. We don't need to check experiments separately.


# Extra wait to ensure data is available to query before proceeding
sleep(2)

if found_runs == len(run_ids) and found_experiments == len(experiment_names):
return

raise RuntimeError(
f"Timed out waiting for data ingestion. "
f"Found runs: {found_runs} out of expected: {len(run_ids)}. "
f"Found experiments: {found_experiments} out of expected: {len(experiment_names)}."
)


def _get_all_steps(run_data: RunData) -> Iterable[float]:
Expand Down
28 changes: 28 additions & 0 deletions tests/e2e/test_connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from tests.e2e.conftest import EnsureProjectFunction
from tests.e2e.data_ingestion import (
IngestedProjectData,
ProjectData,
RunData,
)


@pytest.fixture(scope="module")
def project_1(ensure_project: EnsureProjectFunction) -> IngestedProjectData:
return ensure_project(
project_data=ProjectData(
runs=[
RunData(
experiment_name="test_connectivity_experiment",
run_id="test_connectivity_run",
configs={"im_alive": 1},
),
]
)
)


def test_connectivity(project_1: IngestedProjectData) -> None:
"""A placeholder test to ensure connectivity to the Neptune server"""
assert True
Loading