Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions .github/workflows/tests-performance-client-e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
name: Client e2e performance tests

permissions:
checks: write

on:
schedule:
- cron: '0 7 * * *' # Run at 7:00 daily
workflow_dispatch:
inputs:
test_mode:
description: Test mode (use baseline_discovery to run without timeouts on test cases)
required: true
type: choice
options:
- normal
- baseline_discovery
default: normal
timeout_tolerance:
description: Timeout tolerance in as float; e.g. 1.2 means 20% tolerance
required: false
default: 1.1

jobs:
test:
name: 'Performance tests'
runs-on: gcp-perf-test-dedicated-big
container:
image: python:3.13-trixie
timeout-minutes: 120
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Install dependencies
run: |
chown root:root . &&
python -m pip install --upgrade pip &&
pip install -r dev_requirements.txt

- name: Run performance tests
env:
NEPTUNE_PERFORMANCE_LOG_FILE: "test-results/test-client-e2e-performance.log"
NEPTUNE_PERFORMANCE_TEST_MODE: ${{ inputs.test_mode || 'normal' }}
NEPTUNE_PERFORMANCE_TEST_TOLERANCE_FACTOR: 1.1
run: |
pytest --junitxml="test-results/test-client-e2e-performance.xml" --log-cli-level=INFO --durations=0 tests/performance_e2e

- uses: actions/upload-artifact@v4
with:
name: test-client-e2e-performance.log
path: test-results/test-client-e2e-performance.log

- name: Report
uses: mikepenz/action-junit-report@v5
if: always()
with:
check_name: 'Performance tests report'
report_paths: "./test-results/test-client-e2e-performance.xml"
detailed_summary: true
verbose_summary: true
include_passed: true
include_time_in_summary: true
resolve_ignore_classname: true
4 changes: 4 additions & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ pytest-timeout
pytest-xdist
icecream
git+https://github.com/neptune-ai/neptune-client-scale.git@main#egg=neptune-scale
fastapi == 0.116.2
uvicorn == 0.35.0
humanize == 4.13.0
colorlog == 6.9.0
hypothesis==6.139.2
4 changes: 2 additions & 2 deletions src/neptune_query/internal/composition/fetch_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import pandas as pd
from neptune_api.client import AuthenticatedClient

from .. import client as _client
from .. import identifiers
from ..client import get_client
from ..composition import (
concurrency,
type_inference,
Expand Down Expand Up @@ -74,7 +74,7 @@ def fetch_metrics(
restricted_attributes = validation.restrict_attribute_filter_type(attributes, type_in={"float_series"})

valid_context = validate_context(context or get_context())
client = get_client(context=valid_context)
client = _client.get_client(context=valid_context)
Comment on lines 26 to 77
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert? Unrelated?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related & required. I'm currently mocking get_client in perf tests in order to inject the x-perf-request header.


with (
concurrency.create_thread_pool_executor() as executor,
Expand Down
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
"""
Endpoint for handling multiple float series values requests.
"""
import hashlib
import json
import math
import random
from typing import (
Final,
Optional,
)

from fastapi import (
APIRouter,
Request,
Response,
)
from neptune_api.models.float_time_series_values_request import FloatTimeSeriesValuesRequest
from neptune_api.proto.protobuf_v4plus.neptune_pb.api.v1.model.series_values_pb2 import (
ProtoFloatSeriesValuesResponseDTO,
)

from tests.performance_e2e.backend.middleware.read_perf_config_middleware import PERF_REQUEST_CONFIG_ATTRIBUTE_NAME
from tests.performance_e2e.backend.perf_request import FloatTimeSeriesValuesConfig
from tests.performance_e2e.backend.utils.exceptions import MalformedRequestError
from tests.performance_e2e.backend.utils.hashing_utils import hash_to_uniform_0_1
from tests.performance_e2e.backend.utils.logging import (
map_unset_to_none,
setup_logger,
)
from tests.performance_e2e.backend.utils.metrics import RequestMetrics
from tests.performance_e2e.backend.utils.timing import Timer

# Path without /api prefix since we're mounting under /api in the main app
GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH: Final[str] = "/leaderboard/v1/proto/attributes/series/float"
# Path used for configuration matching (with /api prefix for backward compatibility)
GET_MULTIPLE_FLOAT_SERIES_VALUES_CONFIG_PATH: Final[str] = "/api" + GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH

logger = setup_logger("get_multiple_float_series_values")

router = APIRouter()


def _compute_series_cardinality(
config: FloatTimeSeriesValuesConfig,
experiment_id: str,
attribute_def: str,
) -> int:
"""Compute the number of points for a series based on configuration and series identity.

Args:
config: Endpoint configuration
experiment_id: Experiment ID
attribute_def: Attribute definition path

Returns:
Number of points to generate for this series
"""
hash_value = hash_to_uniform_0_1(experiment_id, attribute_def, config.seed)

if config.series_cardinality_policy == "uniform":
if not config.series_cardinality_uniform_range:
raise ValueError("Missing uniform range for uniform cardinality policy")

min_points, max_points = config.series_cardinality_uniform_range
# Use the hash to determine the number of points
points = min_points + math.floor(hash_value * (max_points - min_points + 1))
return points

elif config.series_cardinality_policy == "bucketed":
if not config.series_cardinality_buckets:
raise ValueError("Missing buckets for bucketed cardinality policy")

# Normalize the bucket probabilities
total_prob = sum(prob for prob, _ in config.series_cardinality_buckets)
normalized_buckets = [(prob / total_prob, points) for prob, points in config.series_cardinality_buckets]

# Use the hash to select a bucket
cumulative_prob = 0
for prob, points in normalized_buckets:
cumulative_prob += prob
if hash_value <= cumulative_prob:
return int(points)

# Fallback to last bucket
return int(normalized_buckets[-1][1])

else:
raise ValueError(f"Unknown cardinality policy: {config.series_cardinality_policy}")


def _generate_series_values(
series_cardinality: int, after_step: Optional[float], max_values: int
) -> list[tuple[float, float, float]]: # (timestamp in seconds, step, value)
"""Generate time series values for a specific series.

Returns:
List of (timestamp, step, value) tuples
"""

initial_step = 1 if after_step is None else (after_step + 1)
initial_timestamp = 1600000000 # some time in the year 2020
total_remaining_steps = series_cardinality - (initial_step - 1)
steps_in_current_request = min(total_remaining_steps, max_values)
max_step = initial_step + steps_in_current_request - 1
step_range = range(int(initial_step), int(max_step) + 1)
return [(initial_timestamp + step, step, random.uniform(-1e6, 1e6)) for step in step_range]


def _build_float_series_response(
parsed_request: FloatTimeSeriesValuesRequest,
endpoint_config: FloatTimeSeriesValuesConfig,
) -> ProtoFloatSeriesValuesResponseDTO:
"""Build a response for the float series values request.

Args:
parsed_request: Parsed request object
endpoint_config: Endpoint configuration

Returns:
Protobuf response object
"""
response = ProtoFloatSeriesValuesResponseDTO()

# Process each series request
for series_req in parsed_request.requests:
request_id = series_req.request_id

# Extract experiment_id and attribute_name from TimeSeries
experiment_id = series_req.series.holder.identifier
attribute_name = series_req.series.attribute
after_step = map_unset_to_none(series_req.after_step)

# Check if this series exists based on probability
# Always use the seed from the perf config for consistent hashing
series_hash = hashlib.md5(f"{experiment_id}:{attribute_name}:{endpoint_config.seed}".encode()).hexdigest()
hash_value = int(series_hash, 16) / (2**128 - 1)

# Create a series entry in the response
series_dto = response.series.add()
series_dto.requestId = request_id

# Skip generating points if the series doesn't exist according to probability
if hash_value > endpoint_config.existence_probability:
continue

# Determine how many points to generate
series_cardinality = _compute_series_cardinality(endpoint_config, experiment_id, attribute_name)
# Limit points by the per_series_points_limit

# Generate series values
values = _generate_series_values(
series_cardinality=series_cardinality,
after_step=after_step,
max_values=parsed_request.per_series_points_limit,
)

# Add values to the response
for timestamp, step, value in values:
point = series_dto.series.values.add()
point.timestamp_millis = int(timestamp * 1000) # Convert to milliseconds
point.step = step
point.value = value
point.is_preview = False
point.completion_ratio = 1.0

return response


@router.post(GET_MULTIPLE_FLOAT_SERIES_VALUES_ENDPOINT_PATH)
async def get_multiple_float_series_values(request: Request) -> Response:
"""Handle requests for multiple float series values."""
metrics: RequestMetrics = request.state.metrics

try:
# Parse request body
with Timer() as parsing_timer:
raw_body = await request.body()
request_dict = json.loads(raw_body)
parsed_request = FloatTimeSeriesValuesRequest.from_dict(request_dict)

# Get the configuration from middleware
perf_config = getattr(request.state, PERF_REQUEST_CONFIG_ATTRIBUTE_NAME, None)
if not perf_config:
logger.error("No performance_e2e configuration found")
raise MalformedRequestError("Missing or invalid X-Perf-Request header")

# Get endpoint-specific configuration using the config path (with /api prefix)
endpoint_config = perf_config.get_endpoint_config(GET_MULTIPLE_FLOAT_SERIES_VALUES_CONFIG_PATH, "POST")
if not endpoint_config or not isinstance(endpoint_config, FloatTimeSeriesValuesConfig):
logger.warning("Missing configuration for this endpoint")
raise MalformedRequestError("Missing configuration for this endpoint")

metrics.parse_time_ms = parsing_timer.time_ms

# Log request details
logger.info(
f"Processing: num_series={len(parsed_request.requests)} "
f"per_series_limit={parsed_request.per_series_points_limit} "
f"existence_probability={endpoint_config.existence_probability} "
f"cardinality_policy={endpoint_config.series_cardinality_policy} "
f"seed={endpoint_config.seed} "
f"cardinality_uniform_range={endpoint_config.series_cardinality_uniform_range} "
f"cardinality_buckets={endpoint_config.series_cardinality_buckets}"
)

# Generate and return response
with Timer() as data_generation_timer:
# Build the response using the extracted function
response = _build_float_series_response(parsed_request, endpoint_config)

# Serialize the response
response_bytes = response.SerializeToString()

metrics.generation_time_ms = data_generation_timer.time_ms
metrics.returned_payload_size_bytes = len(response_bytes)

logger.info(
f"Generated response with {len(response.series)} series "
f"({sum(1 if s.series.values else 0 for s in response.series)} non-empty), "
f"{sum(len(s.series.values) for s in response.series)} total points "
f"in {data_generation_timer.time_ms:.2f}ms, "
f"size={metrics.returned_payload_size_bytes} bytes"
)

return Response(content=response_bytes, media_type="application/x-protobuf")

except MalformedRequestError as exc:
logger.error(f"Invalid request configuration: {str(exc)}")
return Response(
status_code=400,
content=ProtoFloatSeriesValuesResponseDTO().SerializeToString(),
media_type="application/x-protobuf",
)
except Exception as exc:
logger.exception(f"Unhandled exception during request processing: {exc}")
return Response(
content=ProtoFloatSeriesValuesResponseDTO().SerializeToString(),
media_type="application/x-protobuf",
status_code=500,
)
Loading
Loading