Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions snuba/web/rpc/storage_routing/routing_strategies/load_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import sentry_sdk

from snuba.configs.configuration import Configuration
from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import (
OutcomesBasedRoutingStrategy,
)
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
RoutingDecision,
)


class LoadBasedRoutingStrategy(OutcomesBasedRoutingStrategy):
"""
If cluster load is under a threshold, ignore recommendations and allow the query to pass through with the tier decided based on outcomes-based routing.
"""
Comment on lines +12 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this inheriting from OutcomesBasedRoutingStrategy? shouldn't it inherit BaseRoutingStrategy?

I think it's mixing concerns weirdly to make the load-based routing in any way aware or coupled to outcomes-based routing. Some third entity/module should chain the two together if that's necessary.


def _additional_config_definitions(self) -> list[Configuration]:
return [
Configuration(
name="pass_through_load_percentage",
description="If cluster load is below this percentage, allow the query to run regardless of allocation policies",
value_type=int,
default=20,
),
Configuration(
name="pass_through_max_threads",
description="Max threads to use when allowing the query to pass through under low load",
value_type=int,
default=10,
),
]

def _update_routing_decision(
self,
routing_decision: RoutingDecision,
) -> None:
super()._update_routing_decision(routing_decision)

load_info = routing_decision.routing_context.cluster_load_info
if load_info is None:
return

pass_through_threshold = int(self.get_config_value("pass_through_load_percentage"))
pass_through_max_threads = int(self.get_config_value("pass_through_max_threads"))

if load_info.cluster_load < pass_through_threshold:
routing_decision.can_run = True
routing_decision.is_throttled = False
routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads
routing_decision.routing_context.extra_info["load_based_pass_through"] = {
"threshold": pass_through_threshold,
"max_threads": pass_through_max_threads,
}
Comment on lines +46 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: New logic incorrectly allows queries to bypass allocation policies when cluster_load retrieval fails and returns -1.0.
Severity: CRITICAL | Confidence: 1.00

🔍 Detailed Analysis

When get_cluster_loadinfo() fails to retrieve cluster load information, it returns a LoadInfo object with cluster_load set to -1.0. The new code checks if load_info is None but does not account for this specific sentinel value. As a result, the condition load_info.cluster_load < pass_through_threshold evaluates to True (e.g., -1.0 < 20), causing queries to bypass all allocation policies and run, despite the unavailability of actual load data. This occurs silently without explicit error handling in the new logic.

💡 Suggested Fix

Modify the LoadBasedRoutingStrategy to explicitly check for the -1.0 sentinel value in load_info.cluster_load when determining if load information is available, or ensure load_info is None on failure.

🤖 Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: snuba/web/rpc/storage_routing/routing_strategies/load_based.py#L46-L53

Potential issue: When `get_cluster_loadinfo()` fails to retrieve cluster load
information, it returns a `LoadInfo` object with `cluster_load` set to `-1.0`. The new
code checks `if load_info is None` but does not account for this specific sentinel
value. As a result, the condition `load_info.cluster_load < pass_through_threshold`
evaluates to `True` (e.g., `-1.0 < 20`), causing queries to bypass all allocation
policies and run, despite the unavailability of actual load data. This occurs silently
without explicit error handling in the new logic.

Did we get this right? 👍 / 👎 to inform future reviews.

sentry_sdk.update_current_span( # pyright: ignore[reportUndefinedVariable]
attributes={
"load_based_pass_through": True,
"cluster_load": load_info.cluster_load,
"pass_through_threshold": pass_through_threshold,
"pass_through_max_threads": pass_through_max_threads,
}
)
109 changes: 109 additions & 0 deletions tests/web/rpc/v1/routing_strategies/test_load_based.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import uuid
from datetime import UTC, datetime, timedelta
from unittest.mock import patch

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType

from snuba.configs.configuration import Configuration, ResourceIdentifier
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.allocation_policies import (
MAX_THRESHOLD,
NO_SUGGESTION,
NO_UNITS,
AllocationPolicy,
QueryResultOrError,
QuotaAllowance,
)
from snuba.utils.metrics.timer import Timer
from snuba.web.rpc.storage_routing.load_retriever import LoadInfo
from snuba.web.rpc.storage_routing.routing_strategies.load_based import (
LoadBasedRoutingStrategy,
)
from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import (
BaseRoutingStrategy,
RoutingContext,
)

BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
_PROJECT_ID = 1
_ORG_ID = 1


def _get_request_meta(hour_interval: int = 1) -> RequestMeta:
start = BASE_TIME - timedelta(hours=hour_interval)
end = BASE_TIME
return RequestMeta(
project_ids=[_PROJECT_ID],
organization_id=_ORG_ID,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=int(start.timestamp())),
end_timestamp=Timestamp(seconds=int(end.timestamp())),
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
downsampled_storage_config=DownsampledStorageConfig(
mode=DownsampledStorageConfig.MODE_NORMAL
),
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_load_based_routing_pass_through_even_if_policies_reject() -> None:
# policy that always rejects (can_run=False)
class RejectAllPolicy(AllocationPolicy):
def _additional_config_definitions(self) -> list[Configuration]:
return []

def _get_quota_allowance(
self, tenant_ids: dict[str, str | int], query_id: str
) -> QuotaAllowance:
return QuotaAllowance(
can_run=False,
max_threads=0,
explanation={"reason": "reject all"},
is_throttled=True,
throttle_threshold=MAX_THRESHOLD,
rejection_threshold=MAX_THRESHOLD,
quota_used=0,
quota_unit=NO_UNITS,
suggestion=NO_SUGGESTION,
)

def _update_quota_balance(
self,
tenant_ids: dict[str, str | int],
query_id: str,
result_or_error: QueryResultOrError,
) -> None:
return

strategy = LoadBasedRoutingStrategy()
request = TraceItemTableRequest(meta=_get_request_meta(hour_interval=1))
context = RoutingContext(
in_msg=request,
timer=Timer("test"),
query_id=uuid.uuid4().hex,
)

with patch.object(
BaseRoutingStrategy,
"get_allocation_policies",
return_value=[
RejectAllPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["org_id"], {})
],
):
with patch(
"snuba.web.rpc.storage_routing.routing_strategies.storage_routing.get_cluster_loadinfo",
return_value=LoadInfo(cluster_load=5.0, concurrent_queries=1),
):
routing_decision = strategy.get_routing_decision(context)

assert routing_decision.can_run is True
assert routing_decision.clickhouse_settings.get("max_threads") == 10
assert "load_based_pass_through" in routing_decision.routing_context.extra_info
assert routing_decision.routing_context.cluster_load_info is not None
assert routing_decision.routing_context.cluster_load_info.cluster_load == 5.0
Loading