diff --git a/.github/workflows/tests-performance-client-e2e.yml b/.github/workflows/tests-performance-client-e2e.yml new file mode 100644 index 00000000..91cd1e9e --- /dev/null +++ b/.github/workflows/tests-performance-client-e2e.yml @@ -0,0 +1,64 @@ +name: Client e2e performance tests + +permissions: + checks: write + +on: + schedule: + - cron: '0 7 * * *' # Run at 7:00 daily + workflow_dispatch: + inputs: + test_mode: + description: Test mode (use baseline_discovery to run without timeouts on test cases) + required: true + type: choice + options: + - normal + - baseline_discovery + default: normal + timeout_tolerance: + description: Timeout tolerance in as float; e.g. 1.2 means 20% tolerance + required: false + default: 1.1 + +jobs: + test: + name: 'Performance tests' + runs-on: gcp-perf-test-dedicated-big + container: + image: python:3.13-trixie + timeout-minutes: 120 + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install dependencies + run: | + chown root:root . && + python -m pip install --upgrade pip && + pip install -r dev_requirements.txt + + - name: Run performance tests + env: + NEPTUNE_PERFORMANCE_LOG_FILE: "test-results/test-client-e2e-performance.log" + NEPTUNE_PERFORMANCE_TEST_MODE: ${{ inputs.test_mode || 'normal' }} + NEPTUNE_PERFORMANCE_TEST_TOLERANCE_FACTOR: 1.1 + run: | + pytest --junitxml="test-results/test-client-e2e-performance.xml" --log-cli-level=INFO --durations=0 tests/performance_e2e + + - uses: actions/upload-artifact@v4 + with: + name: test-client-e2e-performance.log + path: test-results/test-client-e2e-performance.log + + - name: Report + uses: mikepenz/action-junit-report@v5 + if: always() + with: + check_name: 'Performance tests report' + report_paths: "./test-results/test-client-e2e-performance.xml" + detailed_summary: true + verbose_summary: true + include_passed: true + include_time_in_summary: true + resolve_ignore_classname: true diff --git a/dev_requirements.txt b/dev_requirements.txt index 564b58cf..1d9561b4 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -10,4 +10,8 @@ pytest-timeout pytest-xdist icecream git+https://github.com/neptune-ai/neptune-client-scale.git@main#egg=neptune-scale +fastapi == 0.116.2 +uvicorn == 0.35.0 +humanize == 4.13.0 +colorlog == 6.9.0 hypothesis==6.139.2 diff --git a/src/neptune_query/internal/composition/fetch_metrics.py b/src/neptune_query/internal/composition/fetch_metrics.py index fecf2b41..c5385ba9 100644 --- a/src/neptune_query/internal/composition/fetch_metrics.py +++ b/src/neptune_query/internal/composition/fetch_metrics.py @@ -23,8 +23,8 @@ import pandas as pd from neptune_api.client import AuthenticatedClient +from .. import client as _client from .. import identifiers -from ..client import get_client from ..composition import ( concurrency, type_inference, @@ -74,7 +74,7 @@ def fetch_metrics( restricted_attributes = validation.restrict_attribute_filter_type(attributes, type_in={"float_series"}) valid_context = validate_context(context or get_context()) - client = get_client(context=valid_context) + client = _client.get_client(context=valid_context) with ( concurrency.create_thread_pool_executor() as executor, diff --git a/tests/performance_e2e/__init__.py b/tests/performance_e2e/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/performance_e2e/backend/__init__.py b/tests/performance_e2e/backend/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/performance_e2e/backend/endpoints/__init__.py b/tests/performance_e2e/backend/endpoints/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/performance_e2e/backend/endpoints/get_multiple_float_series_values.py b/tests/performance_e2e/backend/endpoints/get_multiple_float_series_values.py new file mode 100644 index 00000000..2f4d7e5a --- /dev/null +++ b/tests/performance_e2e/backend/endpoints/get_multiple_float_series_values.py @@ -0,0 +1,241 @@ +""" +Endpoint for handling multiple float series values requests. +""" +import hashlib +import json +import math +import random +from typing import ( + Final, + Optional, +) + +from fastapi import ( + APIRouter, + Request, + Response, +) +from neptune_api.models.float_time_series_values_request import FloatTimeSeriesValuesRequest +from neptune_api.proto.protobuf_v4plus.neptune_pb.api.v1.model.series_values_pb2 import ( + ProtoFloatSeriesValuesResponseDTO, +) + +from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PERF_REQUEST_CONFIG_ATTRIBUTE_NAME +from tests.performance_e2e.backend.perf_request import FloatTimeSeriesValuesConfig +from tests.performance_e2e.backend.utils.exceptions import MalformedRequestError +from tests.performance_e2e.backend.utils.hashing_utils import hash_to_uniform_0_1 +from tests.performance_e2e.backend.utils.logging import ( + map_unset_to_none, + setup_logger, +) +from tests.performance_e2e.backend.utils.metrics import RequestMetrics +from tests.performance_e2e.backend.utils.timing import Timer + +# Path without /api prefix since we're mounting under /api in the main app +GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH: Final[str] = "/leaderboard/v1/proto/attributes/series/float" +# Path used for configuration matching (with /api prefix for backward compatibility) +GET_MULTIPLE_FLOAT_SERIES_VALUES_CONFIG_PATH: Final[str] = "/api" + GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH + +logger = setup_logger("get_multiple_float_series_values") + +router = APIRouter() + + +def _compute_series_cardinality( + config: FloatTimeSeriesValuesConfig, + experiment_id: str, + attribute_def: str, +) -> int: + """Compute the number of points for a series based on configuration and series identity. + + Args: + config: Endpoint configuration + experiment_id: Experiment ID + attribute_def: Attribute definition path + + Returns: + Number of points to generate for this series + """ + hash_value = hash_to_uniform_0_1(experiment_id, attribute_def, config.seed) + + if config.series_cardinality_policy == "uniform": + if not config.series_cardinality_uniform_range: + raise ValueError("Missing uniform range for uniform cardinality policy") + + min_points, max_points = config.series_cardinality_uniform_range + # Use the hash to determine the number of points + points = min_points + math.floor(hash_value * (max_points - min_points + 1)) + return points + + elif config.series_cardinality_policy == "bucketed": + if not config.series_cardinality_buckets: + raise ValueError("Missing buckets for bucketed cardinality policy") + + # Normalize the bucket probabilities + total_prob = sum(prob for prob, _ in config.series_cardinality_buckets) + normalized_buckets = [(prob / total_prob, points) for prob, points in config.series_cardinality_buckets] + + # Use the hash to select a bucket + cumulative_prob = 0 + for prob, points in normalized_buckets: + cumulative_prob += prob + if hash_value <= cumulative_prob: + return int(points) + + # Fallback to last bucket + return int(normalized_buckets[-1][1]) + + else: + raise ValueError(f"Unknown cardinality policy: {config.series_cardinality_policy}") + + +def _generate_series_values( + series_cardinality: int, after_step: Optional[float], max_values: int +) -> list[tuple[float, float, float]]: # (timestamp in seconds, step, value) + """Generate time series values for a specific series. + + Returns: + List of (timestamp, step, value) tuples + """ + + initial_step = 1 if after_step is None else (after_step + 1) + initial_timestamp = 1600000000 # some time in the year 2020 + total_remaining_steps = series_cardinality - (initial_step - 1) + steps_in_current_request = min(total_remaining_steps, max_values) + max_step = initial_step + steps_in_current_request - 1 + step_range = range(int(initial_step), int(max_step) + 1) + return [(initial_timestamp + step, step, random.uniform(-1e6, 1e6)) for step in step_range] + + +def _build_float_series_response( + parsed_request: FloatTimeSeriesValuesRequest, + endpoint_config: FloatTimeSeriesValuesConfig, +) -> ProtoFloatSeriesValuesResponseDTO: + """Build a response for the float series values request. + + Args: + parsed_request: Parsed request object + endpoint_config: Endpoint configuration + + Returns: + Protobuf response object + """ + response = ProtoFloatSeriesValuesResponseDTO() + + # Process each series request + for series_req in parsed_request.requests: + request_id = series_req.request_id + + # Extract experiment_id and attribute_name from TimeSeries + experiment_id = series_req.series.holder.identifier + attribute_name = series_req.series.attribute + after_step = map_unset_to_none(series_req.after_step) + + # Check if this series exists based on probability + # Always use the seed from the perf config for consistent hashing + series_hash = hashlib.md5(f"{experiment_id}:{attribute_name}:{endpoint_config.seed}".encode()).hexdigest() + hash_value = int(series_hash, 16) / (2**128 - 1) + + # Create a series entry in the response + series_dto = response.series.add() + series_dto.requestId = request_id + + # Skip generating points if the series doesn't exist according to probability + if hash_value > endpoint_config.existence_probability: + continue + + # Determine how many points to generate + series_cardinality = _compute_series_cardinality(endpoint_config, experiment_id, attribute_name) + # Limit points by the per_series_points_limit + + # Generate series values + values = _generate_series_values( + series_cardinality=series_cardinality, + after_step=after_step, + max_values=parsed_request.per_series_points_limit, + ) + + # Add values to the response + for timestamp, step, value in values: + point = series_dto.series.values.add() + point.timestamp_millis = int(timestamp * 1000) # Convert to milliseconds + point.step = step + point.value = value + point.is_preview = False + point.completion_ratio = 1.0 + + return response + + +@router.post(GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH) +async def get_multiple_float_series_values(request: Request) -> Response: + """Handle requests for multiple float series values.""" + metrics: RequestMetrics = request.state.metrics + + try: + # Parse request body + with Timer() as parsing_timer: + raw_body = await request.body() + request_dict = json.loads(raw_body) + parsed_request = FloatTimeSeriesValuesRequest.from_dict(request_dict) + + # Get the configuration from middleware + perf_config = getattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, None) + if not perf_config: + logger.error("No performance_e2e configuration found") + raise MalformedRequestError("Missing or invalid X-Perf-Request header") + + # Get endpoint-specific configuration using the config path (with /api prefix) + endpoint_config = perf_config.get_endpoint_config(GET_MULTIPLE_FLOAT_SERIES_VALUES_CONFIG_PATH, "POST") + if not endpoint_config or not isinstance(endpoint_config, FloatTimeSeriesValuesConfig): + logger.warning("Missing configuration for this endpoint") + raise MalformedRequestError("Missing configuration for this endpoint") + + metrics.parse_time_ms = parsing_timer.time_ms + + # Log request details + logger.info( + f"Processing: num_series={len(parsed_request.requests)} " + f"per_series_limit={parsed_request.per_series_points_limit} " + f"existence_probability={endpoint_config.existence_probability} " + f"cardinality_policy={endpoint_config.series_cardinality_policy} " + f"seed={endpoint_config.seed} " + f"cardinality_uniform_range={endpoint_config.series_cardinality_uniform_range} " + f"cardinality_buckets={endpoint_config.series_cardinality_buckets}" + ) + + # Generate and return response + with Timer() as data_generation_timer: + # Build the response using the extracted function + response = _build_float_series_response(parsed_request, endpoint_config) + + # Serialize the response + response_bytes = response.SerializeToString() + + metrics.generation_time_ms = data_generation_timer.time_ms + metrics.returned_payload_size_bytes = len(response_bytes) + + logger.info( + f"Generated response with {len(response.series)} series " + f"({sum(1 if s.series.values else 0 for s in response.series)} non-empty), " + f"{sum(len(s.series.values) for s in response.series)} total points " + f"in {data_generation_timer.time_ms:.2f}ms, " + f"size={metrics.returned_payload_size_bytes} bytes" + ) + + return Response(content=response_bytes, media_type="application/x-protobuf") + + except MalformedRequestError as exc: + logger.error(f"Invalid request configuration: {str(exc)}") + return Response( + status_code=400, + content=ProtoFloatSeriesValuesResponseDTO().SerializeToString(), + media_type="application/x-protobuf", + ) + except Exception as exc: + logger.exception(f"Unhandled exception during request processing: {exc}") + return Response( + content=ProtoFloatSeriesValuesResponseDTO().SerializeToString(), + media_type="application/x-protobuf", + status_code=500, + ) diff --git a/tests/performance_e2e/backend/endpoints/query_attribute_definitions_within_project.py b/tests/performance_e2e/backend/endpoints/query_attribute_definitions_within_project.py new file mode 100644 index 00000000..08afc1df --- /dev/null +++ b/tests/performance_e2e/backend/endpoints/query_attribute_definitions_within_project.py @@ -0,0 +1,164 @@ +""" +Endpoint for handling attribute definitions queries within a project. +""" +import json +import random +import time +from typing import ( + Final, + Optional, + Union, +) + +from fastapi import ( + APIRouter, + Request, + Response, +) +from neptune_api.models import AttributeTypeDTO +from neptune_api.models.attribute_definition_dto import AttributeDefinitionDTO +from neptune_api.models.next_page_dto import NextPageDTO +from neptune_api.models.query_attribute_definitions_body_dto import QueryAttributeDefinitionsBodyDTO +from neptune_api.models.query_attribute_definitions_result_dto import QueryAttributeDefinitionsResultDTO +from neptune_api.types import Unset + +from neptune_query.internal.retrieval.attribute_types import map_attribute_type_python_to_backend +from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PERF_REQUEST_CONFIG_ATTRIBUTE_NAME +from tests.performance_e2e.backend.perf_request import QueryAttributeDefinitionsConfig +from tests.performance_e2e.backend.utils.exceptions import MalformedRequestError +from tests.performance_e2e.backend.utils.hashing_utils import hash_to_string +from tests.performance_e2e.backend.utils.logging import ( + map_unset_to_none, + repr_list, + setup_logger, +) +from tests.performance_e2e.backend.utils.metrics import RequestMetrics +from tests.performance_e2e.backend.utils.timing import Timer + +# Path without /api prefix since we're mounting under /api in the main app +QUERY_ATTRIBUTE_DEFINITIONS_ENDPOINT_PATH: Final[str] = "/leaderboard/v1/leaderboard/attributes/definitions/query" +# Path used for configuration matching (with /api prefix for backward compatibility) +QUERY_ATTRIBUTE_DEFINITIONS_CONFIG_PATH: Final[str] = "/api" + QUERY_ATTRIBUTE_DEFINITIONS_ENDPOINT_PATH + +logger = setup_logger("query_attribute_definitions") + +router = APIRouter() + + +def _build_result( + endpoint_config: QueryAttributeDefinitionsConfig, experiment_ids: list[str], pagination_token: Optional[NextPageDTO] +) -> QueryAttributeDefinitionsResultDTO: + """Build a response according to the endpoint configuration.""" + logger.debug(f"Building result with {endpoint_config.total_definitions_count} definitions") + + # we're encoding an "offset" within the next_page_token to facilitate pagination + limit = pagination_token.limit + returned_in_previous_pages = ( + json.loads(pagination_token.next_page_token)["total_already_returned"] + if pagination_token and pagination_token.next_page_token + else 0 + ) + total_remaining_to_return = endpoint_config.total_definitions_count - returned_in_previous_pages + to_be_returned_in_this_page = min(limit, total_remaining_to_return) + + return QueryAttributeDefinitionsResultDTO( + entries=[ + AttributeDefinitionDTO( + name=hash_to_string( + endpoint_config.seed, + returned_in_previous_pages + i, + length=10, + ), + type=AttributeTypeDTO( + map_attribute_type_python_to_backend(random.choice(endpoint_config.attribute_types)) + ), + ) + for i in range(to_be_returned_in_this_page) + ], + next_page=NextPageDTO( + limit=0, + next_page_token=json.dumps( + {"total_already_returned": returned_in_previous_pages + to_be_returned_in_this_page} + ), + ), + ) + + +@router.post(QUERY_ATTRIBUTE_DEFINITIONS_ENDPOINT_PATH) +async def query_attribute_definitions_within_project(request: Request) -> Response: + """Handle requests for attribute definitions within a project.""" + metrics: RequestMetrics = request.state.metrics + + try: + # Parse request body + with Timer() as parsing_timer: + raw_body = await request.json() + parsed_request = QueryAttributeDefinitionsBodyDTO.from_dict(raw_body) + + # Get the configuration from middleware + perf_config = getattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, None) + if not perf_config: + logger.error("No performance_e2e configuration found") + raise MalformedRequestError("Missing or invalid X-Perf-Request header") + + # Get endpoint-specific configuration using the config path (with /api prefix) + endpoint_config = perf_config.get_endpoint_config(QUERY_ATTRIBUTE_DEFINITIONS_CONFIG_PATH, "POST") + if not endpoint_config or not isinstance(endpoint_config, QueryAttributeDefinitionsConfig): + logger.warning("Missing configuration for this endpoint") + raise MalformedRequestError("Missing configuration for this endpoint") + + metrics.parse_time_ms = parsing_timer.time_ms + + # Log request details + project_ids = ( + parsed_request.project_identifiers if not isinstance(parsed_request.project_identifiers, type(None)) else [] + ) + logger.info( + f"Processing: projects={project_ids} " + f"seed={endpoint_config.seed} " + f"total_definitions={endpoint_config.total_definitions_count} " + f"experiments={map_unset_to_none(repr_list(parsed_request.experiment_ids_filter))} " + f"attribute_types={repr_list(endpoint_config.attribute_types)} " + f"next_page={_next_page_dto_to_str(parsed_request.next_page)}" + ) + + # Generate and return response + with Timer() as data_generation_timer: + result = _build_result( + endpoint_config, + experiment_ids=parsed_request.experiment_ids_filter, + pagination_token=parsed_request.next_page, + ) + payload = json.dumps(result.to_dict()) + + metrics.generation_time_ms = data_generation_timer.time_ms + metrics.returned_payload_size_bytes = len(payload) + + return Response( + content=payload, + media_type="application/json", + status_code=200, + ) + + except MalformedRequestError as exc: + logger.error(f"Invalid request configuration: {str(exc)}") + return Response( + status_code=400, + content=json.dumps({"error": str(exc), "timestamp": time.time()}), + media_type="application/json", + ) + except Exception as exc: + logger.exception(f"Unhandled exception during request processing: {exc}") + return Response( + status_code=500, + content=json.dumps({"error": "Internal server error", "timestamp": time.time()}), + media_type="application/json", + ) + + +def _next_page_dto_to_str(next_page: Union[Unset, "NextPageDTO"]) -> str: + if isinstance(next_page, Unset) or next_page is None: + return "None" + + token = next_page.next_page_token + return f"NextPageDTO(limit={next_page.limit}, token={token if not isinstance(token, Unset) else 'None'})" diff --git a/tests/performance_e2e/backend/endpoints/search_leaderboard_entries.py b/tests/performance_e2e/backend/endpoints/search_leaderboard_entries.py new file mode 100644 index 00000000..306b7a6c --- /dev/null +++ b/tests/performance_e2e/backend/endpoints/search_leaderboard_entries.py @@ -0,0 +1,184 @@ +""" +Endpoint for handling leaderboard entries search requests. +""" +import json +import random +import time +from typing import Final + +from fastapi import ( + APIRouter, + Request, + Response, +) +from neptune_api.models.search_leaderboard_entries_params_dto import SearchLeaderboardEntriesParamsDTO +from neptune_api.proto.protobuf_v4plus.neptune_pb.api.v1.model.leaderboard_entries_pb2 import ( + ProtoLeaderboardEntriesSearchResultDTO, +) + +# Import attribute types from neptune_query +from neptune_query.internal.retrieval.attribute_types import ALL_TYPES +from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PERF_REQUEST_CONFIG_ATTRIBUTE_NAME +from tests.performance_e2e.backend.perf_request import SearchLeaderboardEntriesConfig +from tests.performance_e2e.backend.utils.exceptions import MalformedRequestError +from tests.performance_e2e.backend.utils.logging import setup_logger +from tests.performance_e2e.backend.utils.metrics import RequestMetrics +from tests.performance_e2e.backend.utils.random_utils import ( + MAX_NUMERIC_VALUE, + MIN_NUMERIC_VALUE, + MIN_VARIANCE, + random_string, +) +from tests.performance_e2e.backend.utils.timing import Timer + +# Path without /api prefix since we're mounting under /api in the main app +SEARCH_LEADERBOARD_ENTRIES_ENDPOINT_PATH: Final[str] = "/leaderboard/v1/proto/leaderboard/entries/search/" +# Path used for configuration matching (with /api prefix for backward compatibility) +SEARCH_LEADERBOARD_ENTRIES_CONFIG_PATH: Final[str] = "/api" + SEARCH_LEADERBOARD_ENTRIES_ENDPOINT_PATH + +logger = setup_logger("search_leaderboard_entries") + +router = APIRouter() + + +def _build_page_result(endpoint_config: SearchLeaderboardEntriesConfig, limit: int, offset: int) -> bytes: + """Build a protobuf page according to pagination parameters. + + Args: + endpoint_config: Configuration for the endpoint + limit: Maximum number of entries to return + offset: Starting position in the virtual result set + + Returns: + Serialized protobuf response with generated entries + + Raises: + ValueError: If an unsupported attribute type is requested + """ + start = offset + end = min(offset + limit, endpoint_config.total_entries_count) + actual_entry_count = max(0, end - start) + + logger.debug(f"Building page result: limit={limit}, offset={offset}, actual_entries={actual_entry_count}") + + if start >= endpoint_config.total_entries_count or not endpoint_config.requested_attributes: + logger.debug( + f"Returning empty result: start={start} >= total={endpoint_config.total_entries_count} " f"or no attributes" + ) + return ProtoLeaderboardEntriesSearchResultDTO().SerializeToString() + + result = ProtoLeaderboardEntriesSearchResultDTO() + + # Pre-process attribute types for faster lookup + processed_attrs = { + name: attr_type.lower() if attr_type.lower() in ALL_TYPES else None + for name, attr_type in endpoint_config.requested_attributes.items() + } + + if invalid_attrs := {name: attr_type for name, attr_type in processed_attrs.items() if attr_type is None}: + logger.error(f"Found invalid attribute types: {invalid_attrs}") + raise NotImplementedError(f"Found invalid attribute types: {invalid_attrs}") + + entries_generated = 0 + for idx in range(start, end): + entry = result.entries.add() + entries_generated += 1 + + for name, attr_type in processed_attrs.items(): + proto_attr = entry.attributes.add() + proto_attr.name = name + + if attr_type == "string": + proto_attr.string_properties.value = random_string() + elif attr_type == "float": + proto_attr.float_properties.value = random.uniform(MIN_NUMERIC_VALUE, MAX_NUMERIC_VALUE) + elif attr_type == "int": + proto_attr.int_properties.value = random.randint(int(MIN_NUMERIC_VALUE), int(MAX_NUMERIC_VALUE)) + elif attr_type == "bool": + proto_attr.bool_properties.value = random.random() < 0.5 + elif attr_type == "float_series": + min_val, max_val = sorted(random.uniform(MIN_NUMERIC_VALUE, MAX_NUMERIC_VALUE) for _ in (1, 2)) + + proto_attr.float_series_properties.min = min_val + proto_attr.float_series_properties.max = max_val + proto_attr.float_series_properties.last = random.uniform(min_val, max_val) + proto_attr.float_series_properties.average = random.uniform(min_val, max_val) + proto_attr.float_series_properties.variance = random.uniform(MIN_VARIANCE, MAX_NUMERIC_VALUE) + else: + logger.error(f"Unsupported attribute type: {endpoint_config.requested_attributes.get(name)}") + raise NotImplementedError( + f"Unsupported attribute type: {endpoint_config.requested_attributes.get(name)}" + ) + + if entries_generated > 0: + logger.debug(f"Generated {entries_generated} entries with {len(processed_attrs)} attributes each") + + serialized_result = result.SerializeToString() + logger.debug(f"Serialized result size: {len(serialized_result)} bytes") + + return serialized_result + + +@router.post(SEARCH_LEADERBOARD_ENTRIES_ENDPOINT_PATH) +async def search_leaderboard_entries(request: Request) -> Response: + """Handle requests for leaderboard entries search.""" + metrics: RequestMetrics = request.state.metrics + + try: + # Parse request body + with Timer() as parsing_timer: + raw_body = await request.json() + parsed_request = SearchLeaderboardEntriesParamsDTO.from_dict(raw_body) + + # Get the configuration from middleware + perf_config = getattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, None) + if not perf_config: + logger.error("No performance_e2e configuration found") + raise MalformedRequestError("Missing or invalid X-Perf-Request header") + + # Get endpoint-specific configuration using the config path (with /api prefix) + endpoint_config = perf_config.get_endpoint_config(SEARCH_LEADERBOARD_ENTRIES_CONFIG_PATH, "POST") + if not endpoint_config or not isinstance(endpoint_config, SearchLeaderboardEntriesConfig): + logger.warning("Missing configuration for this endpoint") + raise MalformedRequestError("Missing configuration for this endpoint") + + metrics.parse_time_ms = parsing_timer.time_ms + + # Extract pagination parameters + limit = int(parsed_request.pagination.limit) + offset = int(parsed_request.pagination.offset) + + # Log request details + attr_keys = list(endpoint_config.requested_attributes.keys()) + logger.info( + f"Processing: limit={limit} offset={offset} attrs={attr_keys} " + f"total_entries={endpoint_config.total_entries_count}" + ) + + # Generate and return protobuf response + with Timer() as data_generation_timer: + payload = _build_page_result(endpoint_config, limit, offset) + + metrics.generation_time_ms = data_generation_timer.time_ms + metrics.returned_payload_size_bytes = len(payload) + + return Response( + content=payload, + media_type="application/octet-stream", + status_code=200, + ) + + except MalformedRequestError as exc: + logger.error(f"Invalid request configuration: {str(exc)}") + return Response( + status_code=400, + content=json.dumps({"error": str(exc), "timestamp": time.time()}), + media_type="application/json", + ) + except Exception as exc: + logger.exception(f"Unhandled exception during request processing: {exc}") + return Response( + status_code=500, + content=json.dumps({"error": "Internal server error", "timestamp": time.time()}), + media_type="application/json", + ) diff --git a/tests/performance_e2e/backend/main.py b/tests/performance_e2e/backend/main.py new file mode 100644 index 00000000..d33109eb --- /dev/null +++ b/tests/performance_e2e/backend/main.py @@ -0,0 +1,75 @@ +""" +Main entry point for the performance_e2e test backend server. +Configures the FastAPI app and includes all endpoints. +""" +import time +from typing import Dict + +from fastapi import FastAPI + +from tests.performance_e2e.backend.endpoints.get_multiple_float_series_values import ( + router as float_series_values_router, +) +from tests.performance_e2e.backend.endpoints.query_attribute_definitions_within_project import ( + router as query_attribute_definitions_router, +) +from tests.performance_e2e.backend.endpoints.search_leaderboard_entries import ( + router as search_leaderboard_entries_router, +) +from tests.performance_e2e.backend.middleware.add_latency_middleware import LatencyAddingMiddleware +from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PerfRequestConfigMiddleware +from tests.performance_e2e.backend.middleware.request_logging_middleware import RequestLoggingMiddleware +from tests.performance_e2e.backend.utils.logging import ( + configure_root_logger, + setup_logger, +) + +# Configure root logger first to prevent any duplicate logging +configure_root_logger() + +# Configure logger +logger = setup_logger("performance_test_backend") + +# Create main FastAPI application +app = FastAPI(title="Performance Test Backend", version="0.0.1") + +# Create sub-application for API endpoints with middleware +api_app = FastAPI(title="Performance Test API", version="0.0.1") + +# Add middleware for performance_e2e testing to the API sub-app only +# IMPORTANT: middleware are executed in reverse order from how they're added +# The last middleware added is executed first, so we add them in the reverse order: +# - RequestLoggingMiddleware +# - PerfConfigMiddleware +# - LatencyMiddleware +api_app.add_middleware(LatencyAddingMiddleware) +api_app.add_middleware(PerfRequestConfigMiddleware) +api_app.add_middleware(RequestLoggingMiddleware) + +# Include routers for API endpoints in the sub-app +api_app.include_router(search_leaderboard_entries_router) +api_app.include_router(query_attribute_definitions_router) +api_app.include_router(float_series_values_router) + +# Mount the API sub-app under the /api path +app.mount("/api", api_app) + + +@app.get("/health") +async def health() -> Dict[str, str]: + logger.debug("Health check requested") + return {"status": "ok", "timestamp": str(time.time())} + + +def run(host: str = "127.0.0.1", port: int = 8080) -> None: + """Run the FastAPI server with uvicorn.""" + import uvicorn + + logger.info(f"Starting server on {host}:{port}") + + # Use our custom log config - we've already configured the loggers + uvicorn.run(app, host=host, port=port, log_level="info", log_config=None) + + +if __name__ == "__main__": # pragma: no cover + run() diff --git a/tests/performance_e2e/backend/middleware/__init__.py b/tests/performance_e2e/backend/middleware/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/performance_e2e/backend/middleware/add_latency_middleware.py b/tests/performance_e2e/backend/middleware/add_latency_middleware.py new file mode 100644 index 00000000..f777c8a1 --- /dev/null +++ b/tests/performance_e2e/backend/middleware/add_latency_middleware.py @@ -0,0 +1,73 @@ +""" +Middleware for simulating network latency in performance_e2e tests. +""" +import asyncio +import random +import time +from typing import Callable + +from fastapi import ( + Request, + Response, +) +from starlette.middleware.base import BaseHTTPMiddleware + +from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PERF_REQUEST_CONFIG_ATTRIBUTE_NAME +from tests.performance_e2e.backend.utils.exceptions import MalformedRequestError +from tests.performance_e2e.backend.utils.logging import setup_logger + +# Configure logger +logger = setup_logger("latency_middleware") + + +class LatencyAddingMiddleware(BaseHTTPMiddleware): + """Middleware for simulating latency in performance_e2e testing.""" + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + request_id = getattr(request.state, "id", "unknown") + start_time_ms = time.perf_counter_ns() / 1_000_000 # Convert to milliseconds + + # Call the next handler (endpoint) + response = await call_next(request) + + # Check for latency configuration + perf_request_config = getattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, None) + if not perf_request_config: + logger.error("No performance_e2e request configuration found; skipping latency addition.") + raise MalformedRequestError("No performance_e2e request configuration found; skipping latency addition.") + + # Get endpoint-specific configuration + path = request.url.path + method = request.method + endpoint_config = perf_request_config.get_endpoint_config(path, method) + + # Check if we have endpoint config with latency settings + if endpoint_config.latency: + # Calculate how long we've spent processing so far + elapsed_time_ms = (time.perf_counter_ns() / 1_000_000) - start_time_ms + + # Generate random latency in the specified range using uniform distribution + target_latency_ms = random.uniform(endpoint_config.latency.min_ms, endpoint_config.latency.max_ms) + + # Subtract the time already spent processing + remaining_latency_ms = max(0.0, target_latency_ms - elapsed_time_ms) + + # Add artificial latency if needed + if remaining_latency_ms > 0: + logger.debug( + f"[{request_id}] Adding artificial latency: {target_latency_ms:.2f}ms " + f"(remaining: {remaining_latency_ms:.2f}ms)" + ) + # Sleep for the remaining time to reach the desired latency (convert back to seconds) + await asyncio.sleep(remaining_latency_ms / 1000.0) + + # Store latency information in metrics if available + if hasattr(request.state, "metrics"): + request.state.metrics.latency_added_ms = remaining_latency_ms + request.state.metrics.latency_target_ms = target_latency_ms + + # Add headers with latency information + response.headers["X-Artificial-Latency-Ms"] = str(round(remaining_latency_ms, 2)) + response.headers["X-Target-Latency-Ms"] = str(round(target_latency_ms, 2)) + + return response diff --git a/tests/performance_e2e/backend/middleware/read_perf_config_middleware.py b/tests/performance_e2e/backend/middleware/read_perf_config_middleware.py new file mode 100644 index 00000000..b7582553 --- /dev/null +++ b/tests/performance_e2e/backend/middleware/read_perf_config_middleware.py @@ -0,0 +1,46 @@ +""" +Middleware for handling performance_e2e testing configuration parsing. +Parses X-Perf-Request headers and attaches config to request state. +""" +import json +from typing import ( + Callable, + Final, +) + +from fastapi import ( + Request, + Response, +) +from starlette.middleware.base import BaseHTTPMiddleware + +from tests.performance_e2e.backend.perf_request import PerfRequestConfig +from tests.performance_e2e.backend.utils.logging import setup_logger + +# Configure logger +logger = setup_logger("perf_config_middleware") + + +PERF_REQUEST_CONFIG_ATTRIBUTE_NAME: Final[str] = "perf_request_config" + + +class PerfRequestConfigMiddleware(BaseHTTPMiddleware): + """Middleware for handling performance_e2e testing configuration from X-Perf-Request header.""" + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + header_value = None + try: + header_value = request.headers.get("X-Perf-Request") + perf_request_config = PerfRequestConfig.from_json(header_value) + # Attach the parsed config to the request state for endpoint handlers + # and other middleware components to use + setattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, perf_request_config) + logger.debug("Parsed X-Perf-Request header") + except Exception as e: + logger.error(f"Error parsing X-Perf-Request header ({header_value}): {str(e)}") + response_content = {"code": 400, "message": f"Malformed X-Perf-Request header: {str(e)}"} + return Response(status_code=400, content=json.dumps(response_content)) + + # Call the next handler (endpoint) + response = await call_next(request) + return response diff --git a/tests/performance_e2e/backend/middleware/request_logging_middleware.py b/tests/performance_e2e/backend/middleware/request_logging_middleware.py new file mode 100644 index 00000000..8d66e37c --- /dev/null +++ b/tests/performance_e2e/backend/middleware/request_logging_middleware.py @@ -0,0 +1,79 @@ +""" +Middleware for tracking request metrics in performance_e2e tests. +Records timing information and adds metrics to requests. +""" +import json +import time +from contextvars import Token +from typing import Callable + +from fastapi import ( + Request, + Response, +) +from starlette.middleware.base import BaseHTTPMiddleware + +from tests.performance_e2e.backend.utils.logging import ( + request_id_ctx, + request_id_filter, + scenario_name_ctx, + setup_logger, +) +from tests.performance_e2e.backend.utils.metrics import RequestMetrics +from tests.performance_e2e.backend.utils.random_utils import random_string + +logger = setup_logger("request_metrics_middleware") + + +class RequestLoggingMiddleware(BaseHTTPMiddleware): + """Middleware for tracking request metrics and adding request IDs.""" + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + # Generate and attach request ID + request_id = request.headers.get("X-Request-ID", default=f"req-{random_string(8)}") + scenario_name = request.headers.get("X-Scenario-Name", default="-") + request.state.id = request_id + + request_id_token: Token = request_id_ctx.set(request_id) + scenario_name_token: Token = scenario_name_ctx.set(scenario_name) + try: + + logger.info(f"Handling {request.method} {request.url.path}") + + # Create metrics object on request state + request.state.metrics = RequestMetrics() + + # Process the request through the middleware chain + start_time_ms = time.perf_counter_ns() / 1_000_000 # Convert to milliseconds + response = await call_next(request) + + # Record basic processing time in the metrics + request.state.metrics.total_time_ms = (time.perf_counter_ns() / 1_000_000) - start_time_ms + + # Add basic timing headers to response + response.headers["X-Process-Time-Ms"] = str(round(request.state.metrics.total_time_ms, 2)) + response.headers["X-Request-ID"] = request_id + response.headers["X-Scenario-Name"] = scenario_name + + # Log complete metrics + log_result(request.state.metrics, response.status_code) + + # Reset request ID after processing + request_id_filter.request_id = "-" + + return response + finally: + request_id_ctx.reset(request_id_token) + scenario_name_ctx.reset(scenario_name_token) + + +def log_result(metrics: RequestMetrics, status_code: int) -> None: + metrics_data = { + "total_processing_time_ms": round(metrics.total_time_ms, 2), + "parsing_time_ms": round(metrics.parse_time_ms, 2), + "generation_time_ms": round(metrics.generation_time_ms, 2), + "returned_payload_size_bytes": metrics.returned_payload_size_bytes, + "artificial_latency_added_ms": round(metrics.latency_added_ms, 2) if metrics.latency_added_ms > 0 else 0, + } + + logger.info(f"Response status_code={status_code}, metrics: {json.dumps(metrics_data)}") diff --git a/tests/performance_e2e/backend/perf_request.py b/tests/performance_e2e/backend/perf_request.py new file mode 100644 index 00000000..163cf8f9 --- /dev/null +++ b/tests/performance_e2e/backend/perf_request.py @@ -0,0 +1,171 @@ +""" +Common schema and utilities for X-Perf-Request headers used in performance_e2e testing. +This module provides shared functionality for both test cases and the server implementation. +""" +import json +from dataclasses import ( + asdict, + dataclass, + field, +) +from typing import ( + ClassVar, + Dict, + Optional, +) + + +@dataclass +class LatencyConfig: + """Configuration for simulated latency.""" + + min_ms: float + max_ms: float + + +@dataclass +class EndpointConfig: + """Base class for endpoint-specific configuration.""" + + latency: Optional[LatencyConfig] + + +@dataclass +class SearchLeaderboardEntriesConfig(EndpointConfig): + """Configuration for the search_leaderboard_entries endpoint.""" + + # name -> type, e.g. {"accuracy": "float", "status": "string"} + requested_attributes: Dict[str, str] + total_entries_count: int + + +@dataclass +class QueryAttributeDefinitionsConfig(EndpointConfig): + """Configuration for the query_attribute_definitions_within_project endpoint.""" + + # The server will use hashing to compute the definitions to return + # This allows stability across multiple calls + seed: int + + # Total number of attribute definitions to return + total_definitions_count: int + + # Attribute types to include in the response (name -> type) + attribute_types: list[str] + + +@dataclass +class FloatTimeSeriesValuesConfig(EndpointConfig): + """Configuration for the get_multiple_float_series_values endpoint.""" + + # The server will use hashing to compute desired point count for each series. + # This allows randomness for non-random series names. + seed: int + + # Probability that a (experiment, attribute definition) pair exists + existence_probability: float + # Minimum number of points per series + + series_cardinality_policy: str + # if policy is "uniform", this defines the (min, max) range for number of points per series + series_cardinality_uniform_range: Optional[tuple[int, int]] = field(default=None) + # if policy is "bucketed", this defines the buckets and their probabilities + # a list of (probability, number_of_points) tuples; the probabilities will be normalized by the server + series_cardinality_buckets: Optional[list[tuple[float, float]]] = field(default=None) + + +@dataclass +class PerfRequestConfig: + """Schema for the X-Perf-Request header.""" + + # Registry of endpoint paths to their configuration classes + ENDPOINT_CONFIG_CLASSES: ClassVar[Dict[str, dict[str, type]]] = { + "/api/leaderboard/v1/proto/leaderboard/entries/search/": {"POST": SearchLeaderboardEntriesConfig}, + "/api/leaderboard/v1/leaderboard/attributes/definitions/query": {"POST": QueryAttributeDefinitionsConfig}, + "/api/leaderboard/v1/proto/attributes/series/float": {"POST": FloatTimeSeriesValuesConfig}, + } + + # Endpoint-specific configurations: path -> method -> config + endpoints_configuration: Dict[str, Dict[str, EndpointConfig]] = field(default_factory=dict) + + def add_endpoint_config(self, path: str, method: str, config: EndpointConfig) -> None: + """Add configuration for a specific endpoint. + + Args: + path: API endpoint path + method: HTTP method (GET, POST, etc.) + config: Configuration object for the endpoint + """ + if path not in self.endpoints_configuration: + self.endpoints_configuration[path] = {} + self.endpoints_configuration[path][method] = config + + def get_endpoint_config(self, path: str, method: str) -> Optional[EndpointConfig]: + """Get configuration for a specific endpoint. + + Args: + path: API endpoint path + method: HTTP method (GET, POST, etc.) + + Returns: + Configuration object for the endpoint or None if not found + """ + return self.endpoints_configuration.get(path, {}).get(method) + + @classmethod + def _serialize_dataclass(cls, obj): + """Helper to serialize dataclasses to dictionaries.""" + if hasattr(obj, "__dataclass_fields__"): + result = {} + for k, v in asdict(obj).items(): + result[k] = cls._serialize_dataclass(v) + return result + elif isinstance(obj, dict): + return {k: cls._serialize_dataclass(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [cls._serialize_dataclass(i) for i in obj] + else: + return obj + + def to_json(self) -> str: + """Serialize the configuration to JSON format. + + Returns: + JSON string representation of the configuration + """ + return json.dumps(self._serialize_dataclass(self)) + + @classmethod + def from_json(cls, json_str: str) -> "PerfRequestConfig": + """Deserialize a configuration from JSON. + + Args: + json_str: JSON string to parse + + Returns: + PerfRequestConfig object + """ + data = json.loads(json_str) + + # Create a new config object + config = cls() + + # Parse endpoint-specific configurations + if "endpoints_configuration" in data: + endpoints_data = data["endpoints_configuration"] + for path, methods in endpoints_data.items(): + for method, endpoint_config in methods.items(): + # Look up the correct config class for this endpoint + config_class = cls.ENDPOINT_CONFIG_CLASSES.get(path, {}).get(method) + if config_class: + # Handle latency configuration if present + if "latency" in endpoint_config: + latency_data = endpoint_config.pop("latency") + endpoint_config["latency"] = LatencyConfig(**latency_data) + + # Create endpoint config object + endpoint_obj = config_class(**endpoint_config) + + config.add_endpoint_config(path, method, endpoint_obj) + + return config diff --git a/tests/performance_e2e/backend/utils/__init__.py b/tests/performance_e2e/backend/utils/__init__.py new file mode 100644 index 00000000..62561df2 --- /dev/null +++ b/tests/performance_e2e/backend/utils/__init__.py @@ -0,0 +1,3 @@ +""" +Utility package initialization file. +""" diff --git a/tests/performance_e2e/backend/utils/exceptions.py b/tests/performance_e2e/backend/utils/exceptions.py new file mode 100644 index 00000000..8795efba --- /dev/null +++ b/tests/performance_e2e/backend/utils/exceptions.py @@ -0,0 +1,9 @@ +""" +Common exceptions used across the performance_e2e test backend. +""" + + +class MalformedRequestError(Exception): + """Exception raised when the performance_e2e request format is invalid.""" + + pass diff --git a/tests/performance_e2e/backend/utils/hashing_utils.py b/tests/performance_e2e/backend/utils/hashing_utils.py new file mode 100644 index 00000000..fc66775f --- /dev/null +++ b/tests/performance_e2e/backend/utils/hashing_utils.py @@ -0,0 +1,22 @@ +import base64 +import hashlib +import json + + +def hash_to_string(*xs, length) -> str: + s = json.dumps(xs, sort_keys=True).encode() + # 16-byte deterministic hash -> hex string of length 32 + h = hashlib.blake2b(s, digest_size=16).digest() + b64 = base64.urlsafe_b64encode(h).decode("ascii") + return (b64 * ((length // len(b64)) + 1))[:length] + + +def hash_to_uniform_64bit(*xs) -> float: + s = json.dumps(xs, sort_keys=True).encode() + # 8-byte deterministic hash -> integer in [0, 2^64-1] + h = hashlib.blake2b(s, digest_size=8).digest() + return int.from_bytes(h, "big") + + +def hash_to_uniform_0_1(*xs) -> float: + return hash_to_uniform_64bit(*xs) / 2**64 diff --git a/tests/performance_e2e/backend/utils/logging.py b/tests/performance_e2e/backend/utils/logging.py new file mode 100644 index 00000000..10f29913 --- /dev/null +++ b/tests/performance_e2e/backend/utils/logging.py @@ -0,0 +1,140 @@ +""" +Logging utilities for the performance_e2e test backend. +Provides a consistent logging configuration across all modules. +""" +import logging +import os +from contextvars import ContextVar +from typing import ( + Dict, + Optional, +) + +import colorlog +from neptune_api.types import Unset + +scenario_name_ctx = ContextVar("scenario_name", default="-") +request_id_ctx = ContextVar("request_id", default="-") + +# Define default log file path and environment variable name +DEFAULT_LOG_FILE = "performance_test.log" +LOG_FILE_ENV_VAR = "NEPTUNE_PERFORMANCE_LOG_FILE" + +# Define color scheme for different log levels +LOG_COLORS = { + "DEBUG": "cyan", + "INFO": "green", + "WARNING": "yellow", + "ERROR": "red", + "CRITICAL": "bold_red", +} + + +class RequestMetadataFilter(logging.Filter): + """Filter that adds request_id to log records.""" + + def __init__(self, name: str = ""): + super().__init__(name) + self.request_id = "-" + self.scenario_name = "-" + + def filter(self, record: logging.LogRecord) -> bool: + if not hasattr(record, "request_id"): + record.request_id = request_id_ctx.get() + if not hasattr(record, "scenario_name"): + record.scenario_name = scenario_name_ctx.get() + return True + + +# Create a singleton instance for the whole application +request_id_filter = RequestMetadataFilter() + +# Store configured loggers to prevent duplicate setup +_CONFIGURED_LOGGERS: Dict[str, logging.Logger] = {} + +# Flag to track if root logger has been configured +_ROOT_LOGGER_CONFIGURED = False + + +def configure_root_logger() -> None: + """Configure the root logger to use a NullHandler to prevent duplicate logging. + This prevents unconfigured loggers from propagating to the root logger. + """ + global _ROOT_LOGGER_CONFIGURED + + if not _ROOT_LOGGER_CONFIGURED: + # Configure root logger with NullHandler to prevent propagation of messages + root_logger = logging.getLogger() + + # Clear any existing handlers + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Add null handler to prevent "No handlers could be found" warnings + root_logger.addHandler(logging.NullHandler()) + + # Set to WARNING level by default + root_logger.setLevel(logging.WARNING) + + _ROOT_LOGGER_CONFIGURED = True + + +def setup_logger(logger_name: str, level: Optional[int] = None) -> logging.Logger: + # Configure root logger first + configure_root_logger() + + # Check if this logger was already configured + if logger_name in _CONFIGURED_LOGGERS: + return _CONFIGURED_LOGGERS[logger_name] + + # Get or create the logger + logger = logging.getLogger(logger_name) + + # Set propagate to False to prevent duplicate logs + logger.propagate = False + + # Clear any existing handlers to be safe + if logger.handlers: + for handler in logger.handlers[:]: + logger.removeHandler(handler) + + # Configure formatter with request_id + formatter = colorlog.ColoredFormatter( + fmt="%(asctime)s.%(msecs)03d | %(scenario_name)s " + "| %(log_color)s%(levelname)s%(reset)s | %(name)s | %(request_id)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + log_colors=LOG_COLORS, + ) + + # Determine log file path from environment variable or use default + log_file_path = os.environ.get(LOG_FILE_ENV_VAR, DEFAULT_LOG_FILE) + + # Create directory if it doesn't exist (for cases where path includes directories) + log_file_dir = os.path.dirname(log_file_path) + if log_file_dir and not os.path.exists(log_file_dir): + os.makedirs(log_file_dir) + + # Configure file handler instead of stdout + handler = logging.FileHandler(log_file_path) + handler.setFormatter(formatter) + handler.addFilter(request_id_filter) + logger.addHandler(handler) + + # Set log level (default to INFO if not specified) + logger.setLevel(level if level is not None else logging.INFO) + + # Store the configured logger + _CONFIGURED_LOGGERS[logger_name] = logger + + return logger + + +def map_unset_to_none(value): + return None if isinstance(value, Unset) else value + + +def repr_list(lst): + if len(lst) < 5: + return str(lst) + else: + return str(lst[:5] + ["..."]) + f" (total {len(lst)})" diff --git a/tests/performance_e2e/backend/utils/metrics.py b/tests/performance_e2e/backend/utils/metrics.py new file mode 100644 index 00000000..94ac73d3 --- /dev/null +++ b/tests/performance_e2e/backend/utils/metrics.py @@ -0,0 +1,15 @@ +""" +Metrics utilities for performance_e2e monitoring. +""" +from dataclasses import dataclass + + +@dataclass +class RequestMetrics: + """Collection of metrics for request processing performance_e2e monitoring.""" + + parse_time_ms: float = 0.0 # Time taken to parse the request + generation_time_ms: float = 0.0 # Time taken to generate the response data + latency_added_ms: float = 0.0 # Artificial latency added to simulate processing / network delays + total_time_ms: float = 0.0 # Total time including parsing, data generation and artificial latency + returned_payload_size_bytes: int = 0 # Size of the response payload in bytes diff --git a/tests/performance_e2e/backend/utils/random_utils.py b/tests/performance_e2e/backend/utils/random_utils.py new file mode 100644 index 00000000..cf265fa4 --- /dev/null +++ b/tests/performance_e2e/backend/utils/random_utils.py @@ -0,0 +1,17 @@ +""" +Utilities for generating random data for test responses. +""" +import random +import string +from typing import Final + +# Constants for data generation +DEFAULT_RANDOM_STRING_LENGTH: Final[int] = 10 +MIN_NUMERIC_VALUE: Final[float] = -1_000_000_000.0 +MAX_NUMERIC_VALUE: Final[float] = 1_000_000_000.0 +MIN_VARIANCE: Final[float] = 0.0 + + +def random_string(length: int = DEFAULT_RANDOM_STRING_LENGTH) -> str: + """Generate a random string of specified length.""" + return "".join(random.choices(string.ascii_letters + string.digits, k=length)) diff --git a/tests/performance_e2e/backend/utils/timing.py b/tests/performance_e2e/backend/utils/timing.py new file mode 100644 index 00000000..2b0c376a --- /dev/null +++ b/tests/performance_e2e/backend/utils/timing.py @@ -0,0 +1,19 @@ +import time +from typing import Optional + + +class Timer: + def __init__(self): + self._total_time_ms: Optional[float] = None + + @property + def time_ms(self): + assert self._total_time_ms is not None + return self._total_time_ms + + def __enter__(self): + self._enter_time = time.perf_counter_ns() + return self + + def __exit__(self, *exc_args): + self._total_time_ms = (time.perf_counter_ns() - self._enter_time) / 1_000_000 diff --git a/tests/performance_e2e/conftest.py b/tests/performance_e2e/conftest.py new file mode 100644 index 00000000..440106af --- /dev/null +++ b/tests/performance_e2e/conftest.py @@ -0,0 +1,263 @@ +import base64 +import contextlib +import json +import multiprocessing +import os +import signal +import time +import traceback +from typing import ( + Any, + Dict, + Iterator, + Optional, +) +from urllib.parse import urlparse + +import httpx +import pytest +from humanize import naturalsize +from neptune_api import AuthenticatedClient +from neptune_api.credentials import Credentials +from neptune_api.types import OAuthToken + +import neptune_query.internal.client as client +from neptune_query import set_api_token +from tests.performance_e2e.backend.utils.logging import setup_logger + +# Get a logger for the test framework using our centralized configuration +logger = setup_logger("performance_tests") + + +# Configuration constants with environment variable overrides +SERVER_HOST = os.environ.get("PERF_TEST_HOST", "127.0.0.1") +SERVER_PORT = int(os.environ.get("PERF_TEST_PORT", "8080")) +SERVER_STARTUP_TIMEOUT = int(os.environ.get("PERF_TEST_STARTUP_TIMEOUT", "20")) +SERVER_HEALTH_CHECK_INTERVAL = float(os.environ.get("PERF_TEST_HEALTH_INTERVAL", "0.25")) +HTTP_CLIENT_TIMEOUT = int(os.environ.get("PERF_TEST_CLIENT_TIMEOUT", "10")) + + +@pytest.fixture(scope="session") +def backend_base_url() -> str: + """Provide the base URL for the test backend server. + + Note: + Can be overridden with PERF_TEST_HOST and PERF_TEST_PORT environment variables + """ + return f"http://{SERVER_HOST}:{SERVER_PORT}" + + +def _run_server(host: str, port: int) -> None: # pragma: no cover - helper for spawning process + """Run the FastAPI server for performance_e2e testing.""" + try: + import uvicorn + + logger.info(f"Starting performance_e2e test server at {host}:{port}") + uvicorn.run( + app="tests.performance_e2e.backend.main:app", + host=host, + port=port, + log_level="info", + access_log=False, + workers=8, + ) + except Exception: + logger.error(f"Error in test server process:\n{traceback.format_exc()}") + # Make sure the process exits so the test doesn't hang + os._exit(1) + + +@pytest.fixture(scope="session", autouse=True) +def backend_server(backend_base_url: str) -> Iterator[None]: + """Start the test FastAPI backend in a separate process for the test session. + + Raises: + RuntimeError: If the server fails to start or doesn't respond to health checks + """ + parsed = urlparse(backend_base_url) + host = parsed.hostname or SERVER_HOST + port = parsed.port or SERVER_PORT + + # Create a server process with proper signal handling + ctx = multiprocessing.get_context("spawn") # Use spawn for better cross-platform compatibility + proc = ctx.Process(target=_run_server, args=(host, port), daemon=False) + + logger.info(f"Starting backend server process at {backend_base_url}") + proc.start() + + # Health check initialization + deadline = time.time() + SERVER_STARTUP_TIMEOUT + healthy = False + + logger.info("Waiting for server to become healthy...") + + # Use a more readable approach to health checking + for attempt in range(1, int(SERVER_STARTUP_TIMEOUT / SERVER_HEALTH_CHECK_INTERVAL) + 1): + # Check if server process is still alive + if not proc.is_alive(): + logger.error("Server process died during startup") + break + + # Try to connect to the health endpoint + try: + response = httpx.get( + url=f"{backend_base_url}/health", timeout=1.0, headers={"User-Agent": "Neptune-Performance-Test/1.0"} + ) + if response.status_code == 200: + healthy = True + logger.info(f"Server is healthy after {attempt} attempts") + break + except Exception as e: + if attempt % 10 == 0: # Log less frequently to avoid spamming + logger.debug(f"Health check attempt {attempt} failed: {str(e)}") + + # Wait before next attempt, but check if we're past the deadline + if time.time() > deadline: + logger.error(f"Reached timeout after {attempt} attempts") + break + + time.sleep(SERVER_HEALTH_CHECK_INTERVAL) + + if not healthy: + error_msg = f"Backend server failed to start or become healthy within {SERVER_STARTUP_TIMEOUT}s" + logger.error(error_msg) + + # Ensure process terminated before raising to avoid zombie + if proc.is_alive(): + logger.info("Terminating unresponsive server process") + with contextlib.suppress(Exception): + # Try graceful termination first + os.kill(proc.pid, signal.SIGTERM) + proc.join(timeout=2) + + # Force kill if still running + if proc.is_alive(): + logger.warning("Server process didn't terminate gracefully, forcing kill") + with contextlib.suppress(Exception): + os.kill(proc.pid, signal.SIGKILL) + proc.join(timeout=1) + + raise RuntimeError(error_msg) + + # Server is up and healthy, yield control to tests + logger.info("Backend server is ready for tests") + yield + + # Cleanup after tests + logger.info("Shutting down backend server") + if proc.is_alive(): + with contextlib.suppress(Exception): + # Try graceful termination first + os.kill(proc.pid, signal.SIGTERM) + proc.join(timeout=3) + + # Force kill if still running + if proc.is_alive(): + logger.warning("Server process didn't terminate gracefully, forcing kill") + with contextlib.suppress(Exception): + os.kill(proc.pid, signal.SIGKILL) + proc.join(timeout=2) + + logger.info("Backend server shutdown complete") + + +@pytest.fixture(scope="session", autouse=True) +def api_token(backend_base_url: str) -> str: + """Create and configure a fake API token for testing. + + Args: + backend_base_url: The base URL of the test server + + Returns: + A base64-encoded fake API token + """ + api_token_bytes = json.dumps( + {"api_address": backend_base_url, "api_url": backend_base_url, "api_key": "fake"} + ).encode("utf-8") + api_token = base64.b64encode(api_token_bytes).decode("utf-8") + + # set globally + set_api_token(api_token) + + # and return + return api_token + + +class ClientProviderWithHeaderInjection: + """Wrapper for the Neptune API client with header management.""" + + def __init__(self, real_client: Optional[AuthenticatedClient] = None): + self._headers: Dict[str, str] = {} + self._client = real_client + + def set_x_perf_request_header(self, value: str) -> None: + """Update request headers for subsequent API calls.""" + self._headers.update({"X-Perf-Request": value}) + + def set_scenario_name_header(self, scenario_name: str) -> None: + """Update request headers for subsequent API calls.""" + self._headers.update({"X-Scenario-Name": scenario_name}) + + def __call__(self, context: Any, proxies: Optional[Dict[str, str]] = None) -> AuthenticatedClient: + """Return a configured client with the current headers.""" + return self._client.with_headers(self._headers) + + +@pytest.fixture(scope="function") +def http_client(monkeypatch, backend_base_url: str, api_token: str) -> ClientProviderWithHeaderInjection: + """Create and configure an HTTP client for API testing. + + Returns: + A wrapper for the Neptune API client with header management + """ + never_expiring_token = OAuthToken(access_token="x", refresh_token="x", expiration_time=time.time() + 10_000_000) + patched_client = AuthenticatedClient( + base_url=backend_base_url, + credentials=Credentials.from_api_key(api_token), + client_id="", + token_refreshing_endpoint="", + api_key_exchange_callback=lambda _client, _credentials: never_expiring_token, + verify_ssl=False, + httpx_args={"http2": False}, + timeout=httpx.Timeout(timeout=HTTP_CLIENT_TIMEOUT), + headers={"User-Agent": "Neptune-Performance-Test/1.0"}, + ) + + client_provider = ClientProviderWithHeaderInjection(patched_client) + + monkeypatch.setattr(client, "get_client", client_provider) + + return client_provider + + +def resolve_timeout(default_seconds: float) -> float: + test_mode = os.environ.get("NEPTUNE_PERFORMANCE_TEST_MODE", "normal") + if test_mode == "baseline_discovery": + return 3_600.0 # 1 hour for baseline discovery + + tolerance = float(os.environ.get("NEPTUNE_PERFORMANCE_TEST_TOLERANCE_FACTOR", 1.1)) + + return default_seconds * tolerance + + +@pytest.hookimpl(hookwrapper=True) +def pytest_runtest_makereport(item, call): + # Let pytest create the report first + outcome = yield + rep = outcome.get_result() + + # Only act after the test body ran + if rep.when != "call": + return + # if not item.config.getoption("--df-summary"): + # return + + # Look for properties added via record_property(...) + recorded_properties = dict(rep.user_properties or []) + + # Write via terminal reporter to bypass capture + tr = item.config.pluginmanager.get_plugin("terminalreporter") + if tr is not None: + if "dataframe_memory_usage" in recorded_properties: + tr.write(f"df_mem_usage={naturalsize(recorded_properties['dataframe_memory_usage'])} ") + tr.write(f"duration={rep.duration:.3f}s ") diff --git a/tests/performance_e2e/perf_e2e_test_fetch_metrics.py b/tests/performance_e2e/perf_e2e_test_fetch_metrics.py new file mode 100644 index 00000000..00327dda --- /dev/null +++ b/tests/performance_e2e/perf_e2e_test_fetch_metrics.py @@ -0,0 +1,647 @@ +from dataclasses import dataclass +from typing import ( + Any, + Union, +) + +import pytest +from humanize import metric + +from neptune_query import fetch_metrics +from tests.performance_e2e.conftest import resolve_timeout +from tests.performance_e2e.test_helpers import PerfRequestBuilder + + +@dataclass +class Scenario: + id: int + # Total number of experiments matching user's filter + experiments_count: int + # Total number of attribute definitions in selected experiments matching user's filter + attribute_definitions_count: int + # The chance that a particular (experiment, metric) pair exists + metric_existence_probability: float + # A range for the number of steps per metric (uniformly distributed); + # If a single int is provided, all metrics have that many steps + steps_count_per_metric: Union[int, tuple[int, int]] + + # Represents exact or approximate expected number of data points / rows / columns + # It's a sanity check to ensure we fetched what we wanted + expected_points: Union[int, Any] + expected_columns: Union[int, Any] + expected_rows: Union[int, Any] + + @property + def steps_range_per_metric(self): + return ( + self.steps_count_per_metric + if isinstance(self.steps_count_per_metric, tuple) + else (self.steps_count_per_metric, self.steps_count_per_metric) + ) + + @property + def name(self): + steps = (self.steps_range_per_metric[0] + self.steps_range_per_metric[1]) / 2 + points = self.expected_points if isinstance(self.expected_points, int) else self.expected_points.expected + density = self.metric_existence_probability + + return "; ".join( + f"{key}={value}" + for key, value in { + "id": self.id, + "exp_count": metric(self.experiments_count, precision=0), + "attr_count": metric(self.attribute_definitions_count, precision=0), + "density": f"{density:.0%}" if density >= 0.01 else f"{density:.2%}", + "avg_steps": metric(steps, precision=0), + "points": metric(points, precision=0), + }.items() + ) + + def to_pytest_param(self, timeout: float): + return pytest.param(self, id=self.name, marks=pytest.mark.timeout(resolve_timeout(timeout), func_only=True)) + + +@pytest.mark.parametrize( + "scenario", + [ + # ######################## + # 1 experiment, 1 metric # + # ######################## + # 1M steps + Scenario( + id=1, + experiments_count=1, + attribute_definitions_count=1, + metric_existence_probability=1.0, + steps_count_per_metric=1_000_000, + expected_points=1_000_000, + expected_columns=1, + expected_rows=1_000_000, + ).to_pytest_param(timeout=3.771), + # 10M steps + Scenario( + id=2, + experiments_count=1, + attribute_definitions_count=1, + metric_existence_probability=1.0, + steps_count_per_metric=10_000_000, + expected_points=10_000_000, + expected_columns=1, + expected_rows=10_000_000, + ).to_pytest_param(timeout=41.722), + # ############################ + # 1 experiment, 100k metrics # + # ############################ + # 1 step + Scenario( + id=3, + experiments_count=1, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=100_000, + expected_columns=100_000, + expected_rows=1, + ).to_pytest_param(timeout=6.297), + # 10 steps + Scenario( + id=4, + experiments_count=1, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=1_000_000, + expected_columns=100_000, + expected_rows=10, + ).to_pytest_param(timeout=7.529), + # 100 steps + Scenario( + id=5, + experiments_count=1, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=10_000_000, + expected_columns=100_000, + expected_rows=100, + ).to_pytest_param(timeout=26.463), + # 1k steps + Scenario( + id=6, + experiments_count=1, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=1_000, + expected_points=100_000_000, + expected_columns=100_000, + expected_rows=1_000, + ).to_pytest_param(timeout=223.119), + # ########################## + # 1 experiment, 1M metrics # + # ########################## + # 1 step + Scenario( + id=7, + experiments_count=1, + attribute_definitions_count=1_000_000, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=1_000_000, + expected_columns=1_000_000, + expected_rows=1, + ).to_pytest_param(timeout=67.945), + # 10 steps + Scenario( + id=8, + experiments_count=1, + attribute_definitions_count=1_000_000, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=10_000_000, + expected_columns=1_000_000, + expected_rows=10, + ).to_pytest_param(timeout=86.696), + # 100 steps + Scenario( + id=9, + experiments_count=1, + attribute_definitions_count=1_000_000, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=100_000_000, + expected_columns=1_000_000, + expected_rows=100, + ).to_pytest_param(timeout=283.399), + # ############################################################### + # 2 experiments, 50k metrics per experiment (75k metrics total) # + # ############################################################### + # 1 step + Scenario( + id=10, + experiments_count=2, + attribute_definitions_count=100_000, + metric_existence_probability=0.5, + steps_count_per_metric=1, + expected_points=pytest.approx(100_000, rel=0.05), + expected_columns=pytest.approx(75_000, rel=0.05), + expected_rows=2, + ).to_pytest_param(timeout=7.268), + # 10 steps + Scenario( + id=11, + experiments_count=2, + attribute_definitions_count=100_000, + metric_existence_probability=0.5, + steps_count_per_metric=10, + expected_points=pytest.approx(1_000_000, rel=0.05), + expected_columns=pytest.approx(75_000, rel=0.05), + expected_rows=20, + ).to_pytest_param(timeout=10.095), + # 100 steps + Scenario( + id=12, + experiments_count=2, + attribute_definitions_count=100_000, + metric_existence_probability=0.5, + steps_count_per_metric=100, + expected_points=pytest.approx(10_000_000, rel=0.05), + expected_columns=pytest.approx(75_000, rel=0.05), + expected_rows=200, + ).to_pytest_param(timeout=28.551), + # 500 steps + Scenario( + id=13, + experiments_count=2, + attribute_definitions_count=100_000, + metric_existence_probability=0.5, + steps_count_per_metric=500, + expected_points=pytest.approx(50_000_000, rel=0.05), + expected_columns=pytest.approx(75_000, rel=0.05), + expected_rows=1_000, + ).to_pytest_param(timeout=104.910), + # ################################################################ + # 10 experiments, 10k metrics per experiment (65k metrics total) # + # ################################################################ + # 1 step + Scenario( + id=14, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=0.1, + steps_count_per_metric=1, + expected_points=pytest.approx(100_000, rel=0.05), + expected_columns=pytest.approx(65_000, rel=0.05), + expected_rows=10, + ).to_pytest_param(timeout=31.388), + # 10 steps + Scenario( + id=15, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=0.1, + steps_count_per_metric=10, + expected_points=pytest.approx(1_000_000, rel=0.05), + expected_columns=pytest.approx(65_000, rel=0.05), + expected_rows=100, + ).to_pytest_param(timeout=33.331), + # 100 steps + Scenario( + id=16, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=0.1, + steps_count_per_metric=100, + expected_points=pytest.approx(10_000_000, rel=0.05), + expected_columns=pytest.approx(65_000, rel=0.05), + expected_rows=1_000, + ).to_pytest_param(timeout=54.590), + # 1k steps + Scenario( + id=17, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=0.1, + steps_count_per_metric=1_000, + expected_points=pytest.approx(100_000_000, rel=0.05), + expected_columns=pytest.approx(65_000, rel=0.05), + expected_rows=10_000, + ).to_pytest_param(timeout=235.017), + # ################################################################## + # 10 experiments, 100k metrics per experiment (100k metrics total) # + # ################################################################## + # 1 step + Scenario( + id=18, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=pytest.approx(1_000_000, rel=0.05), + expected_columns=pytest.approx(100_000, rel=0.05), + expected_rows=10, + ).to_pytest_param(timeout=35.459), + # 10 steps + Scenario( + id=19, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=pytest.approx(10_000_000, rel=0.05), + expected_columns=pytest.approx(100_000, rel=0.05), + expected_rows=100, + ).to_pytest_param(timeout=52.231), + # 100 steps + Scenario( + id=20, + experiments_count=10, + attribute_definitions_count=100_000, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=pytest.approx(100_000_000, rel=0.05), + expected_columns=pytest.approx(100_000, rel=0.05), + expected_rows=1_000, + ).to_pytest_param(timeout=244.418), + # ################################################################## + # ############# Doesn't work - too slow ##################### + # # 1k experiments, 1k metrics per experiment (100k metrics total) # + # ################################################################## + # ## 1 step + # Scenario( + # id=21, + # experiments_count=1_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.01, + # steps_count_per_metric=1, + # expected_points=pytest.approx(1_000_000, rel=0.05), + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=1_000, + # ).to_pytest_param(timeout=600), + # ## 10 steps + # Scenario( + # id=22, + # experiments_count=1_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.01, + # steps_count_per_metric=10, + # expected_points=pytest.approx(10_000_000, rel=0.05), + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=10_000, + # ).to_pytest_param(timeout=600), + # ## 100 steps + # Scenario( + # id=23, + # experiments_count=10, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.01, + # steps_count_per_metric=100, + # expected_points=pytest.approx(100_000_000, rel=0.05), + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=100_000, + # ).to_pytest_param(timeout=600), + # ############################################################## + # 1k experiments, 1k metrics per experiment (1k metrics total) # + # ############################################################## + # 1 step + Scenario( + id=24, + experiments_count=1_000, + attribute_definitions_count=1_000, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=1_000_000, + expected_columns=1_000, + expected_rows=1_000, + ).to_pytest_param(timeout=31.243), + # 10 steps + Scenario( + id=25, + experiments_count=1_000, + attribute_definitions_count=1_000, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=10_000_000, + expected_columns=1_000, + expected_rows=10_000, + ).to_pytest_param(timeout=45.729), + # 100 steps + Scenario( + id=26, + experiments_count=1_000, + attribute_definitions_count=1_000, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=100_000_000, + expected_columns=1_000, + expected_rows=100_000, + ).to_pytest_param(timeout=228.996), + # ################################################################ + # 1k experiments, 10k metrics per experiment (10k metrics total) # + # ################################################################ + # 1 step + Scenario( + id=27, + experiments_count=1_000, + attribute_definitions_count=10_000, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=10_000_000, + expected_columns=10_000, + expected_rows=1_000, + ).to_pytest_param(timeout=293.297), + # 10 steps + Scenario( + id=28, + experiments_count=1_000, + attribute_definitions_count=10_000, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=100_000_000, + expected_columns=10_000, + expected_rows=10_000, + ).to_pytest_param(timeout=515.5656), + # ################################################################### + # ############# Doesn't work - too slow ###################### + # # 1k experiments, 10k metrics per experiment (100k metrics total) # + # ################################################################### + # ## 1 step + # Scenario( + # id=29, + # experiments_count=1_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.1, + # steps_count_per_metric=1, + # expected_points=10_000_000, + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=1_000, + # ).to_pytest_param(timeout=600), + # ## 10 steps + # Scenario( + # id=30, + # experiments_count=1_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.1, + # steps_count_per_metric=10, + # expected_points=100_000_000, + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=10_000, + # ).to_pytest_param(timeout=600), + # #################################################################### + # ############# Doesn't work - too slow ####################### + # # 1k experiments, 100k metrics per experiment (100k metrics total) # + # #################################################################### + # ## 1 step + # Scenario( + # id=31, + # experiments_count=1_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=1.0, + # steps_count_per_metric=1, + # expected_points=100_000_000, + # expected_columns=100_000, + # expected_rows=1_000, + # ).to_pytest_param(timeout=600), + # ################################################################### + # ############# Doesn't work - too slow ###################### + # # 10k experiments, 1k metrics per experiment (100k metrics total) # + # ################################################################### + # ## 1 step + # Scenario( + # id=32, + # experiments_count=10_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.01, + # steps_count_per_metric=1, + # expected_points=10_000_000, + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=10_000, + # ).to_pytest_param(timeout=600), + # ## 10 steps + # Scenario( + # id=33, + # experiments_count=10_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.01, + # steps_count_per_metric=10, + # expected_points=100_000_000, + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=100_000, + # ).to_pytest_param(timeout=600), + # #################################################################### + # # ############# Doesn't work - too slow ##################### + # # 10k experiments, 10k metrics per experiment (100k metrics total) # + # #################################################################### + # ## 1 step + # Scenario( + # id=34, + # experiments_count=10_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.1, + # steps_count_per_metric=1, + # expected_points=100_000_000, + # expected_columns=pytest.approx(100_000, rel=0.05), + # expected_rows=10_000, + # ).to_pytest_param(timeout=600), + # ############################################################### + # 100k experiments, 10 metrics per experiment (10 metric total) # + # ############################################################### + # 1 step + Scenario( + id=35, + experiments_count=100_000, + attribute_definitions_count=10, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=1_000_000, + expected_columns=pytest.approx(10, rel=0.05), + expected_rows=100_000, + ).to_pytest_param(timeout=30.7131), + # 10 steps + Scenario( + id=36, + experiments_count=100_000, + attribute_definitions_count=10, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=10_000_000, + expected_columns=pytest.approx(10, rel=0.05), + expected_rows=1_000_000, + ).to_pytest_param(timeout=46.860), + # 100 steps + Scenario( + id=37, + experiments_count=100_000, + attribute_definitions_count=10, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=100_000_000, + expected_columns=pytest.approx(10, rel=0.05), + expected_rows=10_000_000, + ).to_pytest_param(timeout=232.357), + # ################################################################# + # # ############# Doesn't work - too slow ################## + # # 100k experiments, 1 metric per experiment (63k metric total) # + # ################################################################# + # ## 1 step + # Scenario( + # id=38, + # experiments_count=100_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.00001, + # steps_count_per_metric=1, + # expected_points=100_000, + # expected_columns=pytest.approx(63_000, rel=0.05), + # expected_rows=pytest.approx(74_000, rel=0.05), + # ).to_pytest_param(timeout=600), + # ## 10 steps + # Scenario( + # id=39, + # experiments_count=100_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.00001, + # steps_count_per_metric=10, + # expected_points=1_000_000, + # expected_columns=pytest.approx(63_000, rel=0.05), + # expected_rows=pytest.approx(74_000, rel=0.05), + # ).to_pytest_param(timeout=600), + # ## 100 steps + # Scenario( + # id=40, + # experiments_count=100_000, + # attribute_definitions_count=100_000, + # metric_existence_probability=0.00001, + # steps_count_per_metric=100, + # expected_points=100_000_000, + # expected_columns=pytest.approx(63_000, rel=0.05), + # expected_rows=pytest.approx(74_000, rel=0.05), + # ).to_pytest_param(timeout=600), + # ############################################################## + # 1M experiments, 1 metric per experiment (1 metric total) # + # ############################################################## + # 1 step + Scenario( + id=41, + experiments_count=1_000_000, + attribute_definitions_count=1, + metric_existence_probability=1.0, + steps_count_per_metric=1, + expected_points=1_000_000, + expected_columns=1, + expected_rows=1_000_000, + ).to_pytest_param(timeout=39.247), + # 10 steps + Scenario( + id=42, + experiments_count=1_000_000, + attribute_definitions_count=1, + metric_existence_probability=1.0, + steps_count_per_metric=10, + expected_points=10_000_000, + expected_columns=1, + expected_rows=10_000_000, + ).to_pytest_param(timeout=61.948), + # 100 steps + Scenario( + id=43, + experiments_count=1_000_000, + attribute_definitions_count=1, + metric_existence_probability=1.0, + steps_count_per_metric=100, + expected_points=100_000_000, + expected_columns=1, + expected_rows=100_000_000, + ).to_pytest_param(timeout=275.968), + # ################################################################ + # # # ############# Doesn't work - too slow ############### + # # 1M experiments, 100 metric per experiment (100 metric total) # + # ################################################################ + # ## 1 step + # Scenario( + # id=44, + # experiments_count=1_000_000, + # attribute_definitions_count=100, + # metric_existence_probability=1.0, + # steps_count_per_metric=1, + # expected_points=100_000_000, + # expected_columns=100, + # expected_rows=1_000_000, + # ).to_pytest_param(timeout=600), + ], +) +def test_fetch_metrics(scenario, http_client, record_property): + perf_request = ( + PerfRequestBuilder() + .with_search_leaderboard_entries( + attributes={"sys/name": "string", "sys/id": "string"}, + total_entries=scenario.experiments_count, + latency_range_ms=(20, 30), + ) + .with_query_attribute_definitions( + seed=42, + attribute_types=["float_series"], + total_definitions=scenario.attribute_definitions_count, + latency_range_ms=(20, 30), + ) + .with_multiple_float_series_values( + seed=42, + existence_probability=scenario.metric_existence_probability, + series_cardinality_policy="uniform", + series_cardinality_uniform_range=scenario.steps_range_per_metric, + latency_range_ms=(10, 20), + ) + ) + + http_client.set_x_perf_request_header(value=perf_request.build()) + http_client.set_scenario_name_header(scenario_name=scenario.name) + + metrics_df = fetch_metrics( + project="workspace/project", + experiments=".*", + attributes=".*", + ) + + record_property("dataframe_memory_usage", metrics_df.memory_usage(deep=True).sum()) + + non_nan_values = metrics_df.count().sum() + assert non_nan_values == scenario.expected_points + assert metrics_df.shape == (scenario.expected_rows, scenario.expected_columns) diff --git a/tests/performance_e2e/perf_e2e_test_list_experiments.py b/tests/performance_e2e/perf_e2e_test_list_experiments.py new file mode 100644 index 00000000..6ec1b977 --- /dev/null +++ b/tests/performance_e2e/perf_e2e_test_list_experiments.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass + +import pytest +from humanize import metric + +from neptune_query import list_experiments +from tests.performance_e2e.conftest import resolve_timeout +from tests.performance_e2e.test_helpers import PerfRequestBuilder + + +@dataclass +class Scenario: + id: int + experiments_count: int + latency_range_ms: tuple[int, int] + + @property + def name(self): + return "; ".join( + f"{key}={value}" + for key, value in { + "id": self.id, + "exp_count": metric(self.experiments_count, precision=0), + "latency_range_ms": self.latency_range_ms, + }.items() + ) + + def to_pytest_param(self, timeout: float): + return pytest.param(self, id=self.name, marks=pytest.mark.timeout(resolve_timeout(timeout), func_only=True)) + + +@pytest.mark.parametrize( + "scenario", + [ + Scenario( + id=1, + experiments_count=29_000, + latency_range_ms=(500, 500), + ).to_pytest_param(timeout=1.617), + Scenario( + id=2, + experiments_count=1_000_000, + latency_range_ms=(30, 30), + ).to_pytest_param(timeout=10.103), + ], +) +def test_list_experiments(scenario, http_client): + perf_request = PerfRequestBuilder().with_search_leaderboard_entries( + attributes={"sys/name": "string", "sys/id": "string"}, + total_entries=scenario.experiments_count, + latency_range_ms=scenario.latency_range_ms, + ) + + http_client.set_x_perf_request_header(value=perf_request.build()) + http_client.set_scenario_name_header(scenario_name=scenario.name) + + experiments = list_experiments( + project="workspace/project", + experiments=None, + ) + + assert len(experiments) == scenario.experiments_count diff --git a/tests/performance_e2e/test_helpers.py b/tests/performance_e2e/test_helpers.py new file mode 100644 index 00000000..28a0b416 --- /dev/null +++ b/tests/performance_e2e/test_helpers.py @@ -0,0 +1,108 @@ +from typing import Optional + +from tests.performance_e2e.backend.perf_request import ( + FloatTimeSeriesValuesConfig, + LatencyConfig, + PerfRequestConfig, + QueryAttributeDefinitionsConfig, + SearchLeaderboardEntriesConfig, +) + + +class PerfRequestBuilder: + """Helper for building X-Perf-Request headers in test cases.""" + + def __init__(self): + self._config = PerfRequestConfig() + + def with_search_leaderboard_entries( + self, + attributes: dict[str, str], + total_entries: int, + latency_range_ms: Optional[tuple[float, float]] = None, + ) -> "PerfRequestBuilder": + """Configure the search_leaderboard_entries endpoint. + + Args: + attributes: Dict mapping attribute names to their types + total_entries: Total number of entries to return across pagination + latency_range_ms: Optional min/max latency in ms for this endpoint + """ + latency = LatencyConfig(min_ms=latency_range_ms[0], max_ms=latency_range_ms[1]) if latency_range_ms else None + self._config.add_endpoint_config( + path="/api/leaderboard/v1/proto/leaderboard/entries/search/", + method="POST", + config=SearchLeaderboardEntriesConfig( + latency=latency, requested_attributes=attributes, total_entries_count=total_entries + ), + ) + return self + + def with_query_attribute_definitions( + self, + seed: int, + attribute_types: list[str], + total_definitions: int, + latency_range_ms: Optional[tuple[float, float]] = None, + ) -> "PerfRequestBuilder": + """Configure the query_attribute_definitions endpoint. + + Args: + attribute_types: List of attribute types to include in the response + total_definitions: Total number of attribute definitions to return across pagination + latency_range_ms: Optional min/max latency in ms for this endpoint + """ + latency = LatencyConfig(min_ms=latency_range_ms[0], max_ms=latency_range_ms[1]) if latency_range_ms else None + self._config.add_endpoint_config( + path="/api/leaderboard/v1/leaderboard/attributes/definitions/query", + method="POST", + config=QueryAttributeDefinitionsConfig( + latency=latency, + seed=seed, + attribute_types=attribute_types, + total_definitions_count=total_definitions, + ), + ) + return self + + def with_multiple_float_series_values( + self, + seed: int, + existence_probability: float, + series_cardinality_policy: str, + series_cardinality_uniform_range: tuple[int, int] = None, + series_cardinality_buckets: list[tuple[float, float]] = None, + latency_range_ms: Optional[tuple[float, float]] = None, + ) -> "PerfRequestBuilder": + """Configure the get_multiple_float_series_values endpoint. + + Args: + existence_probability: Probability that a requested series exists + seed: Seed for deterministic point generation + series_cardinality_policy: Policy for determining series cardinality ("uniform" or "bucketed") + series_cardinality_uniform_range: For "uniform" policy: (min_points, max_points) + series_cardinality_buckets: For "bucketed" policy: list of (probability, num_points) tuples + latency_range_ms: Optional min/max latency in ms for this endpoint + + Returns: + Self for chaining + """ + + latency = LatencyConfig(min_ms=latency_range_ms[0], max_ms=latency_range_ms[1]) if latency_range_ms else None + + self._config.add_endpoint_config( + path="/api/leaderboard/v1/proto/attributes/series/float", + method="POST", + config=FloatTimeSeriesValuesConfig( + latency=latency, + seed=seed, + existence_probability=existence_probability, + series_cardinality_policy=series_cardinality_policy, + series_cardinality_uniform_range=series_cardinality_uniform_range, + series_cardinality_buckets=series_cardinality_buckets, + ), + ) + return self + + def build(self) -> str: + return self._config.to_json() diff --git a/tests/unit/internal/test_output_format.py b/tests/unit/internal/test_output_format.py index 0d1141a9..755d3127 100644 --- a/tests/unit/internal/test_output_format.py +++ b/tests/unit/internal/test_output_format.py @@ -1405,7 +1405,7 @@ def test_fetch_metrics_duplicate_values(include_time): # when with ( - patch("neptune_query.internal.composition.fetch_metrics.get_client") as get_client, + patch("neptune_query.internal.composition.fetch_metrics._client"), patch("neptune_query.internal.retrieval.search.fetch_experiment_sys_attrs") as fetch_experiment_sys_attrs, patch( "neptune_query.internal.retrieval.attribute_definitions.fetch_attribute_definitions_single_filter" @@ -1414,7 +1414,6 @@ def test_fetch_metrics_duplicate_values(include_time): "neptune_query.internal.composition.fetch_metrics.fetch_multiple_series_values" ) as fetch_multiple_series_values, ): - get_client.return_value = None fetch_experiment_sys_attrs.return_value = iter([util.Page(experiments)]) fetch_attribute_definitions_single_filter.side_effect = lambda **kwargs: iter([util.Page(attributes)]) fetch_multiple_series_values.return_value = series_values diff --git a/tests/unit/v1/test_split.py b/tests/unit/v1/test_split.py index 64241269..8a14e293 100644 --- a/tests/unit/v1/test_split.py +++ b/tests/unit/v1/test_split.py @@ -292,7 +292,7 @@ def test_fetch_metrics_patched(sys_id_length, exp_count, attr_name_length, attr_ # when with ( - patch("neptune_query.internal.composition.fetch_metrics.get_client") as get_client, + patch("neptune_query.internal.composition.fetch_metrics._client"), patch("neptune_query.internal.retrieval.search.fetch_experiment_sys_attrs") as fetch_experiment_sys_attrs, patch( "neptune_query.internal.retrieval.attribute_definitions.fetch_attribute_definitions_single_filter" @@ -301,7 +301,6 @@ def test_fetch_metrics_patched(sys_id_length, exp_count, attr_name_length, attr_ "neptune_query.internal.composition.fetch_metrics.fetch_multiple_series_values" ) as fetch_multiple_series_values, ): - get_client.return_value = None fetch_experiment_sys_attrs.return_value = iter([util.Page(experiments)]) fetch_attribute_definitions_single_filter.side_effect = lambda **kwargs: iter([util.Page(attributes)]) fetch_multiple_series_values.return_value = {}