Skip to content

Commit e91678f

Browse files
authored
catalog: Key migration shard by binary version (#31210)
The builtin migration shard is a persist shard used during version upgrades. It uses the environment's deploy generation as part of the key for the values. The assumption was that two environments with the same deploy generation would always have the same binary version. This assumption would allow all migration steps of two environments with the same deploy generation to be idempotent. This assumption was not correct. Two environments with different binary version can use the same deploy generation as long as one environment never fully completed a deployment. This is especially bad because the migration shard is written to and read from in read-only mode, before a deployment is complete. This commit updates the key of the builtin migration shard to explicitly use the binary version of environmentd so that the migration steps are idempotent. Fixes #MaterializeInc/database-issues/issues/8917
1 parent c5340e2 commit e91678f

File tree

5 files changed

+58
-26
lines changed

5 files changed

+58
-26
lines changed

src/adapter/src/catalog/migrate.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,23 @@ pub(crate) fn durable_migrate(
208208
Some(EXPR_CACHE_MIGRATION_DONE),
209209
)?;
210210
}
211+
212+
// Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
213+
// binary version instead of the deploy generation.
214+
const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
215+
const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
216+
if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
217+
!= Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
218+
{
219+
if let Some(shard_id) = tx.get_builtin_migration_shard() {
220+
tx.mark_shards_as_finalized(btreeset! {shard_id});
221+
tx.set_builtin_migration_shard(ShardId::new())?;
222+
}
223+
tx.set_config(
224+
BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
225+
Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
226+
)?;
227+
}
211228
Ok(())
212229
}
213230

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ pub(crate) async fn migrate_builtin_items(
6868
}
6969
BuiltinItemMigrationConfig::ZeroDownTime {
7070
persist_client,
71-
deploy_generation,
7271
read_only,
7372
} => {
7473
migrate_builtin_items_0dt(
@@ -77,7 +76,6 @@ pub(crate) async fn migrate_builtin_items(
7776
local_expr_cache,
7877
persist_client,
7978
migrated_builtins,
80-
deploy_generation,
8179
read_only,
8280
)
8381
.await
@@ -134,22 +132,22 @@ async fn migrate_builtin_items_legacy(
134132
///
135133
/// 1. Each environment has a dedicated persist shard, called the migration shard, that allows
136134
/// environments to durably write down metadata while in read-only mode. The shard is a
137-
/// mapping of `(GlobalId, deploy_generation)` to `ShardId`.
138-
/// 2. Collect the `GlobalId` of all migrated tables for the current deploy generation.
135+
/// mapping of `(GlobalId, build_version)` to `ShardId`.
136+
/// 2. Collect the `GlobalId` of all migrated tables for the current build version.
139137
/// 3. Read in the current contents of the migration shard.
140138
/// 4. Collect all the `ShardId`s from the migration shard that are not at the current
141-
/// `deploy_generation` or are not in the set of migrated tables.
139+
/// `build_version` or are not in the set of migrated tables.
142140
/// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
143141
/// from an incomplete migration. Finalize them and remove them from the migration shard.
144142
/// Note: care must be taken to not remove the shard from the migration shard until we are
145143
/// sure that they will be finalized, otherwise the shard will leak.
146144
/// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
147145
/// complete migration. Remove them from the migration shard.
148146
/// 5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
149-
/// for the current deploy generation. Generate new `ShardId`s and add them to the migration
147+
/// for the current build version. Generate new `ShardId`s and add them to the migration
150148
/// shard.
151149
/// 6. At this point the migration shard should only logically contain a mapping of migrated
152-
/// table `GlobalId`s to new `ShardId`s for the current deploy generation. For each of these
150+
/// table `GlobalId`s to new `ShardId`s for the current build version. For each of these
153151
/// `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
154152
/// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
155153
/// b. Finalize the removed `ShardId`s.
@@ -177,7 +175,6 @@ async fn migrate_builtin_items_0dt(
177175
local_expr_cache: &mut LocalExpressionCache,
178176
persist_client: PersistClient,
179177
migrated_builtins: Vec<CatalogItemId>,
180-
deploy_generation: u64,
181178
read_only: bool,
182179
) -> Result<BuiltinItemMigrationResult, Error> {
183180
assert_eq!(
@@ -186,6 +183,8 @@ async fn migrate_builtin_items_0dt(
186183
"txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
187184
);
188185

186+
let build_version = state.config.build_info.semver_version();
187+
189188
// 0. Update durably stored fingerprints.
190189
let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
191190
.map(|builtin| {
@@ -237,7 +236,9 @@ async fn migrate_builtin_items_0dt(
237236
.expect("builtin migration shard should exist for opened catalogs");
238237
let diagnostics = Diagnostics {
239238
shard_name: "builtin_migration".to_string(),
240-
handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
239+
handle_purpose: format!(
240+
"builtin table migration shard for org {organization_id:?} version {build_version:?}"
241+
),
241242
};
242243
let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
243244
.open_critical_since(
@@ -348,16 +349,16 @@ async fn migrate_builtin_items_0dt(
348349
txn.get_collection_metadata()
349350
};
350351
for (table_key, shard_id) in global_id_shards.clone() {
351-
if table_key.deploy_generation > deploy_generation {
352+
if table_key.build_version > build_version {
352353
halt!(
353-
"saw deploy generation {}, which is greater than current deploy generation {}",
354-
table_key.deploy_generation,
355-
deploy_generation
354+
"saw build version {}, which is greater than current build version {}",
355+
table_key.build_version,
356+
build_version
356357
);
357358
}
358359

359360
if !migrated_storage_collections.contains(&table_key.global_id)
360-
|| table_key.deploy_generation < deploy_generation
361+
|| table_key.build_version < build_version
361362
{
362363
global_id_shards.remove(&table_key);
363364
if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
@@ -370,7 +371,7 @@ async fn migrate_builtin_items_0dt(
370371
}
371372
}
372373

373-
// 5. Add migrated tables to migration shard for current generation.
374+
// 5. Add migrated tables to migration shard for current build version.
374375
let mut global_id_shards: BTreeMap<_, _> = global_id_shards
375376
.into_iter()
376377
.map(|(table_key, shard_id)| (table_key.global_id, shard_id))
@@ -381,7 +382,7 @@ async fn migrate_builtin_items_0dt(
381382
global_id_shards.insert(global_id, shard_id);
382383
let table_key = TableKey {
383384
global_id,
384-
deploy_generation,
385+
build_version: build_version.clone(),
385386
};
386387
migrated_shard_updates.push(((table_key, shard_id), upper, 1));
387388
}
@@ -541,35 +542,35 @@ mod persist_schema {
541542
use mz_persist_types::stats::NoneStats;
542543
use mz_persist_types::Codec;
543544

544-
#[derive(Debug, Clone, Default, Eq, Ord, PartialEq, PartialOrd)]
545+
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
545546
pub(super) struct TableKey {
546547
pub(super) global_id: u64,
547-
pub(super) deploy_generation: u64,
548+
pub(super) build_version: semver::Version,
548549
}
549550

550551
impl std::fmt::Display for TableKey {
551552
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552-
write!(f, "{}-{}", self.global_id, self.deploy_generation)
553+
write!(f, "{}-{}", self.global_id, self.build_version)
553554
}
554555
}
555556

556557
impl std::str::FromStr for TableKey {
557558
type Err = String;
558559

559560
fn from_str(s: &str) -> Result<Self, Self::Err> {
560-
let parts: Vec<_> = s.split('-').collect();
561-
let &[global_id, deploy_generation] = parts.as_slice() else {
561+
let parts: Vec<_> = s.splitn(2, '-').collect();
562+
let &[global_id, build_version] = parts.as_slice() else {
562563
return Err(format!("invalid TableKey '{s}'"));
563564
};
564565
let global_id = global_id
565566
.parse()
566567
.map_err(|e: ParseIntError| e.to_string())?;
567-
let deploy_generation = deploy_generation
568+
let build_version = build_version
568569
.parse()
569-
.map_err(|e: ParseIntError| e.to_string())?;
570+
.map_err(|e: semver::Error| e.to_string())?;
570571
Ok(TableKey {
571572
global_id,
572-
deploy_generation,
573+
build_version,
573574
})
574575
}
575576
}
@@ -588,6 +589,15 @@ mod persist_schema {
588589
}
589590
}
590591

592+
impl Default for TableKey {
593+
fn default() -> Self {
594+
Self {
595+
global_id: Default::default(),
596+
build_version: semver::Version::new(0, 0, 0),
597+
}
598+
}
599+
}
600+
591601
impl Codec for TableKey {
592602
type Storage = ();
593603
type Schema = TableKeySchema;

src/adapter/src/coord.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3986,7 +3986,6 @@ pub fn serve(
39863986
let builtin_item_migration_config = if enable_0dt_deployment {
39873987
BuiltinItemMigrationConfig::ZeroDownTime {
39883988
persist_client: persist_client.clone(),
3989-
deploy_generation: controller_config.deploy_generation,
39903989
read_only: read_only_controllers,
39913990
}
39923991
} else {

src/catalog/src/config.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ pub enum BuiltinItemMigrationConfig {
9898
Legacy,
9999
ZeroDownTime {
100100
persist_client: PersistClient,
101-
deploy_generation: u64,
102101
read_only: bool,
103102
},
104103
}

src/catalog/src/durable/transaction.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,6 +1831,13 @@ impl<'a> Transaction<'a> {
18311831
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
18321832
}
18331833

1834+
pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
1835+
self.set_setting(
1836+
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
1837+
Some(shard_id.to_string()),
1838+
)
1839+
}
1840+
18341841
pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
18351842
self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
18361843
.map(|shard_id| shard_id.parse().expect("valid ShardId"))

0 commit comments

Comments
 (0)