|
| 1 | +from posthog.clickhouse.client.connection import NodeRole |
| 2 | +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions |
| 3 | + |
| 4 | +from products.error_tracking.backend.embedding import ( |
| 5 | + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL, |
| 6 | + DOCUMENT_EMBEDDING_WRITABLE, |
| 7 | + DOCUMENT_EMBEDDINGS_MV, |
| 8 | + DOCUMENT_EMBEDDINGS_MV_SQL, |
| 9 | + DOCUMENT_EMBEDDINGS_TABLE_SQL, |
| 10 | + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL, |
| 11 | + KAFKA_DOCUMENT_EMBEDDINGS, |
| 12 | + KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL, |
| 13 | +) |
| 14 | + |
| 15 | +# The only tricky part of this migration is that the behaviour of `DOCUMENT_EMBEDDINGS_TABLE_SQL` has changed - |
| 16 | +# it now creates sharded tables rather than replicated tables. We don't drop the replicated table, as in production |
| 17 | +# we want to keep the historical data around, at least until we get around to migrating it, but this does mean |
| 18 | +# all future "rebuild the world" runs of the migration set will never create that old table, only the new sharded one. |
| 19 | + |
| 20 | +operations = [ |
| 21 | + # 1. Drop MV to stop processing messages from kafka |
| 22 | + run_sql_with_exceptions( |
| 23 | + f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDINGS_MV}", |
| 24 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 25 | + ), |
| 26 | + # 2. Drop Kafka table |
| 27 | + run_sql_with_exceptions( |
| 28 | + f"DROP TABLE IF EXISTS {KAFKA_DOCUMENT_EMBEDDINGS}", |
| 29 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 30 | + ), |
| 31 | + # 3. Drop old writable table (but not the old "main" table, since we want to keep the data around) |
| 32 | + run_sql_with_exceptions( |
| 33 | + f"DROP TABLE IF EXISTS {DOCUMENT_EMBEDDING_WRITABLE}", |
| 34 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 35 | + ), |
| 36 | + # 4. Create new sharded data tables (this function used to create "posthog_document_embeddings" directly, but now creates the sharded_ version) |
| 37 | + run_sql_with_exceptions( |
| 38 | + DOCUMENT_EMBEDDINGS_TABLE_SQL(), |
| 39 | + node_roles=[NodeRole.DATA], |
| 40 | + sharded=True, |
| 41 | + ), |
| 42 | + # 5. Create distributed read table for the sharded data |
| 43 | + run_sql_with_exceptions( |
| 44 | + DISTRIBUTED_DOCUMENT_EMBEDDINGS_TABLE_SQL(), |
| 45 | + node_roles=[NodeRole.DATA, NodeRole.COORDINATOR], |
| 46 | + ), |
| 47 | + # 6. Create new writable distributed table pointing to sharded tables |
| 48 | + run_sql_with_exceptions( |
| 49 | + DOCUMENT_EMBEDDINGS_WRITABLE_TABLE_SQL(), |
| 50 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 51 | + ), |
| 52 | + # 7. Recreate Kafka table |
| 53 | + run_sql_with_exceptions( |
| 54 | + KAFKA_DOCUMENT_EMBEDDINGS_TABLE_SQL(), |
| 55 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 56 | + ), |
| 57 | + # 8. Recreate MV writing to writable table (which now writes to sharded tables) |
| 58 | + run_sql_with_exceptions( |
| 59 | + DOCUMENT_EMBEDDINGS_MV_SQL(), |
| 60 | + node_roles=[NodeRole.INGESTION_SMALL], |
| 61 | + ), |
| 62 | +] |
0 commit comments