From cbd1e9d73d6ea449c76ac71ea19b33eabb92bfd2 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 29 Sep 2023 11:12:11 -0700 Subject: [PATCH 01/13] feat: Remove sessions datasets, storages, APIs and message processors This has been deprecated for 2 years, it's got to go --- snuba/cli/devserver.py | 12 - .../configuration/sessions/dataset.yaml | 6 - .../configuration/sessions/entities/org.yaml | 42 -- .../sessions/entities/sessions.yaml | 271 ----------- .../sessions/storages/hourly.yaml | 180 -------- .../configuration/sessions/storages/org.yaml | 177 -------- .../configuration/sessions/storages/raw.yaml | 42 -- .../datasets/processors/sessions_processor.py | 80 ---- .../storage_selectors/test_sessions.py | 96 ---- tests/test_sessions_api.py | 419 ------------------ 10 files changed, 1325 deletions(-) delete mode 100644 snuba/datasets/configuration/sessions/dataset.yaml delete mode 100644 snuba/datasets/configuration/sessions/entities/org.yaml delete mode 100644 snuba/datasets/configuration/sessions/entities/sessions.yaml delete mode 100644 snuba/datasets/configuration/sessions/storages/hourly.yaml delete mode 100644 snuba/datasets/configuration/sessions/storages/org.yaml delete mode 100644 snuba/datasets/configuration/sessions/storages/raw.yaml delete mode 100644 snuba/datasets/processors/sessions_processor.py delete mode 100644 tests/datasets/entities/storage_selectors/test_sessions.py delete mode 100644 tests/test_sessions_api.py diff --git a/snuba/cli/devserver.py b/snuba/cli/devserver.py index 4108c7ce056..b6cc5f2c04d 100644 --- a/snuba/cli/devserver.py +++ b/snuba/cli/devserver.py @@ -53,18 +53,6 @@ def devserver(*, bootstrap: bool, workers: bool) -> None: *COMMON_CONSUMER_DEV_OPTIONS, ], ), - ( - "sessions-consumer", - [ - "snuba", - "consumer", - "--auto-offset-reset=latest", - "--no-strict-offset-reset", - "--log-level=debug", - "--storage=sessions_raw", - "--consumer-group=sessions_group", - ], - ), ( "outcomes-consumer", [ diff --git a/snuba/datasets/configuration/sessions/dataset.yaml b/snuba/datasets/configuration/sessions/dataset.yaml deleted file mode 100644 index d0e0b2688bf..00000000000 --- a/snuba/datasets/configuration/sessions/dataset.yaml +++ /dev/null @@ -1,6 +0,0 @@ -version: v1 -kind: dataset -name: sessions - -entities: - - sessions diff --git a/snuba/datasets/configuration/sessions/entities/org.yaml b/snuba/datasets/configuration/sessions/entities/org.yaml deleted file mode 100644 index 587eece0306..00000000000 --- a/snuba/datasets/configuration/sessions/entities/org.yaml +++ /dev/null @@ -1,42 +0,0 @@ -version: v1 -kind: entity -name: org_sessions -schema: - [ - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: started, type: DateTime }, - { name: bucketed_started, type: DateTime }, - ] -required_time_column: started - -storages: - - storage: org_sessions - is_writable: false - -storage_selector: - selector: DefaultQueryStorageSelector - -query_processors: - - processor: BasicFunctionsProcessor - - processor: TimeSeriesProcessor - args: - time_group_columns: - bucketed_started: started - time_parse_columns: - - started - - received - - processor: ReferrerRateLimiterProcessor - - processor: OrganizationRateLimiterProcessor - args: - org_column: org_id - - processor: ProjectReferrerRateLimiter - args: - project_column: project_id - - processor: ProjectRateLimiterProcessor - args: - project_column: project_id - - processor: ResourceQuotaProcessor - args: - project_field: project_id -validators: [] diff --git a/snuba/datasets/configuration/sessions/entities/sessions.yaml b/snuba/datasets/configuration/sessions/entities/sessions.yaml deleted file mode 100644 index 86176c87106..00000000000 --- a/snuba/datasets/configuration/sessions/entities/sessions.yaml +++ /dev/null @@ -1,271 +0,0 @@ -version: v1 -kind: entity -name: sessions -schema: - [ - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: started, type: DateTime }, - { name: release, type: String }, - { name: environment, type: String }, - { name: user_agent, type: String }, - { name: os, type: String }, - { - name: duration_quantiles, - type: AggregateFunction, - args: - { - func: "quantilesIf(0.5, 0.9)", - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: duration_avg, - type: AggregateFunction, - args: - { - func: avgIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_crashed, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_crashed_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_abnormal, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_abnormal_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_errored_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: users, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_crashed, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_abnormal, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { name: bucketed_started, type: DateTime }, - ] -required_time_column: started -storages: -- storage: sessions_hourly - is_writable: false - translation_mappers: - columns: - - mapper: DurationQuantilesHourlyMapper - - mapper: FunctionColumn - args: - col_name: duration_avg - function_name: avgIfMerge - - mapper: PlusFunctionColumns - args: - col_name: sessions - op1_col: sessions - op1_func: countIfMerge - op2_col: sessions_preaggr - op2_func: sumIfMerge - - mapper: PlusFunctionColumns - args: - col_name: sessions_crashed - op1_col: sessions_crashed - op1_func: countIfMerge - op2_col: sessions_crashed_preaggr - op2_func: sumIfMerge - - mapper: PlusFunctionColumns - args: - col_name: sessions_abnormal - op1_col: sessions_abnormal - op1_func: countIfMerge - op2_col: sessions_abnormal_preaggr - op2_func: sumIfMerge - - mapper: PlusFunctionColumns - args: - col_name: sessions_errored - op1_col: sessions_errored - op1_func: uniqIfMerge - op2_col: sessions_errored_preaggr - op2_func: sumIfMerge - - mapper: FunctionColumn - args: - col_name: users - function_name: uniqIfMerge - - mapper: FunctionColumn - args: - col_name: users_crashed - function_name: uniqIfMerge - - mapper: FunctionColumn - args: - col_name: users_abnormal - function_name: uniqIfMerge - - mapper: FunctionColumn - args: - col_name: users_errored - function_name: uniqIfMerge -- storage: sessions_raw - is_writable: true - translation_mappers: - columns: - - mapper: DurationQuantilesRawMapper - - mapper: DurationAvgRawMapper - - mapper: SessionsRawNumSessionsMapper - - mapper: SessionsRawCrashedMapper - - mapper: SessionsRawSessionsAbnormalMapper - - mapper: SessionsRawErroredMapper - - mapper: SessionsRawUsersMapper - - mapper: SessionsRawUsersCrashedMapper - - mapper: SessionsRawUsersAbnormalMapper - - mapper: SessionsRawUsersErroredMapper -storage_selector: - selector: SessionsQueryStorageSelector -query_processors: -- processor: BasicFunctionsProcessor -- processor: TimeSeriesProcessor - args: - time_group_columns: - bucketed_started: started - time_parse_columns: - - started - - received -- processor: OrganizationRateLimiterProcessor - args: - org_column: org_id -- processor: ProjectRateLimiterProcessor - args: - project_column: project_id -validators: -- validator: EntityRequiredColumnValidator - args: - required_filter_columns: - - org_id - - project_id -subscription_processors: - - processor: AddColumnCondition - args: - extra_condition_data_key: organization - extra_condition_column: org_id - -subscription_validators: - - validator: AggregationValidator - args: - max_allowed_aggregations: 2 - disallowed_aggregations: - - groupby - - having - - orderby - required_time_column: started diff --git a/snuba/datasets/configuration/sessions/storages/hourly.yaml b/snuba/datasets/configuration/sessions/storages/hourly.yaml deleted file mode 100644 index 5eb94f7660c..00000000000 --- a/snuba/datasets/configuration/sessions/storages/hourly.yaml +++ /dev/null @@ -1,180 +0,0 @@ -version: v1 -kind: readable_storage -name: sessions_hourly -storage: - key: sessions_hourly - set_key: sessions -readiness_state: deprecate -schema: - columns: - [ - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: started, type: DateTime }, - { name: release, type: String }, - { name: environment, type: String }, - { name: user_agent, type: String }, - { name: os, type: String }, - { - name: duration_quantiles, - type: AggregateFunction, - args: - { - func: "quantilesIf(0.5, 0.9)", - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: duration_avg, - type: AggregateFunction, - args: - { - func: avgIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_crashed, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_crashed_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_abnormal, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_abnormal_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_errored_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: users, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_crashed, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_abnormal, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - ] - local_table_name: sessions_hourly_local - dist_table_name: sessions_hourly_dist -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - org_id - - processor: TableRateLimit -mandatory_condition_checkers: - - condition: OrgIdEnforcer - - condition: ProjectIdEnforcer diff --git a/snuba/datasets/configuration/sessions/storages/org.yaml b/snuba/datasets/configuration/sessions/storages/org.yaml deleted file mode 100644 index b54e5a049d9..00000000000 --- a/snuba/datasets/configuration/sessions/storages/org.yaml +++ /dev/null @@ -1,177 +0,0 @@ -version: v1 -kind: readable_storage -name: org_sessions -storage: - key: org_sessions - set_key: sessions -readiness_state: deprecate -schema: - columns: - [ - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: started, type: DateTime }, - { name: release, type: String }, - { name: environment, type: String }, - { name: user_agent, type: String }, - { name: os, type: String }, - { - name: duration_quantiles, - type: AggregateFunction, - args: - { - func: "quantilesIf(0.5, 0.9)", - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: duration_avg, - type: AggregateFunction, - args: - { - func: avgIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_crashed, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_crashed_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_abnormal, - type: AggregateFunction, - args: - { - func: countIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_abnormal_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: sessions_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: sessions_errored_preaggr, - type: AggregateFunction, - args: - { - func: sumIf, - arg_types: - [ - { type: UInt, args: { size: 32 } }, - { type: UInt, args: { size: 8 } }, - ], - }, - }, - { - name: users, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_crashed, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_abnormal, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - { - name: users_errored, - type: AggregateFunction, - args: - { - func: uniqIf, - arg_types: [{ type: UUID }, { type: UInt, args: { size: 8 } }], - }, - }, - ] - local_table_name: sessions_hourly_local - dist_table_name: sessions_hourly_dist -query_processors: - - processor: PrewhereProcessor - args: - prewhere_candidates: - - project_id - - org_id - - processor: TableRateLimit diff --git a/snuba/datasets/configuration/sessions/storages/raw.yaml b/snuba/datasets/configuration/sessions/storages/raw.yaml deleted file mode 100644 index 0c8688f32d0..00000000000 --- a/snuba/datasets/configuration/sessions/storages/raw.yaml +++ /dev/null @@ -1,42 +0,0 @@ -version: v1 -kind: writable_storage -name: sessions_raw -storage: - key: sessions_raw - set_key: sessions -readiness_state: deprecate -schema: - columns: - [ - { name: session_id, type: UUID }, - { name: distinct_id, type: UUID }, - { name: quantity, type: UInt, args: { size: 32 } }, - { name: seq, type: UInt, args: { size: 64 } }, - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: retention_days, type: UInt, args: { size: 16 } }, - { name: duration, type: UInt, args: { size: 32 } }, - { name: status, type: UInt, args: { size: 8 } }, - { name: errors, type: UInt, args: { size: 16 } }, - { name: received, type: DateTime }, - { name: started, type: DateTime }, - { name: release, type: String }, - { name: environment, type: String }, - { name: user_agent, type: String }, - { name: os, type: String }, - ] - local_table_name: sessions_raw_local - dist_table_name: sessions_raw_dist -query_processors: - - processor: MinuteResolutionProcessor - - processor: TableRateLimit -mandatory_condition_checkers: - - condition: OrgIdEnforcer - - condition: ProjectIdEnforcer -stream_loader: - processor: SessionsProcessor - default_topic: ingest-sessions - commit_log_topic: snuba-sessions-commit-log - subscription_result_topic: sessions-subscription-results - subscription_scheduler_mode: global - subscription_scheduled_topic: scheduled-subscriptions-sessions diff --git a/snuba/datasets/processors/sessions_processor.py b/snuba/datasets/processors/sessions_processor.py deleted file mode 100644 index 0ea2dd7d8b2..00000000000 --- a/snuba/datasets/processors/sessions_processor.py +++ /dev/null @@ -1,80 +0,0 @@ -import uuid -from datetime import datetime -from typing import Any, Mapping, Optional - -from snuba import environment -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.events_format import enforce_retention -from snuba.datasets.processors import DatasetMessageProcessor -from snuba.processor import ( - MAX_UINT32, - NIL_UUID, - InsertBatch, - ProcessedMessage, - _collapse_uint16, - _collapse_uint32, - _ensure_valid_date, -) -from snuba.utils.metrics.wrapper import MetricsWrapper - -STATUS_MAPPING = { - "ok": 0, - "exited": 1, - "crashed": 2, - "abnormal": 3, - "errored": 4, -} - -metrics = MetricsWrapper(environment.metrics, "sessions.processor") - - -class SessionsProcessor(DatasetMessageProcessor): - def process_message( - self, message: Mapping[str, Any], metadata: KafkaMessageMetadata - ) -> Optional[ProcessedMessage]: - # some old relays accidentally emit rows without release - if message["release"] is None: - return None - if message["duration"] is None: - duration = None - else: - duration = _collapse_uint32(int(message["duration"] * 1000)) - - # since duration is not nullable, the max duration means no duration - if duration is None: - duration = MAX_UINT32 - - errors = _collapse_uint16(message["errors"]) or 0 - quantity = _collapse_uint32(message.get("quantity")) or 1 - - # If a session ends in crashed or abnormal we want to make sure that - # they count as errored too, so we can get the number of health and - # errored sessions correctly. - if message["status"] in ("crashed", "abnormal"): - errors = max(errors, 1) - - received = _ensure_valid_date(datetime.utcfromtimestamp(message["received"])) - started = _ensure_valid_date(datetime.utcfromtimestamp(message["started"])) - - if started is None: - metrics.increment("empty_started_date") - if received is None: - metrics.increment("empty_received_date") - - processed = { - "session_id": str(uuid.UUID(message["session_id"])), - "distinct_id": str(uuid.UUID(message.get("distinct_id") or NIL_UUID)), - "quantity": quantity, - "seq": message["seq"], - "org_id": message["org_id"], - "project_id": message["project_id"], - "retention_days": enforce_retention(message["retention_days"], received), - "duration": duration, - "status": STATUS_MAPPING[message["status"]], - "errors": errors, - "received": received if received is not None else datetime.now(), - "started": started if started is not None else datetime.now(), - "release": message["release"], - "environment": message.get("environment") or "", - } - return InsertBatch([processed], None) diff --git a/tests/datasets/entities/storage_selectors/test_sessions.py b/tests/datasets/entities/storage_selectors/test_sessions.py deleted file mode 100644 index 200bee0cc84..00000000000 --- a/tests/datasets/entities/storage_selectors/test_sessions.py +++ /dev/null @@ -1,96 +0,0 @@ -from typing import List - -import pytest - -from snuba.datasets.dataset import Dataset -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.entities.storage_selectors import QueryStorageSelector -from snuba.datasets.entities.storage_selectors.sessions import ( - SessionsQueryStorageSelector, -) -from snuba.datasets.factory import get_dataset -from snuba.datasets.storage import ( - EntityStorageConnection, - EntityStorageConnectionNotFound, - Storage, -) -from snuba.datasets.storages.factory import get_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.query.logical import Query -from snuba.query.query_settings import SubscriptionQuerySettings -from snuba.query.snql.parser import parse_snql_query - -TEST_CASES = [ - pytest.param( - """ - MATCH (sessions) - SELECT bucketed_started, users - BY bucketed_started - WHERE org_id = 1 - AND project_id IN tuple(1) - AND started >= toDateTime('2022-01-01 01:00:00') - AND started < toDateTime('2022-01-01 01:30:00') - LIMIT 5000 - GRANULARITY 60 - """, - get_dataset("sessions"), - get_entity(EntityKey.SESSIONS).get_all_storage_connections(), - SessionsQueryStorageSelector(), - get_storage(StorageKey.SESSIONS_RAW), - id="Sessions storage selector", - ), - pytest.param( - """ - MATCH (sessions) - SELECT bucketed_started, users - BY bucketed_started - WHERE org_id = 1 - AND project_id IN tuple(1) - AND started >= toDateTime('2022-01-01 01:00:00') - AND started < toDateTime('2022-01-01 03:00:00') - """, - get_dataset("sessions"), - get_entity(EntityKey.SESSIONS).get_all_storage_connections(), - SessionsQueryStorageSelector(), - get_storage(StorageKey.SESSIONS_HOURLY), - id="Sessions storage selector", - ), -] - - -@pytest.mark.parametrize( - "snql_query, dataset, storage_connections, selector, expected_storage", - TEST_CASES, -) -def test_query_storage_selector( - snql_query: str, - dataset: Dataset, - storage_connections: List[EntityStorageConnection], - selector: QueryStorageSelector, - expected_storage: Storage, -) -> None: - query, _ = parse_snql_query(str(snql_query), dataset) - assert isinstance(query, Query) - selected_storage = selector.select_storage( - query, SubscriptionQuerySettings(), storage_connections - ) - assert selected_storage.storage == expected_storage - - -def test_assert_raises() -> None: - query, _ = parse_snql_query( - """ - MATCH (events) - SELECT col1 - WHERE project_id IN tuple(2 , 3) - AND timestamp>=toDateTime('2021-01-01') - AND timestamp None: - session_1 = "b3ef3211-58a4-4b36-a9a1-5a55df0d9aae" - session_2 = "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf" - user_1 = "b3ef3211-58a4-4b36-a9a1-5a55df0d9aae" - user_2 = "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf" - - template = { - "duration": MAX_UINT32, - "environment": "production", - "org_id": 1, - "project_id": project_id, - "release": "sentry-test@1.0.0", - "retention_days": settings.DEFAULT_RETENTION_DAYS, - "seq": 0, - "errors": 0, - "received": datetime.now().isoformat(" ", "seconds"), - "started": self.started.replace(tzinfo=None).isoformat(" ", "seconds"), - } - - sessions: Sequence[Mapping[str, Any]] = [ - # individual "exited" session with two updates, a user and errors - {**template, "session_id": session_1, "distinct_id": user_1, "status": 0}, - { - **template, - "session_id": session_1, - "distinct_id": user_1, - "seq": 123, - "status": 2, - "errors": 123, - }, - # individual "exited" session with just one update, no user, no errors - {**template, "session_id": session_2, "status": 1}, - # pre-aggregated "errored" sessions, no user - {**template, "quantity": 9, "status": 4}, - # pre-aggregated "exited" sessions with user - {**template, "quantity": 5, "distinct_id": user_2, "status": 1}, - # pre-aggregated "exited" session - {**template, "quantity": 4, "status": 1}, - ] - - self.storage.get_table_writer().get_batch_writer( - metrics=DummyMetricsBackend(strict=True) - ).write([json.dumps(session).encode("utf-8") for session in sessions]) - - -@pytest.mark.redis_db -@pytest.mark.clickhouse_db -class TestLegacySessionsApi(BaseSessionsMockTest, BaseApiTest): - @pytest.fixture - def test_entity(self) -> Union[str, Tuple[str, str]]: - return "sessions" - - @pytest.fixture - def test_app(self) -> Any: - return self.app - - @pytest.fixture(autouse=True) - def setup_post(self, _build_snql_post_methods: Callable[[str], Any]) -> None: - self.post = _build_snql_post_methods - - @pytest.fixture(scope="class") - def get_project_id(self, request: object) -> Callable[[], int]: - id_iter = itertools.count() - next(id_iter) # skip 0 - return lambda: next(id_iter) - - def setup_method(self, test_method: Any) -> None: - super().setup_method(test_method) - - # values for test data - self.minutes = 180 - self.skew = timedelta(minutes=self.minutes) - self.started = datetime.utcnow().replace( - minute=0, second=0, microsecond=0, tzinfo=pytz.utc - ) - - self.storage = get_writable_storage(StorageKey.SESSIONS_RAW) - - def test_manual_session_aggregation( - self, get_project_id: Callable[[], int] - ) -> None: - project_id = get_project_id() - self.generate_manual_session_events(project_id) - response = self.post( - json.dumps( - { - "dataset": "sessions", - "organization": 1, - "project": project_id, - "selected_columns": [ - "sessions", - "sessions_errored", - "users", - "users_errored", - ], - "from_date": (self.started - self.skew).isoformat(), - "to_date": (self.started + self.skew).isoformat(), - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - assert len(data["data"]) == 1, data - assert data["data"][0]["sessions"] == 20 - assert data["data"][0]["sessions_errored"] == 10 - assert data["data"][0]["users"] == 2 - assert data["data"][0]["users_errored"] == 1 - - def generate_session_events(self, project_id: int) -> None: - processor = self.storage.get_table_writer().get_stream_loader().get_processor() - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - template = { - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": None, - "environment": "production", - "org_id": 1, - "project_id": project_id, - "release": "sentry-test@1.0.0", - "retention_days": settings.DEFAULT_RETENTION_DAYS, - "seq": 0, - "errors": 0, - "received": datetime.utcnow().timestamp(), - "started": self.started.timestamp(), - } - events = [ - processor.process_message( - { - **template, - "status": "exited", - "duration": 1947.49, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": (self.started + timedelta(minutes=13)).timestamp(), - }, - meta, - ), - processor.process_message( - {**template, "status": "exited", "quantity": 5}, - meta, - ), - processor.process_message( - {**template, "status": "errored", "errors": 1, "quantity": 2}, - meta, - ), - processor.process_message( - { - **template, - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "status": "errored", - "errors": 1, - "quantity": 2, - "started": (self.started + timedelta(minutes=24)).timestamp(), - }, - meta, - ), - ] - filtered = [e for e in events if e] - write_processed_messages(self.storage, filtered) - - def test_session_aggregation(self, get_project_id: Callable[[], int]) -> None: - project_id = get_project_id() - self.generate_session_events(project_id) - response = self.post( - json.dumps( - { - "dataset": "sessions", - "organization": 1, - "project": project_id, - "selected_columns": [ - "sessions", - "sessions_errored", - "users", - "users_errored", - ], - "from_date": (self.started - self.skew).isoformat(), - "to_date": (self.started + self.skew).isoformat(), - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - - assert len(data["data"]) == 1, data - assert data["data"][0]["sessions"] == 10 - assert data["data"][0]["sessions_errored"] == 4 - assert data["data"][0]["users"] == 1 - assert data["data"][0]["users_errored"] == 1 - - # the test sessions have timestamps of `:00`, `:13` and `:24`, so they will - # end up in 3 buckets no matter if we group by minute, 2 minutes or 10 minutes - @pytest.mark.parametrize("granularity", [60, 120, 600]) - def test_session_small_granularity( - self, get_project_id: Callable[[], int], granularity: int - ) -> None: - project_id = get_project_id() - self.generate_session_events(project_id) - response = self.post( - json.dumps( - { - "dataset": "sessions", - "organization": 1, - "project": project_id, - "selected_columns": [ - "bucketed_started", - "sessions", - "sessions_errored", - "users", - "users_errored", - ], - "groupby": ["bucketed_started"], - "orderby": ["bucketed_started"], - "granularity": granularity, - "from_date": (self.started - self.skew).isoformat(), - "to_date": (self.started + self.skew).isoformat(), - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - - assert len(data["data"]) == 3, data - assert data["data"][0]["sessions"] == 7 - assert data["data"][0]["sessions_errored"] == 2 - assert data["data"][0]["users"] == 1 - assert data["data"][0]["users_errored"] == 1 - assert data["data"][1]["sessions"] == 1 - assert data["data"][1]["sessions_errored"] == 0 - assert data["data"][1]["users"] == 1 - assert data["data"][1]["users_errored"] == 0 - assert data["data"][2]["sessions"] == 2 - assert data["data"][2]["sessions_errored"] == 2 - assert data["data"][2]["users"] == 1 - assert data["data"][2]["users_errored"] == 1 - - def test_minute_granularity_range(self, get_project_id: Callable[[], int]) -> None: - project_id = get_project_id() - self.generate_session_events(project_id) - response = self.post( - json.dumps( - { - "dataset": "sessions", - "organization": 1, - "project": project_id, - "selected_columns": ["bucketed_started", "sessions"], - "groupby": ["bucketed_started"], - "granularity": 60, - # `skew` is 3h - "from_date": (self.started - self.skew - self.skew).isoformat(), - "to_date": (self.started + self.skew + self.skew).isoformat(), - } - ), - ) - data = json.loads(response.data) - assert response.status_code == 400, response.data - - assert data["error"] == { - "type": "invalid_query", - "message": "Minute-resolution queries are restricted to a 7-hour time window.", - } - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestSessionsApi(BaseSessionsMockTest, BaseApiTest): - def post(self, url: str, data: str) -> Any: - return self.app.post(url, data=data, headers={"referer": "test"}) - - def setup_method(self, test_method: Callable[..., Any]) -> None: - super().setup_method(test_method) - self.skew = timedelta(minutes=180) - self.base_time = datetime.utcnow().replace( - minute=0, second=0, microsecond=0 - ) - timedelta(minutes=180) - self.next_time = datetime.utcnow().replace( - minute=0, second=0, microsecond=0 - ) + timedelta(minutes=180) - self.storage = get_writable_storage(StorageKey.SESSIONS_RAW) - - @pytest.fixture(scope="class") - def get_project_id(self, request: object) -> Callable[[], int]: - id_iter = itertools.count() - next(id_iter) # skip 0 - return lambda: next(id_iter) - - def generate_session_events(self, project_id: int) -> None: - processor = self.storage.get_table_writer().get_stream_loader().get_processor() - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - template = { - "session_id": "00000000-0000-0000-0000-000000000000", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": None, - "environment": "production", - "org_id": 1, - "project_id": project_id, - "release": "sentry-test@1.0.0", - "retention_days": settings.DEFAULT_RETENTION_DAYS, - "seq": 0, - "errors": 0, - "received": datetime.utcnow().timestamp(), - "started": self.started.timestamp(), - } - events = [ - processor.process_message( - { - **template, - "status": "exited", - "duration": 1947.49, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": (self.started + timedelta(minutes=13)).timestamp(), - }, - meta, - ), - processor.process_message( - {**template, "status": "exited", "quantity": 5}, - meta, - ), - processor.process_message( - {**template, "status": "errored", "errors": 1, "quantity": 2}, - meta, - ), - processor.process_message( - { - **template, - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "status": "errored", - "errors": 1, - "quantity": 2, - "started": (self.started + timedelta(minutes=24)).timestamp(), - }, - meta, - ), - ] - filtered = [e for e in events if e] - write_processed_messages(self.storage, filtered) - - def test_sessions_bucketed_time_columns( - self, get_project_id: Callable[[], int] - ) -> None: - project_id = get_project_id() - self.generate_session_events(project_id) - response = self.post( - "/sessions/snql", - data=json.dumps( - { - "query": f"""MATCH (sessions) - SELECT bucketed_started, users_errored, users_crashed, users, users_abnormal - BY bucketed_started - WHERE org_id = 1 - AND started >= toDateTime('{self.base_time.isoformat()}') - AND started < toDateTime('{self.next_time.isoformat()}') - AND project_id IN tuple({project_id}) - AND project_id IN tuple({project_id}) - LIMIT 5000 - GRANULARITY 86400 - """ - } - ), - ) - - assert response.status_code == 200 - result = json.loads(response.data) - assert len(result["data"]) > 0 - assert "bucketed_started" in result["data"][0] - - def test_sessions_bucketed_time_columns_no_granularity( - self, get_project_id: Callable[[], int] - ) -> None: - project_id = get_project_id() - self.generate_session_events(project_id) - response = self.post( - "/sessions/snql", - data=json.dumps( - { - "query": f"""MATCH (sessions) - SELECT bucketed_started, users_errored, users_crashed, users, users_abnormal - BY bucketed_started - WHERE org_id = 1 - AND started >= toDateTime('{self.base_time.isoformat()}') - AND started < toDateTime('{self.next_time.isoformat()}') - AND project_id IN tuple({project_id}) - AND project_id IN tuple({project_id}) - LIMIT 5000 - """ - } - ), - ) - - assert response.status_code == 200 - result = json.loads(response.data) - assert len(result["data"]) > 0 - assert "bucketed_started" in result["data"][0] From 1930708451a5468302441cee325b736fbd04a19c Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 29 Sep 2023 11:33:19 -0700 Subject: [PATCH 02/13] . --- tests/datasets/test_entity_factory.py | 1 - tests/datasets/test_session_processor.py | 202 --------------- tests/datasets/test_sessions_processing.py | 245 ------------------ tests/query/snql/test_invalid_queries.py | 2 +- tests/query/snql/test_query.py | 4 +- .../test_entity_subscriptions.py | 46 ---- tests/test_org_sessions_api.py | 181 ------------- 7 files changed, 3 insertions(+), 678 deletions(-) delete mode 100644 tests/datasets/test_session_processor.py delete mode 100644 tests/datasets/test_sessions_processing.py delete mode 100644 tests/test_org_sessions_api.py diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index e94f8988a81..ce3ca6dbf3e 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -18,7 +18,6 @@ EntityKey.GROUPEDMESSAGE, EntityKey.OUTCOMES, EntityKey.OUTCOMES_RAW, - EntityKey.SESSIONS, EntityKey.SEARCH_ISSUES, EntityKey.ORG_SESSIONS, EntityKey.TRANSACTIONS, diff --git a/tests/datasets/test_session_processor.py b/tests/datasets/test_session_processor.py deleted file mode 100644 index 903f2384921..00000000000 --- a/tests/datasets/test_session_processor.py +++ /dev/null @@ -1,202 +0,0 @@ -from datetime import datetime, timedelta, timezone - -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.processors.sessions_processor import SessionsProcessor -from snuba.processor import InsertBatch - - -class TestSessionProcessor: - def test_ingest_session_event_max_sample_rate(self) -> None: - timestamp = datetime.now(timezone.utc) - started = timestamp - timedelta(hours=1) - - payload = { - "device_family": "iPhone12,3", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": 1947.49, - "environment": "production", - "org_id": 1, - "os": "iOS", - "os_version": "13.3.1", - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - "errors": 0, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.timestamp(), - "status": "exited", - "received": timestamp.timestamp(), - } - - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - assert SessionsProcessor().process_message(payload, meta) == InsertBatch( - [ - { - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "quantity": 1, - "duration": 1947490, - "environment": "production", - "org_id": 1, - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - "errors": 0, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.replace(tzinfo=None), - "status": 1, - "received": timestamp.replace(tzinfo=None), - } - ], - None, - ) - - def test_ingest_session_event_abnormal(self) -> None: - timestamp = datetime.now(timezone.utc) - started = timestamp - timedelta(hours=1) - - payload = { - "device_family": "iPhone12,3", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": 1947.49, - "environment": "production", - "org_id": 1, - "os": "iOS", - "os_version": "13.3.1", - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - "errors": 0, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.timestamp(), - "status": "abnormal", - "received": timestamp.timestamp(), - } - - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - assert SessionsProcessor().process_message(payload, meta) == InsertBatch( - [ - { - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "quantity": 1, - "duration": 1947490, - "environment": "production", - "org_id": 1, - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - # abnormal counts as at least one error - "errors": 1, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.replace(tzinfo=None), - "status": 3, - "received": timestamp.replace(tzinfo=None), - } - ], - None, - ) - - def test_ingest_session_event_crashed(self) -> None: - timestamp = datetime.now(timezone.utc) - started = timestamp - timedelta(hours=1) - - payload = { - "device_family": "iPhone12,3", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": 1947.49, - "environment": "production", - "org_id": 1, - "os": "iOS", - "os_version": "13.3.1", - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - "errors": 0, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.timestamp(), - "status": "crashed", - "received": timestamp.timestamp(), - } - - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - assert SessionsProcessor().process_message(payload, meta) == InsertBatch( - [ - { - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "quantity": 1, - "duration": 1947490, - "environment": "production", - "org_id": 1, - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - # abnormal counts as at least one error - "errors": 1, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.replace(tzinfo=None), - "status": 2, - "received": timestamp.replace(tzinfo=None), - } - ], - None, - ) - - def test_retention_days(self) -> None: - timestamp = datetime.now(timezone.utc) - started = timestamp - timedelta(hours=1) - - payload = { - "device_family": "iPhone12,3", - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "duration": 1947.49, - "environment": "production", - "org_id": 1, - "os": "iOS", - "os_version": "13.3.1", - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 120, - "seq": 42, - "errors": 0, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.timestamp(), - "status": "crashed", - "received": timestamp.timestamp(), - } - - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - assert SessionsProcessor().process_message(payload, meta) == InsertBatch( - [ - { - "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", - "quantity": 1, - "duration": 1947490, - "environment": "production", - "org_id": 1, - "project_id": 42, - "release": "sentry-test@1.0.0", - "retention_days": 90, - "seq": 42, - # abnormal counts as at least one error - "errors": 1, - "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", - "started": started.replace(tzinfo=None), - "status": 2, - "received": timestamp.replace(tzinfo=None), - } - ], - None, - ) diff --git a/tests/datasets/test_sessions_processing.py b/tests/datasets/test_sessions_processing.py deleted file mode 100644 index afa3ed68e64..00000000000 --- a/tests/datasets/test_sessions_processing.py +++ /dev/null @@ -1,245 +0,0 @@ -from typing import Any, MutableMapping - -import pytest -from snuba_sdk.legacy import json_to_snql - -from snuba.attribution import get_app_id -from snuba.attribution.attribution_info import AttributionInfo -from snuba.clickhouse.query import Query -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.factory import get_dataset -from snuba.datasets.schemas.tables import TableSchema -from snuba.datasets.storages.factory import get_storage -from snuba.datasets.storages.storage_key import StorageKey -from snuba.query import SelectedExpression -from snuba.query.expressions import Column, CurriedFunctionCall, FunctionCall, Literal -from snuba.query.query_settings import ( - HTTPQuerySettings, - QuerySettings, - SubscriptionQuerySettings, -) -from snuba.query.snql.parser import parse_snql_query -from snuba.reader import Reader -from snuba.request import Request -from snuba.web import QueryResult - -sessions_read_schema = get_storage(StorageKey.SESSIONS_HOURLY).get_schema() -sessions_raw_schema = get_storage(StorageKey.SESSIONS_RAW).get_schema() -assert isinstance(sessions_read_schema, TableSchema) -assert isinstance(sessions_raw_schema, TableSchema) - - -@pytest.mark.clickhouse_db -def test_sessions_processing() -> None: - query_body = { - "query": """ - MATCH (sessions) - SELECT duration_quantiles, sessions, users - WHERE org_id = 1 - AND project_id = 1 - AND started >= toDateTime('2020-01-01T12:00:00') - AND started < toDateTime('2020-01-02T12:00:00') - """, - "dataset": "sessions", - } - - sessions = get_dataset("sessions") - sessions_entity = get_entity(EntityKey.SESSIONS) - - query, snql_anonymized = parse_snql_query(query_body["query"], sessions) - request = Request( - id="a", - original_body=query_body, - query=query, - snql_anonymized=snql_anonymized, - query_settings=HTTPQuerySettings(referrer=""), - attribution_info=AttributionInfo( - get_app_id("default"), {"tenant_type": "tenant_id"}, "", None, None, None - ), - ) - - def query_runner( - clickhouse_query: Query, query_settings: QuerySettings, reader: Reader - ) -> QueryResult: - quantiles = tuple( - Literal(None, quant) for quant in [0.5, 0.75, 0.9, 0.95, 0.99, 1] - ) - assert clickhouse_query.get_selected_columns() == [ - SelectedExpression( - "duration_quantiles", - CurriedFunctionCall( - "_snuba_duration_quantiles", - FunctionCall( - None, - "quantilesIfMerge", - quantiles, - ), - (Column(None, None, "duration_quantiles"),), - ), - ), - SelectedExpression( - "sessions", - FunctionCall( - "_snuba_sessions", - "plus", - ( - FunctionCall( - None, "countIfMerge", (Column(None, None, "sessions"),) - ), - FunctionCall( - None, - "sumIfMerge", - (Column(None, None, "sessions_preaggr"),), - ), - ), - ), - ), - SelectedExpression( - "users", - FunctionCall( - "_snuba_users", "uniqIfMerge", (Column(None, None, "users"),) - ), - ), - ] - return QueryResult({}, {}) - - sessions_entity.get_query_pipeline_builder().build_execution_pipeline( - request, query_runner - ).execute() - - -selector_tests = [ - pytest.param( - { - "selected_columns": ["sessions", "bucketed_started"], - "groupby": ["bucketed_started"], - "conditions": [ - ["org_id", "=", 1], - ["project_id", "=", 1], - ["started", ">=", "2020-01-01T12:00:00"], - ["started", "<", "2020-01-02T12:00:00"], - ], - }, - False, - sessions_read_schema.get_table_name(), - id="Select hourly by default", - ), - pytest.param( - { - "selected_columns": ["sessions"], - "granularity": 60, - "conditions": [ - ["org_id", "=", 1], - ["project_id", "=", 1], - ["started", ">=", "2020-01-01T12:00:00"], - ["started", "<", "2020-01-02T12:00:00"], - ], - }, - False, - sessions_read_schema.get_table_name(), - id="Select hourly if not grouped by started time", - ), - pytest.param( - { - "selected_columns": ["sessions", "bucketed_started"], - "groupby": ["bucketed_started"], - "granularity": 60, - "conditions": [ - ("org_id", "=", 1), - ("project_id", "=", 1), - ("started", ">=", "2019-09-19T10:00:00"), - ("started", "<", "2019-09-19T12:00:00"), - ], - }, - False, - sessions_raw_schema.get_table_name(), - id="Select raw depending on granularity", - ), - pytest.param( - { - "selected_columns": [], - "aggregations": [ - [ - "if(greater(sessions, 0), divide(sessions_crashed, sessions), null)", - None, - "crash_rate_alert_aggregate", - ] - ], - "conditions": [ - ("org_id", "=", 1), - ("project_id", "=", 1), - ("started", ">=", "2019-09-19T10:00:00"), - ("started", "<", "2019-09-19T11:00:00"), - ], - }, - True, - sessions_raw_schema.get_table_name(), - id="Select raw if its a dataset subscription and time_window is <=1h", - ), - pytest.param( - { - "selected_columns": [], - "aggregations": [ - [ - "if(greater(sessions, 0), divide(sessions_crashed, sessions), null)", - None, - "crash_rate_alert_aggregate", - ] - ], - "conditions": [ - ("org_id", "=", 1), - ("project_id", "=", 1), - ("started", ">=", "2019-09-19T10:00:00"), - ("started", "<", "2019-09-19T12:00:00"), - ], - }, - True, - sessions_read_schema.get_table_name(), - id="Select materialized if its a dataset subscription and time_window > 1h", - ), -] - - -@pytest.mark.parametrize( - "query_body, is_subscription, expected_table", - selector_tests, -) -@pytest.mark.clickhouse_db -def test_select_storage( - query_body: MutableMapping[str, Any], is_subscription: bool, expected_table: str -) -> None: - sessions = get_dataset("sessions") - sessions_entity = get_entity(EntityKey.SESSIONS) - request = json_to_snql(query_body, "sessions") - request.validate() - query, snql_anonymized = parse_snql_query(str(request.query), sessions) - subscription_settings = ( - SubscriptionQuerySettings if is_subscription else HTTPQuerySettings - ) - - request = Request( - id="a", - original_body=request.to_dict(), - query=query, - snql_anonymized=snql_anonymized, - query_settings=subscription_settings(referrer=""), - attribution_info=AttributionInfo( - get_app_id("default"), - {"tenant_type": "tenant_id"}, - "blah", - None, - None, - None, - ), - ) - - def query_runner( - clickhouse_query: Query, query_settings: QuerySettings, reader: Reader - ) -> QueryResult: - assert clickhouse_query.get_from_clause().table_name == expected_table - return QueryResult({}, {}) - - sessions_entity.get_query_pipeline_builder().build_execution_pipeline( - request, query_runner - ).execute() diff --git a/tests/query/snql/test_invalid_queries.py b/tests/query/snql/test_invalid_queries.py index 2bc3cd0e63b..3ec53560abf 100644 --- a/tests/query/snql/test_invalid_queries.py +++ b/tests/query/snql/test_invalid_queries.py @@ -99,7 +99,7 @@ def test_failures(query_body: str, message: str) -> None: "contains": (EntityKey.TRANSACTIONS, "event_id"), "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), - "activity": (EntityKey.SESSIONS, "org_id"), + "activity": (EntityKey.METRICS, "org_id"), } def events_mock(relationship: str) -> Optional[JoinRelationship]: diff --git a/tests/query/snql/test_query.py b/tests/query/snql/test_query.py index 633ec9b9171..5d20149c0f3 100644 --- a/tests/query/snql/test_query.py +++ b/tests/query/snql/test_query.py @@ -1160,8 +1160,8 @@ def build_cond(tn: str) -> str: right_node=IndividualNode( "se", QueryEntity( - EntityKey.SESSIONS, - get_entity(EntityKey.SESSIONS).get_data_model(), + EntityKey.METRICS, + get_entity(EntityKey.METRICS).get_data_model(), ), ), keys=[ diff --git a/tests/subscriptions/entity_subscriptions/test_entity_subscriptions.py b/tests/subscriptions/entity_subscriptions/test_entity_subscriptions.py index 9162a8ec8f1..057ac9a622f 100644 --- a/tests/subscriptions/entity_subscriptions/test_entity_subscriptions.py +++ b/tests/subscriptions/entity_subscriptions/test_entity_subscriptions.py @@ -60,52 +60,6 @@ 5, id="Transactions subscription", ), - pytest.param( - EntityKey.SESSIONS, - Query( - QueryEntity( - EntityKey.SESSIONS, - get_entity(EntityKey.SESSIONS).get_data_model(), - ), - selected_columns=[ - SelectedExpression( - "time", Column("_snuba_timestamp", None, "timestamp") - ), - ], - condition=binary_condition( - "equals", - Column("_snuba_project_id", None, "project_id"), - Literal(None, 1), - ), - ), - {"organization": 1}, - None, - 5, - id="Sessions subscription", - ), - pytest.param( - EntityKey.SESSIONS, - Query( - QueryEntity( - EntityKey.SESSIONS, - get_entity(EntityKey.SESSIONS).get_data_model(), - ), - selected_columns=[ - SelectedExpression( - "time", Column("_snuba_timestamp", None, "timestamp") - ), - ], - condition=binary_condition( - "equals", - Column("_snuba_project_id", None, "project_id"), - Literal(None, 1), - ), - ), - {}, - InvalidQueryException, - 5, - id="Sessions subscription", - ), pytest.param( EntityKey.METRICS_COUNTERS, Query( diff --git a/tests/test_org_sessions_api.py b/tests/test_org_sessions_api.py deleted file mode 100644 index 6bce0546a80..00000000000 --- a/tests/test_org_sessions_api.py +++ /dev/null @@ -1,181 +0,0 @@ -import itertools -from datetime import datetime, timedelta -from typing import Any -from uuid import uuid4 - -import pytest -import pytz -import simplejson as json -from snuba_sdk import Request -from snuba_sdk.column import Column -from snuba_sdk.conditions import Condition, Op -from snuba_sdk.entity import Entity -from snuba_sdk.expressions import Granularity -from snuba_sdk.orderby import Direction, OrderBy -from snuba_sdk.query import Query - -from snuba import settings -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.storages.factory import get_writable_storage -from snuba.datasets.storages.storage_key import StorageKey -from tests.base import BaseApiTest -from tests.helpers import write_processed_messages - - -@pytest.mark.clickhouse_db -@pytest.mark.redis_db -class TestOrgSessionsApi(BaseApiTest): - def post(self, url: str, data: str) -> Any: - return self.app.post(url, data=data, headers={"referer": "test"}) - - @pytest.fixture(autouse=True) - def setup_teardown(self, clickhouse_db: None) -> None: - # values for test data - self.started = datetime.utcnow().replace( - minute=0, second=0, microsecond=0, tzinfo=pytz.utc - ) - - self.storage = get_writable_storage(StorageKey.SESSIONS_RAW) - - self.id_iter = itertools.count() - next(self.id_iter) # skip 0 - self.project_id = next(self.id_iter) - self.org_id = next(self.id_iter) - self.project_id2 = next(self.id_iter) - - self.generate_session_events(self.org_id, self.project_id) - self.generate_session_events(self.org_id, self.project_id2) - - def generate_session_events(self, org_id: int, project_id: int) -> None: - processor = self.storage.get_table_writer().get_stream_loader().get_processor() - meta = KafkaMessageMetadata( - offset=1, partition=2, timestamp=datetime(1970, 1, 1) - ) - distinct_id = uuid4().hex - template = { - "session_id": uuid4().hex, - "distinct_id": distinct_id, - "duration": None, - "environment": "production", - "org_id": org_id, - "project_id": project_id, - "release": "sentry-test@1.0.1", - "retention_days": settings.DEFAULT_RETENTION_DAYS, - "seq": 0, - "errors": 0, - "received": datetime.utcnow().timestamp(), - "started": self.started.timestamp(), - } - events = [ - processor.process_message( - { - **template, - "status": "exited", - "duration": 1947.49, - "session_id": uuid4().hex, - "started": (self.started + timedelta(minutes=13)).timestamp(), - }, - meta, - ), - processor.process_message( - {**template, "status": "exited", "quantity": 5}, - meta, - ), - processor.process_message( - {**template, "status": "errored", "errors": 1, "quantity": 2}, - meta, - ), - processor.process_message( - { - **template, - "distinct_id": distinct_id, - "status": "errored", - "errors": 1, - "quantity": 2, - "started": (self.started + timedelta(minutes=24)).timestamp(), - }, - meta, - ), - ] - filtered = [e for e in events if e] - write_processed_messages(self.storage, filtered) - - def test_simple(self) -> None: - query = Query( - match=Entity("org_sessions"), - select=[Column("org_id"), Column("project_id")], - groupby=[Column("org_id"), Column("project_id")], - where=[ - Condition( - Column("started"), Op.GTE, datetime.utcnow() - timedelta(hours=6) - ), - Condition(Column("started"), Op.LT, datetime.utcnow()), - ], - granularity=Granularity(3600), - ) - - request = Request(dataset="sessions", app_id="default", query=query) - response = self.app.post( - "/sessions/snql", - data=json.dumps(request.to_dict()), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - assert len(data["data"]) == 2 - assert data["data"][0]["org_id"] == self.org_id - assert data["data"][0]["project_id"] == self.project_id - assert data["data"][1]["org_id"] == self.org_id - assert data["data"][1]["project_id"] == self.project_id2 - - def test_orderby(self) -> None: - self.project_id3 = next(self.id_iter) - self.org_id2 = next(self.id_iter) - self.generate_session_events(self.org_id2, self.project_id3) - - query = Query( - match=Entity("org_sessions"), - select=[Column("org_id"), Column("project_id")], - groupby=[Column("org_id"), Column("project_id")], - where=[ - Condition( - Column("started"), Op.GTE, datetime.utcnow() - timedelta(hours=6) - ), - Condition(Column("started"), Op.LT, datetime.utcnow()), - ], - granularity=Granularity(3600), - orderby=[OrderBy(Column("org_id"), Direction.ASC)], - ) - - request = Request(dataset="sessions", app_id="default", query=query) - response = self.app.post( - "/sessions/snql", - data=json.dumps(request.to_dict()), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - assert len(data["data"]) == 3 - assert data["data"][0]["org_id"] == self.org_id - assert data["data"][0]["project_id"] == self.project_id - assert data["data"][1]["org_id"] == self.org_id - assert data["data"][1]["project_id"] == self.project_id2 - assert data["data"][2]["org_id"] == self.org_id2 - assert data["data"][2]["project_id"] == self.project_id3 - - query = query.set_orderby( - [OrderBy(Column("org_id"), Direction.DESC)], - ) - - request = Request(dataset="sessions", app_id="default", query=query) - response = self.app.post( - "/sessions/snql", - data=json.dumps(request.to_dict()), - ) - data = json.loads(response.data) - assert response.status_code == 200, response.data - assert len(data["data"]) == 3 - assert data["data"][0]["org_id"] == self.org_id2 - assert data["data"][0]["project_id"] == self.project_id3 - assert data["data"][1]["org_id"] == self.org_id - assert data["data"][1]["project_id"] == self.project_id - assert data["data"][2]["org_id"] == self.org_id - assert data["data"][2]["project_id"] == self.project_id2 From ccef6e3f5ad28c1bb5bc339d70d5353998808888 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 12 Oct 2023 13:22:45 -0700 Subject: [PATCH 03/13] . --- tests/datasets/storages/test_storage_factory.py | 3 --- tests/datasets/test_entity_factory.py | 1 - 2 files changed, 4 deletions(-) diff --git a/tests/datasets/storages/test_storage_factory.py b/tests/datasets/storages/test_storage_factory.py index d31898d7d0e..2de2a91d87b 100644 --- a/tests/datasets/storages/test_storage_factory.py +++ b/tests/datasets/storages/test_storage_factory.py @@ -23,9 +23,6 @@ StorageKey.OUTCOMES_RAW, StorageKey.OUTCOMES_HOURLY, StorageKey.QUERYLOG, - StorageKey.SESSIONS_RAW, - StorageKey.SESSIONS_HOURLY, - StorageKey.ORG_SESSIONS, StorageKey.TRANSACTIONS, StorageKey.PROFILES, StorageKey.FUNCTIONS, diff --git a/tests/datasets/test_entity_factory.py b/tests/datasets/test_entity_factory.py index ce3ca6dbf3e..77082ee6cda 100644 --- a/tests/datasets/test_entity_factory.py +++ b/tests/datasets/test_entity_factory.py @@ -19,7 +19,6 @@ EntityKey.OUTCOMES, EntityKey.OUTCOMES_RAW, EntityKey.SEARCH_ISSUES, - EntityKey.ORG_SESSIONS, EntityKey.TRANSACTIONS, EntityKey.DISCOVER_TRANSACTIONS, EntityKey.DISCOVER_EVENTS, From f343186065aac93efd9eb7d95d42b8ed5a8485b8 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 12 Oct 2023 19:36:11 -0700 Subject: [PATCH 04/13] ? --- tests/query/snql/test_query.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/query/snql/test_query.py b/tests/query/snql/test_query.py index 5d20149c0f3..847ccca1a11 100644 --- a/tests/query/snql/test_query.py +++ b/tests/query/snql/test_query.py @@ -1160,8 +1160,8 @@ def build_cond(tn: str) -> str: right_node=IndividualNode( "se", QueryEntity( - EntityKey.METRICS, - get_entity(EntityKey.METRICS).get_data_model(), + EntityKey.METRICS_SETS, + get_entity(EntityKey.METRICS_SETS).get_data_model(), ), ), keys=[ From 1ea1b91b32c94f816617b1ce2770e172a7f534e1 Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 12 Oct 2023 19:39:18 -0700 Subject: [PATCH 05/13] fix stuff --- tests/query/snql/test_query.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/query/snql/test_query.py b/tests/query/snql/test_query.py index 847ccca1a11..858fd4fb1c1 100644 --- a/tests/query/snql/test_query.py +++ b/tests/query/snql/test_query.py @@ -1140,11 +1140,11 @@ def build_cond(tn: str) -> str: (e: events) -[contains]-> (t: transactions), (e: events) -[assigned]-> (ga: groupassignee), (e: events) -[bookmark]-> (gm: groupedmessage), - (e: events) -[activity]-> (se: sessions) + (e: events) -[activity]-> (se: metrics_sets) SELECT 4-5, e.a, t.b, ga.c, gm.d, se.e WHERE {build_cond('e')} AND {build_cond('t')} AND se.org_id = 1 AND se.project_id = 1 - AND se.started >= toDateTime('2021-01-01') AND se.started < toDateTime('2021-01-02')""", + AND se.timestamp >= toDateTime('2021-01-01') AND se.timestamp < toDateTime('2021-01-02')""", CompositeQuery( from_clause=JoinClause( left_node=JoinClause( @@ -1297,9 +1297,9 @@ def build_cond(tn: str) -> str: binary_condition( "greaterOrEquals", Column( - "_snuba_se.started", + "_snuba_se.timestamp", "se", - "started", + "timestamp", ), Literal( None, @@ -1311,9 +1311,9 @@ def build_cond(tn: str) -> str: binary_condition( "less", Column( - "_snuba_se.started", + "_snuba_se.timestamp", "se", - "started", + "timestamp", ), Literal( None, @@ -1962,7 +1962,7 @@ def test_format_expressions(query_body: str, expected_query: LogicalQuery) -> No "contains": (EntityKey.TRANSACTIONS, "event_id"), "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), - "activity": (EntityKey.SESSIONS, "org_id"), + "activity": (EntityKey.METRICS_SETS, "org_id"), } def events_mock(relationship: str) -> JoinRelationship: From beaa0d03e23ffb9cc51319d00474fe1977b58ce6 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 13 Oct 2023 12:43:51 -0700 Subject: [PATCH 06/13] more fixes --- tests/admin/test_api.py | 2 +- tests/clusters/test_cluster.py | 1 - tests/datasets/test_dataset_factory.py | 2 -- tests/test_snql_api.py | 45 -------------------------- tests/test_snql_sdk_api.py | 35 -------------------- 5 files changed, 1 insertion(+), 84 deletions(-) diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 754fd646509..157bffec3a5 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -341,7 +341,7 @@ def test_snuba_debug_invalid_dataset(admin_api: FlaskClient) -> None: @pytest.mark.redis_db def test_snuba_debug_invalid_query(admin_api: FlaskClient) -> None: response = admin_api.post( - "/snuba_debug", data=json.dumps({"dataset": "sessions", "query": ""}) + "/snuba_debug", data=json.dumps({"dataset": "transactions", "query": ""}) ) assert response.status_code == 400 data = json.loads(response.data) diff --git a/tests/clusters/test_cluster.py b/tests/clusters/test_cluster.py index 967a5d02d93..7195de8a569 100644 --- a/tests/clusters/test_cluster.py +++ b/tests/clusters/test_cluster.py @@ -19,7 +19,6 @@ "metrics", "migrations", "querylog", - "sessions", } ALL_STORAGE_SETS = { diff --git a/tests/datasets/test_dataset_factory.py b/tests/datasets/test_dataset_factory.py index 5a0183a3306..e3238238d46 100644 --- a/tests/datasets/test_dataset_factory.py +++ b/tests/datasets/test_dataset_factory.py @@ -23,7 +23,6 @@ def test_get_dataset() -> None: "metrics", "outcomes", "outcomes_raw", - "sessions", "transactions", "profiles", "functions", @@ -68,7 +67,6 @@ def test_all_names() -> None: "metrics", "outcomes", "outcomes_raw", - "sessions", "transactions", "profiles", "functions", diff --git a/tests/test_snql_api.py b/tests/test_snql_api.py index dd0061c2314..aa34adbabcb 100644 --- a/tests/test_snql_api.py +++ b/tests/test_snql_api.py @@ -109,29 +109,6 @@ def test_simple_query(self) -> None: } ] - def test_sessions_query(self) -> None: - response = self.post( - "/sessions/snql", - data=json.dumps( - { - "dataset": "sessions", - "query": f"""MATCH (sessions) - SELECT project_id, release BY release, project_id - WHERE project_id IN array({self.project_id}) - AND project_id IN array({self.project_id}) - AND org_id = {self.org_id} - AND started >= toDateTime('2021-01-01T17:05:59.554860') - AND started < toDateTime('2022-01-01T17:06:00.554981') - ORDER BY sessions DESC - LIMIT 100 OFFSET 0""", - } - ), - ) - data = json.loads(response.data) - - assert response.status_code == 200 - assert data["data"] == [] - def test_join_query(self) -> None: response = self.post( "/events/snql", @@ -379,28 +356,6 @@ def test_error_handler(self, pipeline_mock: MagicMock) -> None: assert data["error"]["type"] == "internal_server_error" assert data["error"]["message"] == "stuff" - def test_sessions_with_function_orderby(self) -> None: - response = self.post( - "/sessions/snql", - data=json.dumps( - { - "query": f"""MATCH (sessions) - SELECT project_id, release BY release, project_id - WHERE org_id = {self.org_id} - AND started >= toDateTime('2021-04-05T16:52:48.907628') - AND started < toDateTime('2021-04-06T16:52:49.907666') - AND project_id IN tuple({self.project_id}) - AND project_id IN tuple({self.project_id}) - ORDER BY divide(sessions_crashed, sessions) ASC - LIMIT 21 - OFFSET 0 - """, - "tenant_ids": {"referrer": "r", "organization_id": 123}, - } - ), - ) - assert response.status_code == 200 - def test_arrayjoin(self) -> None: response = self.post( "/events/snql", diff --git a/tests/test_snql_sdk_api.py b/tests/test_snql_sdk_api.py index ee2e50818eb..b0ea89eb0c7 100644 --- a/tests/test_snql_sdk_api.py +++ b/tests/test_snql_sdk_api.py @@ -95,41 +95,6 @@ def test_simple_query(self) -> None: } ] - def test_sessions_query(self) -> None: - query = ( - Query(Entity("sessions")) - .set_select([Column("project_id"), Column("release")]) - .set_groupby([Column("project_id"), Column("release")]) - .set_where( - [ - Condition(Column("project_id"), Op.IN, [self.project_id]), - Condition(Column("org_id"), Op.EQ, self.org_id), - Condition( - Column("started"), - Op.GTE, - datetime(2021, 1, 1, 17, 5, 59, 554860), - ), - Condition( - Column("started"), Op.LT, datetime(2022, 1, 1, 17, 6, 0, 554981) - ), - ] - ) - .set_orderby([OrderBy(Column("sessions"), Direction.DESC)]) - .set_limit(100) - ) - - request = Request( - dataset="sessions", - query=query, - app_id="default", - tenant_ids={"referrer": "r", "organization_id": 123}, - ) - response = self.post("/sessions/snql", data=json.dumps(request.to_dict())) - data = json.loads(response.data) - - assert response.status_code == 200 - assert data["data"] == [] - def test_join_query(self) -> None: ev = Entity("events", "ev") gm = Entity("groupedmessage", "gm") From 60995ab4be410cc7989e188cf158ac8a511abeb3 Mon Sep 17 00:00:00 2001 From: Lyn Date: Fri, 13 Oct 2023 14:48:32 -0700 Subject: [PATCH 07/13] try to fix admin tests --- tests/admin/test_api.py | 14 +++++++------- tests/clusters/test_storage_sets.py | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 157bffec3a5..b3fe9d3a752 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -355,15 +355,15 @@ def test_snuba_debug_invalid_query(admin_api: FlaskClient) -> None: @pytest.mark.clickhouse_db def test_snuba_debug_valid_query(admin_api: FlaskClient) -> None: snql_query = """ - MATCH (sessions) - SELECT sessions_crashed + MATCH (functions) + SELECT worst WHERE org_id = 100 AND project_id IN tuple(100) AND started >= toDateTime('2022-01-01 00:00:00') AND started < toDateTime('2022-02-01 00:00:00') """ response = admin_api.post( - "/snuba_debug", data=json.dumps({"dataset": "sessions", "query": snql_query}) + "/snuba_debug", data=json.dumps({"dataset": "functions", "query": snql_query}) ) assert response.status_code == 200 data = json.loads(response.data) @@ -375,15 +375,15 @@ def test_snuba_debug_valid_query(admin_api: FlaskClient) -> None: @pytest.mark.clickhouse_db def test_snuba_debug_explain_query(admin_api: FlaskClient) -> None: snql_query = """ - MATCH (sessions) - SELECT sessions_crashed + MATCH (functions) + SELECT worst WHERE org_id = 100 AND project_id IN tuple(100) AND started >= toDateTime('2022-01-01 00:00:00') AND started < toDateTime('2022-02-01 00:00:00') """ response = admin_api.post( - "/snuba_debug", data=json.dumps({"dataset": "sessions", "query": snql_query}) + "/snuba_debug", data=json.dumps({"dataset": "functions", "query": snql_query}) ) assert response.status_code == 200 data = json.loads(response.data) @@ -576,7 +576,7 @@ def test_prod_snql_query_invalid_dataset(admin_api: FlaskClient) -> None: @pytest.mark.redis_db def test_prod_snql_query_invalid_query(admin_api: FlaskClient) -> None: response = admin_api.post( - "/production_snql_query", data=json.dumps({"dataset": "sessions", "query": ""}) + "/production_snql_query", data=json.dumps({"dataset": "functions", "query": ""}) ) assert response.status_code == 400 data = json.loads(response.data) diff --git a/tests/clusters/test_storage_sets.py b/tests/clusters/test_storage_sets.py index 953e3f16f48..970ba06842a 100644 --- a/tests/clusters/test_storage_sets.py +++ b/tests/clusters/test_storage_sets.py @@ -7,12 +7,12 @@ def test_storage_set_combination() -> None: is True ) assert ( - is_valid_storage_set_combination(StorageSetKey.EVENTS, StorageSetKey.SESSIONS) + is_valid_storage_set_combination(StorageSetKey.EVENTS, StorageSetKey.PROFILES) is False ) assert ( is_valid_storage_set_combination( - StorageSetKey.EVENTS, StorageSetKey.CDC, StorageSetKey.SESSIONS + StorageSetKey.EVENTS, StorageSetKey.CDC, StorageSetKey.PROFILES ) is False ) From ac0870478abf694b9be27cb6bab1de315ce849ad Mon Sep 17 00:00:00 2001 From: Lyn Date: Tue, 17 Oct 2023 18:21:43 -0700 Subject: [PATCH 08/13] fix --- tests/admin/test_api.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index b3fe9d3a752..21dcf308a59 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -357,10 +357,9 @@ def test_snuba_debug_valid_query(admin_api: FlaskClient) -> None: snql_query = """ MATCH (functions) SELECT worst - WHERE org_id = 100 - AND project_id IN tuple(100) - AND started >= toDateTime('2022-01-01 00:00:00') - AND started < toDateTime('2022-02-01 00:00:00') + WHERE project_id IN tuple(100) + AND timestamp >= toDateTime('2022-01-01 00:00:00') + AND timestamp < toDateTime('2022-02-01 00:00:00') """ response = admin_api.post( "/snuba_debug", data=json.dumps({"dataset": "functions", "query": snql_query}) @@ -377,10 +376,9 @@ def test_snuba_debug_explain_query(admin_api: FlaskClient) -> None: snql_query = """ MATCH (functions) SELECT worst - WHERE org_id = 100 - AND project_id IN tuple(100) - AND started >= toDateTime('2022-01-01 00:00:00') - AND started < toDateTime('2022-02-01 00:00:00') + WHERE project_id IN tuple(100) + AND timestamp >= toDateTime('2022-01-01 00:00:00') + AND timestamp < toDateTime('2022-02-01 00:00:00') """ response = admin_api.post( "/snuba_debug", data=json.dumps({"dataset": "functions", "query": snql_query}) From 597781ff6e98d5d7b99d2f5245c7d606f9151791 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 18 Oct 2023 17:14:24 -0700 Subject: [PATCH 09/13] delete file again --- .../configuration/sessions/storages/raw.yaml | 43 ------------------- 1 file changed, 43 deletions(-) delete mode 100644 snuba/datasets/configuration/sessions/storages/raw.yaml diff --git a/snuba/datasets/configuration/sessions/storages/raw.yaml b/snuba/datasets/configuration/sessions/storages/raw.yaml deleted file mode 100644 index c1cb3c8316e..00000000000 --- a/snuba/datasets/configuration/sessions/storages/raw.yaml +++ /dev/null @@ -1,43 +0,0 @@ -version: v1 -kind: writable_storage -name: sessions_raw -storage: - key: sessions_raw - set_key: sessions -readiness_state: deprecate -schema: - columns: - [ - { name: session_id, type: UUID }, - { name: distinct_id, type: UUID }, - { name: quantity, type: UInt, args: { size: 32 } }, - { name: seq, type: UInt, args: { size: 64 } }, - { name: org_id, type: UInt, args: { size: 64 } }, - { name: project_id, type: UInt, args: { size: 64 } }, - { name: retention_days, type: UInt, args: { size: 16 } }, - { name: duration, type: UInt, args: { size: 32 } }, - { name: status, type: UInt, args: { size: 8 } }, - { name: errors, type: UInt, args: { size: 16 } }, - { name: received, type: DateTime }, - { name: started, type: DateTime }, - { name: release, type: String }, - { name: environment, type: String }, - { name: user_agent, type: String }, - { name: os, type: String }, - ] - local_table_name: sessions_raw_local - dist_table_name: sessions_raw_dist -query_processors: - - processor: MinuteResolutionProcessor - - processor: TableRateLimit -mandatory_condition_checkers: - - condition: OrgIdEnforcer - - condition: ProjectIdEnforcer -stream_loader: - processor: SessionsProcessor - default_topic: ingest-sessions - commit_log_topic: snuba-sessions-commit-log - subscription_result_topic: sessions-subscription-results - subscription_scheduler_mode: global - subscription_synchronization_timestamp: orig_message_ts - subscription_scheduled_topic: scheduled-subscriptions-sessions From 01bdab291438525c1865e9a9f0c7f50dfeae89f7 Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 18 Oct 2023 17:27:16 -0700 Subject: [PATCH 10/13] update test --- tests/admin/test_api.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 21dcf308a59..31e3e6e760c 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -390,11 +390,6 @@ def test_snuba_debug_explain_query(admin_api: FlaskClient) -> None: assert data["explain"]["original_ast"].startswith("SELECT") expected_steps = [ - { - "category": "entity_processor", - "name": "BasicFunctionsProcessor", - "type": "query_transform", - }, { "category": "storage_planning", "name": "mappers", From ced8d7cd089729fe7512cb6c99d17fc40e5d2cfd Mon Sep 17 00:00:00 2001 From: Lyn Date: Thu, 19 Oct 2023 13:16:18 -0700 Subject: [PATCH 11/13] idk --- tests/query/snql/test_invalid_queries.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/query/snql/test_invalid_queries.py b/tests/query/snql/test_invalid_queries.py index 3ec53560abf..fbb4f2c4069 100644 --- a/tests/query/snql/test_invalid_queries.py +++ b/tests/query/snql/test_invalid_queries.py @@ -96,10 +96,10 @@ def test_failures(query_body: str, message: str) -> None: # TODO: Potentially remove this once entities have actual join relationships mapping = { - "contains": (EntityKey.TRANSACTIONS, "event_id"), - "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), - "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), - "activity": (EntityKey.METRICS, "org_id"), + "contains": (EntityKey("transactions"), "event_id"), + "assigned": (EntityKey("groupassignee"), "group_id"), + "bookmark": (EntityKey("groupedmessages"), "first_release_id"), + "activity": (EntityKey("metrics"), "org_id"), } def events_mock(relationship: str) -> Optional[JoinRelationship]: From b14120f35cc134430b003c2962e183a0f5f5c289 Mon Sep 17 00:00:00 2001 From: Lyn Date: Tue, 24 Oct 2023 13:22:11 -0700 Subject: [PATCH 12/13] more fixes --- tests/query/snql/test_snql_anonymizer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/query/snql/test_snql_anonymizer.py b/tests/query/snql/test_snql_anonymizer.py index 2247cab1c65..3e38b8185fc 100644 --- a/tests/query/snql/test_snql_anonymizer.py +++ b/tests/query/snql/test_snql_anonymizer.py @@ -157,7 +157,6 @@ def test_format_expressions(query_body: str, expected_snql_anonymized: str) -> N "contains": (EntityKey.TRANSACTIONS, "event_id"), "assigned": (EntityKey.GROUPASSIGNEE, "group_id"), "bookmark": (EntityKey.GROUPEDMESSAGE, "first_release_id"), - "activity": (EntityKey.SESSIONS, "org_id"), } def events_mock(relationship: str) -> JoinRelationship: From 38486af1238ebcb8bf75e80d6531a17343d02ec8 Mon Sep 17 00:00:00 2001 From: "getsantry[bot]" <66042841+getsantry[bot]@users.noreply.github.com> Date: Fri, 31 Oct 2025 16:06:07 +0000 Subject: [PATCH 13/13] [getsentry/action-github-commit] Auto commit --- tests/clusters/test_cluster.py | 16 ++++------------ tests/clusters/test_storage_sets.py | 7 +------ 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/tests/clusters/test_cluster.py b/tests/clusters/test_cluster.py index 46f45d20dc1..e8ef94dbdb6 100644 --- a/tests/clusters/test_cluster.py +++ b/tests/clusters/test_cluster.py @@ -139,9 +139,7 @@ def test_cache_partition() -> None: StorageKey("transactions") ).get_cluster().get_reader().cache_partition_id == "host_2_cache" - get_storage( - StorageKey("errors") - ).get_cluster().get_reader().cache_partition_id is None + get_storage(StorageKey("errors")).get_cluster().get_reader().cache_partition_id is None @patch("snuba.settings.CLUSTERS", FULL_CONFIG) @@ -151,9 +149,7 @@ def test_query_settings_prefix() -> None: StorageKey("transactions") ).get_cluster().get_reader().get_query_settings_prefix() == "transactions" - get_storage( - StorageKey("errors") - ).get_cluster().get_reader().get_query_settings_prefix() is None + get_storage(StorageKey("errors")).get_cluster().get_reader().get_query_settings_prefix() is None @patch("snuba.settings.CLUSTERS", FULL_CONFIG) @@ -182,9 +178,7 @@ def test_disabled_cluster() -> None: def test_get_local_nodes() -> None: importlib.reload(cluster) with patch.object(ClickhousePool, "execute") as execute: - execute.return_value = ClickhouseResult( - [("host_1", 9000, 1, 1), ("host_2", 9000, 2, 1)] - ) + execute.return_value = ClickhouseResult([("host_1", 9000, 1, 1), ("host_2", 9000, 2, 1)]) local_cluster = get_storage(StorageKey("errors")).get_cluster() assert len(local_cluster.get_local_nodes()) == 1 @@ -278,9 +272,7 @@ def test_sliced_cluster() -> None: assert res_cluster.get_host() == "host_slice" assert res_cluster.get_port() == 9001 - res_cluster_default = cluster.get_cluster( - StorageSetKey.GENERIC_METRICS_DISTRIBUTIONS, 0 - ) + res_cluster_default = cluster.get_cluster(StorageSetKey.GENERIC_METRICS_DISTRIBUTIONS, 0) assert res_cluster_default.is_single_node() == True assert res_cluster_default.get_database() == "default" diff --git a/tests/clusters/test_storage_sets.py b/tests/clusters/test_storage_sets.py index f21bc117378..11df2943c4f 100644 --- a/tests/clusters/test_storage_sets.py +++ b/tests/clusters/test_storage_sets.py @@ -2,9 +2,4 @@ def test_storage_set_combination() -> None: - assert ( - is_valid_storage_set_combination( - StorageSetKey.EVENTS, StorageSetKey.PROFILES - ) - is False - ) \ No newline at end of file + assert is_valid_storage_set_combination(StorageSetKey.EVENTS, StorageSetKey.PROFILES) is False