Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions snuba/migrations/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@ class SquashedMigration(Migration):

def forwards(self, context: Context, dry_run: bool) -> None:
_migration_id, _logger, update_status = context
update_status(Status.COMPLETED)
if not dry_run:
update_status(Status.COMPLETED)

def backwards(self, context: Context, dry_run: bool) -> None:
_migration_id, _logger, update_status = context
update_status(Status.NOT_STARTED)
if not dry_run:
update_status(Status.NOT_STARTED)


class CodeMigration(Migration, ABC):
Expand Down
168 changes: 3 additions & 165 deletions snuba/snuba_migrations/events_analytics_platform/0001_spans.py
Original file line number Diff line number Diff line change
@@ -1,167 +1,5 @@
from typing import List, Sequence
from snuba.migrations import migration

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation
from snuba.utils.schemas import (
UUID,
Column,
DateTime,
DateTime64,
Float,
Int,
Map,
String,
UInt,
)

storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
local_table_name = "eap_spans_local"
dist_table_name = "eap_spans_dist"
num_attr_buckets = 20

columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("service", String(Modifiers(codecs=["ZSTD(1)"]))),
Column("trace_id", UUID()),
Column("span_id", UInt(64)),
Column("parent_span_id", UInt(64, Modifiers(codecs=["ZSTD(1)"]))),
Column("segment_id", UInt(64, Modifiers(codecs=["ZSTD(1)"]))),
Column("segment_name", String(Modifiers(codecs=["ZSTD(1)"]))),
Column("is_segment", UInt(8, Modifiers(codecs=["T64", "ZSTD(1)"]))),
Column("_sort_timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column(
"start_timestamp",
DateTime64(6, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column(
"end_timestamp",
DateTime64(6, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column(
"duration_ms",
UInt(32, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column("exclusive_time_ms", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column(
"retention_days",
UInt(16, modifiers=Modifiers(codecs=["DoubleDelta", "ZSTD(1)"])),
),
Column("name", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_factor", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sampling_weight", Float(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
Column("sign", Int(8, modifiers=Modifiers(codecs=["DoubleDelta"]))),
]
columns.extend(
[
Column(
f"attr_str_{i}",
Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
]
)

columns.extend(
[
Column(
f"attr_num_{i}",
Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])),
)
for i in range(num_attr_buckets)
]
)

indices: Sequence[AddIndicesData] = (
[
AddIndicesData(
name="bf_trace_id",
expression="trace_id",
type="bloom_filter",
granularity=1,
)
]
+ [
AddIndicesData(
name=f"bf_attr_str_{i}",
expression=f"mapKeys(attr_str_{i})",
type="bloom_filter",
granularity=1,
)
for i in range(num_attr_buckets)
]
+ [
AddIndicesData(
name=f"bf_attr_str_val_{i}",
expression=f"mapValues(attr_str_{i})",
type="ngrambf_v1(4, 1024, 10, 1)",
granularity=1,
)
for i in range(num_attr_buckets)
]
+ [
AddIndicesData(
name=f"bf_attr_num_{i}",
expression=f"mapKeys(attr_num_{i})",
type="bloom_filter",
granularity=1,
)
for i in range(num_attr_buckets)
]
)


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[SqlOperation]:
res: List[SqlOperation] = [
operations.CreateTable(
storage_set=storage_set_name,
table_name=local_table_name,
columns=columns,
engine=table_engines.CollapsingMergeTree(
primary_key="(organization_id, _sort_timestamp, trace_id)",
order_by="(organization_id, _sort_timestamp, trace_id, span_id)",
sign_column="sign",
partition_by="(toMonday(_sort_timestamp))",
settings={"index_granularity": "8192"},
storage_set=storage_set_name,
ttl="_sort_timestamp + toIntervalDay(retention_days)",
),
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=storage_set_name,
table_name=dist_table_name,
columns=columns,
engine=table_engines.Distributed(
local_table_name=local_table_name,
sharding_key="cityHash64(reinterpretAsUInt128(trace_id))", # sharding keys must be at most 64 bits
),
target=OperationTarget.DISTRIBUTED,
),
operations.AddIndices(
storage_set=storage_set_name,
table_name=local_table_name,
indices=indices,
target=OperationTarget.LOCAL,
),
]
return res

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=storage_set_name,
table_name=local_table_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=storage_set_name,
table_name=dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
]
class Migration(migration.SquashedMigration):
pass
Original file line number Diff line number Diff line change
@@ -1,138 +1,5 @@
from __future__ import annotations
from snuba.migrations import migration

from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.constants import ATTRIBUTE_BUCKETS

META_KEY_QUERY_TEMPLATE = """
SELECT
organization_id,
attribute_key,
{attribute_value} AS attribute_value,
toMonday(start_timestamp) AS timestamp,
retention_days,
sumState(cast(1, 'UInt64')) AS count
FROM eap_spans_local
LEFT ARRAY JOIN
arrayConcat({key_columns}) AS attribute_key,
arrayConcat({value_columns}) AS attr_value
GROUP BY
organization_id,
attribute_key,
attribute_value,
timestamp,
retention_days
"""


class Migration(migration.ClickhouseNodeMigration):
"""
This migration creates a table meant to store just the attributes seen in a particular org.
The table is populated by a separate materialized view for each type of attribute.
"""

blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
granularity = "8192"

value_types = ["str", "num"]

meta_view_name = "spans_attributes_{attribute_type}_meta_mv"
meta_local_table_name = "spans_attributes_meta_local"
meta_dist_table_name = "spans_attributes_meta_dist"
meta_table_columns: Sequence[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("attribute_type", String()),
Column("attribute_key", String()),
Column("attribute_value", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("count", AggregateFunction("sum", [UInt(64)])),
]

def forwards_ops(self) -> Sequence[SqlOperation]:
create_table_ops: list[SqlOperation] = [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.meta_local_table_name,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, attribute_key)",
order_by="(organization_id, attribute_key, attribute_value, timestamp)",
partition_by="toMonday(timestamp)",
settings={
"index_granularity": self.granularity,
# Since the partitions contain multiple retention periods, need to ensure
# that rows within partitions are dropped
"ttl_only_drop_parts": 0,
},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.meta_table_columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.meta_dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.meta_local_table_name, sharding_key=None
),
columns=self.meta_table_columns,
target=OperationTarget.DISTRIBUTED,
),
]

materialized_view_ops: list[SqlOperation] = []
for value_type in self.value_types:
attribute_value = "attr_value" if value_type == "str" else "''"

key_columns = ",".join(
[f"mapKeys(attr_{value_type}_{i})" for i in range(ATTRIBUTE_BUCKETS)]
)
value_columns = ",".join(
[f"mapValues(attr_{value_type}_{i})" for i in range(ATTRIBUTE_BUCKETS)]
)

materialized_view_ops.append(
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.meta_view_name.format(attribute_type=value_type),
columns=self.meta_table_columns,
destination_table_name=self.meta_local_table_name,
target=OperationTarget.LOCAL,
query=META_KEY_QUERY_TEMPLATE.format(
attribute_value=attribute_value,
key_columns=key_columns,
value_columns=value_columns,
),
),
)

return create_table_ops + materialized_view_ops

def backwards_ops(self) -> Sequence[SqlOperation]:
ops: Sequence[SqlOperation] = [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_view_name.format(attribute_type=value_type),
target=OperationTarget.LOCAL,
)
for value_type in self.value_types
] + [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_local_table_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
]
return ops
class Migration(migration.SquashedMigration):
pass
Original file line number Diff line number Diff line change
@@ -1,31 +1,5 @@
from typing import Sequence
from snuba.migrations import migration

from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.AddIndex(
storage_set=StorageSetKey.EVENTS_ANALYTICS_PLATFORM,
table_name="eap_spans_local",
index_name="bf_project_id",
index_expression="project_id",
index_type="bloom_filter",
granularity=1,
target=operations.OperationTarget.LOCAL,
),
]

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return [
operations.DropIndex(
storage_set=StorageSetKey.EVENTS_ANALYTICS_PLATFORM,
table_name="eap_spans_local",
index_name="bf_project_id",
target=operations.OperationTarget.LOCAL,
),
]
class Migration(migration.SquashedMigration):
pass
Loading
Loading