Skip to content

Commit 87c7953

Browse files
committed
Introduce overflowing diff type.
The change introduced an overflowing type, which overflows in release builds, too. It implements all relevant traits to be usable where we would use a number normally, including differential and a few other traits defined by external crates. It uses the overflowing type in all parts of Materialize where possible. Persist still requires the old plain `i64` type, so we introduce a `StorageDiff` type alias, and at the boundary to persist convert to/from the overflowing diff type. This isn't ideal because storage can overflow, too, on consolidation, but changing the type there seems harder and is out of my expertise. The diff is quite large to account for boilerplate changes, everywhere where one would write 1 previously, one now has to convert type or mention `Diff::ONE` directly. Signed-off-by: Moritz Hoffmann <mh@materialize.com>
1 parent 8375f41 commit 87c7953

File tree

109 files changed

+1606
-858
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+1606
-858
lines changed

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/src/active_compute_sink.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use mz_expr::compare_columns;
2222
use mz_ore::cast::CastFrom;
2323
use mz_ore::now::EpochMillis;
2424
use mz_repr::adt::numeric;
25-
use mz_repr::{CatalogItemId, Datum, GlobalId, IntoRowIterator, Row, Timestamp};
25+
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, Timestamp};
2626
use mz_sql::plan::SubscribeOutput;
2727
use mz_storage_types::instances::StorageInstanceId;
2828
use timely::progress::Antichain;
@@ -182,10 +182,10 @@ impl ActiveSubscribe {
182182
|(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
183183
left_time.cmp(right_time).then_with(|| {
184184
let mut left_datums = left_datum_vec.borrow();
185-
left_datums.extend(&[Datum::Int64(*left_diff)]);
185+
left_datums.extend(&[Datum::Int64(**left_diff)]);
186186
left_datums.extend(left_row.iter());
187187
let mut right_datums = right_datum_vec.borrow();
188-
right_datums.extend(&[Datum::Int64(*right_diff)]);
188+
right_datums.extend(&[Datum::Int64(**right_diff)]);
189189
right_datums.extend(right_row.iter());
190190
compare_columns(order_by, &left_datums, &right_datums, || {
191191
left_row.cmp(right_row).then(left_diff.cmp(right_diff))
@@ -238,7 +238,7 @@ impl ActiveSubscribe {
238238
let value_columns = self.arity - order_by_keys.len();
239239
let mut packer = row_buf.packer();
240240
new_rows.push(match &group[..] {
241-
[(_, row, 1)] => {
241+
[(_, row, Diff::ONE)] => {
242242
packer.push(if debezium {
243243
Datum::String("insert")
244244
} else {
@@ -258,9 +258,9 @@ impl ActiveSubscribe {
258258
packer.push(datums[idx]);
259259
}
260260
}
261-
(start.0, row_buf.clone(), 0)
261+
(start.0, row_buf.clone(), Diff::ZERO)
262262
}
263-
[(_, _, -1)] => {
263+
[(_, _, Diff::MINUS_ONE)] => {
264264
packer.push(Datum::String("delete"));
265265
let datums = datum_vec.borrow_with(&start.1);
266266
for column_order in order_by_keys {
@@ -276,9 +276,9 @@ impl ActiveSubscribe {
276276
for _ in 0..self.arity - order_by_keys.len() {
277277
packer.push(Datum::Null);
278278
}
279-
(start.0, row_buf.clone(), 0)
279+
(start.0, row_buf.clone(), Diff::ZERO)
280280
}
281-
[(_, old_row, -1), (_, row, 1)] => {
281+
[(_, old_row, Diff::MINUS_ONE), (_, row, Diff::ONE)] => {
282282
packer.push(Datum::String("upsert"));
283283
let datums = datum_vec.borrow_with(row);
284284
let old_datums = old_datum_vec.borrow_with(old_row);
@@ -298,7 +298,7 @@ impl ActiveSubscribe {
298298
packer.push(datums[idx]);
299299
}
300300
}
301-
(start.0, row_buf.clone(), 0)
301+
(start.0, row_buf.clone(), Diff::ZERO)
302302
}
303303
_ => {
304304
packer.push(Datum::String("key_violation"));
@@ -314,7 +314,7 @@ impl ActiveSubscribe {
314314
for _ in 0..(self.arity - order_by_keys.len()) {
315315
packer.push(Datum::Null);
316316
}
317-
(start.0, row_buf.clone(), 0)
317+
(start.0, row_buf.clone(), Diff::ZERO)
318318
}
319319
});
320320
}
@@ -342,7 +342,7 @@ impl ActiveSubscribe {
342342
SubscribeOutput::EnvelopeUpsert { .. }
343343
| SubscribeOutput::EnvelopeDebezium { .. } => {}
344344
SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
345-
packer.push(Datum::Int64(diff));
345+
packer.push(Datum::Int64(*diff));
346346
}
347347
}
348348

src/adapter/src/catalog/apply.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ use mz_ore::{instrument, soft_assert_no_log};
4242
use mz_pgrepr::oid::INVALID_OID;
4343
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
4444
use mz_repr::role_id::RoleId;
45-
use mz_repr::{CatalogItemId, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
45+
use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
4646
use mz_sql::catalog::CatalogError as SqlCatalogError;
4747
use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
4848
use mz_sql::names::{
@@ -1319,7 +1319,7 @@ impl CatalogState {
13191319
// catalog.
13201320
let item_id = entry.id();
13211321
state.insert_entry(entry);
1322-
builtin_table_updates.extend(state.pack_item_update(item_id, 1));
1322+
builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
13231323
}
13241324

13251325
let mut handles = Vec::new();
@@ -1523,7 +1523,7 @@ impl CatalogState {
15231523
builtin_table_updates.extend(
15241524
item_ids
15251525
.into_iter()
1526-
.flat_map(|id| state.pack_item_update(id, 1)),
1526+
.flat_map(|id| state.pack_item_update(id, Diff::ONE)),
15271527
);
15281528

15291529
builtin_table_updates

src/adapter/src/catalog/builtin_table_updates.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1960,7 +1960,7 @@ impl CatalogState {
19601960
Datum::Int32(ip.prefix_len().into()),
19611961
Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
19621962
]);
1963-
Ok(BuiltinTableUpdate::row(id, row, 1))
1963+
Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
19641964
}
19651965

19661966
pub fn pack_replica_metric_updates(
@@ -2034,7 +2034,7 @@ impl CatalogState {
20342034
disk_bytes.into(),
20352035
(*credits_per_hour).into(),
20362036
]);
2037-
BuiltinTableUpdate::row(id, row, 1)
2037+
BuiltinTableUpdate::row(id, row, Diff::ONE)
20382038
},
20392039
)
20402040
.collect();

src/adapter/src/catalog/migrate.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
1616
use mz_ore::collections::CollectionExt;
1717
use mz_ore::now::NowFn;
1818
use mz_persist_types::ShardId;
19-
use mz_repr::{CatalogItemId, Timestamp};
19+
use mz_repr::{CatalogItemId, Diff, Timestamp};
2020
use mz_sql::ast::display::AstDisplay;
2121
use mz_sql::ast::CreateSinkOptionName;
2222
use mz_sql::names::FullItemName;
@@ -83,7 +83,7 @@ where
8383

8484
pub(crate) struct MigrateResult {
8585
pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
86-
pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, i64)>,
86+
pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
8787
}
8888

8989
/// Migrates all user items and loads them into `state`.

src/adapter/src/catalog/open.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ impl Catalog {
253253
let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
254254
differential_dataflow::consolidation::consolidate_updates(&mut updates);
255255
soft_assert_no_log!(
256-
updates.iter().all(|(_, _, diff)| *diff == 1),
256+
updates.iter().all(|(_, _, diff)| **diff == 1),
257257
"consolidated updates should be positive during startup: {updates:?}"
258258
);
259259

@@ -530,7 +530,7 @@ impl Catalog {
530530
mz_sql::func::Func::Scalar(impls) => {
531531
for imp in impls {
532532
builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
533-
catalog.state.pack_op_update(op, imp.details(), 1),
533+
catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
534534
));
535535
}
536536
}

src/adapter/src/catalog/open/builtin_item_migration.rs

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use mz_persist_types::ShardId;
3535
use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
3636
use mz_sql::catalog::CatalogItem as _;
3737
use mz_storage_client::controller::StorageTxn;
38+
use mz_storage_types::StorageDiff;
3839
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
3940
use tracing::{debug, error};
4041

@@ -199,19 +200,20 @@ async fn migrate_builtin_items_0dt(
199200
"builtin table migration shard for org {organization_id:?} version {build_version:?}"
200201
),
201202
};
202-
let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
203-
.open_critical_since(
204-
shard_id,
205-
// TODO: We may need to use a different critical reader
206-
// id for this if we want to be able to introspect it via SQL.
207-
PersistClient::CONTROLLER_CRITICAL_SINCE,
208-
diagnostics.clone(),
209-
)
210-
.await
211-
.expect("invalid usage");
203+
let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
204+
persist_client
205+
.open_critical_since(
206+
shard_id,
207+
// TODO: We may need to use a different critical reader
208+
// id for this if we want to be able to introspect it via SQL.
209+
PersistClient::CONTROLLER_CRITICAL_SINCE,
210+
diagnostics.clone(),
211+
)
212+
.await
213+
.expect("invalid usage");
212214
let (mut write_handle, mut read_handle): (
213-
WriteHandle<TableKey, ShardId, Timestamp, Diff>,
214-
ReadHandle<TableKey, ShardId, Timestamp, Diff>,
215+
WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
216+
ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
215217
) = persist_client
216218
.open(
217219
shard_id,
@@ -223,7 +225,7 @@ async fn migrate_builtin_items_0dt(
223225
.await
224226
.expect("invalid usage");
225227
// Commit an empty write at the minimum timestamp so the shard is always readable.
226-
const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, Diff)] = &[];
228+
const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
227229
let res = write_handle
228230
.compare_and_append(
229231
EMPTY_UPDATES,
@@ -323,7 +325,7 @@ async fn migrate_builtin_items_0dt(
323325
if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
324326
== Some(&shard_id)
325327
{
326-
migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
328+
migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -Diff::ONE));
327329
} else {
328330
migration_shards_to_finalize.insert((table_key, shard_id));
329331
}
@@ -343,7 +345,7 @@ async fn migrate_builtin_items_0dt(
343345
global_id,
344346
build_version: build_version.clone(),
345347
};
346-
migrated_shard_updates.push(((table_key, shard_id), upper, 1));
348+
migrated_shard_updates.push(((table_key, shard_id), upper, Diff::ONE));
347349
}
348350
}
349351

@@ -412,7 +414,7 @@ async fn migrate_builtin_items_0dt(
412414
if !read_only {
413415
let updates: Vec<_> = migration_shards_to_finalize
414416
.into_iter()
415-
.map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
417+
.map(|(table_key, shard_id)| ((table_key, shard_id), upper, -Diff::ONE))
416418
.collect();
417419
if !updates.is_empty() {
418420
// Ignore any errors, these shards will get cleaned up in the next upgrade.
@@ -437,7 +439,7 @@ async fn migrate_builtin_items_0dt(
437439
}
438440

439441
async fn fetch_upper(
440-
write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
442+
write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
441443
) -> Timestamp {
442444
write_handle
443445
.fetch_recent_upper()
@@ -450,8 +452,8 @@ async fn fetch_upper(
450452
async fn write_to_migration_shard(
451453
updates: Vec<((TableKey, ShardId), Timestamp, Diff)>,
452454
upper: Timestamp,
453-
write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
454-
since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, Diff, i64>,
455+
write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
456+
since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
455457
) -> Result<Timestamp, Error> {
456458
let next_upper = upper.step_forward();
457459
// Lag the shard's upper by 1 to keep it readable.

src/adapter/src/catalog/transact.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use mz_persist_types::ShardId;
4141
use mz_repr::adt::mz_acl_item::{merge_mz_acl_items, AclMode, MzAclItem, PrivilegeMap};
4242
use mz_repr::network_policy_id::NetworkPolicyId;
4343
use mz_repr::role_id::RoleId;
44-
use mz_repr::{strconv, CatalogItemId, ColumnName, ColumnType, GlobalId};
44+
use mz_repr::{strconv, CatalogItemId, ColumnName, ColumnType, Diff, GlobalId};
4545
use mz_sql::ast::RawDataType;
4646
use mz_sql::catalog::{
4747
CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
@@ -462,7 +462,7 @@ impl Catalog {
462462
self.state().pack_optimizer_notices(
463463
&mut builtin_table_updates,
464464
dropped_notices.iter(),
465-
-1,
465+
-Diff::ONE,
466466
);
467467
}
468468

@@ -2347,7 +2347,7 @@ impl Catalog {
23472347
let id = tx.allocate_storage_usage_ids()?;
23482348
let metric =
23492349
VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
2350-
let builtin_table_update = state.pack_storage_usage_update(metric, 1);
2350+
let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
23512351
let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
23522352
weird_builtin_table_update = Some(builtin_table_update);
23532353
}

src/adapter/src/coord.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ use mz_repr::explain::{ExplainConfig, ExplainFormat};
131131
use mz_repr::global_id::TransientIdGen;
132132
use mz_repr::optimize::OptimizerFeatures;
133133
use mz_repr::role_id::RoleId;
134-
use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
134+
use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
135135
use mz_secrets::cache::CachingSecretsReader;
136136
use mz_secrets::{SecretsController, SecretsReader};
137137
use mz_sql::ast::{Raw, Statement};
@@ -1843,10 +1843,13 @@ impl Coordinator {
18431843
for replica_statuses in self.cluster_replica_statuses.0.values() {
18441844
for (replica_id, processes_statuses) in replica_statuses {
18451845
for (process_id, status) in processes_statuses {
1846-
let builtin_table_update = self
1847-
.catalog()
1848-
.state()
1849-
.pack_cluster_replica_status_update(*replica_id, *process_id, status, 1);
1846+
let builtin_table_update =
1847+
self.catalog().state().pack_cluster_replica_status_update(
1848+
*replica_id,
1849+
*process_id,
1850+
status,
1851+
Diff::ONE,
1852+
);
18501853
let builtin_table_update = self
18511854
.catalog()
18521855
.state()
@@ -2018,7 +2021,7 @@ impl Coordinator {
20182021
self.catalog().state().pack_optimizer_notices(
20192022
&mut builtin_table_updates,
20202023
df_meta.optimizer_notices.iter(),
2021-
1,
2024+
Diff::ONE,
20222025
);
20232026
}
20242027

@@ -2074,7 +2077,7 @@ impl Coordinator {
20742077
self.catalog().state().pack_optimizer_notices(
20752078
&mut builtin_table_updates,
20762079
df_meta.optimizer_notices.iter(),
2077-
1,
2080+
Diff::ONE,
20782081
);
20792082
}
20802083

@@ -2125,7 +2128,7 @@ impl Coordinator {
21252128
self.catalog().state().pack_optimizer_notices(
21262129
&mut builtin_table_updates,
21272130
df_meta.optimizer_notices.iter(),
2128-
1,
2131+
Diff::ONE,
21292132
);
21302133
}
21312134

@@ -3807,7 +3810,7 @@ impl Coordinator {
38073810
.expect("all collections happen after Jan 1 1970");
38083811
if collection_timestamp < cutoff_ts {
38093812
debug!("pruning storage event {row:?}");
3810-
let builtin_update = BuiltinTableUpdate::row(item_id, row, -1);
3813+
let builtin_update = BuiltinTableUpdate::row(item_id, row, -Diff::ONE);
38113814
expired.push(builtin_update);
38123815
}
38133816
}

0 commit comments

Comments
 (0)