Skip to content

Commit 635e54a

Browse files
authored
feat(eap): Add a table to store items (#6850)
1 parent d2f4f47 commit 635e54a

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
from typing import List, Sequence
2+
3+
from snuba.clusters.storage_sets import StorageSetKey
4+
from snuba.migrations import migration, operations, table_engines
5+
from snuba.migrations.columns import MigrationModifiers as Modifiers
6+
from snuba.migrations.operations import AddIndicesData, OperationTarget, SqlOperation
7+
from snuba.utils.schemas import (
8+
UUID,
9+
Bool,
10+
Column,
11+
DateTime,
12+
Float,
13+
Int,
14+
Map,
15+
String,
16+
UInt,
17+
)
18+
19+
storage_set_name = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
20+
local_table_name = "eap_items_1_local"
21+
dist_table_name = "eap_items_1_dist"
22+
num_attr_buckets = 40
23+
24+
columns: List[Column[Modifiers]] = [
25+
Column("organization_id", UInt(64)),
26+
Column("project_id", UInt(64)),
27+
Column("item_type", UInt(8)),
28+
Column("timestamp", DateTime(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
29+
Column("trace_id", UUID()),
30+
Column("item_id", UInt(128)),
31+
Column("sampling_weight", UInt(64, modifiers=Modifiers(codecs=["ZSTD(1)"]))),
32+
Column(
33+
"retention_days",
34+
UInt(16, modifiers=Modifiers(codecs=["T64", "ZSTD(1)"])),
35+
),
36+
Column(
37+
"attributes_bool",
38+
Map(
39+
String(),
40+
Bool(),
41+
),
42+
),
43+
Column(
44+
"attributes_int",
45+
Map(
46+
String(),
47+
Int(64),
48+
),
49+
),
50+
]
51+
52+
columns.extend(
53+
[
54+
Column(
55+
f"attributes_string_{i}",
56+
Map(
57+
String(),
58+
String(),
59+
modifiers=Modifiers(
60+
codecs=["ZSTD(1)"],
61+
),
62+
),
63+
)
64+
for i in range(num_attr_buckets)
65+
]
66+
)
67+
68+
columns.extend(
69+
[
70+
Column(
71+
f"attributes_float_{i}",
72+
Map(
73+
String(),
74+
Float(64),
75+
modifiers=Modifiers(
76+
codecs=["ZSTD(1)"],
77+
),
78+
),
79+
)
80+
for i in range(num_attr_buckets)
81+
]
82+
)
83+
84+
85+
indices: Sequence[AddIndicesData] = [
86+
AddIndicesData(
87+
name="bf_trace_id",
88+
expression="trace_id",
89+
type="bloom_filter",
90+
granularity=1,
91+
)
92+
]
93+
94+
95+
class Migration(migration.ClickhouseNodeMigration):
96+
blocking = False
97+
98+
def forwards_ops(self) -> Sequence[SqlOperation]:
99+
res: List[SqlOperation] = [
100+
operations.CreateTable(
101+
storage_set=storage_set_name,
102+
table_name=local_table_name,
103+
columns=columns,
104+
engine=table_engines.ReplacingMergeTree(
105+
primary_key="(organization_id, project_id, item_type, timestamp)",
106+
order_by="(organization_id, project_id, item_type, timestamp, trace_id, item_id)",
107+
partition_by="(retention_days, toMonday(timestamp))",
108+
settings={"index_granularity": "8192"},
109+
storage_set=storage_set_name,
110+
ttl="timestamp + toIntervalDay(retention_days)",
111+
),
112+
target=OperationTarget.LOCAL,
113+
),
114+
operations.CreateTable(
115+
storage_set=storage_set_name,
116+
table_name=dist_table_name,
117+
columns=columns,
118+
engine=table_engines.Distributed(
119+
local_table_name=local_table_name,
120+
sharding_key="cityHash64(reinterpretAsUInt128(trace_id))",
121+
),
122+
target=OperationTarget.DISTRIBUTED,
123+
),
124+
operations.AddIndices(
125+
storage_set=storage_set_name,
126+
table_name=local_table_name,
127+
indices=indices,
128+
target=OperationTarget.LOCAL,
129+
),
130+
]
131+
return res
132+
133+
def backwards_ops(self) -> Sequence[SqlOperation]:
134+
return [
135+
operations.DropTable(
136+
storage_set=storage_set_name,
137+
table_name=local_table_name,
138+
target=OperationTarget.LOCAL,
139+
),
140+
operations.DropTable(
141+
storage_set=storage_set_name,
142+
table_name=dist_table_name,
143+
target=OperationTarget.DISTRIBUTED,
144+
),
145+
]

0 commit comments

Comments
 (0)