-
-
Notifications
You must be signed in to change notification settings - Fork 61
feat(cbrs): load based routing strategy #7519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| import sentry_sdk | ||
|
|
||
| from snuba.configs.configuration import Configuration | ||
| from snuba.web.rpc.storage_routing.routing_strategies.outcomes_based import ( | ||
| OutcomesBasedRoutingStrategy, | ||
| ) | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( | ||
| RoutingDecision, | ||
| ) | ||
|
|
||
|
|
||
| class LoadBasedRoutingStrategy(OutcomesBasedRoutingStrategy): | ||
| """ | ||
| If cluster load is under a threshold, ignore recommendations and allow the query to pass through with the tier decided based on outcomes-based routing. | ||
| """ | ||
|
|
||
| def _additional_config_definitions(self) -> list[Configuration]: | ||
| return [ | ||
| Configuration( | ||
| name="pass_through_load_percentage", | ||
| description="If cluster load is below this percentage, allow the query to run regardless of allocation policies", | ||
| value_type=int, | ||
| default=20, | ||
| ), | ||
| Configuration( | ||
| name="pass_through_max_threads", | ||
| description="Max threads to use when allowing the query to pass through under low load", | ||
| value_type=int, | ||
| default=10, | ||
| ), | ||
| ] | ||
|
|
||
| def _update_routing_decision( | ||
| self, | ||
| routing_decision: RoutingDecision, | ||
| ) -> None: | ||
| super()._update_routing_decision(routing_decision) | ||
|
|
||
| load_info = routing_decision.routing_context.cluster_load_info | ||
| if load_info is None: | ||
| return | ||
|
|
||
| pass_through_threshold = int(self.get_config_value("pass_through_load_percentage")) | ||
| pass_through_max_threads = int(self.get_config_value("pass_through_max_threads")) | ||
|
|
||
| if load_info.cluster_load < pass_through_threshold: | ||
| routing_decision.can_run = True | ||
| routing_decision.is_throttled = False | ||
| routing_decision.clickhouse_settings["max_threads"] = pass_through_max_threads | ||
| routing_decision.routing_context.extra_info["load_based_pass_through"] = { | ||
| "threshold": pass_through_threshold, | ||
| "max_threads": pass_through_max_threads, | ||
| } | ||
|
Comment on lines
+46
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: New logic incorrectly allows queries to bypass allocation policies when 🔍 Detailed AnalysisWhen 💡 Suggested FixModify the 🤖 Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. |
||
| sentry_sdk.update_current_span( # pyright: ignore[reportUndefinedVariable] | ||
| attributes={ | ||
| "load_based_pass_through": True, | ||
| "cluster_load": load_info.cluster_load, | ||
| "pass_through_threshold": pass_through_threshold, | ||
| "pass_through_max_threads": pass_through_max_threads, | ||
| } | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| import uuid | ||
| from datetime import UTC, datetime, timedelta | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| from google.protobuf.timestamp_pb2 import Timestamp | ||
| from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig | ||
| from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest | ||
| from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType | ||
|
|
||
| from snuba.configs.configuration import Configuration, ResourceIdentifier | ||
| from snuba.datasets.storages.storage_key import StorageKey | ||
| from snuba.query.allocation_policies import ( | ||
| MAX_THRESHOLD, | ||
| NO_SUGGESTION, | ||
| NO_UNITS, | ||
| AllocationPolicy, | ||
| QueryResultOrError, | ||
| QuotaAllowance, | ||
| ) | ||
| from snuba.utils.metrics.timer import Timer | ||
| from snuba.web.rpc.storage_routing.load_retriever import LoadInfo | ||
| from snuba.web.rpc.storage_routing.routing_strategies.load_based import ( | ||
| LoadBasedRoutingStrategy, | ||
| ) | ||
| from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( | ||
| BaseRoutingStrategy, | ||
| RoutingContext, | ||
| ) | ||
|
|
||
| BASE_TIME = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0) | ||
| _PROJECT_ID = 1 | ||
| _ORG_ID = 1 | ||
|
|
||
|
|
||
| def _get_request_meta(hour_interval: int = 1) -> RequestMeta: | ||
| start = BASE_TIME - timedelta(hours=hour_interval) | ||
| end = BASE_TIME | ||
| return RequestMeta( | ||
| project_ids=[_PROJECT_ID], | ||
| organization_id=_ORG_ID, | ||
| cogs_category="something", | ||
| referrer="something", | ||
| start_timestamp=Timestamp(seconds=int(start.timestamp())), | ||
| end_timestamp=Timestamp(seconds=int(end.timestamp())), | ||
| trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, | ||
| downsampled_storage_config=DownsampledStorageConfig( | ||
| mode=DownsampledStorageConfig.MODE_NORMAL | ||
| ), | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.clickhouse_db | ||
| @pytest.mark.redis_db | ||
| def test_load_based_routing_pass_through_even_if_policies_reject() -> None: | ||
| # policy that always rejects (can_run=False) | ||
| class RejectAllPolicy(AllocationPolicy): | ||
| def _additional_config_definitions(self) -> list[Configuration]: | ||
| return [] | ||
|
|
||
| def _get_quota_allowance( | ||
| self, tenant_ids: dict[str, str | int], query_id: str | ||
| ) -> QuotaAllowance: | ||
| return QuotaAllowance( | ||
| can_run=False, | ||
| max_threads=0, | ||
| explanation={"reason": "reject all"}, | ||
| is_throttled=True, | ||
| throttle_threshold=MAX_THRESHOLD, | ||
| rejection_threshold=MAX_THRESHOLD, | ||
| quota_used=0, | ||
| quota_unit=NO_UNITS, | ||
| suggestion=NO_SUGGESTION, | ||
| ) | ||
|
|
||
| def _update_quota_balance( | ||
| self, | ||
| tenant_ids: dict[str, str | int], | ||
| query_id: str, | ||
| result_or_error: QueryResultOrError, | ||
| ) -> None: | ||
| return | ||
|
|
||
| strategy = LoadBasedRoutingStrategy() | ||
| request = TraceItemTableRequest(meta=_get_request_meta(hour_interval=1)) | ||
| context = RoutingContext( | ||
| in_msg=request, | ||
| timer=Timer("test"), | ||
| query_id=uuid.uuid4().hex, | ||
| ) | ||
|
|
||
| with patch.object( | ||
| BaseRoutingStrategy, | ||
| "get_allocation_policies", | ||
| return_value=[ | ||
| RejectAllPolicy(ResourceIdentifier(StorageKey("doesntmatter")), ["org_id"], {}) | ||
| ], | ||
| ): | ||
| with patch( | ||
| "snuba.web.rpc.storage_routing.routing_strategies.storage_routing.get_cluster_loadinfo", | ||
| return_value=LoadInfo(cluster_load=5.0, concurrent_queries=1), | ||
| ): | ||
| routing_decision = strategy.get_routing_decision(context) | ||
|
|
||
| assert routing_decision.can_run is True | ||
| assert routing_decision.clickhouse_settings.get("max_threads") == 10 | ||
| assert "load_based_pass_through" in routing_decision.routing_context.extra_info | ||
| assert routing_decision.routing_context.cluster_load_info is not None | ||
| assert routing_decision.routing_context.cluster_load_info.cluster_load == 5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this inheriting from
OutcomesBasedRoutingStrategy? shouldn't it inheritBaseRoutingStrategy?I think it's mixing concerns weirdly to make the load-based routing in any way aware or coupled to outcomes-based routing. Some third entity/module should chain the two together if that's necessary.