diff --git a/snuba/admin/rpc/rpc_queries.py b/snuba/admin/rpc/rpc_queries.py index f19b9ba58aa..a146843de08 100644 --- a/snuba/admin/rpc/rpc_queries.py +++ b/snuba/admin/rpc/rpc_queries.py @@ -2,6 +2,9 @@ from snuba import settings from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + BaseRoutingStrategy, +) def _validate_projects_in_query(project_ids: List[int]) -> None: @@ -30,3 +33,7 @@ def validate_request_meta(request_proto: Any) -> None: project_ids = list(meta.project_ids) _validate_org_ids_in_query(org_id) _validate_projects_in_query(project_ids) + + +def get_routing_strategies() -> List[str]: + return list(BaseRoutingStrategy.all_names()) diff --git a/snuba/admin/static/api_client.tsx b/snuba/admin/static/api_client.tsx index ecc5148d8c2..ba573d27efd 100644 --- a/snuba/admin/static/api_client.tsx +++ b/snuba/admin/static/api_client.tsx @@ -35,7 +35,7 @@ import { CardinalityQueryResult, } from "SnubaAdmin/cardinality_analyzer/types"; -import { AllocationPolicy } from "SnubaAdmin/capacity_management/types"; +import { AllocationPolicy, ConfigurableComponent, Configuration, Entity } from "SnubaAdmin/capacity_management/types"; import { ReplayInstruction, Topic } from "SnubaAdmin/dead_letter_queue/types"; import { AutoReplacementsBypassProjectsData } from "SnubaAdmin/auto_replacements_bypass_projects/types"; @@ -83,18 +83,20 @@ interface Client { getAllMigrationGroups: () => Promise; runMigration: (req: RunMigrationRequest) => Promise; getAllowedTools: () => Promise; + getRoutingStrategies: () => Promise; getStoragesWithAllocationPolicies: () => Promise; - getAllocationPolicies: (storage: string) => Promise; - setAllocationPolicyConfig: ( - storage: string, - policy: string, + getAllocationPolicies: (entity: Entity) => Promise; + getRoutingStrategyConfigs: (strategy_name: string) => Promise; + setConfiguration: ( + entity: Entity, + configurable_component: ConfigurableComponent, key: string, value: string, params: object, ) => Promise; - deleteAllocationPolicyConfig: ( - storage: string, - policy: string, + deleteConfiguration: ( + entity: Entity, + configurable_component: ConfigurableComponent, key: string, params: object, ) => Promise; @@ -445,31 +447,57 @@ function Client(): Client { }).then((resp) => resp.json()); }, + getRoutingStrategies: () => { + const url = baseUrl + "routing_strategies"; + return fetch(url, { + headers: { "Content-Type": "application/json" }, + }).then((resp) => resp.json()); + }, + getStoragesWithAllocationPolicies: () => { const url = baseUrl + "storages_with_allocation_policies"; return fetch(url, { headers: { "Content-Type": "application/json" }, }).then((resp) => resp.json()); }, - getAllocationPolicies: (storage: string) => { + getAllocationPolicies: (entity: Entity) => { const url = - baseUrl + "allocation_policy_configs/" + encodeURIComponent(storage); + baseUrl + "allocation_policy_configs/" + entity.type + "/" + encodeURIComponent(entity.name); return fetch(url, { headers: { "Content-Type": "application/json" }, }).then((resp) => resp.json()); }, - setAllocationPolicyConfig: ( - storage: string, - policy: string, + getRoutingStrategyConfigs: (strategy_name: string) => { + const url = baseUrl + "routing_strategy_configs/" + strategy_name; + return fetch(url, { + headers: { "Content-Type": "application/json" }, + }).then((resp) => resp.json()); + }, + setConfiguration: ( + entity: Entity, + configurable_component: ConfigurableComponent, key: string, value: string, params: object, ) => { - const url = baseUrl + "allocation_policy_config"; + let body: string; + let url: string; + + if (configurable_component.type === "routing_strategy") { + body = JSON.stringify({ strategy: configurable_component.name, key, value, params }); + url = baseUrl + "routing_strategy_config"; + } else if (entity.type === "strategy" && configurable_component.type === "allocation_policy") { + body = JSON.stringify({ strategy: entity.name, policy: configurable_component.name, key, value, params }) + url = baseUrl + "allocation_policy_config_for_strategy"; + } else { + body = JSON.stringify({ storage: entity.name, policy: configurable_component.name, key, value, params }) + url = baseUrl + "allocation_policy_config_for_storage"; + } + return fetch(url, { headers: { "Content-Type": "application/json" }, method: "POST", - body: JSON.stringify({ storage, policy, key, value, params }), + body: body, }).then((res) => { if (res.ok) { return; @@ -481,17 +509,30 @@ function Client(): Client { } }); }, - deleteAllocationPolicyConfig: ( - storage: string, - policy: string, + deleteConfiguration: ( + entity: Entity, + configurable_component: ConfigurableComponent, key: string, params: object, ) => { - const url = baseUrl + "allocation_policy_config"; + let body: string; + let url: string; + + if (configurable_component.type === "routing_strategy") { + body = JSON.stringify({ strategy: configurable_component.name, key, params }); + url = baseUrl + "routing_strategy_config"; + } else if (entity.type === "strategy" && configurable_component.type === "allocation_policy") { + body = JSON.stringify({ strategy: entity.name, policy: configurable_component.name, key, params }) + url = baseUrl + "allocation_policy_config_for_strategy"; + } else { + body = JSON.stringify({ storage: entity.name, policy: configurable_component.name, key, params }) + url = baseUrl + "allocation_policy_config_for_storage"; + } + return fetch(url, { headers: { "Content-Type": "application/json" }, method: "DELETE", - body: JSON.stringify({ storage, policy, key, params }), + body: body, }).then((res) => { if (res.ok) { return; diff --git a/snuba/admin/static/capacity_management/add_config_modal.tsx b/snuba/admin/static/capacity_management/add_config_modal.tsx index 6cc6dad5dcc..128587b8566 100644 --- a/snuba/admin/static/capacity_management/add_config_modal.tsx +++ b/snuba/admin/static/capacity_management/add_config_modal.tsx @@ -3,16 +3,16 @@ import Button from "react-bootstrap/Button"; import Form from "react-bootstrap/Form"; import Modal from "react-bootstrap/Modal"; import { - AllocationPolicyConfig, - AllocationPolicyOptionalConfigDefinition, + Configuration, + OptionalConfigurationDefinition, } from "SnubaAdmin/capacity_management/types"; import FormGroup from "react-bootstrap/FormGroup"; function AddConfigModal(props: { currentlyAdding: boolean; setCurrentlyAdding: (currentlyAdding: boolean) => void; - optionalConfigDefinitions: AllocationPolicyOptionalConfigDefinition[]; - saveConfig: (config: AllocationPolicyConfig) => void; + optionalConfigDefinitions: OptionalConfigurationDefinition[]; + saveConfig: (config: Configuration) => void; }) { const { currentlyAdding, @@ -22,9 +22,9 @@ function AddConfigModal(props: { } = props; const [selectedDefinition, selectDefinition] = - useState(); + useState(); - const [config, buildConfig] = useState({ + const [config, buildConfig] = useState({ name: "", value: "", description: "", diff --git a/snuba/admin/static/capacity_management/allocation_policy.tsx b/snuba/admin/static/capacity_management/allocation_policy.tsx index 166b95283c1..6bdbe3ef54a 100644 --- a/snuba/admin/static/capacity_management/allocation_policy.tsx +++ b/snuba/admin/static/capacity_management/allocation_policy.tsx @@ -3,13 +3,18 @@ import React, { useEffect, useState } from "react"; import { Table, createCustomTableStyles } from "../table"; import { COLORS } from "SnubaAdmin/theme"; import Client from "SnubaAdmin/api_client"; -import { AllocationPolicy, AllocationPolicyConfig } from "SnubaAdmin/capacity_management/types"; +import { + AllocationPolicy, + Configuration, + Entity, + ConfigurableComponent, +} from "SnubaAdmin/capacity_management/types"; import { containerStyle, linkStyle, paragraphStyle } from "SnubaAdmin/capacity_management/styles"; import { getReadonlyRow } from "SnubaAdmin/capacity_management/row_data"; import EditConfigModal from "SnubaAdmin/capacity_management/edit_config_modal"; import AddConfigModal from "SnubaAdmin/capacity_management/add_config_modal"; -function getTableColor(configs: AllocationPolicyConfig[]): string { +function getTableColor(configs: Configuration[]): string { let policyIsActive = false; let policyIsEnforced = false; configs.forEach((config) => { @@ -37,22 +42,22 @@ function getTableColor(configs: AllocationPolicyConfig[]): string { } } -function AllocationPolicyConfigs(props: { +function Configurations(props: { api: Client; - storage: string; - policy: AllocationPolicy; + entity: Entity; + configurable_component: ConfigurableComponent; }) { - const { api, storage, policy } = props; + const { api, entity, configurable_component } = props; - const [configs, setConfigs] = useState([]); + const [configs, setConfigs] = useState([]); useEffect(() => { - policy.configs.sort(); - setConfigs(policy.configs); - }, [policy]); + configurable_component.configs.sort(); + setConfigs(configurable_component.configs); + }, [configurable_component]); const [currentlyEditing, setCurrentlyEditing] = useState(false); - const [currentConfig, setCurrentConfig] = useState({ + const [currentConfig, setCurrentConfig] = useState({ name: "", value: "", description: "", @@ -61,16 +66,16 @@ function AllocationPolicyConfigs(props: { }); const [addingNew, setAddingNew] = useState(false); - function enterEditMode(config: AllocationPolicyConfig) { + function enterEditMode(config: Configuration) { setCurrentlyEditing(true); setCurrentConfig(config); } - function deleteConfig(toDelete: AllocationPolicyConfig) { + function deleteConfig(toDelete: Configuration) { api - .deleteAllocationPolicyConfig( - storage, - policy.policy_name, + .deleteConfiguration( + entity, + configurable_component, toDelete.name, toDelete.params ) @@ -86,11 +91,11 @@ function AllocationPolicyConfigs(props: { }); } - function saveConfig(config: AllocationPolicyConfig) { + function saveConfig(config: Configuration) { api - .setAllocationPolicyConfig( - storage, - policy.policy_name, + .setConfiguration( + entity, + configurable_component, config.name, config.value, config.params @@ -100,7 +105,7 @@ function AllocationPolicyConfigs(props: { }); } - function addConfig(config: AllocationPolicyConfig) { + function addConfig(config: Configuration) { saveConfig(config); setConfigs((prev) => [...prev, config]); } @@ -117,11 +122,11 @@ function AllocationPolicyConfigs(props: {
-

{policy.policy_name}

+

{configurable_component.name}

These are the global configurations.

@@ -169,10 +174,10 @@ function AllocationPolicyConfigs(props: { ])} columnWidths={[3, 3, 2, 5, 1, 1]} customStyles={createCustomTableStyles({ - headerStyle: { backgroundColor: getTableColor(policy.configs) }, + headerStyle: { backgroundColor: getTableColor(configurable_component.configs) }, })} /> - {!addingNew && policy.optional_config_definitions.length != 0 && ( + {!addingNew && configurable_component.optional_config_definitions.length != 0 && ( setAddingNew(true)} style={linkStyle}> add new @@ -183,4 +188,4 @@ function AllocationPolicyConfigs(props: { ); } -export { AllocationPolicyConfigs, getTableColor }; +export { Configurations, getTableColor }; diff --git a/snuba/admin/static/capacity_management/edit_config_modal.tsx b/snuba/admin/static/capacity_management/edit_config_modal.tsx index 84a00b3ac5f..f54cad0284d 100644 --- a/snuba/admin/static/capacity_management/edit_config_modal.tsx +++ b/snuba/admin/static/capacity_management/edit_config_modal.tsx @@ -2,14 +2,14 @@ import React, { useState } from "react"; import Button from "react-bootstrap/Button"; import Form from "react-bootstrap/Form"; import Modal from "react-bootstrap/Modal"; -import { AllocationPolicyConfig } from "SnubaAdmin/capacity_management/types"; +import { Configuration } from "SnubaAdmin/capacity_management/types"; function EditConfigModal(props: { currentlyEditing: boolean; - currentConfig: AllocationPolicyConfig; + currentConfig: Configuration; setCurrentlyEditing: (currentlyEditing: boolean) => void; - deleteConfig: (config: AllocationPolicyConfig) => void; - saveConfig: (config: AllocationPolicyConfig) => void; + deleteConfig: (config: Configuration) => void; + saveConfig: (config: Configuration) => void; }) { const { currentlyEditing, diff --git a/snuba/admin/static/capacity_management/index.tsx b/snuba/admin/static/capacity_management/index.tsx index 68b0455956d..706a0673923 100644 --- a/snuba/admin/static/capacity_management/index.tsx +++ b/snuba/admin/static/capacity_management/index.tsx @@ -1,6 +1,6 @@ import React, { useEffect, useState } from "react"; import Client from "SnubaAdmin/api_client"; -import { AllocationPolicyConfigs } from "SnubaAdmin/capacity_management/allocation_policy"; +import { Configurations } from "SnubaAdmin/capacity_management/allocation_policy"; import { AllocationPolicy } from "SnubaAdmin/capacity_management/types"; import { CustomSelect, getParamFromStorage } from "SnubaAdmin/select"; import { COLORS } from "SnubaAdmin/theme"; @@ -31,7 +31,7 @@ function CapacityManagement(props: { api: Client }) { function loadAllocationPolicies(storage: string) { api - .getAllocationPolicies(storage) + .getAllocationPolicies({ type: "storage", name: storage }) .then((res) => { setAllocationPolicies(res); }) @@ -53,11 +53,11 @@ function CapacityManagement(props: { api: Client }) { Policy Type: {policies[0].query_type.toUpperCase()}

{policies.map((policy: AllocationPolicy) => ( - ))} diff --git a/snuba/admin/static/capacity_management/row_data.tsx b/snuba/admin/static/capacity_management/row_data.tsx index a6add56495e..c78000d994e 100644 --- a/snuba/admin/static/capacity_management/row_data.tsx +++ b/snuba/admin/static/capacity_management/row_data.tsx @@ -1,9 +1,9 @@ import React from "react"; -import { AllocationPolicyConfig, RowData } from "SnubaAdmin/capacity_management/types"; +import { Configuration, RowData } from "SnubaAdmin/capacity_management/types"; import Button from "react-bootstrap/Button"; function getReadonlyRow( - config: AllocationPolicyConfig, + config: Configuration, edit: () => void ): RowData { return { diff --git a/snuba/admin/static/capacity_management/types.tsx b/snuba/admin/static/capacity_management/types.tsx index 27756db6079..7efbc267db8 100644 --- a/snuba/admin/static/capacity_management/types.tsx +++ b/snuba/admin/static/capacity_management/types.tsx @@ -1,13 +1,23 @@ import { ReactNode } from "react"; -type AllocationPolicy = { - policy_name: string; - configs: AllocationPolicyConfig[]; - optional_config_definitions: AllocationPolicyOptionalConfigDefinition[]; + +interface ConfigurableComponent { + type: string; + name: string; + configs: Configuration[]; + optional_config_definitions: OptionalConfigurationDefinition[]; +}; + +interface AllocationPolicy extends ConfigurableComponent { + type: "allocation_policy"; query_type: string; }; -type AllocationPolicyConfig = { +interface RoutingStrategy extends ConfigurableComponent { + type: "routing_strategy"; +} + +type Configuration = { name: string; value: string; description: string; @@ -15,17 +25,17 @@ type AllocationPolicyConfig = { params: object; }; -type AllocationPolicyConfigParams = { +type ConfigurationParams = { name: string; type: string; }; -type AllocationPolicyOptionalConfigDefinition = { +type OptionalConfigurationDefinition = { name: string; type: string; default: string; description: string; - params: AllocationPolicyConfigParams[]; + params: ConfigurationParams[]; }; type RowData = { @@ -37,10 +47,40 @@ type RowData = { edit: ReactNode; }; +interface StorageEntity { + type: "storage"; + name: string; +} + +interface StrategyEntity { + type: "strategy"; + name: string; +} + +type Entity = StorageEntity | StrategyEntity; + +function getEntityName(entity: Entity): string { + return entity.name; +} + +function isStorage(entity: Entity): entity is StorageEntity { + return entity.type === "storage"; +} + +function isStrategy(entity: Entity): entity is StrategyEntity { + return entity.type === "strategy"; +} + export { AllocationPolicy, - AllocationPolicyConfig, - AllocationPolicyOptionalConfigDefinition, - AllocationPolicyConfigParams, + Configuration, + OptionalConfigurationDefinition, + ConfigurationParams, RowData, + Entity, + getEntityName, + isStorage, + isStrategy, + StorageEntity, + StrategyEntity, ConfigurableComponent, RoutingStrategy }; diff --git a/snuba/admin/static/cbrs/index.tsx b/snuba/admin/static/cbrs/index.tsx new file mode 100644 index 00000000000..234ef32960b --- /dev/null +++ b/snuba/admin/static/cbrs/index.tsx @@ -0,0 +1,141 @@ +import React, { useEffect, useState } from "react"; +import Client from "SnubaAdmin/api_client"; +import { Configurations } from "SnubaAdmin/capacity_management/allocation_policy"; +import { AllocationPolicy, Configuration, ConfigurableComponent, RoutingStrategy } from "SnubaAdmin/capacity_management/types"; +import { CustomSelect, getParamFromStorage } from "SnubaAdmin/select"; +import { COLORS } from "SnubaAdmin/theme"; + +function CapacityBasedRoutingSystem(props: { api: Client }) { + const { api } = props; + + const [strategies, setStrategies] = useState([]); + const [selectedStrategy, setStrategy] = useState(); + const [allocationPolicies, setAllocationPolicies] = useState< + AllocationPolicy[] + >([]); + const [strategyConfigs, setStrategyConfigs] = useState([]); + + useEffect(() => { + api.getRoutingStrategies().then((res) => { + setStrategies(res); + const previousStrategy = getParamFromStorage("strategy"); + if (previousStrategy) { + selectStrategy(previousStrategy); + } + }); + }, []); + + function selectStrategy(strategy: string) { + setStrategy(strategy); + loadAllocationPolicies(strategy); + loadStrategyConfigs(strategy); + } + + function loadAllocationPolicies(strategy: string) { + api + .getAllocationPolicies({ type: "strategy", name: strategy }) + .then((res) => { + setAllocationPolicies(res); + }) + .catch((err) => { + window.alert(err); + }); + } + + function loadStrategyConfigs(strategy: string) { + api + .getRoutingStrategyConfigs(strategy) + .then((res) => { + setStrategyConfigs(res); + }) + .catch((err) => { + window.alert(err); + }); + } + + function renderStrategy() { + if (!selectedStrategy) { + return

Strategy not selected.

; + } + if (strategyConfigs.length === 0) { + return

No strategy configurations found.

; + } + + const strategyComponent: RoutingStrategy = { + type: "routing_strategy", + name: selectedStrategy, + configs: strategyConfigs, + optional_config_definitions: [], + }; + + return ( +
+

Strategy Configurations

+ +
+ ); + } + + function renderPolicies(policies: AllocationPolicy[]) { + if (!selectedStrategy) { + return

Strategy not selected.

; + } + if (policies.length == 0) { + return null; + } + return ( +
+

+ Policy Type: {policies[0].query_type.toUpperCase()} +

+ {policies.map((policy: AllocationPolicy) => ( + + ))} +
+ ); + } + + return ( +
+

+ Strategy: + +

+ + {renderStrategy()} + + {renderPolicies( + allocationPolicies.filter((policy) => policy.query_type == "select") + )} + {renderPolicies( + allocationPolicies.filter((policy) => policy.query_type == "delete") + )} +
+ ); +} + +const policyTypeStyle = { + fontSize: 18, + fontWeight: 600, + color: COLORS.HEADER_TEXT, + backgroundColor: COLORS.TEXT_LIGHTER, + maxWidth: "100%", + margin: "10px 0px", + padding: "5px", +}; + +export default CapacityBasedRoutingSystem; diff --git a/snuba/admin/static/data.tsx b/snuba/admin/static/data.tsx index 5f60b6883e2..c3dee04de95 100644 --- a/snuba/admin/static/data.tsx +++ b/snuba/admin/static/data.tsx @@ -18,6 +18,7 @@ import DeleteTool from "SnubaAdmin/delete_tool"; import ViewCustomJobs from "SnubaAdmin/manual_jobs"; import DatabaseClusters from "./database_clusters"; import RpcEndpoints from "SnubaAdmin/rpc_endpoints"; +import CapacityBasedRoutingSystem from "SnubaAdmin/cbrs"; const NAV_ITEMS = [ { id: "overview", display: "🤿 Snuba Admin", component: Welcome }, @@ -112,6 +113,11 @@ const NAV_ITEMS = [ display: "🗂️ Database Clusters", component: DatabaseClusters, }, + { + id: "capacity-based-routing-system", + display: "🔄 Capacity Based Routing System", + component: CapacityBasedRoutingSystem, + }, ]; export { NAV_ITEMS }; diff --git a/snuba/admin/static/tests/capacity_management/add_config_modal.spec.tsx b/snuba/admin/static/tests/capacity_management/add_config_modal.spec.tsx index c17f46b59f4..6a6f9cdda4c 100644 --- a/snuba/admin/static/tests/capacity_management/add_config_modal.spec.tsx +++ b/snuba/admin/static/tests/capacity_management/add_config_modal.spec.tsx @@ -4,8 +4,8 @@ import { act, cleanup, fireEvent, render } from "@testing-library/react"; import React from "react"; import AddConfigModal from "SnubaAdmin/capacity_management/add_config_modal"; import { - AllocationPolicyConfig, - AllocationPolicyOptionalConfigDefinition, + Configuration, + OptionalConfigurationDefinition, } from "SnubaAdmin/capacity_management/types"; describe("Add Config Modal", () => { @@ -23,13 +23,13 @@ describe("Add Config Modal", () => { description: "some config", }; - let expectedConfig: AllocationPolicyConfig = { + let expectedConfig: Configuration = { ...mockedConfig, value: "20", params: { some_param: "30", some_other_param: "test" }, }; - let mockedDefs: AllocationPolicyOptionalConfigDefinition[] = [ + let mockedDefs: OptionalConfigurationDefinition[] = [ { ...mockedConfig, default: "10", diff --git a/snuba/admin/static/tests/capacity_management/allocation_policies.spec.tsx b/snuba/admin/static/tests/capacity_management/allocation_policies.spec.tsx index 7ac07d9ac6f..f12bb8a7059 100644 --- a/snuba/admin/static/tests/capacity_management/allocation_policies.spec.tsx +++ b/snuba/admin/static/tests/capacity_management/allocation_policies.spec.tsx @@ -1,6 +1,6 @@ import Client from "SnubaAdmin/api_client"; -import { AllocationPolicyConfigs } from "SnubaAdmin/capacity_management/allocation_policy"; +import { Configurations } from "SnubaAdmin/capacity_management/allocation_policy"; import { it, expect } from "@jest/globals"; import { AllocationPolicy } from "SnubaAdmin/capacity_management/types"; import { act, fireEvent, render } from "@testing-library/react"; @@ -8,7 +8,8 @@ import React from "react"; it("should populate configs table upon render", async () => { let allocationPolicy: AllocationPolicy = { - policy_name: "some_policy", + type: "allocation_policy", + name: "some_policy", configs: [ { name: "key1", @@ -41,10 +42,10 @@ it("should populate configs table upon render", async () => { }; let { getByText, getByTestId } = render( - ); diff --git a/snuba/admin/static/tests/capacity_management/index.spec.tsx b/snuba/admin/static/tests/capacity_management/index.spec.tsx index 3a10aa832e9..3b0e80d0da5 100644 --- a/snuba/admin/static/tests/capacity_management/index.spec.tsx +++ b/snuba/admin/static/tests/capacity_management/index.spec.tsx @@ -32,7 +32,8 @@ it("should display allocation policy configs once a storage is selected", async let storages = ["storage1", "storage2"]; let allocationPolicies: AllocationPolicy[] = [ { - policy_name: "some_policy", + type: "allocation_policy", + name: "some_policy", configs: [ { name: "key1", @@ -46,7 +47,8 @@ it("should display allocation policy configs once a storage is selected", async query_type: "select", }, { - policy_name: "some_other_policy", + type: "allocation_policy", + name: "some_other_policy", configs: [ { name: "key2", diff --git a/snuba/admin/tool_policies.py b/snuba/admin/tool_policies.py index c6268233f71..4db0422e404 100644 --- a/snuba/admin/tool_policies.py +++ b/snuba/admin/tool_policies.py @@ -28,6 +28,7 @@ class AdminTools(Enum): AUDIT_LOG = "audit-log" KAFKA = "kafka" CAPACITY_MANAGEMENT = "capacity-management" + CAPACITY_BASED_ROUTING_SYSTEM = "capacity-based-routing-system" PRODUCTION_QUERIES = "production-queries" CARDINALITY_ANALYZER = "cardinality-analyzer" SNUBA_EXPLAIN = "snuba-explain" diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 3cf8b0699ea..21d6a2a2a1c 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -46,7 +46,7 @@ get_migration_group_policies, ) from snuba.admin.production_queries.prod_queries import run_mql_query, run_snql_query -from snuba.admin.rpc.rpc_queries import validate_request_meta +from snuba.admin.rpc.rpc_queries import get_routing_strategies, validate_request_meta from snuba.admin.runtime_config import ( ConfigChange, ConfigType, @@ -79,7 +79,7 @@ from snuba.migrations.groups import MigrationGroup from snuba.migrations.runner import MigrationKey, Runner from snuba.migrations.status import Status -from snuba.query.allocation_policies import AllocationPolicy +from snuba.query.allocation_policies import CAPMAN_HASH, AllocationPolicy from snuba.query.exceptions import InvalidQueryException from snuba.query.query_settings import HTTPQuerySettings from snuba.replacers.replacements_and_expiry import ( @@ -96,6 +96,10 @@ deletes_are_enabled, ) from snuba.web.rpc import RPCEndpoint, list_all_endpoint_names, run_rpc_handler +from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + CBRS_HASH, + BaseRoutingStrategy, +) from snuba.web.views import dataset_query logger = structlog.get_logger().bind(module=__name__) @@ -958,6 +962,16 @@ def snuba_debug() -> Response: explain_cleanup() +@application.route("/routing_strategies") +@check_tool_perms(tools=[AdminTools.CAPACITY_BASED_ROUTING_SYSTEM]) +def routing_strategies() -> Response: + return Response( + json.dumps(get_routing_strategies()), + 200, + {"Content-Type": "application/json"}, + ) + + @application.route("/storages_with_allocation_policies") @check_tool_perms(tools=[AdminTools.CAPACITY_MANAGEMENT]) def storages_with_allocation_policies() -> Response: @@ -968,37 +982,196 @@ def storages_with_allocation_policies() -> Response: ) -@application.route("/allocation_policy_configs/", methods=["GET"]) +def _add_policy_data( + policies: Sequence[AllocationPolicy], query_type: str, data: list[dict[str, Any]] +) -> None: + for policy in policies: + data.append( + { + "type": "allocation_policy", + "name": policy.config_key(), + "configs": policy.get_current_configs(), + "optional_config_definitions": policy.get_optional_config_definitions_json(), + "query_type": query_type, + } + ) + + +@application.route( + "/allocation_policy_configs/storage/", methods=["GET"] +) @check_tool_perms(tools=[AdminTools.CAPACITY_MANAGEMENT]) -def get_allocation_policy_configs(storage_key: str) -> Response: +def get_allocation_policy_configs_of_storage(storage_key: str) -> Response: policies = get_storage(StorageKey(storage_key)).get_allocation_policies() delete_policies = get_storage( StorageKey(storage_key) ).get_delete_allocation_policies() - data = [] + data: list[dict[str, Any]] = [] + _add_policy_data(policies, "select", data) + _add_policy_data(delete_policies, "delete", data) - def add_policy_data(policies: Sequence[AllocationPolicy], query_type: str) -> None: - for policy in policies: - data.append( - { - "policy_name": policy.config_key(), - "configs": policy.get_current_configs(), - "optional_config_definitions": policy.get_optional_config_definitions_json(), - "query_type": query_type, - } - ) + return Response(json.dumps(data), 200, {"Content-Type": "application/json"}) + + +@application.route( + "/allocation_policy_configs/strategy/", methods=["GET"] +) +@check_tool_perms(tools=[AdminTools.CAPACITY_MANAGEMENT]) +def get_allocation_policy_configs_of_routing_strategy(strategy_name: str) -> Response: + + policies = BaseRoutingStrategy.get_from_name( + strategy_name + )().get_allocation_policies() - add_policy_data(policies, "select") - add_policy_data(delete_policies, "delete") + data: list[dict[str, Any]] = [] + _add_policy_data(policies, "select", data) + + print("allocationpolicyyyyy", data[0]) return Response(json.dumps(data), 200, {"Content-Type": "application/json"}) -@application.route("/allocation_policy_config", methods=["POST", "DELETE"]) +@application.route("/routing_strategy_configs/", methods=["GET"]) +@check_tool_perms(tools=[AdminTools.CAPACITY_BASED_ROUTING_SYSTEM]) +def get_routing_strategy_configs(strategy_name: str) -> Response: + from snuba.web.rpc.storage_routing.routing_strategies.storage_routing import ( + RoutingStrategyConfig, + ) + + configs = BaseRoutingStrategy.get_from_name(strategy_name)().get_configurations() + serialized_configs = [ + cast(RoutingStrategyConfig, config).to_config_dict() + for config in configs.values() + ] + print("serialized_configssssssss", serialized_configs) + return Response( + json.dumps(serialized_configs), 200, {"Content-Type": "application/json"} + ) + + +@application.route("/routing_strategy_config", methods=["POST", "DELETE"]) +@check_tool_perms(tools=[AdminTools.CAPACITY_BASED_ROUTING_SYSTEM]) +def set_routing_strategy_config() -> Response: + data = json.loads(request.data) + user = request.headers.get(USER_HEADER_KEY) + + try: + strategy, key = (data["strategy"], data["key"]) + params = data.get("params", {}) + + assert isinstance(strategy, str), "Invalid strategy" + assert isinstance(key, str), "Invalid key" + assert isinstance(params, dict), "Invalid params" + assert key != "", "Key cannot be empty string" + + strategies = list(BaseRoutingStrategy.all_names()) + strategy = next( + (s for s in strategies if s == strategy), + None, + ) + assert strategy is not None, "Strategy not found" + + except (KeyError, AssertionError) as exc: + return Response( + json.dumps({"error": f"Invalid config: {str(exc)}"}), + 400, + {"Content-Type": "application/json"}, + ) + + if request.method == "DELETE": + strategy.delete_config_value(config_key=key, params=params, user=user) + return Response("", 200) + + +@application.route("/allocation_policy_config_for_strategy", methods=["POST", "DELETE"]) +@check_tool_perms(tools=[AdminTools.CAPACITY_MANAGEMENT]) +def set_allocation_policy_config_for_strategy() -> Response: + data = json.loads(request.data) + user = request.headers.get(USER_HEADER_KEY) + + try: + strategy, key, policy_name = (data["strategy"], data["key"], data["policy"]) + + params = data.get("params", {}) + + assert isinstance(strategy, str), "Invalid strategy" + assert isinstance(key, str), "Invalid key" + assert isinstance(params, dict), "Invalid params" + assert key != "", "Key cannot be empty string" + assert isinstance(policy_name, str), "Invalid policy name" + strategy = BaseRoutingStrategy.get_from_name(strategy) + assert strategy is not None, "Strategy not found" + policies = ( + strategy().get_allocation_policies() + + strategy().get_delete_allocation_policies() + ) + policy = next( + (p for p in policies if p.config_key() == policy_name), + None, + ) + assert policy is not None, "Policy not found on strategy" + + except (KeyError, AssertionError) as exc: + return Response( + json.dumps({"error": f"Invalid config: {str(exc)}"}), + 400, + {"Content-Type": "application/json"}, + ) + + if request.method == "DELETE": + policy.delete_config_value( + config_key=key, hash=CBRS_HASH, params=params, user=user + ) + audit_log.record( + user or "", + AuditLogAction.ALLOCATION_POLICY_DELETE, + { + "storage": strategy, + "policy": policy.config_key(), + "key": key, + }, # todo: make audit log handle strategy in addition to storage + notify=True, + ) + return Response("", 200) + elif request.method == "POST": + try: + value = data["value"] + assert isinstance(value, str), "Invalid value" + policy.set_config_value( + config_key=key, value=value, params=params, user=user + ) + audit_log.record( + user or "", + AuditLogAction.ALLOCATION_POLICY_UPDATE, + { + "storage": strategy, # todo: make audit log handle strategy in addition to storage + "policy": policy.config_key(), + "key": key, + "value": value, + "params": str(params), + }, + notify=True, + ) + return Response("", 200) + except (KeyError, AssertionError) as exc: + return Response( + json.dumps({"error": f"Invalid config: {str(exc)}"}), + 400, + {"Content-Type": "application/json"}, + ) + else: + return Response( + json.dumps({"error": "Method not allowed"}), + 405, + {"Content-Type": "application/json"}, + ) + + +@application.route("/allocation_policy_config_for_storage", methods=["POST", "DELETE"]) @check_tool_perms(tools=[AdminTools.CAPACITY_MANAGEMENT]) -def set_allocation_policy_config() -> Response: +def set_allocation_policy_config_for_storage() -> Response: data = json.loads(request.data) user = request.headers.get(USER_HEADER_KEY) @@ -1031,7 +1204,9 @@ def set_allocation_policy_config() -> Response: ) if request.method == "DELETE": - policy.delete_config_value(config_key=key, params=params, user=user) + policy.delete_config_value( + config_key=key, hash=CAPMAN_HASH, params=params, user=user + ) audit_log.record( user or "", AuditLogAction.ALLOCATION_POLICY_DELETE, diff --git a/snuba/query/allocation_policies/__init__.py b/snuba/query/allocation_policies/__init__.py index 505535801fb..78e7767dc95 100644 --- a/snuba/query/allocation_policies/__init__.py +++ b/snuba/query/allocation_policies/__init__.py @@ -8,7 +8,7 @@ from snuba import environment, settings from snuba.datasets.storages.storage_key import StorageKey -from snuba.state import delete_config as delete_runtime_config +from snuba.query.configuration import ConfigurableComponent, Configuration from snuba.state import get_all_configs as get_all_runtime_configs from snuba.state import get_config as get_runtime_config from snuba.state import set_config as set_runtime_config @@ -48,49 +48,7 @@ def __post_init__(self) -> None: @dataclass() -class AllocationPolicyConfig: - name: str - description: str - value_type: type - default: Any - param_types: dict[str, type] = field(default_factory=dict) - - def __post_init__(self) -> None: - if type(self.default) != self.value_type: - raise ValueError( - f"Config item `{self.name}` expects type {self.value_type} got value `{self.default}` of type {type(self.default)}" - ) - - def __to_base_dict(self) -> dict[str, Any]: - return { - "name": self.name, - "type": self.value_type.__name__, - "default": self.default, - "description": self.description, - } - - def to_definition_dict(self) -> dict[str, Any]: - """Returns a dict representation of the definition of a Config.""" - return { - **self.__to_base_dict(), - "params": [ - {"name": param, "type": self.param_types[param].__name__} - for param in self.param_types - ], - } - - def to_config_dict( - self, value: Any = None, params: dict[str, Any] = {} - ) -> dict[str, Any]: - """Returns a dict representation of a live Config.""" - return { - **self.__to_base_dict(), - "value": value if value is not None else self.default, - "params": params, - } - - -class InvalidPolicyConfig(Exception): +class AllocationPolicyConfig(Configuration): pass @@ -187,7 +145,7 @@ def from_args( ) -class AllocationPolicy(ABC, metaclass=RegisteredClass): +class AllocationPolicy(ConfigurableComponent, ABC, metaclass=RegisteredClass): """This class should be the centralized place for policy decisions regarding resource usage of a clickhouse cluster. It is meant to live as a configurable item on a storage. @@ -367,21 +325,11 @@ class AllocationPolicy(ABC, metaclass=RegisteredClass): * Every allocation policy takes a `storage_key` in its init. The storage_key is like a pseudo-tenant. In different environments, storages may be co-located on the same cluster. To facilitate resource sharing, every allocation policy knows which storage_key it is serving. This is used to create unique keys for saving the config values. - See `__build_runtime_config_key()` for more info. + See `_build_runtime_config_key()` for more info. * Reiterating that you should no longer use `snuba.state.{get,set}_config()` for runtime configs for a specific Policy. Refer to the Configurations section of this docstring for more info. """ - # This component builds redis strings that are delimited by dots, commas, colons - # in order to allow those characters to exist in config we replace them with their - # counterparts on write/read. It may be better to just replace our serialization with JSON - # instead of what we're doing but this is where we're at rn 1/10/24 - __KEY_DELIMITERS_TO_ESCAPE_SEQUENCES = { - ".": "__dot_literal__", - ",": "__comma_literal__", - ":": "__colon_literal__", - } - def __init__( self, storage_key: StorageKey, @@ -504,8 +452,11 @@ def __get_overridden_additional_config_defaults( for definition in definitions ] - def additional_config_definitions(self) -> list[AllocationPolicyConfig]: - return self._overridden_additional_config_definitions + def get_default_config_definitions(self) -> list[Configuration]: + return cast(list[Configuration], self._default_config_definitions) + + def get_additional_config_definitions(self) -> list[Configuration]: + return cast(list[Configuration], self._overridden_additional_config_definitions) @abstractmethod def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: @@ -515,19 +466,11 @@ def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: """ pass - def config_definitions(self) -> dict[str, AllocationPolicyConfig]: - """Returns a dictionary of config definitions on this AllocationPolicy.""" - return { - config.name: config - for config in self._default_config_definitions - + self.additional_config_definitions() - } - def get_optional_config_definitions_json(self) -> list[dict[str, Any]]: """Returns a json-like dictionary of optional config definitions on this AllocationPolicy.""" return [ definition.to_definition_dict() - for definition in self.config_definitions().values() + for definition in self.get_configurations().values() if definition.param_types ] @@ -536,12 +479,12 @@ def get_config_value( ) -> Any: """Returns value of a config on this Allocation Policy, or the default if none exists in Redis.""" config_definition = ( - self.__validate_config_params(config_key, params) + self._validate_config_params(config_key, params) if validate - else self.config_definitions()[config_key] + else self.get_configurations()[config_key] ) return get_runtime_config( - key=self.__build_runtime_config_key(config_key, params), + key=self._build_runtime_config_key(config_key, params), default=config_definition.default, config_key=CAPMAN_HASH, ) @@ -554,39 +497,22 @@ def set_config_value( user: str | None = None, ) -> None: """Sets a value of a config on this AllocationPolicy.""" - config_definition = self.__validate_config_params(config_key, params, value) + config_definition = self._validate_config_params(config_key, params, value) # ensure correct type is stored value = config_definition.value_type(value) set_runtime_config( - key=self.__build_runtime_config_key(config_key, params), + key=self._build_runtime_config_key(config_key, params), value=value, user=user, force=True, config_key=CAPMAN_HASH, ) - def delete_config_value( - self, - config_key: str, - params: dict[str, Any] = {}, - user: str | None = None, - ) -> None: - """ - Deletes an instance of an optional config on this AllocationPolicy. - If this function is run on a required config, it resets the value to default instead. - """ - self.__validate_config_params(config_key, params) - delete_runtime_config( - key=self.__build_runtime_config_key(config_key, params), - user=user, - config_key=CAPMAN_HASH, - ) - def get_current_configs(self) -> list[dict[str, Any]]: """Returns a list of live configs with their definitions on this AllocationPolicy.""" runtime_configs = get_all_runtime_configs(CAPMAN_HASH) - definitions = self.config_definitions() + definitions = self.get_configurations() required_configs = set( config_name @@ -618,22 +544,6 @@ def get_current_configs(self) -> list[dict[str, Any]]: return detailed_configs - def __build_runtime_config_key(self, config: str, params: dict[str, Any]) -> str: - """ - Builds a unique key to be used in the actual datastore containing these configs. - - Example return values: - - `"mystorage.MyAllocationPolicy.my_config"` # no params - - `"mystorage.MyAllocationPolicy.my_config.a:1,b:2"` # sorted params - """ - parameters = "." - for param in sorted(list(params.keys())): - param_sanitized = self.__escape_delimiter_chars(param) - value_sanitized = self.__escape_delimiter_chars(params[param]) - parameters += f"{param_sanitized}:{value_sanitized}," - parameters = parameters[:-1] - return f"{self.runtime_config_prefix}.{config}{parameters}" - def __deserialize_runtime_config_key(self, key: str) -> tuple[str, dict[str, Any]]: """ Given a raw runtime config key, deconstructs it into it's AllocationPolicy config @@ -660,95 +570,18 @@ def __deserialize_runtime_config_key(self, key: str) -> tuple[str, dict[str, Any param_value = self.__unescape_delimiter_chars(param_value) params_dict[param_key] = param_value - self.__validate_config_params(config_key=config_key, params=params_dict) + self._validate_config_params(config_key=config_key, params=params_dict) return config_key, params_dict - def __escape_delimiter_chars(self, key: str) -> str: - if not isinstance(key, str): - return key - for ( - delimiter_char, - escape_sequence, - ) in self.__KEY_DELIMITERS_TO_ESCAPE_SEQUENCES.items(): - if escape_sequence in str(key): - raise InvalidPolicyConfig( - f"{escape_sequence} is not a valid string for a policy config" - ) - key = key.replace(delimiter_char, escape_sequence) - return key - def __unescape_delimiter_chars(self, key: str) -> str: for ( delimiter_char, escape_sequence, - ) in self.__KEY_DELIMITERS_TO_ESCAPE_SEQUENCES.items(): + ) in self._KEY_DELIMITERS_TO_ESCAPE_SEQUENCES.items(): key = key.replace(escape_sequence, delimiter_char) return key - def __validate_config_params( - self, config_key: str, params: dict[str, Any], value: Any = None - ) -> AllocationPolicyConfig: - definitions = self.config_definitions() - - class_name = self.__class__.__name__ - - # config doesn't exist - if config_key not in definitions: - raise InvalidPolicyConfig( - f"'{config_key}' is not a valid config for {class_name}!" - ) - - config = definitions[config_key] - - # missing required parameters - if ( - diff := { - key: config.param_types[key].__name__ - for key in config.param_types - if key not in params - } - ) != dict(): - raise InvalidPolicyConfig( - f"'{config_key}' missing required parameters: {diff} for {class_name}!" - ) - - # not an optional config (no parameters) - if params and not config.param_types: - raise InvalidPolicyConfig( - f"'{config_key}' takes no params for {class_name}!" - ) - - # parameters aren't correct types - if params: - for param_name in params: - if not isinstance(params[param_name], config.param_types[param_name]): - try: - # try casting to the right type, eg try int("10") - expected_type = config.param_types[param_name] - params[param_name] = expected_type(params[param_name]) - except Exception: - raise InvalidPolicyConfig( - f"'{config_key}' parameter '{param_name}' needs to be of type" - f" {config.param_types[param_name].__name__} (not {type(params[param_name]).__name__})" - f" for {class_name}!" - ) - - # value isn't correct type - if value is not None: - if not isinstance(value, config.value_type): - try: - # try casting to the right type - config.value_type(value) - except Exception: - raise InvalidPolicyConfig( - f"'{config_key}' value needs to be of type" - f" {config.value_type.__name__} (not {type(value).__name__})" - f" for {class_name}!" - ) - - return config - def get_quota_allowance( self, tenant_ids: dict[str, str | int], query_id: str ) -> QuotaAllowance: @@ -856,6 +689,9 @@ def _update_quota_balance( def storage_key(self) -> StorageKey: return self._storage_key + def component_namespace(self) -> str: + return "allocation_policy" + class PassthroughPolicy(AllocationPolicy): def _additional_config_definitions(self) -> list[AllocationPolicyConfig]: diff --git a/snuba/query/allocation_policies/cross_org.py b/snuba/query/allocation_policies/cross_org.py index dcd80ddee4a..6d5d625c02c 100644 --- a/snuba/query/allocation_policies/cross_org.py +++ b/snuba/query/allocation_policies/cross_org.py @@ -6,13 +6,13 @@ from snuba.datasets.storages.storage_key import StorageKey from snuba.query.allocation_policies import ( AllocationPolicyConfig, - InvalidPolicyConfig, QueryResultOrError, QuotaAllowance, ) from snuba.query.allocation_policies.concurrent_rate_limit import ( BaseConcurrentRateLimitAllocationPolicy, ) +from snuba.query.configuration import InvalidConfig from snuba.redis import RedisClientKey, get_redis_client from snuba.state.rate_limit import RateLimitParameters from snuba.utils.serializable_exception import JsonSerializable @@ -76,7 +76,7 @@ def set_config_value( ): referrer = params.get("referrer", None) if referrer is not None and not self._referrer_is_registered(referrer): - raise InvalidPolicyConfig( + raise InvalidConfig( f"Referrer {referrer} is not registered in the the {self._storage_key.value} yaml. Register it first to be able to override its limits" ) super().set_config_value(config_key, value, params, user) diff --git a/snuba/query/configuration.py b/snuba/query/configuration.py new file mode 100644 index 00000000000..f5e8c5eef76 --- /dev/null +++ b/snuba/query/configuration.py @@ -0,0 +1,218 @@ +from dataclasses import dataclass, field +from typing import Any + +from snuba.state import delete_config as delete_runtime_config + + +class InvalidConfig(Exception): + pass + + +@dataclass() +class Configuration: + name: str + description: str + value_type: type + default: Any + param_types: dict[str, type] = field(default_factory=dict) + + def __post_init__(self) -> None: + if type(self.default) != self.value_type: + raise ValueError( + f"Config item `{self.name}` expects type {self.value_type} got value `{self.default}` of type {type(self.default)}" + ) + + def __to_base_dict(self) -> dict[str, Any]: + return { + "name": self.name, + "type": self.value_type.__name__, + "default": self.default, + "description": self.description, + } + + def to_definition_dict(self) -> dict[str, Any]: + """Returns a dict representation of the definition of a Config.""" + return { + **self.__to_base_dict(), + "params": [ + {"name": param, "type": self.param_types[param].__name__} + for param in self.param_types + ], + } + + def to_config_dict( + self, value: Any = None, params: dict[str, Any] = {} + ) -> dict[str, Any]: + """Returns a dict representation of a live Config.""" + return { + **self.__to_base_dict(), + "value": value if value is not None else self.default, + "params": params, + } + + +class ConfigurableComponent: + + # This component builds redis strings that are delimited by dots, commas, colons + # in order to allow those characters to exist in config we replace them with their + # counterparts on write/read. It may be better to just replace our serialization with JSON + # instead of what we're doing but this is where we're at rn 1/10/24 + _KEY_DELIMITERS_TO_ESCAPE_SEQUENCES = { + ".": "__dot_literal__", + ",": "__comma_literal__", + ":": "__colon_literal__", + } + + def component_name(self) -> str: + # what is this configurable component? + # allocation policy? routing strategy? strategy selector? + return self.__class__.__name__ + + def component_namespace(self) -> str: + # a way to uniquely identify the component + # so that a configuration namespace can + # be created for it + raise NotImplementedError + + def get_default_config_definitions(self) -> list[Configuration]: + """Returns a list of default config definitions for this AllocationPolicy.""" + raise NotImplementedError + + def get_additional_config_definitions(self) -> list[Configuration]: + """Returns a list of additional config definitions for this AllocationPolicy.""" + raise NotImplementedError + + @property + def runtime_config_prefix(self) -> str: + raise NotImplementedError + + def get_configurations(self) -> dict[str, Configuration]: + """Returns a dictionary of config definitions on this AllocationPolicy.""" + return { + config.name: config + for config in self.get_default_config_definitions() + + self.get_additional_config_definitions() + } + + def get_config_value( + self, config_key: str, params: dict[str, Any] = {}, validate: bool = True + ) -> Any: + pass + + def set_config_value( + self, + config_key: str, + value: Any, + params: dict[str, Any] = {}, + user: str | None = None, + ) -> None: + pass + + def _validate_config_params( + self, config_key: str, params: dict[str, Any], value: Any = None + ) -> Configuration: + definitions = self.get_configurations() + + class_name = self.__class__.__name__ + + # config doesn't exist + if config_key not in definitions: + raise InvalidConfig( + f"'{config_key}' is not a valid config for {class_name}!" + ) + + config = definitions[config_key] + + # missing required parameters + if ( + diff := { + key: config.param_types[key].__name__ + for key in config.param_types + if key not in params + } + ) != dict(): + raise InvalidConfig( + f"'{config_key}' missing required parameters: {diff} for {class_name}!" + ) + + # not an optional config (no parameters) + if params and not config.param_types: + raise InvalidConfig(f"'{config_key}' takes no params for {class_name}!") + + # parameters aren't correct types + if params: + for param_name in params: + if not isinstance(params[param_name], config.param_types[param_name]): + try: + # try casting to the right type, eg try int("10") + expected_type = config.param_types[param_name] + params[param_name] = expected_type(params[param_name]) + except Exception: + raise InvalidConfig( + f"'{config_key}' parameter '{param_name}' needs to be of type" + f" {config.param_types[param_name].__name__} (not {type(params[param_name]).__name__})" + f" for {class_name}!" + ) + + # value isn't correct type + if value is not None: + if not isinstance(value, config.value_type): + try: + # try casting to the right type + config.value_type(value) + except Exception: + raise InvalidConfig( + f"'{config_key}' value needs to be of type" + f" {config.value_type.__name__} (not {type(value).__name__})" + f" for {class_name}!" + ) + + return config + + def _escape_delimiter_chars(self, key: str) -> str: + if not isinstance(key, str): + return key + for ( + delimiter_char, + escape_sequence, + ) in self._KEY_DELIMITERS_TO_ESCAPE_SEQUENCES.items(): + if escape_sequence in str(key): + raise InvalidConfig( + f"{escape_sequence} is not a valid string for a config" + ) + key = key.replace(delimiter_char, escape_sequence) + return key + + def _build_runtime_config_key(self, config: str, params: dict[str, Any]) -> str: + """ + Builds a unique key to be used in the actual datastore containing these configs. + + Example return values: + - `"mystorage.MyAllocationPolicy.my_config"` # no params + - `"mystorage.MyAllocationPolicy.my_config.a:1,b:2"` # sorted params + """ + parameters = "." + for param in sorted(list(params.keys())): + param_sanitized = self._escape_delimiter_chars(param) + value_sanitized = self._escape_delimiter_chars(params[param]) + parameters += f"{param_sanitized}:{value_sanitized}," + parameters = parameters[:-1] + return f"{self.runtime_config_prefix}.{config}{parameters}" + + def delete_config_value( + self, + config_key: str, + hash: str, + params: dict[str, Any] = {}, + user: str | None = None, + ) -> None: + """ + Deletes an instance of an optional config on this AllocationPolicy. + If this function is run on a required config, it resets the value to default instead. + """ + self._validate_config_params(config_key, params) + delete_runtime_config( + key=self._build_runtime_config_key(config_key, params), + user=user, + config_key=config_key, + ) diff --git a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py index 91b0ad0cc7a..20c8f8dc201 100644 --- a/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py +++ b/snuba/web/rpc/storage_routing/routing_strategies/storage_routing.py @@ -14,7 +14,17 @@ from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import TraceItemTableRequest from snuba import environment, settings, state +from snuba.datasets.storages.storage_key import StorageKey from snuba.downsampled_storage_tiers import Tier +from snuba.query.allocation_policies import AllocationPolicy +from snuba.query.allocation_policies.bytes_scanned_rejecting_policy import ( + BytesScannedRejectingPolicy, +) +from snuba.query.allocation_policies.concurrent_rate_limit import ( + ConcurrentRateLimitAllocationPolicy, +) +from snuba.query.allocation_policies.per_referrer import ReferrerGuardRailPolicy +from snuba.query.configuration import ConfigurableComponent, Configuration from snuba.query.query_settings import HTTPQuerySettings from snuba.state import record_query from snuba.utils.metrics.timer import Timer @@ -23,6 +33,7 @@ from snuba.web import QueryResult from snuba.web.rpc.storage_routing.common import extract_message_meta +CBRS_HASH = "cbrs" _SAMPLING_IN_STORAGE_PREFIX = "sampling_in_storage_" _START_ESTIMATION_MARK = "start_sampling_in_storage_estimation" _END_ESTIMATION_MARK = "end_sampling_in_storage_estimation" @@ -156,7 +167,22 @@ def _construct_hacky_querylog_payload( } -class BaseRoutingStrategy(metaclass=RegisteredClass): +@dataclass() +class RoutingStrategyConfig(Configuration): + pass + + +class BaseRoutingStrategy(ConfigurableComponent, metaclass=RegisteredClass): + def __init__(self) -> None: + self._configurations = [ + RoutingStrategyConfig( + name="max_load", + description="The maximum load we allow the Clickhouse cluster to reach", + value_type=int, + default=100, + ), + ] + @classmethod def config_key(cls) -> str: return cls.__name__ @@ -169,6 +195,12 @@ def metrics(self) -> MetricsWrapper: tags={"routing_strategy_name": self.__class__.__name__}, ) + def component_namespace(self) -> str: + return "routing_strategy" + + def get_configurations(self) -> dict[str, Configuration]: + return {config.name: config for config in self._configurations} + @classmethod def get_from_name(cls, name: str) -> Type["BaseRoutingStrategy"]: return cast("Type[BaseRoutingStrategy]", cls.class_from_name(name)) @@ -182,6 +214,28 @@ def _is_highest_accuracy_mode(self, routing_context: RoutingContext) -> bool: == DownsampledStorageConfig.MODE_HIGHEST_ACCURACY ) + def get_delete_allocation_policies(self) -> list[AllocationPolicy]: + return [] + + def get_allocation_policies(cls) -> list[AllocationPolicy]: + return [ + ConcurrentRateLimitAllocationPolicy( + storage_key=StorageKey("eap_items"), + required_tenant_types=["organization_id", "referrer", "project_id"], + default_config_overrides={"is_enforced": 0}, + ), + ReferrerGuardRailPolicy( + storage_key=StorageKey("eap_items"), + required_tenant_types=["organization_id", "referrer", "project_id"], + default_config_overrides={"is_enforced": 0}, + ), + BytesScannedRejectingPolicy( + storage_key=StorageKey("eap_items"), + required_tenant_types=["organization_id", "referrer", "project_id"], + default_config_overrides={"is_enforced": 0}, + ), + ] + def merge_clickhouse_settings( self, routing_decision: RoutingDecision, @@ -366,6 +420,10 @@ def _emit_routing_mistake(self, routing_decision: RoutingDecision) -> None: }, ) + @property + def runtime_config_prefix(self) -> str: + return self.component_name() + import_submodules_in_directory( os.path.dirname(os.path.realpath(__file__)), diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 871f173f751..4e6c6d1a2ec 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -611,7 +611,7 @@ def mock_get_policies() -> list[AllocationPolicy]: "snuba.datasets.storage.ReadableTableStorage.get_allocation_policies", side_effect=mock_get_policies, ): - response = admin_api.get("/allocation_policy_configs/errors") + response = admin_api.get("/allocation_policy_configs/storage/errors") assert response.status_code == 200 assert response.json is not None and len(response.json) == 1 @@ -664,7 +664,7 @@ def mock_record(user: Any, action: Any, data: Any, notify: Any) -> None: assert response.status_code == 200, response.json # make sure an auditlog entry was recorded assert auditlog_records.pop() - response = admin_api.get("/allocation_policy_configs/errors") + response = admin_api.get("/allocation_policy_configs/storage/errors") assert response.status_code == 200 # three policies @@ -706,7 +706,7 @@ def mock_record(user: Any, action: Any, data: Any, notify: Any) -> None: == 200 ) - response = admin_api.get("/allocation_policy_configs/errors") + response = admin_api.get("/allocation_policy_configs/storage/errors") assert response.status_code == 200 assert response.json is not None and len(response.json) == 5 assert {