Skip to content

Commit 2e1d381

Browse files
authored
feat(smart-autocomplete): add materialized view for smart autocomplete (#6859)
As per the design [here](https://www.notion.so/sentry/Smart-autocomplete-18a8b10e4b5d80a9967aca8f8fd7a598#18a8b10e4b5d807aa61dd3d44c1d09c1), create a view to store co-occurring attribute keys and values. ### Goal of this PR The goal of this PR is to run the migration and observe impact to the clickhouse cluster when such a view exists namely: * Disk usage growth * Merge time * CPU load * Consumer throughput Key things to note: * This is using the existing spans table, once the new trace_item table is created, I will make a migration to kill this materialized view and add a new materialized view to populate from that table ### Local Testing I inserted data into this view and verified that equivalent tags and value arrays do get merged into one row, ignoring the numerical values ### Reverting If the view produces too much load on the system, the snuba migrations tool will allow us to reverse this migration and drop the materialized view
1 parent 9ad86f4 commit 2e1d381

File tree

1 file changed

+171
-0
lines changed

1 file changed

+171
-0
lines changed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
from typing import Any, List, Sequence
2+
3+
from snuba.clusters.storage_sets import StorageSetKey
4+
from snuba.migrations import migration, operations, table_engines
5+
from snuba.migrations.columns import MigrationModifiers as Modifiers
6+
from snuba.migrations.operations import OperationTarget, SqlOperation
7+
from snuba.utils.schemas import Array, Column, ColumnType, Date, Map, String, UInt
8+
9+
storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
10+
local_table_name = "eap_items_1_local"
11+
dist_table_name = "eap_items_1_dist"
12+
num_attr_buckets = 20
13+
14+
_TYPES: dict[str, ColumnType[Any]] = {
15+
"string": Map(String(), String()),
16+
"bool": Array(String()),
17+
"int64": Array(String()),
18+
"float64": Array(String()),
19+
}
20+
21+
22+
_attr_columns = [
23+
Column(f"attrs_{type_name}", type_spec) for type_name, type_spec in _TYPES.items()
24+
]
25+
26+
27+
columns: List[Column[Modifiers]] = [
28+
Column("project_id", UInt(64)),
29+
Column("item_type", String()),
30+
Column("date", Date(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
31+
Column("retention_days", UInt(16)),
32+
*_attr_columns,
33+
Column("key_val_hash", UInt(64)),
34+
]
35+
36+
37+
_attr_num_names = ", ".join([f"mapKeys(attr_num_{i})" for i in range(20)])
38+
39+
40+
MV_QUERY = f"""
41+
SELECT
42+
project_id,
43+
'span',
44+
toDate(_sort_timestamp) AS date,
45+
retention_days as retention_days,
46+
mapConcat(attr_str_0, attr_str_1, attr_str_2, attr_str_3, attr_str_4, attr_str_5, attr_str_6, attr_str_7, attr_str_8, attr_str_9, attr_str_10, attr_str_11, attr_str_12, attr_str_13, attr_str_14, attr_str_15, attr_str_16, attr_str_17, attr_str_18, attr_str_19) AS attrs_string, -- `attrs_string` Map(String, String),
47+
array() AS attrs_bool, -- bool
48+
array() AS attrs_int64, -- int64
49+
arrayConcat({_attr_num_names}) AS attrs_float64, -- float
50+
-- a hash of all the attribute key,val pairs of the item in sorted order
51+
-- this lets us deduplicate rows with merges
52+
cityHash64(mapSort(
53+
mapConcat(
54+
mapApply((k, v) -> (k, ''), attr_num_0),
55+
mapApply((k, v) -> (k, ''), attr_num_1),
56+
mapApply((k, v) -> (k, ''), attr_num_2),
57+
mapApply((k, v) -> (k, ''), attr_num_3),
58+
mapApply((k, v) -> (k, ''), attr_num_4),
59+
mapApply((k, v) -> (k, ''), attr_num_5),
60+
mapApply((k, v) -> (k, ''), attr_num_6),
61+
mapApply((k, v) -> (k, ''), attr_num_7),
62+
mapApply((k, v) -> (k, ''), attr_num_8),
63+
mapApply((k, v) -> (k, ''), attr_num_9),
64+
mapApply((k, v) -> (k, ''), attr_num_10),
65+
mapApply((k, v) -> (k, ''), attr_num_11),
66+
mapApply((k, v) -> (k, ''), attr_num_12),
67+
mapApply((k, v) -> (k, ''), attr_num_13),
68+
mapApply((k, v) -> (k, ''), attr_num_14),
69+
mapApply((k, v) -> (k, ''), attr_num_15),
70+
mapApply((k, v) -> (k, ''), attr_num_16),
71+
mapApply((k, v) -> (k, ''), attr_num_17),
72+
mapApply((k, v) -> (k, ''), attr_num_18),
73+
mapApply((k, v) -> (k, ''), attr_num_19),
74+
attr_str_0,
75+
attr_str_1,
76+
attr_str_2,
77+
attr_str_3,
78+
attr_str_4,
79+
attr_str_5,
80+
attr_str_6,
81+
attr_str_7,
82+
attr_str_8,
83+
attr_str_9,
84+
attr_str_10,
85+
attr_str_11,
86+
attr_str_12,
87+
attr_str_13,
88+
attr_str_14,
89+
attr_str_15,
90+
attr_str_16,
91+
attr_str_17,
92+
attr_str_18,
93+
attr_str_19
94+
)
95+
)) AS key_val_hash
96+
FROM eap_spans_2_local
97+
98+
99+
"""
100+
101+
102+
class Migration(migration.ClickhouseNodeMigration):
103+
104+
blocking = False
105+
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
106+
granularity = "8192"
107+
108+
local_table_name = "eap_trace_item_attrs_local"
109+
dist_table_name = "eap_trace_item_attrs_dist"
110+
mv_name = "eap_trace_item_attrs_mv"
111+
112+
def forwards_ops(self) -> Sequence[SqlOperation]:
113+
create_table_ops = [
114+
operations.CreateTable(
115+
storage_set=self.storage_set_key,
116+
table_name=self.local_table_name,
117+
engine=table_engines.ReplacingMergeTree(
118+
storage_set=self.storage_set_key,
119+
primary_key="(project_id, date, key_val_hash)",
120+
order_by="(project_id, date, key_val_hash)",
121+
partition_by="(retention_days, toMonday(date))",
122+
ttl="date + toIntervalDay(retention_days)",
123+
),
124+
columns=columns,
125+
target=OperationTarget.LOCAL,
126+
),
127+
operations.CreateTable(
128+
storage_set=self.storage_set_key,
129+
table_name=self.dist_table_name,
130+
engine=table_engines.Distributed(
131+
local_table_name=self.local_table_name,
132+
sharding_key=None,
133+
),
134+
columns=columns,
135+
target=OperationTarget.DISTRIBUTED,
136+
),
137+
]
138+
139+
materialized_view_ops: list[SqlOperation] = []
140+
141+
materialized_view_ops.append(
142+
operations.CreateMaterializedView(
143+
storage_set=self.storage_set_key,
144+
view_name=self.mv_name,
145+
columns=columns,
146+
destination_table_name=self.local_table_name,
147+
target=OperationTarget.LOCAL,
148+
query=MV_QUERY,
149+
),
150+
)
151+
152+
return create_table_ops + materialized_view_ops
153+
154+
def backwards_ops(self) -> Sequence[SqlOperation]:
155+
return [
156+
operations.DropTable(
157+
storage_set=self.storage_set_key,
158+
table_name=self.mv_name,
159+
target=OperationTarget.LOCAL,
160+
),
161+
operations.DropTable(
162+
storage_set=self.storage_set_key,
163+
table_name=self.local_table_name,
164+
target=OperationTarget.LOCAL,
165+
),
166+
operations.DropTable(
167+
storage_set=self.storage_set_key,
168+
table_name=self.dist_table_name,
169+
target=OperationTarget.DISTRIBUTED,
170+
),
171+
]

0 commit comments

Comments
 (0)