Skip to content

Commit 8283447

Browse files
committed
comments and hogql
1 parent 81889f4 commit 8283447

File tree

5 files changed

+30
-24
lines changed

5 files changed

+30
-24
lines changed

posthog/clickhouse/migrations/0182_shard_document_embeddings.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@
44
from products.error_tracking.backend.embedding import (
55
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL,
66
DOCUMENT_EMBEDDING_WRITABLE,
7-
DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL,
87
DOCUMENT_EMBEDDINGS_MV,
98
DOCUMENT_EMBEDDINGS_MV_SQL,
9+
DOCUMENT_EMBEDDINGS_TABLE_SQL,
1010
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL,
1111
KAFKA_DOCUMENT_EMBEDDINGS,
1212
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL,
1313
)
1414

15+
# The only tricky part of this migration is that the behaviour of `DOCUMENT_EMBEDDINGS_TABLE_SQL` has changed -
16+
# it now creates sharded tables rather than replicated tables. We don't drop the replicated table, as in production
17+
# we want to keep the historical data around, at least until we get around to migrating it, but this does mean
18+
# all future "rebuild the world" runs of the migration set will never create that old table, only the new sharded one.
19+
1520
operations = [
16-
# 1. Drop MV
21+
# 1. Drop MV to stop processing messages from kafka
1722
run_sql_with_exceptions(
1823
f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}",
1924
node_roles=[NodeRole.INGESTION_SMALL],
@@ -23,14 +28,14 @@
2328
f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}",
2429
node_roles=[NodeRole.INGESTION_SMALL],
2530
),
26-
# 3. Drop old writable table
31+
# 3. Drop old writable table (but not the old "main" table, since we want to keep the data around)
2732
run_sql_with_exceptions(
2833
f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}",
2934
node_roles=[NodeRole.INGESTION_SMALL],
3035
),
31-
# 4. Create new sharded data table
36+
# 4. Create new sharded data tables (this function used to create "posthog_document_embeddings" directly, but now creates the sharded_ version)
3237
run_sql_with_exceptions(
33-
DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(),
38+
DOCUMENT_EMBEDDINGS_TABLE_SQL(),
3439
node_roles=[NodeRole.DATA],
3540
sharded=True,
3641
),
@@ -39,7 +44,7 @@
3944
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(),
4045
node_roles=[NodeRole.DATA, NodeRole.COORDINATOR],
4146
),
42-
# 6. Create new writable distributed table pointing to sharded table
47+
# 6. Create new writable distributed table pointing to sharded tables
4348
run_sql_with_exceptions(
4449
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(),
4550
node_roles=[NodeRole.INGESTION_SMALL],
@@ -49,7 +54,7 @@
4954
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL(),
5055
node_roles=[NodeRole.INGESTION_SMALL],
5156
),
52-
# 8. Recreate MV writing to writable table (which now writes to sharded table)
57+
# 8. Recreate MV writing to writable table (which now writes to sharded tables)
5358
run_sql_with_exceptions(
5459
DOCUMENT_EMBEDDINGS_MV_SQL(),
5560
node_roles=[NodeRole.INGESTION_SMALL],

posthog/clickhouse/schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,8 @@
172172

173173
from products.error_tracking.backend.embedding import (
174174
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL,
175-
DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL,
176175
DOCUMENT_EMBEDDINGS_MV_SQL,
176+
DOCUMENT_EMBEDDINGS_TABLE_SQL,
177177
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL,
178178
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL,
179179
)
@@ -202,7 +202,7 @@
202202
PERSONS_DISTINCT_ID_TABLE_SQL,
203203
PERSON_DISTINCT_ID2_TABLE_SQL,
204204
PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
205-
DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL,
205+
DOCUMENT_EMBEDDINGS_TABLE_SQL,
206206
ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL,
207207
ERROR_TRACKING_FINGERPRINT_EMBEDDINGS_TABLE_SQL,
208208
PLUGIN_LOG_ENTRIES_TABLE_SQL,

posthog/hogql/database/schema/document_embeddings.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
Table,
1414
)
1515

16-
from products.error_tracking.backend.embedding import DOCUMENT_EMBEDDINGS
16+
from products.error_tracking.backend.embedding import DISTRIBUTED_DOCUMENT_EMBEDDINGS
1717

1818
DOCUMENT_EMBEDDINGS_FIELDS: dict[str, FieldOrTable] = {
1919
"team_id": IntegerDatabaseField(name="team_id", nullable=False),
@@ -49,7 +49,7 @@ class RawDocumentEmbeddingsTable(Table):
4949
fields: dict[str, FieldOrTable] = DOCUMENT_EMBEDDINGS_FIELDS
5050

5151
def to_printed_clickhouse(self, context):
52-
return DOCUMENT_EMBEDDINGS
52+
return DISTRIBUTED_DOCUMENT_EMBEDDINGS
5353

5454
def to_printed_hogql(self):
5555
return f"raw_document_embeddings"
@@ -67,7 +67,7 @@ def lazy_select(
6767
return select_from_embeddings_table(table_to_add.fields_accessed)
6868

6969
def to_printed_clickhouse(self, context):
70-
return DOCUMENT_EMBEDDINGS
70+
return DISTRIBUTED_DOCUMENT_EMBEDDINGS
7171

7272
def to_printed_hogql(self):
7373
return "document_embeddings"

posthog/hogql_queries/test/test_document_embeddings_query_runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class DocumentEmbeddingRow:
7777
inserted_at: datetime
7878

7979
def _seed_document_embeddings(self) -> list[DocumentEmbeddingRow]:
80-
sync_execute("TRUNCATE TABLE posthog_document_embeddings", flush=False, team_id=self.team.pk)
80+
sync_execute("TRUNCATE TABLE distributed_posthog_document_embeddings", flush=False, team_id=self.team.pk)
8181

8282
fixtures: list[TestDocumentEmbeddingsQueryRunner.DocumentEmbeddingRow] = []
8383
rows: list[tuple] = []
@@ -128,7 +128,7 @@ def _seed_document_embeddings(self) -> list[DocumentEmbeddingRow]:
128128
if rows:
129129
sync_execute(
130130
"""
131-
INSERT INTO posthog_document_embeddings (
131+
INSERT INTO distributed_posthog_document_embeddings (
132132
team_id,
133133
product,
134134
document_type,

products/error_tracking/backend/embedding.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,14 @@
3030
"""
3131

3232

33-
def DOCUMENT_EMBEDDINGS_TABLE_ENGINE():
34-
return ReplacingMergeTree(
35-
SHARDED_DOCUMENT_EMBEDDINGS, ver="inserted_at", replication_scheme=ReplicationScheme.SHARDED
36-
)
33+
# The flow of this table set, as per other sharded tables, is:
34+
# - Kafka table exposes messages from Kafka topic
35+
# - Materialized view reads from Kafka table, writes to writable table, moving the kafka offset
36+
# - Writable table distributes writes to sharded tables
37+
# - Distributed table distributes reads to sharded tables
3738

3839

39-
def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL():
40+
def DOCUMENT_EMBEDDINGS_TABLE_SQL():
4041
return (
4142
DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL
4243
+ """
@@ -49,7 +50,9 @@ def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL():
4950
"""
5051
).format(
5152
table_name=SHARDED_DOCUMENT_EMBEDDINGS,
52-
engine=DOCUMENT_EMBEDDINGS_TABLE_ENGINE(),
53+
engine=ReplacingMergeTree(
54+
SHARDED_DOCUMENT_EMBEDDINGS, ver="inserted_at", replication_scheme=ReplicationScheme.SHARDED
55+
),
5356
default_clause=" DEFAULT ''",
5457
extra_fields=f"""
5558
{KAFKA_COLUMNS_WITH_PARTITION}
@@ -58,6 +61,8 @@ def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL():
5861
)
5962

6063

64+
# The sharding keys of this and the table below are chosen mostly at random - as far as I could tell,
65+
# there isn't much to be gained from trying to get clever here, and it's best just to keep spread even
6166
def DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL():
6267
return DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL.format(
6368
table_name=DISTRIBUTED_DOCUMENT_EMBEDDINGS,
@@ -122,7 +127,3 @@ def DOCUMENT_EMBEDDINGS_MV_SQL(
122127

123128
def TRUNCATE_DOCUMENT_EMBEDDINGS_TABLE_SQL():
124129
return f"TRUNCATE TABLE IF EXISTS {SHARDED_DOCUMENT_EMBEDDINGS} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"
125-
126-
127-
# Backwards compatibility alias for old migrations (0155, 0174)
128-
DOCUMENT_EMBEDDINGS_TABLE_SQL = DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL

0 commit comments

Comments
 (0)