From fdca51b4cf8b2ca5e3f721a42ded75d079255959 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 28 Nov 2025 01:39:15 +0200 Subject: [PATCH 1/5] add distributed embedding table --- .../0182_shard_document_embeddings.py | 67 +++ posthog/clickhouse/schema.py | 8 +- .../test/__snapshots__/test_schema.ambr | 562 ++++-------------- products/error_tracking/backend/embedding.py | 36 +- 4 files changed, 217 insertions(+), 456 deletions(-) create mode 100644 posthog/clickhouse/migrations/0182_shard_document_embeddings.py diff --git a/posthog/clickhouse/migrations/0182_shard_document_embeddings.py b/posthog/clickhouse/migrations/0182_shard_document_embeddings.py new file mode 100644 index 0000000000000..cc9b6be65f432 --- /dev/null +++ b/posthog/clickhouse/migrations/0182_shard_document_embeddings.py @@ -0,0 +1,67 @@ +from posthog.clickhouse.client.connection import NodeRole +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions + +from products.error_tracking.backend.embedding import ( + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, + DOCUMENT_EMBEDDING_WRITABLE, + DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, + DOCUMENT_EMBEDDINGS_MV, + DOCUMENT_EMBEDDINGS_MV_SQL, + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, + KAFKA_DOCUMENT_EMBEDDINGS, + KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, +) + +# This migration creates the new sharded table structure alongside the existing +# replicated table. The old replicated table (posthog_document_embeddings) is +# left intact so data can be copied from it to the new sharded table. +# +# New tables created: +# - sharded_posthog_document_embeddings: The sharded data table +# - distributed_posthog_document_embeddings: Distributed read table for the sharded data +# +# The MV and writable table are recreated to write to the new sharded table. + +operations = [ + # 1. Drop MV to stop writes + run_sql_with_exceptions( + f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}", + node_roles=[NodeRole.INGESTION_SMALL], + ), + # 2. Drop Kafka table + run_sql_with_exceptions( + f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}", + node_roles=[NodeRole.INGESTION_SMALL], + ), + # 3. Drop old writable distributed table + run_sql_with_exceptions( + f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}", + node_roles=[NodeRole.INGESTION_SMALL], + ), + # 4. Create new sharded data table + run_sql_with_exceptions( + DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(), + node_roles=[NodeRole.DATA], + sharded=True, + ), + # 5. Create distributed read table for the sharded data + run_sql_with_exceptions( + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(), + node_roles=[NodeRole.DATA, NodeRole.COORDINATOR], + ), + # 6. Create new writable distributed table pointing to sharded table + run_sql_with_exceptions( + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(), + node_roles=[NodeRole.INGESTION_SMALL], + ), + # 7. Recreate Kafka table + run_sql_with_exceptions( + KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL(), + node_roles=[NodeRole.INGESTION_SMALL], + ), + # 8. Recreate MV writing to writable table (which now writes to sharded table) + run_sql_with_exceptions( + DOCUMENT_EMBEDDINGS_MV_SQL(), + node_roles=[NodeRole.INGESTION_SMALL], + ), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index 9f64a4d4a3851..fe8abe9975ab2 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -171,8 +171,10 @@ ) from products.error_tracking.backend.embedding import ( + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, + DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, DOCUMENT_EMBEDDINGS_MV_SQL, - DOCUMENT_EMBEDDINGS_TABLE_SQL, + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, ) from products.error_tracking.backend.sql import ( @@ -200,7 +202,7 @@ PERSONS_DISTINCT_ID_TABLE_SQL, PERSON_DISTINCT_ID2_TABLE_SQL, PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, - DOCUMENT_EMBEDDINGS_TABLE_SQL, + DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL, ERROR_TRACKING_FINGERPRINT_EMBEDDINGS_TABLE_SQL, PLUGIN_LOG_ENTRIES_TABLE_SQL, @@ -264,6 +266,8 @@ WRITABLE_APP_METRICS2_TABLE_SQL, WRITABLE_ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL, WRITABLE_EVENTS_RECENT_TABLE_SQL, + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, ) CREATE_KAFKA_TABLE_QUERIES = ( KAFKA_LOG_ENTRIES_TABLE_SQL, diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index 1ca7025ba68c8..6d5a43c574cad 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -75,22 +75,6 @@ ''' # --- -# name: test_create_kafka_table_with_different_kafka_host[kafka_behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS kafka_behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - evaluation_timestamp DateTime64(6), - person_id UUID, - condition String, - latest_event_is_match UInt8 - ) ENGINE = Kafka(msk_cluster, kafka_topic_list = 'clickhouse_behavioral_cohorts_matches', kafka_group_name = 'clickhouse_behavioral_cohorts_matches', kafka_format = 'JSONEachRow') - SETTINGS kafka_max_block_size = 1000000, kafka_poll_max_batch_size = 100000, kafka_poll_timeout_ms = 1000, kafka_flush_interval_ms = 7500, kafka_skip_broken_messages = 100, kafka_num_consumers = 1 - - ''' -# --- # name: test_create_kafka_table_with_different_kafka_host[kafka_cohort_membership] ''' @@ -575,35 +559,6 @@ ''' # --- -# name: test_create_kafka_table_with_different_kafka_host[kafka_session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS kafka_session_replay_events_v2_test ON CLUSTER 'posthog' - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - first_timestamp DateTime64(6, 'UTC'), - last_timestamp DateTime64(6, 'UTC'), - block_url String, - first_url Nullable(VARCHAR), - urls Array(String), - click_count Int64, - keypress_count Int64, - mouse_activity_count Int64, - active_milliseconds Int64, - console_log_count Int64, - console_warn_count Int64, - console_error_count Int64, - size Int64, - event_count Int64, - message_count Int64, - snapshot_source LowCardinality(Nullable(String)), - snapshot_library Nullable(String), - ) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_session_replay_events_v2_test_test', 'group1', 'JSONEachRow') - - ''' -# --- # name: test_create_table_query[app_metrics2] ''' @@ -706,44 +661,6 @@ ''' # --- -# name: test_create_table_query[behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - date Date, - person_id UUID, - condition String, - matches SimpleAggregateFunction(sum, UInt64), - latest_event_is_match AggregateFunction(argMax, UInt8, DateTime64(6)) - ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_behavioral_cohorts_matches', sipHash64(person_id)) - - ''' -# --- -# name: test_create_table_query[behavioral_cohorts_matches_mv] - ''' - - CREATE MATERIALIZED VIEW IF NOT EXISTS behavioral_cohorts_matches_mv TO writable_behavioral_cohorts_matches - AS SELECT - team_id, - cohort_id, - toDate(evaluation_timestamp) AS date, - person_id, - condition, - sum(1) AS matches, - argMaxState(latest_event_is_match, evaluation_timestamp) AS latest_event_is_match - FROM kafka_behavioral_cohorts_matches - GROUP BY - team_id, - cohort_id, - date, - person_id, - condition - - ''' -# --- # name: test_create_table_query[channel_definition] ''' @@ -842,6 +759,30 @@ ''' # --- +# name: test_create_table_query[distributed_posthog_document_embeddings] + ''' + + CREATE TABLE IF NOT EXISTS distributed_posthog_document_embeddings + ( + team_id Int64, + product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" + document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. + model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" + rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. + document_id String, -- A uuid, a path like "issue/", whatever you like really + timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created + inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) + content String DEFAULT '', -- The actual text content that was embedded + embedding Array(Float64) -- The embedding itself + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_posthog_document_embeddings', cityHash64(document_id)) + + ''' +# --- # name: test_create_table_query[distributed_system_processes] ''' @@ -1397,22 +1338,6 @@ ''' # --- -# name: test_create_table_query[kafka_behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS kafka_behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - evaluation_timestamp DateTime64(6), - person_id UUID, - condition String, - latest_event_is_match UInt8 - ) ENGINE = Kafka(msk_cluster, kafka_topic_list = 'clickhouse_behavioral_cohorts_matches', kafka_group_name = 'clickhouse_behavioral_cohorts_matches', kafka_format = 'JSONEachRow') - SETTINGS kafka_max_block_size = 1000000, kafka_poll_max_batch_size = 100000, kafka_poll_timeout_ms = 1000, kafka_flush_interval_ms = 7500, kafka_skip_broken_messages = 100, kafka_num_consumers = 1 - - ''' -# --- # name: test_create_table_query[kafka_cohort_membership] ''' @@ -1897,35 +1822,6 @@ ''' # --- -# name: test_create_table_query[kafka_session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS kafka_session_replay_events_v2_test ON CLUSTER 'posthog' - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - first_timestamp DateTime64(6, 'UTC'), - last_timestamp DateTime64(6, 'UTC'), - block_url String, - first_url Nullable(VARCHAR), - urls Array(String), - click_count Int64, - keypress_count Int64, - mouse_activity_count Int64, - active_milliseconds Int64, - console_log_count Int64, - console_warn_count Int64, - console_error_count Int64, - size Int64, - event_count Int64, - message_count Int64, - snapshot_source LowCardinality(Nullable(String)), - snapshot_library Nullable(String), - ) ENGINE = Kafka('kafka:9092', 'clickhouse_session_replay_events_v2_test_test', 'group1', 'JSONEachRow') - - ''' -# --- # name: test_create_table_query[log_entries] ''' @@ -2390,40 +2286,6 @@ ''' # --- -# name: test_create_table_query[posthog_document_embeddings] - ''' - - CREATE TABLE IF NOT EXISTS posthog_document_embeddings - ( - team_id Int64, - product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" - document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. - model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" - rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. - document_id String, -- A uuid, a path like "issue/", whatever you like really - timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created - inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) - content String DEFAULT '', -- The actual text content that was embedded - embedding Array(Float64) -- The embedding itself - - - , _timestamp DateTime - , _offset UInt64 - , _partition UInt64 - - , INDEX kafka_timestamp_minmax_posthog_document_embeddings _timestamp TYPE minmax GRANULARITY 3 - - ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.posthog_document_embeddings', '{replica}-{shard}', inserted_at) - - -- This index assumes: - -- - people will /always/ provide a date range - -- - "show me documents of type X by any model" will be more common than "show me all documents by model X" - -- - Documents with the same ID whose timestamp is in the same day are the same document, and the later inserted one should be retained - ORDER BY (team_id, toDate(timestamp), product, document_type, model_name, rendering, cityHash64(document_id)) - SETTINGS index_granularity = 512 - - ''' -# --- # name: test_create_table_query[posthog_document_embeddings_mv] ''' @@ -3432,96 +3294,6 @@ ''' # --- -# name: test_create_table_query[session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS session_replay_events_v2_test - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), - max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), - block_first_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_last_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_urls SimpleAggregateFunction(groupArrayArray, Array(String)), - first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), - all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)), - click_count SimpleAggregateFunction(sum, Int64), - keypress_count SimpleAggregateFunction(sum, Int64), - mouse_activity_count SimpleAggregateFunction(sum, Int64), - active_milliseconds SimpleAggregateFunction(sum, Int64), - console_log_count SimpleAggregateFunction(sum, Int64), - console_warn_count SimpleAggregateFunction(sum, Int64), - console_error_count SimpleAggregateFunction(sum, Int64), - size SimpleAggregateFunction(sum, Int64), - message_count SimpleAggregateFunction(sum, Int64), - event_count SimpleAggregateFunction(sum, Int64), - snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), - snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - _timestamp SimpleAggregateFunction(max, DateTime) - ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events_v2_test', sipHash64(distinct_id)) - - ''' -# --- -# name: test_create_table_query[session_replay_events_v2_test_mv] - ''' - - CREATE MATERIALIZED VIEW IF NOT EXISTS session_replay_events_v2_test_mv ON CLUSTER 'posthog' - TO posthog_test.writable_session_replay_events_v2_test ( - `session_id` String, - `team_id` Int64, - `distinct_id` String, - `min_first_timestamp` DateTime64(6, 'UTC'), - `max_last_timestamp` DateTime64(6, 'UTC'), - `block_first_timestamps` SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - `block_last_timestamps` SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - `block_urls` SimpleAggregateFunction(groupArrayArray, Array(String)), - `first_url` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - `all_urls` SimpleAggregateFunction(groupUniqArrayArray, Array(String)), - `click_count` SimpleAggregateFunction(sum, Int64), - `keypress_count` SimpleAggregateFunction(sum, Int64), - `mouse_activity_count` SimpleAggregateFunction(sum, Int64), - `active_milliseconds` SimpleAggregateFunction(sum, Int64), - `console_log_count` SimpleAggregateFunction(sum, Int64), - `console_warn_count` SimpleAggregateFunction(sum, Int64), - `console_error_count` SimpleAggregateFunction(sum, Int64), - `size` SimpleAggregateFunction(sum, Int64), - `message_count` SimpleAggregateFunction(sum, Int64), - `event_count` SimpleAggregateFunction(sum, Int64), - `snapshot_source` AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), - `snapshot_library` AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - `_timestamp` SimpleAggregateFunction(max, DateTime) - ) - AS SELECT - session_id, - team_id, - distinct_id, - min(first_timestamp) AS min_first_timestamp, - max(last_timestamp) AS max_last_timestamp, - groupArray(first_timestamp) AS block_first_timestamps, - groupArray(last_timestamp) AS block_last_timestamps, - groupArray(block_url) AS block_urls, - argMinState(first_url, first_timestamp) as first_url, - groupUniqArrayArray(urls) AS all_urls, - sum(click_count) AS click_count, - sum(keypress_count) AS keypress_count, - sum(mouse_activity_count) AS mouse_activity_count, - sum(active_milliseconds) AS active_milliseconds, - sum(console_log_count) AS console_log_count, - sum(console_warn_count) AS console_warn_count, - sum(console_error_count) AS console_error_count, - sum(size) AS size, - sum(message_count) AS message_count, - sum(event_count) AS event_count, - argMinState(snapshot_source, first_timestamp) as snapshot_source, - argMinState(snapshot_library, first_timestamp) as snapshot_library, - max(_timestamp) as _timestamp - FROM posthog_test.kafka_session_replay_events_v2_test - GROUP BY session_id, team_id, distinct_id - - ''' -# --- # name: test_create_table_query[sessions] ''' @@ -3696,26 +3468,6 @@ ''' # --- -# name: test_create_table_query[sharded_behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS sharded_behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - date Date, - person_id UUID, - condition String, - matches SimpleAggregateFunction(sum, UInt64), - latest_event_is_match AggregateFunction(argMax, UInt8, DateTime64(6)) - ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_behavioral_cohorts_matches', '{replica}') - PARTITION BY toYYYYMM(date) - ORDER BY (team_id, cohort_id, condition, date, person_id) - - SETTINGS ttl_only_drop_parts = 1 - - ''' -# --- # name: test_create_table_query[sharded_events] ''' @@ -3972,6 +3724,40 @@ + ''' +# --- +# name: test_create_table_query[sharded_posthog_document_embeddings] + ''' + + CREATE TABLE IF NOT EXISTS sharded_posthog_document_embeddings + ( + team_id Int64, + product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" + document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. + model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" + rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. + document_id String, -- A uuid, a path like "issue/", whatever you like really + timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created + inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) + content String DEFAULT '', -- The actual text content that was embedded + embedding Array(Float64) -- The embedding itself + + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + , INDEX kafka_timestamp_minmax_sharded_posthog_document_embeddings _timestamp TYPE minmax GRANULARITY 3 + + ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_posthog_document_embeddings', '{replica}', inserted_at) + + -- This index assumes: + -- - people will /always/ provide a date range + -- - "show me documents of type X by any model" will be more common than "show me all documents by model X" + -- - Documents with the same ID whose timestamp is in the same day are the same document, and the later inserted one should be retained + ORDER BY (team_id, toDate(timestamp), product, document_type, model_name, rendering, cityHash64(document_id)) + SETTINGS index_granularity = 512 + ''' # --- # name: test_create_table_query[sharded_precalculated_events] @@ -4300,42 +4086,6 @@ ''' # --- -# name: test_create_table_query[sharded_session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS sharded_session_replay_events_v2_test ON CLUSTER 'posthog' - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), - max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), - block_first_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_last_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_urls SimpleAggregateFunction(groupArrayArray, Array(String)), - first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), - all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)), - click_count SimpleAggregateFunction(sum, Int64), - keypress_count SimpleAggregateFunction(sum, Int64), - mouse_activity_count SimpleAggregateFunction(sum, Int64), - active_milliseconds SimpleAggregateFunction(sum, Int64), - console_log_count SimpleAggregateFunction(sum, Int64), - console_warn_count SimpleAggregateFunction(sum, Int64), - console_error_count SimpleAggregateFunction(sum, Int64), - size SimpleAggregateFunction(sum, Int64), - message_count SimpleAggregateFunction(sum, Int64), - event_count SimpleAggregateFunction(sum, Int64), - snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), - snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - _timestamp SimpleAggregateFunction(max, DateTime) - ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events_v2_test', '{replica}') - - PARTITION BY toYYYYMM(min_first_timestamp) - ORDER BY (toDate(min_first_timestamp), team_id, session_id) - SETTINGS index_granularity=512 - - ''' -# --- # name: test_create_table_query[sharded_sessions] ''' @@ -4882,22 +4632,6 @@ ''' # --- -# name: test_create_table_query[writable_behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS writable_behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - date Date, - person_id UUID, - condition String, - matches SimpleAggregateFunction(sum, UInt64), - latest_event_is_match AggregateFunction(argMax, UInt8, DateTime64(6)) - ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_behavioral_cohorts_matches', sipHash64(person_id)) - - ''' -# --- # name: test_create_table_query[writable_cohort_membership] ''' @@ -5188,6 +4922,30 @@ ''' # --- +# name: test_create_table_query[writable_posthog_document_embeddings] + ''' + + CREATE TABLE IF NOT EXISTS writable_posthog_document_embeddings + ( + team_id Int64, + product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" + document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. + model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" + rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. + document_id String, -- A uuid, a path like "issue/", whatever you like really + timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created + inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) + content String DEFAULT '', -- The actual text content that was embedded + embedding Array(Float64) -- The embedding itself + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_posthog_document_embeddings', cityHash64(document_id)) + + ''' +# --- # name: test_create_table_query[writable_precalculated_events] ''' @@ -5469,38 +5227,6 @@ ''' # --- -# name: test_create_table_query[writable_session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS writable_session_replay_events_v2_test - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), - max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), - block_first_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_last_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_urls SimpleAggregateFunction(groupArrayArray, Array(String)), - first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), - all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)), - click_count SimpleAggregateFunction(sum, Int64), - keypress_count SimpleAggregateFunction(sum, Int64), - mouse_activity_count SimpleAggregateFunction(sum, Int64), - active_milliseconds SimpleAggregateFunction(sum, Int64), - console_log_count SimpleAggregateFunction(sum, Int64), - console_warn_count SimpleAggregateFunction(sum, Int64), - console_error_count SimpleAggregateFunction(sum, Int64), - size SimpleAggregateFunction(sum, Int64), - message_count SimpleAggregateFunction(sum, Int64), - event_count SimpleAggregateFunction(sum, Int64), - snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), - snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - _timestamp SimpleAggregateFunction(max, DateTime) - ) ENGINE = Distributed('posthog', 'posthog_test', 'sharded_session_replay_events_v2_test', sipHash64(distinct_id)) - - ''' -# --- # name: test_create_table_query[writable_sessions] ''' @@ -6084,40 +5810,6 @@ ''' # --- -# name: test_create_table_query_replicated_and_storage[posthog_document_embeddings] - ''' - - CREATE TABLE IF NOT EXISTS posthog_document_embeddings - ( - team_id Int64, - product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" - document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. - model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" - rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. - document_id String, -- A uuid, a path like "issue/", whatever you like really - timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created - inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) - content String DEFAULT '', -- The actual text content that was embedded - embedding Array(Float64) -- The embedding itself - - - , _timestamp DateTime - , _offset UInt64 - , _partition UInt64 - - , INDEX kafka_timestamp_minmax_posthog_document_embeddings _timestamp TYPE minmax GRANULARITY 3 - - ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.posthog_document_embeddings', '{replica}-{shard}', inserted_at) - - -- This index assumes: - -- - people will /always/ provide a date range - -- - "show me documents of type X by any model" will be more common than "show me all documents by model X" - -- - Documents with the same ID whose timestamp is in the same day are the same document, and the later inserted one should be retained - ORDER BY (team_id, toDate(timestamp), product, document_type, model_name, rendering, cityHash64(document_id)) - SETTINGS index_granularity = 512 - - ''' -# --- # name: test_create_table_query_replicated_and_storage[query_log_archive] ''' @@ -6292,26 +5984,6 @@ ''' # --- -# name: test_create_table_query_replicated_and_storage[sharded_behavioral_cohorts_matches] - ''' - - CREATE TABLE IF NOT EXISTS sharded_behavioral_cohorts_matches - ( - team_id Int64, - cohort_id Int64, - date Date, - person_id UUID, - condition String, - matches SimpleAggregateFunction(sum, UInt64), - latest_event_is_match AggregateFunction(argMax, UInt8, DateTime64(6)) - ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_behavioral_cohorts_matches', '{replica}') - PARTITION BY toYYYYMM(date) - ORDER BY (team_id, cohort_id, condition, date, person_id) - - SETTINGS ttl_only_drop_parts = 1 - - ''' -# --- # name: test_create_table_query_replicated_and_storage[sharded_events] ''' @@ -6570,6 +6242,40 @@ ''' # --- +# name: test_create_table_query_replicated_and_storage[sharded_posthog_document_embeddings] + ''' + + CREATE TABLE IF NOT EXISTS sharded_posthog_document_embeddings + ( + team_id Int64, + product LowCardinality(String), -- Like "error tracking" or "session replay" - basically a bucket, you'd use this to ask clickhouse "what kind of documents do I have embeddings for, related to session replay" + document_type LowCardinality(String), -- The type of document this is an embedding for, e.g. "issue_fingerprint", "session_summary", "task_update" etc. + model_name LowCardinality(String), -- The name of the model used to generate this embedding. Includes embedding dimensionality, appended as e.g. "text-embedding-3-small-1024" + rendering LowCardinality(String), -- How the document was rendered to text, e.g. "with_error_message", "as_html" etc. Use "plain" if it was already text. + document_id String, -- A uuid, a path like "issue/", whatever you like really + timestamp DateTime64(3, 'UTC'), -- This is a user defined timestamp, meant to be the /documents/ creation time (or similar), rather than the time the embedding was created + inserted_at DateTime64(3, 'UTC'), -- When was this embedding inserted (if a duplicate-key row was inserted, for example, this is what we use to choose the winner) + content String DEFAULT '', -- The actual text content that was embedded + embedding Array(Float64) -- The embedding itself + + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + , INDEX kafka_timestamp_minmax_sharded_posthog_document_embeddings _timestamp TYPE minmax GRANULARITY 3 + + ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.sharded_posthog_document_embeddings', '{replica}', inserted_at) + + -- This index assumes: + -- - people will /always/ provide a date range + -- - "show me documents of type X by any model" will be more common than "show me all documents by model X" + -- - Documents with the same ID whose timestamp is in the same day are the same document, and the later inserted one should be retained + ORDER BY (team_id, toDate(timestamp), product, document_type, model_name, rendering, cityHash64(document_id)) + SETTINGS index_granularity = 512 + + ''' +# --- # name: test_create_table_query_replicated_and_storage[sharded_precalculated_events] ''' @@ -6896,42 +6602,6 @@ ''' # --- -# name: test_create_table_query_replicated_and_storage[sharded_session_replay_events_v2_test] - ''' - - CREATE TABLE IF NOT EXISTS sharded_session_replay_events_v2_test ON CLUSTER 'posthog' - ( - session_id VARCHAR, - team_id Int64, - distinct_id VARCHAR, - min_first_timestamp SimpleAggregateFunction(min, DateTime64(6, 'UTC')), - max_last_timestamp SimpleAggregateFunction(max, DateTime64(6, 'UTC')), - block_first_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_last_timestamps SimpleAggregateFunction(groupArrayArray, Array(DateTime64(6, 'UTC'))), - block_urls SimpleAggregateFunction(groupArrayArray, Array(String)), - first_url AggregateFunction(argMin, Nullable(VARCHAR), DateTime64(6, 'UTC')), - all_urls SimpleAggregateFunction(groupUniqArrayArray, Array(String)), - click_count SimpleAggregateFunction(sum, Int64), - keypress_count SimpleAggregateFunction(sum, Int64), - mouse_activity_count SimpleAggregateFunction(sum, Int64), - active_milliseconds SimpleAggregateFunction(sum, Int64), - console_log_count SimpleAggregateFunction(sum, Int64), - console_warn_count SimpleAggregateFunction(sum, Int64), - console_error_count SimpleAggregateFunction(sum, Int64), - size SimpleAggregateFunction(sum, Int64), - message_count SimpleAggregateFunction(sum, Int64), - event_count SimpleAggregateFunction(sum, Int64), - snapshot_source AggregateFunction(argMin, LowCardinality(Nullable(String)), DateTime64(6, 'UTC')), - snapshot_library AggregateFunction(argMin, Nullable(String), DateTime64(6, 'UTC')), - _timestamp SimpleAggregateFunction(max, DateTime) - ) ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_{shard}/posthog.session_replay_events_v2_test', '{replica}') - - PARTITION BY toYYYYMM(min_first_timestamp) - ORDER BY (toDate(min_first_timestamp), team_id, session_id) - SETTINGS index_granularity=512 - - ''' -# --- # name: test_create_table_query_replicated_and_storage[sharded_sessions] ''' diff --git a/products/error_tracking/backend/embedding.py b/products/error_tracking/backend/embedding.py index 5a272c7aa5087..ba6144b75dae9 100644 --- a/products/error_tracking/backend/embedding.py +++ b/products/error_tracking/backend/embedding.py @@ -2,10 +2,12 @@ from posthog.clickhouse.indexes import index_by_kafka_timestamp from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS_WITH_PARTITION, kafka_engine -from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree +from posthog.clickhouse.table_engines import Distributed, ReplacingMergeTree, ReplicationScheme from posthog.kafka_client.topics import KAFKA_DOCUMENT_EMBEDDINGS_TOPIC DOCUMENT_EMBEDDINGS = "posthog_document_embeddings" +SHARDED_DOCUMENT_EMBEDDINGS = f"sharded_{DOCUMENT_EMBEDDINGS}" +DISTRIBUTED_DOCUMENT_EMBEDDINGS = f"distributed_{DOCUMENT_EMBEDDINGS}" DOCUMENT_EMBEDDING_WRITABLE = f"writable_{DOCUMENT_EMBEDDINGS}" KAFKA_DOCUMENT_EMBEDDINGS = f"kafka_{DOCUMENT_EMBEDDINGS}" DOCUMENT_EMBEDDINGS_MV = f"{DOCUMENT_EMBEDDINGS}_mv" @@ -29,10 +31,12 @@ def DOCUMENT_EMBEDDINGS_TABLE_ENGINE(): - return ReplacingMergeTree(DOCUMENT_EMBEDDINGS, ver="inserted_at") + return ReplacingMergeTree( + SHARDED_DOCUMENT_EMBEDDINGS, ver="inserted_at", replication_scheme=ReplicationScheme.SHARDED + ) -def DOCUMENT_EMBEDDINGS_TABLE_SQL(): +def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(): return ( DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL + """ @@ -44,22 +48,34 @@ def DOCUMENT_EMBEDDINGS_TABLE_SQL(): SETTINGS index_granularity = 512 """ ).format( - table_name=DOCUMENT_EMBEDDINGS, + table_name=SHARDED_DOCUMENT_EMBEDDINGS, engine=DOCUMENT_EMBEDDINGS_TABLE_ENGINE(), default_clause=" DEFAULT ''", extra_fields=f""" {KAFKA_COLUMNS_WITH_PARTITION} - , {index_by_kafka_timestamp(DOCUMENT_EMBEDDINGS)} + , {index_by_kafka_timestamp(SHARDED_DOCUMENT_EMBEDDINGS)} """, ) +def DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(): + return DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL.format( + table_name=DISTRIBUTED_DOCUMENT_EMBEDDINGS, + engine=Distributed( + data_table=SHARDED_DOCUMENT_EMBEDDINGS, + sharding_key="cityHash64(document_id)", + ), + default_clause=" DEFAULT ''", + extra_fields=KAFKA_COLUMNS_WITH_PARTITION, + ) + + def DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(): return DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL.format( table_name=DOCUMENT_EMBEDDING_WRITABLE, engine=Distributed( - data_table=DOCUMENT_EMBEDDINGS, - cluster=settings.CLICKHOUSE_SINGLE_SHARD_CLUSTER, + data_table=SHARDED_DOCUMENT_EMBEDDINGS, + sharding_key="cityHash64(document_id)", ), default_clause=" DEFAULT ''", extra_fields=KAFKA_COLUMNS_WITH_PARTITION, @@ -105,4 +121,8 @@ def DOCUMENT_EMBEDDINGS_MV_SQL( def TRUNCATE_DOCUMENT_EMBEDDINGS_TABLE_SQL(): - return f"TRUNCATE TABLE IF EXISTS {DOCUMENT_EMBEDDINGS} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" + return f"TRUNCATE TABLE IF EXISTS {SHARDED_DOCUMENT_EMBEDDINGS} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" + + +# Backwards compatibility alias for old migrations (0155, 0174) +DOCUMENT_EMBEDDINGS_TABLE_SQL = DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL From eb9c534de042e3912fcd87855ea403dfc86e8d9e Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 28 Nov 2025 02:11:27 +0200 Subject: [PATCH 2/5] upgrade migrations.txt --- ...document_embeddings.py => 0183_shard_document_embeddings.py} | 0 posthog/clickhouse/migrations/max_migration.txt | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename posthog/clickhouse/migrations/{0182_shard_document_embeddings.py => 0183_shard_document_embeddings.py} (100%) diff --git a/posthog/clickhouse/migrations/0182_shard_document_embeddings.py b/posthog/clickhouse/migrations/0183_shard_document_embeddings.py similarity index 100% rename from posthog/clickhouse/migrations/0182_shard_document_embeddings.py rename to posthog/clickhouse/migrations/0183_shard_document_embeddings.py diff --git a/posthog/clickhouse/migrations/max_migration.txt b/posthog/clickhouse/migrations/max_migration.txt index 76250e31d1349..4644d7b12e876 100644 --- a/posthog/clickhouse/migrations/max_migration.txt +++ b/posthog/clickhouse/migrations/max_migration.txt @@ -1 +1 @@ -0182_endpoints_cluster_query_log_prod +0183_shard_document_embeddings From 4220ea389c6d22e013db252fddfe875d113d0f18 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 28 Nov 2025 02:12:26 +0200 Subject: [PATCH 3/5] comment --- .../migrations/0183_shard_document_embeddings.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/posthog/clickhouse/migrations/0183_shard_document_embeddings.py b/posthog/clickhouse/migrations/0183_shard_document_embeddings.py index cc9b6be65f432..134ea8632c711 100644 --- a/posthog/clickhouse/migrations/0183_shard_document_embeddings.py +++ b/posthog/clickhouse/migrations/0183_shard_document_embeddings.py @@ -12,18 +12,8 @@ KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, ) -# This migration creates the new sharded table structure alongside the existing -# replicated table. The old replicated table (posthog_document_embeddings) is -# left intact so data can be copied from it to the new sharded table. -# -# New tables created: -# - sharded_posthog_document_embeddings: The sharded data table -# - distributed_posthog_document_embeddings: Distributed read table for the sharded data -# -# The MV and writable table are recreated to write to the new sharded table. - operations = [ - # 1. Drop MV to stop writes + # 1. Drop MV run_sql_with_exceptions( f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}", node_roles=[NodeRole.INGESTION_SMALL], @@ -33,7 +23,7 @@ f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}", node_roles=[NodeRole.INGESTION_SMALL], ), - # 3. Drop old writable distributed table + # 3. Drop old writable table run_sql_with_exceptions( f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}", node_roles=[NodeRole.INGESTION_SMALL], From 2cdf6646ae03da19f789e447dfdc0178458ba9c8 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 28 Nov 2025 17:04:26 +0200 Subject: [PATCH 4/5] comments and hogql --- .../0183_shard_document_embeddings.py | 19 ++++++++++------- posthog/clickhouse/schema.py | 4 ++-- .../database/schema/document_embeddings.py | 6 +++--- .../test_document_embeddings_query_runner.py | 4 ++-- products/error_tracking/backend/embedding.py | 21 ++++++++++--------- 5 files changed, 30 insertions(+), 24 deletions(-) diff --git a/posthog/clickhouse/migrations/0183_shard_document_embeddings.py b/posthog/clickhouse/migrations/0183_shard_document_embeddings.py index 134ea8632c711..3c718a74bfa68 100644 --- a/posthog/clickhouse/migrations/0183_shard_document_embeddings.py +++ b/posthog/clickhouse/migrations/0183_shard_document_embeddings.py @@ -4,16 +4,21 @@ from products.error_tracking.backend.embedding import ( DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, DOCUMENT_EMBEDDING_WRITABLE, - DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, DOCUMENT_EMBEDDINGS_MV, DOCUMENT_EMBEDDINGS_MV_SQL, + DOCUMENT_EMBEDDINGS_TABLE_SQL, DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, KAFKA_DOCUMENT_EMBEDDINGS, KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, ) +# The only tricky part of this migration is that the behaviour of `DOCUMENT_EMBEDDINGS_TABLE_SQL` has changed - +# it now creates sharded tables rather than replicated tables. We don't drop the replicated table, as in production +# we want to keep the historical data around, at least until we get around to migrating it, but this does mean +# all future "rebuild the world" runs of the migration set will never create that old table, only the new sharded one. + operations = [ - # 1. Drop MV + # 1. Drop MV to stop processing messages from kafka run_sql_with_exceptions( f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}", node_roles=[NodeRole.INGESTION_SMALL], @@ -23,14 +28,14 @@ f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}", node_roles=[NodeRole.INGESTION_SMALL], ), - # 3. Drop old writable table + # 3. Drop old writable table (but not the old "main" table, since we want to keep the data around) run_sql_with_exceptions( f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}", node_roles=[NodeRole.INGESTION_SMALL], ), - # 4. Create new sharded data table + # 4. Create new sharded data tables (this function used to create "posthog_document_embeddings" directly, but now creates the sharded_ version) run_sql_with_exceptions( - DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(), + DOCUMENT_EMBEDDINGS_TABLE_SQL(), node_roles=[NodeRole.DATA], sharded=True, ), @@ -39,7 +44,7 @@ DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(), node_roles=[NodeRole.DATA, NodeRole.COORDINATOR], ), - # 6. Create new writable distributed table pointing to sharded table + # 6. Create new writable distributed table pointing to sharded tables run_sql_with_exceptions( DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL], @@ -49,7 +54,7 @@ KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL(), node_roles=[NodeRole.INGESTION_SMALL], ), - # 8. Recreate MV writing to writable table (which now writes to sharded table) + # 8. Recreate MV writing to writable table (which now writes to sharded tables) run_sql_with_exceptions( DOCUMENT_EMBEDDINGS_MV_SQL(), node_roles=[NodeRole.INGESTION_SMALL], diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index fe8abe9975ab2..74473db31f48d 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -172,8 +172,8 @@ from products.error_tracking.backend.embedding import ( DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, - DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, DOCUMENT_EMBEDDINGS_MV_SQL, + DOCUMENT_EMBEDDINGS_TABLE_SQL, DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, ) @@ -202,7 +202,7 @@ PERSONS_DISTINCT_ID_TABLE_SQL, PERSON_DISTINCT_ID2_TABLE_SQL, PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, - DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL, + DOCUMENT_EMBEDDINGS_TABLE_SQL, ERROR_TRACKING_ISSUE_FINGERPRINT_OVERRIDES_TABLE_SQL, ERROR_TRACKING_FINGERPRINT_EMBEDDINGS_TABLE_SQL, PLUGIN_LOG_ENTRIES_TABLE_SQL, diff --git a/posthog/hogql/database/schema/document_embeddings.py b/posthog/hogql/database/schema/document_embeddings.py index 9d725e069ed64..f9d4635ac04e3 100644 --- a/posthog/hogql/database/schema/document_embeddings.py +++ b/posthog/hogql/database/schema/document_embeddings.py @@ -13,7 +13,7 @@ Table, ) -from products.error_tracking.backend.embedding import DOCUMENT_EMBEDDINGS +from products.error_tracking.backend.embedding import DISTRIBUTED_DOCUMENT_EMBEDDINGS DOCUMENT_EMBEDDINGS_FIELDS: dict[str, FieldOrTable] = { "team_id": IntegerDatabaseField(name="team_id", nullable=False), @@ -49,7 +49,7 @@ class RawDocumentEmbeddingsTable(Table): fields: dict[str, FieldOrTable] = DOCUMENT_EMBEDDINGS_FIELDS def to_printed_clickhouse(self, context): - return DOCUMENT_EMBEDDINGS + return DISTRIBUTED_DOCUMENT_EMBEDDINGS def to_printed_hogql(self): return f"raw_document_embeddings" @@ -67,7 +67,7 @@ def lazy_select( return select_from_embeddings_table(table_to_add.fields_accessed) def to_printed_clickhouse(self, context): - return DOCUMENT_EMBEDDINGS + return DISTRIBUTED_DOCUMENT_EMBEDDINGS def to_printed_hogql(self): return "document_embeddings" diff --git a/posthog/hogql_queries/test/test_document_embeddings_query_runner.py b/posthog/hogql_queries/test/test_document_embeddings_query_runner.py index 80f0151ed6bc8..88398fe6869f8 100644 --- a/posthog/hogql_queries/test/test_document_embeddings_query_runner.py +++ b/posthog/hogql_queries/test/test_document_embeddings_query_runner.py @@ -77,7 +77,7 @@ class DocumentEmbeddingRow: inserted_at: datetime def _seed_document_embeddings(self) -> list[DocumentEmbeddingRow]: - sync_execute("TRUNCATE TABLE posthog_document_embeddings", flush=False, team_id=self.team.pk) + sync_execute("TRUNCATE TABLE distributed_posthog_document_embeddings", flush=False, team_id=self.team.pk) fixtures: list[TestDocumentEmbeddingsQueryRunner.DocumentEmbeddingRow] = [] rows: list[tuple] = [] @@ -128,7 +128,7 @@ def _seed_document_embeddings(self) -> list[DocumentEmbeddingRow]: if rows: sync_execute( """ - INSERT INTO posthog_document_embeddings ( + INSERT INTO distributed_posthog_document_embeddings ( team_id, product, document_type, diff --git a/products/error_tracking/backend/embedding.py b/products/error_tracking/backend/embedding.py index ba6144b75dae9..11e5dfd16ef1f 100644 --- a/products/error_tracking/backend/embedding.py +++ b/products/error_tracking/backend/embedding.py @@ -30,13 +30,14 @@ """ -def DOCUMENT_EMBEDDINGS_TABLE_ENGINE(): - return ReplacingMergeTree( - SHARDED_DOCUMENT_EMBEDDINGS, ver="inserted_at", replication_scheme=ReplicationScheme.SHARDED - ) +# The flow of this table set, as per other sharded tables, is: +# - Kafka table exposes messages from Kafka topic +# - Materialized view reads from Kafka table, writes to writable table, moving the kafka offset +# - Writable table distributes writes to sharded tables +# - Distributed table distributes reads to sharded tables -def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(): +def DOCUMENT_EMBEDDINGS_TABLE_SQL(): return ( DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL + """ @@ -49,7 +50,9 @@ def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(): """ ).format( table_name=SHARDED_DOCUMENT_EMBEDDINGS, - engine=DOCUMENT_EMBEDDINGS_TABLE_ENGINE(), + engine=ReplacingMergeTree( + SHARDED_DOCUMENT_EMBEDDINGS, ver="inserted_at", replication_scheme=ReplicationScheme.SHARDED + ), default_clause=" DEFAULT ''", extra_fields=f""" {KAFKA_COLUMNS_WITH_PARTITION} @@ -58,6 +61,8 @@ def DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL(): ) +# The sharding keys of this and the table below are chosen mostly at random - as far as I could tell, +# there isn't much to be gained from trying to get clever here, and it's best just to keep spread even def DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(): return DOCUMENT_EMBEDDINGS_TABLE_BASE_SQL.format( table_name=DISTRIBUTED_DOCUMENT_EMBEDDINGS, @@ -122,7 +127,3 @@ def DOCUMENT_EMBEDDINGS_MV_SQL( def TRUNCATE_DOCUMENT_EMBEDDINGS_TABLE_SQL(): return f"TRUNCATE TABLE IF EXISTS {SHARDED_DOCUMENT_EMBEDDINGS} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'" - - -# Backwards compatibility alias for old migrations (0155, 0174) -DOCUMENT_EMBEDDINGS_TABLE_SQL = DOCUMENT_EMBEDDINGS_DATA_TABLE_SQL From ecbfcafd902fe17d436e378eae4c2dc89770c34e Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 28 Nov 2025 17:14:09 +0200 Subject: [PATCH 5/5] fix table name in old migration --- .../0174_add_content_column_to_document_embeddings.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/clickhouse/migrations/0174_add_content_column_to_document_embeddings.py b/posthog/clickhouse/migrations/0174_add_content_column_to_document_embeddings.py index 3e631dcf053e9..c3b34732c02a7 100644 --- a/posthog/clickhouse/migrations/0174_add_content_column_to_document_embeddings.py +++ b/posthog/clickhouse/migrations/0174_add_content_column_to_document_embeddings.py @@ -3,11 +3,11 @@ from products.error_tracking.backend.embedding import ( DOCUMENT_EMBEDDING_WRITABLE, - DOCUMENT_EMBEDDINGS, DOCUMENT_EMBEDDINGS_MV, DOCUMENT_EMBEDDINGS_MV_SQL, KAFKA_DOCUMENT_EMBEDDINGS, KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, + SHARDED_DOCUMENT_EMBEDDINGS, ) ADD_CONTENT_COLUMN_SQL = """ @@ -25,7 +25,7 @@ node_roles=[NodeRole.INGESTION_SMALL], ), run_sql_with_exceptions( - ADD_CONTENT_COLUMN_SQL.format(table_name=DOCUMENT_EMBEDDINGS), + ADD_CONTENT_COLUMN_SQL.format(table_name=SHARDED_DOCUMENT_EMBEDDINGS), node_roles=[NodeRole.DATA, NodeRole.COORDINATOR], sharded=False, is_alter_on_replicated_table=True,