Skip to content

Commit 5c09e55

Browse files
authored
Merge branch 'master' into shashjar/produce-next-offset-to-snuba-commit-log-from-rust-consumer
2 parents 89ca087 + b00ba9d commit 5c09e55

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from typing import Any, Mapping, Optional
2+
3+
from snuba.clusters.storage_sets import StorageSetKey
4+
from snuba.manual_jobs import Job, JobLogger, JobSpec
5+
from snuba.migrations.groups import get_group_loader
6+
from snuba.migrations.migration import ClickhouseNodeMigration
7+
from snuba.migrations.runner import MigrationKey, Runner
8+
from snuba.migrations.status import Status
9+
10+
11+
class RerunIdempotentMigration(Job):
12+
"""
13+
Reruns the forward SQL operations of a completed idempotent migration.
14+
15+
Parameters:
16+
- storage_set: The StorageSetKey value (e.g. "events_analytics_platform")
17+
- migration_id: The migration ID (e.g. "0001_events_initial")
18+
"""
19+
20+
def __init__(self, job_spec: JobSpec) -> None:
21+
self.__validate_job_params(job_spec.params)
22+
super().__init__(job_spec)
23+
24+
def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
25+
assert params is not None, "storage_set and migration_id parameters required"
26+
assert params.get("storage_set"), "storage_set required"
27+
assert params.get("migration_id"), "migration_id required"
28+
self._storage_set = params["storage_set"]
29+
self._migration_id = params["migration_id"]
30+
assert isinstance(self._storage_set, str)
31+
assert isinstance(self._migration_id, str)
32+
33+
def execute(self, logger: JobLogger) -> None:
34+
storage_set_key = StorageSetKey(self._storage_set)
35+
36+
# Find the migration group for this storage set
37+
from snuba.migrations.groups import _STORAGE_SET_TO_MIGRATION_GROUP_MAPPING
38+
39+
group = _STORAGE_SET_TO_MIGRATION_GROUP_MAPPING.get(storage_set_key)
40+
assert group is not None, f"No migration group found for storage set {self._storage_set}"
41+
42+
# Check that the migration is in COMPLETED state
43+
migration_key = MigrationKey(group=group, migration_id=self._migration_id)
44+
runner = Runner()
45+
status, _ = runner.get_status(migration_key)
46+
assert status == Status.COMPLETED, (
47+
f"Migration {self._migration_id} in group {group.value} has status "
48+
f"{status.value}, expected completed"
49+
)
50+
51+
# Load the migration
52+
group_loader = get_group_loader(group)
53+
migration = group_loader.load_migration(self._migration_id)
54+
assert isinstance(migration, ClickhouseNodeMigration), (
55+
f"Migration {self._migration_id} is not a ClickhouseNodeMigration, "
56+
f"only SQL migrations are supported"
57+
)
58+
59+
# Get and execute forward operations
60+
ops = migration.forwards_ops()
61+
logger.info(
62+
f"{self.job_spec.job_id}: rerunning {len(ops)} operations from "
63+
f"migration {self._migration_id} (group={group.value})"
64+
)
65+
66+
for i, op in enumerate(ops):
67+
sql = op.format_sql()
68+
logger.info(f"{self.job_spec.job_id}: executing op {i + 1}/{len(ops)}: {sql}")
69+
op.execute()
70+
logger.info(f"{self.job_spec.job_id}: op {i + 1}/{len(ops)} completed")
71+
72+
logger.info(
73+
f"{self.job_spec.job_id}: successfully reran all operations for "
74+
f"migration {self._migration_id}"
75+
)

0 commit comments

Comments
 (0)