Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@

from products.error_tracking.backend.embedding import (
DOCUMENT_EMBEDDING_WRITABLE,
DOCUMENT_EMBEDDINGS,
DOCUMENT_EMBEDDINGS_MV,
DOCUMENT_EMBEDDINGS_MV_SQL,
KAFKA_DOCUMENT_EMBEDDINGS,
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL,
SHARDED_DOCUMENT_EMBEDDINGS,
)

ADD_CONTENT_COLUMN_SQL = """
Expand All @@ -25,7 +25,7 @@
node_roles=[NodeRole.INGESTION_SMALL],
),
run_sql_with_exceptions(
ADD_CONTENT_COLUMN_SQL.format(table_name=DOCUMENT_EMBEDDINGS),
ADD_CONTENT_COLUMN_SQL.format(table_name=SHARDED_DOCUMENT_EMBEDDINGS),
node_roles=[NodeRole.DATA, NodeRole.COORDINATOR],
sharded=False,
is_alter_on_replicated_table=True,
Expand Down
62 changes: 62 additions & 0 deletions posthog/clickhouse/migrations/0183_shard_document_embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from posthog.clickhouse.client.connection import NodeRole
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions

from products.error_tracking.backend.embedding import (
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL,
DOCUMENT_EMBEDDING_WRITABLE,
DOCUMENT_EMBEDDINGS_MV,
DOCUMENT_EMBEDDINGS_MV_SQL,
DOCUMENT_EMBEDDINGS_TABLE_SQL,
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL,
KAFKA_DOCUMENT_EMBEDDINGS,
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL,
)

# The only tricky part of this migration is that the behaviour of `DOCUMENT_EMBEDDINGS_TABLE_SQL` has changed -
# it now creates sharded tables rather than replicated tables. We don't drop the replicated table, as in production
# we want to keep the historical data around, at least until we get around to migrating it, but this does mean
# all future "rebuild the world" runs of the migration set will never create that old table, only the new sharded one.

operations = [
# 1. Drop MV to stop processing messages from kafka
run_sql_with_exceptions(
f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}",
node_roles=[NodeRole.INGESTION_SMALL],
),
# 2. Drop Kafka table
run_sql_with_exceptions(
f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}",
node_roles=[NodeRole.INGESTION_SMALL],
),
# 3. Drop old writable table (but not the old "main" table, since we want to keep the data around)
run_sql_with_exceptions(
f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}",
node_roles=[NodeRole.INGESTION_SMALL],
),
# 4. Create new sharded data tables (this function used to create "posthog_document_embeddings" directly, but now creates the sharded_ version)
run_sql_with_exceptions(
DOCUMENT_EMBEDDINGS_TABLE_SQL(),
node_roles=[NodeRole.DATA],
sharded=True,
),
# 5. Create distributed read table for the sharded data
run_sql_with_exceptions(
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(),
node_roles=[NodeRole.DATA, NodeRole.COORDINATOR],
),
# 6. Create new writable distributed table pointing to sharded tables
run_sql_with_exceptions(
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(),
node_roles=[NodeRole.INGESTION_SMALL],
),
# 7. Recreate Kafka table
run_sql_with_exceptions(
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL(),
node_roles=[NodeRole.INGESTION_SMALL],
),
# 8. Recreate MV writing to writable table (which now writes to sharded tables)
run_sql_with_exceptions(
DOCUMENT_EMBEDDINGS_MV_SQL(),
node_roles=[NodeRole.INGESTION_SMALL],
),
]
2 changes: 1 addition & 1 deletion posthog/clickhouse/migrations/max_migration.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0182_endpoints_cluster_query_log_prod
0183_shard_document_embeddings
4 changes: 4 additions & 0 deletions posthog/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,10 @@
)

from products.error_tracking.backend.embedding import (
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL,
DOCUMENT_EMBEDDINGS_MV_SQL,
DOCUMENT_EMBEDDINGS_TABLE_SQL,
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL,
KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL,
)
from products.error_tracking.backend.sql import (
Expand Down Expand Up @@ -264,6 +266,8 @@
WRITABLE_APP_METRICS2_TABLE_SQL,
WRITABLE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL,
WRITABLE_EVENTS_RECENT_TABLE_SQL,
DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL,
DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL,
)
CREATE_KAFKA_TABLE_QUERIES = (
KAFKA_LOG_ENTRIES_TABLE_SQL,
Expand Down
Loading
Loading