Conversation
604805d to
87c7953
Compare
There was a problem hiding this comment.
This might be an interesting one for getting tests green in Nightly. Cluster test:
cluster-clusterd1-1 | 2025-03-28T12:53:48.941559Z thread 'timely:work-1' panicked at src/ore/src/num.rs:649:1: Overflow 9223372036854775807 + 3
(Unfortunately I'm out next week, but ping me if you need some urgent help)
04f7f60 to
c19de8f
Compare
7f686e2 to
3a45c7e
Compare
frankmcsherry
left a comment
There was a problem hiding this comment.
Everything makes sense to me. Seems generally healthy to have a new type for this, in any case. But also, getting to a place where we are able to confidently see, and potentially avoid overflow errors feels very good.
| | SubscribeOutput::EnvelopeDebezium { .. } => {} | ||
| SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => { | ||
| packer.push(Datum::Int64(diff)); | ||
| packer.push(Datum::Int64(*diff)); |
There was a problem hiding this comment.
This comment will apply in many locations, but this is the first: should we avoid a deref to i64 and ask for explicitness here instead? Or .. perhaps we do that in the future as part of broadening the confidence associated with Diff? Perhaps ergonomically this is the only tractable first step, and .. it's fine to take small steps in this work.
There was a problem hiding this comment.
I removed the Deref (and Borrow) implementation. I think it's cleaner this way and forces us to think about type conversion rather than Rust being helpful (or not).
src/adapter/src/coord/peek.rs
Outdated
| let mut results = Vec::new(); | ||
| for (row, count) in rows { | ||
| if count < 0 { | ||
| if *count < 0 { |
There was a problem hiding this comment.
Potentially compare to Diff::ZERO, if that makes any sense.
There was a problem hiding this comment.
Reading from the rest of file, nevermind! It seems like there are similar ergonomic questions, and fidelity to the original source vs "fix it now" makes sense to me.
|
Looked through the Persist-related changes, and I believe a delegating impl Codec64 for Overflowing<i64> {
fn codec_name() -> String {
// Since we use the identical encoding to the underlying type, it's safe
// to use the identical codec name.
i64::codec_name()
}
fn encode(&self) -> [u8; 8] {
i64::encode(&**self)
}
fn decode(buf: [u8; 8]) -> Self {
i64::decode(buf).into()
}
}My take is that, on balance, that's probably cleaner than having two diff types and converting... and should help make your diff smaller, which is nice. (If that sounds appealing but it's not straightforward for whatever reason, I'd also be happy to take a look at it.) |
src/ore/src/num.rs
Outdated
|
|
||
| /// Handles overflow for [`Overflowing`](super::Overflowing) numbers. | ||
| #[track_caller] | ||
| #[inline] |
There was a problem hiding this comment.
This was mildly surprising to me - I would have guessed we'd want #[cold]/#[inline(never)], since that's what Rust tags on similar error handling paths, but I've got no direct experience one way or the other. What's the reasoning here?
There was a problem hiding this comment.
No, you're right, it was a blip of mind! It should certainly be cold. Fixing, and thanks for spotting!
The change introduced an overflowing type, which overflows in release
builds, too. It implements all relevant traits to be usable where we
would use a number normally, including differential and a few other
traits defined by external crates.
It uses the overflowing type in all parts of Materialize where possible.
Persist still requires the old plain `i64` type, so we introduce a
`StorageDiff` type alias, and at the boundary to persist convert to/from
the overflowing diff type.
This isn't ideal because storage can overflow, too, on consolidation,
but changing the type there seems harder and is out of my expertise.
The diff is quite large to account for boilerplate changes, everywhere
where one would write 1 previously, one now has to convert type or
mention `Diff::ONE` directly.
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
diff --git c/Cargo.lock i/Cargo.lock
index f968b78fce..9f2a2a9ab2 100644
--- c/Cargo.lock
+++ i/Cargo.lock
@@ -6203,11 +6203,14 @@ dependencies = [
"bytes",
"chrono",
"clap",
+ "columnar",
+ "columnation",
"compact_bytes",
"console-subscriber",
"criterion",
"ctor",
"derivative",
+ "differential-dataflow",
"either",
"futures",
"hibitset",
@@ -6221,6 +6224,7 @@ dependencies = [
"mz-ore-proc",
"native-tls",
"num",
+ "num-traits",
"openssl",
"opentelemetry",
"opentelemetry-otlp",
@@ -12252,6 +12256,7 @@ dependencies = [
"syn 2.0.98",
"time",
"time-macros",
+ "timely",
"tokio",
"tokio-postgres",
"tokio-stream",
diff --git c/src/adapter/src/active_compute_sink.rs i/src/adapter/src/active_compute_sink.rs
index 4b861f9abc..2f51af3c95 100644
--- c/src/adapter/src/active_compute_sink.rs
+++ i/src/adapter/src/active_compute_sink.rs
@@ -22,7 +22,7 @@ use mz_expr::compare_columns;
use mz_ore::cast::CastFrom;
use mz_ore::now::EpochMillis;
use mz_repr::adt::numeric;
-use mz_repr::{CatalogItemId, Datum, GlobalId, IntoRowIterator, Row, Timestamp};
+use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, Timestamp};
use mz_sql::plan::SubscribeOutput;
use mz_storage_types::instances::StorageInstanceId;
use timely::progress::Antichain;
@@ -182,10 +182,10 @@ impl ActiveSubscribe {
|(left_time, left_row, left_diff), (right_time, right_row, right_diff)| {
left_time.cmp(right_time).then_with(|| {
let mut left_datums = left_datum_vec.borrow();
- left_datums.extend(&[Datum::Int64(*left_diff)]);
+ left_datums.extend(&[Datum::Int64(**left_diff)]);
left_datums.extend(left_row.iter());
let mut right_datums = right_datum_vec.borrow();
- right_datums.extend(&[Datum::Int64(*right_diff)]);
+ right_datums.extend(&[Datum::Int64(**right_diff)]);
right_datums.extend(right_row.iter());
compare_columns(order_by, &left_datums, &right_datums, || {
left_row.cmp(right_row).then(left_diff.cmp(right_diff))
@@ -238,7 +238,7 @@ impl ActiveSubscribe {
let value_columns = self.arity - order_by_keys.len();
let mut packer = row_buf.packer();
new_rows.push(match &group[..] {
- [(_, row, 1)] => {
+ [(_, row, Diff::ONE)] => {
packer.push(if debezium {
Datum::String("insert")
} else {
@@ -258,9 +258,9 @@ impl ActiveSubscribe {
packer.push(datums[idx]);
}
}
- (start.0, row_buf.clone(), 0)
+ (start.0, row_buf.clone(), Diff::ZERO)
}
- [(_, _, -1)] => {
+ [(_, _, Diff::MINUS_ONE)] => {
packer.push(Datum::String("delete"));
let datums = datum_vec.borrow_with(&start.1);
for column_order in order_by_keys {
@@ -276,9 +276,9 @@ impl ActiveSubscribe {
for _ in 0..self.arity - order_by_keys.len() {
packer.push(Datum::Null);
}
- (start.0, row_buf.clone(), 0)
+ (start.0, row_buf.clone(), Diff::ZERO)
}
- [(_, old_row, -1), (_, row, 1)] => {
+ [(_, old_row, Diff::MINUS_ONE), (_, row, Diff::ONE)] => {
packer.push(Datum::String("upsert"));
let datums = datum_vec.borrow_with(row);
let old_datums = old_datum_vec.borrow_with(old_row);
@@ -298,7 +298,7 @@ impl ActiveSubscribe {
packer.push(datums[idx]);
}
}
- (start.0, row_buf.clone(), 0)
+ (start.0, row_buf.clone(), Diff::ZERO)
}
_ => {
packer.push(Datum::String("key_violation"));
@@ -314,7 +314,7 @@ impl ActiveSubscribe {
for _ in 0..(self.arity - order_by_keys.len()) {
packer.push(Datum::Null);
}
- (start.0, row_buf.clone(), 0)
+ (start.0, row_buf.clone(), Diff::ZERO)
}
});
}
@@ -342,7 +342,7 @@ impl ActiveSubscribe {
SubscribeOutput::EnvelopeUpsert { .. }
| SubscribeOutput::EnvelopeDebezium { .. } => {}
SubscribeOutput::Diffs | SubscribeOutput::WithinTimestampOrderBy { .. } => {
- packer.push(Datum::Int64(diff));
+ packer.push(Datum::Int64(*diff));
}
}
diff --git c/src/adapter/src/catalog/apply.rs i/src/adapter/src/catalog/apply.rs
index 13e962576b..92c412f775 100644
--- c/src/adapter/src/catalog/apply.rs
+++ i/src/adapter/src/catalog/apply.rs
@@ -42,7 +42,7 @@ use mz_ore::{instrument, soft_assert_no_log};
use mz_pgrepr::oid::INVALID_OID;
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
use mz_repr::role_id::RoleId;
-use mz_repr::{CatalogItemId, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
+use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersion, Timestamp, VersionedRelationDesc};
use mz_sql::catalog::CatalogError as SqlCatalogError;
use mz_sql::catalog::{CatalogItem as SqlCatalogItem, CatalogItemType, CatalogSchema, CatalogType};
use mz_sql::names::{
@@ -1319,7 +1319,7 @@ impl CatalogState {
// catalog.
let item_id = entry.id();
state.insert_entry(entry);
- builtin_table_updates.extend(state.pack_item_update(item_id, 1));
+ builtin_table_updates.extend(state.pack_item_update(item_id, Diff::ONE));
}
let mut handles = Vec::new();
@@ -1523,7 +1523,7 @@ impl CatalogState {
builtin_table_updates.extend(
item_ids
.into_iter()
- .flat_map(|id| state.pack_item_update(id, 1)),
+ .flat_map(|id| state.pack_item_update(id, Diff::ONE)),
);
builtin_table_updates
diff --git c/src/adapter/src/catalog/builtin_table_updates.rs i/src/adapter/src/catalog/builtin_table_updates.rs
index 9dcfd4a24c..21342026e8 100644
--- c/src/adapter/src/catalog/builtin_table_updates.rs
+++ i/src/adapter/src/catalog/builtin_table_updates.rs
@@ -1960,7 +1960,7 @@ impl CatalogState {
Datum::Int32(ip.prefix_len().into()),
Datum::String(&format!("{}/{}", addr, ip.prefix_len())),
]);
- Ok(BuiltinTableUpdate::row(id, row, 1))
+ Ok(BuiltinTableUpdate::row(id, row, Diff::ONE))
}
pub fn pack_replica_metric_updates(
@@ -2034,7 +2034,7 @@ impl CatalogState {
disk_bytes.into(),
(*credits_per_hour).into(),
]);
- BuiltinTableUpdate::row(id, row, 1)
+ BuiltinTableUpdate::row(id, row, Diff::ONE)
},
)
.collect();
diff --git c/src/adapter/src/catalog/migrate.rs i/src/adapter/src/catalog/migrate.rs
index bb50f2ba69..8cbfc2e3d7 100644
--- c/src/adapter/src/catalog/migrate.rs
+++ i/src/adapter/src/catalog/migrate.rs
@@ -16,7 +16,7 @@ use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
-use mz_repr::{CatalogItemId, Timestamp};
+use mz_repr::{CatalogItemId, Diff, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql::ast::CreateSinkOptionName;
use mz_sql::names::FullItemName;
@@ -83,7 +83,7 @@ where
pub(crate) struct MigrateResult {
pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
- pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, i64)>,
+ pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, Diff)>,
}
/// Migrates all user items and loads them into `state`.
diff --git c/src/adapter/src/catalog/open.rs i/src/adapter/src/catalog/open.rs
index c131218c1f..aaf58b3c68 100644
--- c/src/adapter/src/catalog/open.rs
+++ i/src/adapter/src/catalog/open.rs
@@ -254,7 +254,7 @@ impl Catalog {
let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
differential_dataflow::consolidation::consolidate_updates(&mut updates);
soft_assert_no_log!(
- updates.iter().all(|(_, _, diff)| *diff == 1),
+ updates.iter().all(|(_, _, diff)| **diff == 1),
"consolidated updates should be positive during startup: {updates:?}"
);
@@ -531,7 +531,7 @@ impl Catalog {
mz_sql::func::Func::Scalar(impls) => {
for imp in impls {
builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
- catalog.state.pack_op_update(op, imp.details(), 1),
+ catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
));
}
}
diff --git c/src/adapter/src/catalog/open/builtin_item_migration.rs i/src/adapter/src/catalog/open/builtin_item_migration.rs
index 5c9e4bb4e2..1b55abffc2 100644
--- c/src/adapter/src/catalog/open/builtin_item_migration.rs
+++ i/src/adapter/src/catalog/open/builtin_item_migration.rs
@@ -35,6 +35,7 @@ use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
use mz_sql::catalog::CatalogItem as _;
use mz_storage_client::controller::StorageTxn;
+use mz_storage_types::StorageDiff;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use tracing::{debug, error};
@@ -199,19 +200,20 @@ async fn migrate_builtin_items_0dt(
"builtin table migration shard for org {organization_id:?} version {build_version:?}"
),
};
- let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
- .open_critical_since(
- shard_id,
- // TODO: We may need to use a different critical reader
- // id for this if we want to be able to introspect it via SQL.
- PersistClient::CONTROLLER_CRITICAL_SINCE,
- diagnostics.clone(),
- )
- .await
- .expect("invalid usage");
+ let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
+ persist_client
+ .open_critical_since(
+ shard_id,
+ // TODO: We may need to use a different critical reader
+ // id for this if we want to be able to introspect it via SQL.
+ PersistClient::CONTROLLER_CRITICAL_SINCE,
+ diagnostics.clone(),
+ )
+ .await
+ .expect("invalid usage");
let (mut write_handle, mut read_handle): (
- WriteHandle<TableKey, ShardId, Timestamp, Diff>,
- ReadHandle<TableKey, ShardId, Timestamp, Diff>,
+ WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
+ ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
) = persist_client
.open(
shard_id,
@@ -223,7 +225,7 @@ async fn migrate_builtin_items_0dt(
.await
.expect("invalid usage");
// Commit an empty write at the minimum timestamp so the shard is always readable.
- const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, Diff)] = &[];
+ const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
let res = write_handle
.compare_and_append(
EMPTY_UPDATES,
@@ -323,7 +325,7 @@ async fn migrate_builtin_items_0dt(
if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
== Some(&shard_id)
{
- migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
+ migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -Diff::ONE));
} else {
migration_shards_to_finalize.insert((table_key, shard_id));
}
@@ -343,7 +345,7 @@ async fn migrate_builtin_items_0dt(
global_id,
build_version: build_version.clone(),
};
- migrated_shard_updates.push(((table_key, shard_id), upper, 1));
+ migrated_shard_updates.push(((table_key, shard_id), upper, Diff::ONE));
}
}
@@ -412,7 +414,7 @@ async fn migrate_builtin_items_0dt(
if !read_only {
let updates: Vec<_> = migration_shards_to_finalize
.into_iter()
- .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
+ .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -Diff::ONE))
.collect();
if !updates.is_empty() {
// Ignore any errors, these shards will get cleaned up in the next upgrade.
@@ -437,7 +439,7 @@ async fn migrate_builtin_items_0dt(
}
async fn fetch_upper(
- write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
+ write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
) -> Timestamp {
write_handle
.fetch_recent_upper()
@@ -450,8 +452,8 @@ async fn fetch_upper(
async fn write_to_migration_shard(
updates: Vec<((TableKey, ShardId), Timestamp, Diff)>,
upper: Timestamp,
- write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, Diff>,
- since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, Diff, i64>,
+ write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
+ since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
) -> Result<Timestamp, Error> {
let next_upper = upper.step_forward();
// Lag the shard's upper by 1 to keep it readable.
diff --git c/src/adapter/src/catalog/transact.rs i/src/adapter/src/catalog/transact.rs
index baa90f073e..8e17370a1d 100644
--- c/src/adapter/src/catalog/transact.rs
+++ i/src/adapter/src/catalog/transact.rs
@@ -41,7 +41,7 @@ use mz_persist_types::ShardId;
use mz_repr::adt::mz_acl_item::{merge_mz_acl_items, AclMode, MzAclItem, PrivilegeMap};
use mz_repr::network_policy_id::NetworkPolicyId;
use mz_repr::role_id::RoleId;
-use mz_repr::{strconv, CatalogItemId, ColumnName, ColumnType, GlobalId};
+use mz_repr::{strconv, CatalogItemId, ColumnName, ColumnType, Diff, GlobalId};
use mz_sql::ast::RawDataType;
use mz_sql::catalog::{
CatalogDatabase, CatalogError as SqlCatalogError, CatalogItem as SqlCatalogItem, CatalogRole,
@@ -462,7 +462,7 @@ impl Catalog {
self.state().pack_optimizer_notices(
&mut builtin_table_updates,
dropped_notices.iter(),
- -1,
+ -Diff::ONE,
);
}
@@ -2347,7 +2347,7 @@ impl Catalog {
let id = tx.allocate_storage_usage_ids()?;
let metric =
VersionedStorageUsage::new(id, object_id, size_bytes, collection_timestamp);
- let builtin_table_update = state.pack_storage_usage_update(metric, 1);
+ let builtin_table_update = state.pack_storage_usage_update(metric, Diff::ONE);
let builtin_table_update = state.resolve_builtin_table_update(builtin_table_update);
weird_builtin_table_update = Some(builtin_table_update);
}
diff --git c/src/adapter/src/coord.rs i/src/adapter/src/coord.rs
index 84baf4fe35..170c557c7f 100644
--- c/src/adapter/src/coord.rs
+++ i/src/adapter/src/coord.rs
@@ -131,7 +131,7 @@ use mz_repr::explain::{ExplainConfig, ExplainFormat};
use mz_repr::global_id::TransientIdGen;
use mz_repr::optimize::OptimizerFeatures;
use mz_repr::role_id::RoleId;
-use mz_repr::{CatalogItemId, GlobalId, RelationDesc, Timestamp};
+use mz_repr::{CatalogItemId, Diff, GlobalId, RelationDesc, Timestamp};
use mz_secrets::cache::CachingSecretsReader;
use mz_secrets::{SecretsController, SecretsReader};
use mz_sql::ast::{Raw, Statement};
@@ -1843,10 +1843,13 @@ impl Coordinator {
for replica_statuses in self.cluster_replica_statuses.0.values() {
for (replica_id, processes_statuses) in replica_statuses {
for (process_id, status) in processes_statuses {
- let builtin_table_update = self
- .catalog()
- .state()
- .pack_cluster_replica_status_update(*replica_id, *process_id, status, 1);
+ let builtin_table_update =
+ self.catalog().state().pack_cluster_replica_status_update(
+ *replica_id,
+ *process_id,
+ status,
+ Diff::ONE,
+ );
let builtin_table_update = self
.catalog()
.state()
@@ -2018,7 +2021,7 @@ impl Coordinator {
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
- 1,
+ Diff::ONE,
);
}
@@ -2074,7 +2077,7 @@ impl Coordinator {
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
- 1,
+ Diff::ONE,
);
}
@@ -2125,7 +2128,7 @@ impl Coordinator {
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
- 1,
+ Diff::ONE,
);
}
@@ -3807,7 +3810,7 @@ impl Coordinator {
.expect("all collections happen after Jan 1 1970");
if collection_timestamp < cutoff_ts {
debug!("pruning storage event {row:?}");
- let builtin_update = BuiltinTableUpdate::row(item_id, row, -1);
+ let builtin_update = BuiltinTableUpdate::row(item_id, row, -Diff::ONE);
expired.push(builtin_update);
}
}
diff --git c/src/adapter/src/coord/command_handler.rs i/src/adapter/src/coord/command_handler.rs
index 8667f1af99..a80773d76b 100644
--- c/src/adapter/src/coord/command_handler.rs
+++ i/src/adapter/src/coord/command_handler.rs
@@ -27,7 +27,7 @@ use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{instrument, soft_panic_or_log};
use mz_repr::role_id::RoleId;
-use mz_repr::{ScalarType, Timestamp};
+use mz_repr::{Diff, ScalarType, Timestamp};
use mz_sql::ast::{
AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
@@ -270,7 +270,7 @@ impl Coordinator {
authenticated_role: role_id,
deferred_lock: None,
};
- let update = self.catalog().state().pack_session_update(&conn, 1);
+ let update = self.catalog().state().pack_session_update(&conn, Diff::ONE);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.begin_session_for_statement_logging(&conn);
self.active_conns.insert(conn_id.clone(), conn);
@@ -1308,7 +1308,10 @@ impl Coordinator {
// Queue the builtin table update, but do not wait for it to complete. We explicitly do
// this to prevent blocking the Coordinator in the case that a lot of connections are
// closed at once, which occurs regularly in some workflows.
- let update = self.catalog().state().pack_session_update(&conn, -1);
+ let update = self
+ .catalog()
+ .state()
+ .pack_session_update(&conn, -Diff::ONE);
let update = self.catalog().state().resolve_builtin_table_update(update);
let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
diff --git c/src/adapter/src/coord/ddl.rs i/src/adapter/src/coord/ddl.rs
index c10fb527d3..6a0582863c 100644
--- c/src/adapter/src/coord/ddl.rs
+++ i/src/adapter/src/coord/ddl.rs
@@ -34,7 +34,7 @@ use mz_ore::str::StrExt;
use mz_ore::task;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_repr::adt::numeric::Numeric;
-use mz_repr::{CatalogItemId, GlobalId, Timestamp};
+use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
use mz_sql::catalog::{CatalogCluster, CatalogClusterReplica, CatalogSchema};
use mz_sql::names::ResolvedDatabaseSpecifier;
use mz_sql::plan::ConnectionDetails;
@@ -561,7 +561,7 @@ impl Coordinator {
*replica_id,
process_id,
&status,
- -1,
+ -Diff::ONE,
);
let builtin_table_update = catalog
.state()
@@ -573,9 +573,12 @@ impl Coordinator {
let cluster_statuses = cluster_replica_statuses.remove_cluster_statuses(cluster_id);
for (replica_id, replica_statuses) in cluster_statuses {
for (process_id, status) in replica_statuses {
- let builtin_table_update = catalog
- .state()
- .pack_cluster_replica_status_update(replica_id, process_id, &status, -1);
+ let builtin_table_update = catalog.state().pack_cluster_replica_status_update(
+ replica_id,
+ process_id,
+ &status,
+ -Diff::ONE,
+ );
let builtin_table_update = catalog
.state()
.resolve_builtin_table_update(builtin_table_update);
@@ -593,7 +596,7 @@ impl Coordinator {
*replica_id,
*process_id,
status,
- 1,
+ Diff::ONE,
);
let builtin_table_update = catalog
.state()
@@ -621,7 +624,7 @@ impl Coordinator {
replica_id,
*process_id,
status,
- 1,
+ Diff::ONE,
);
let builtin_table_update = catalog
.state()
@@ -867,10 +870,11 @@ impl Coordinator {
{
let mut updates = vec![];
if let Some(metrics) = metrics {
- let retractions = self
- .catalog()
- .state()
- .pack_replica_metric_updates(replica_id, &metrics, -1);
+ let retractions = self.catalog().state().pack_replica_metric_updates(
+ replica_id,
+ &metrics,
+ -Diff::ONE,
+ );
let retractions = self
.catalog()
.state()
diff --git c/src/adapter/src/coord/message_handler.rs i/src/adapter/src/coord/message_handler.rs
index 70a057ded6..1145198782 100644
--- c/src/adapter/src/coord/message_handler.rs
+++ i/src/adapter/src/coord/message_handler.rs
@@ -24,7 +24,7 @@ use mz_ore::option::OptionExt;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{soft_assert_or_log, task};
use mz_persist_client::usage::ShardsUsageReferenced;
-use mz_repr::{Datum, Row};
+use mz_repr::{Datum, Diff, Row};
use mz_sql::ast::Statement;
use mz_sql::pure::PurifiedStatement;
use mz_storage_client::controller::IntrospectionType;
@@ -188,7 +188,7 @@ impl Coordinator {
mz_storage_client::controller::IntrospectionType::PrivatelinkConnectionStatusHistory,
events
.into_iter()
- .map(|e| (mz_repr::Row::from(e), 1))
+ .map(|e| (mz_repr::Row::from(e), Diff::ONE))
.collect(),
);
}
@@ -419,14 +419,17 @@ impl Coordinator {
let old = std::mem::replace(m, Some(new.clone()));
if old.as_ref() != Some(&new) {
let retractions = old.map(|old| {
- self.catalog()
- .state()
- .pack_replica_metric_updates(replica_id, &old, -1)
+ self.catalog().state().pack_replica_metric_updates(
+ replica_id,
+ &old,
+ -Diff::ONE,
+ )
});
- let insertions = self
- .catalog()
- .state()
- .pack_replica_metric_updates(replica_id, &new, 1);
+ let insertions = self.catalog().state().pack_replica_metric_updates(
+ replica_id,
+ &new,
+ Diff::ONE,
+ );
let updates = if let Some(retractions) = retractions {
retractions
.into_iter()
@@ -721,7 +724,7 @@ impl Coordinator {
]);
self.controller.storage.append_introspection_updates(
IntrospectionType::ReplicaStatusHistory,
- vec![(row, 1)],
+ vec![(row, Diff::ONE)],
);
}
@@ -735,7 +738,7 @@ impl Coordinator {
event.replica_id,
event.process_id,
old_process_status,
- -1,
+ -Diff::ONE,
);
let builtin_table_retraction = self
.catalog()
@@ -750,7 +753,7 @@ impl Coordinator {
event.replica_id,
event.process_id,
&new_process_status,
- 1,
+ Diff::ONE,
);
let builtin_table_addition = self
.catalog()
diff --git c/src/adapter/src/coord/peek.rs i/src/adapter/src/coord/peek.rs
index 1eb0505373..e975be31aa 100644
--- c/src/adapter/src/coord/peek.rs
+++ i/src/adapter/src/coord/peek.rs
@@ -466,14 +466,14 @@ impl crate::coord::Coordinator {
let mut results = Vec::new();
for (row, count) in rows {
- if count < 0 {
+ if *count < 0 {
Err(EvalError::InvalidParameterValue(
format!("Negative multiplicity in constant result: {}", count).into(),
))?
};
- if count > 0 {
+ if *count > 0 {
let count = usize::cast_from(
- u64::try_from(count).expect("known to be positive from check above"),
+ u64::try_from(*count).expect("known to be positive from check above"),
);
results.push((
row,
@@ -853,9 +853,9 @@ mod tests {
assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
let mut constant_rows = vec![
- (Row::pack(Some(Datum::String("hello"))), 1),
- (Row::pack(Some(Datum::String("world"))), 2),
- (Row::pack(Some(Datum::String("star"))), 500),
+ (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
+ (Row::pack(Some(Datum::String("world"))), 2.into()),
+ (Row::pack(Some(Datum::String("star"))), 500.into()),
];
let constant_exp1 =
"Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
@@ -866,7 +866,8 @@ mod tests {
),
constant_exp1
);
- constant_rows.extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), 1)));
+ constant_rows
+ .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
let constant_exp2 =
"Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
diff --git c/src/adapter/src/coord/sequencer.rs i/src/adapter/src/coord/sequencer.rs
index 61e7665e36..6b02c60c27 100644
--- c/src/adapter/src/coord/sequencer.rs
+++ i/src/adapter/src/coord/sequencer.rs
@@ -834,7 +834,7 @@ impl Coordinator {
// If all diffs are positive, the number of affected rows is just the
// sum of all unconsolidated diffs.
for (_, diff) in plan.updates.iter() {
- if *diff < 0 {
+ if **diff < 0 {
all_positive_diffs = false;
break;
}
@@ -848,7 +848,7 @@ impl Coordinator {
// affected rows.
differential_dataflow::consolidation::consolidate(&mut plan.updates);
- affected_rows = 0;
+ affected_rows = Diff::ZERO;
// With retractions, the number of affected rows is not the number
// of rows we see, but the sum of the absolute value of their diffs,
// e.g. if one row is retracted and another is added, the total
diff --git c/src/adapter/src/coord/sequencer/inner.rs i/src/adapter/src/coord/sequencer/inner.rs
index 2bb42793bd..20ee677ddb 100644
--- c/src/adapter/src/coord/sequencer/inner.rs
+++ i/src/adapter/src/coord/sequencer/inner.rs
@@ -3074,20 +3074,20 @@ impl Coordinator {
datums[idx] = new_value;
}
let updated = Row::pack_slice(&datums);
- diffs.push((updated, 1));
+ diffs.push((updated, Diff::ONE));
}
match kind {
// Updates and deletes always remove the
// current row. Updates will also add an
// updated value.
MutationKind::Update | MutationKind::Delete => {
- diffs.push((row.to_owned(), -1))
+ diffs.push((row.to_owned(), -Diff::ONE))
}
- MutationKind::Insert => diffs.push((row.to_owned(), 1)),
+ MutationKind::Insert => diffs.push((row.to_owned(), Diff::ONE)),
}
}
for (row, diff) in &diffs {
- if *diff > 0 {
+ if **diff > 0 {
for (idx, datum) in row.iter().enumerate() {
desc.constraints_met(idx, &datum)?;
}
@@ -3137,7 +3137,7 @@ impl Coordinator {
.as_ref()
.expect("known to be `Ok` from `is_ok()` call above")
{
- if diff < &1 {
+ if **diff < 1 {
continue;
}
let mut returning_row = Row::with_capacity(returning.len());
@@ -3154,7 +3154,7 @@ impl Coordinator {
}
}
}
- let diff = NonZeroI64::try_from(*diff).expect("known to be >= 1");
+ let diff = NonZeroI64::try_from(**diff).expect("known to be >= 1");
let diff = match NonZeroUsize::try_from(diff) {
Ok(diff) => diff,
Err(err) => {
@@ -5158,7 +5158,7 @@ impl Coordinator {
self.catalog().state().pack_optimizer_notices(
&mut builtin_table_updates,
df_meta.optimizer_notices.iter(),
- 1,
+ Diff::ONE,
);
// Save the metainfo.
diff --git c/src/adapter/src/coord/sql.rs i/src/adapter/src/coord/sql.rs
index ad8ed1b14b..848907c0e9 100644
--- c/src/adapter/src/coord/sql.rs
+++ i/src/adapter/src/coord/sql.rs
@@ -12,7 +12,7 @@
use mz_adapter_types::connection::ConnectionId;
use mz_ore::now::EpochMillis;
-use mz_repr::{GlobalId, ScalarType};
+use mz_repr::{Diff, GlobalId, ScalarType};
use mz_sql::names::{Aug, ResolvedIds};
use mz_sql::plan::{Params, StatementDesc};
use mz_sql::session::metadata::SessionMetadata;
@@ -252,10 +252,10 @@ impl Coordinator {
let ret_fut = match &active_sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
- let update = self
- .catalog()
- .state()
- .pack_subscribe_update(id, active_subscribe, 1);
+ let update =
+ self.catalog()
+ .state()
+ .pack_subscribe_update(id, active_subscribe, Diff::ONE);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.metrics
@@ -299,10 +299,11 @@ impl Coordinator {
match &sink {
ActiveComputeSink::Subscribe(active_subscribe) => {
- let update =
- self.catalog()
- .state()
- .pack_subscribe_update(id, active_subscribe, -1);
+ let update = self.catalog().state().pack_subscribe_update(
+ id,
+ active_subscribe,
+ -Diff::ONE,
+ );
let update = self.catalog().state().resolve_builtin_table_update(update);
self.builtin_table_update().blocking(vec![update]).await;
diff --git c/src/adapter/src/coord/statement_logging.rs i/src/adapter/src/coord/statement_logging.rs
index 3f071cfc8e..2a76a6e1a5 100644
--- c/src/adapter/src/coord/statement_logging.rs
+++ i/src/adapter/src/coord/statement_logging.rs
@@ -226,7 +226,7 @@ impl Coordinator {
pub(crate) fn drain_statement_log(&mut self) {
let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
.into_iter()
- .map(|update| (update, 1))
+ .map(|update| (update, Diff::ONE))
.collect();
let (prepared_statement_updates, sql_text_updates) =
std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
@@ -235,7 +235,9 @@ impl Coordinator {
|PreparedStatementEvent {
prepared_statement,
sql_text,
- }| ((prepared_statement, 1), (sql_text, 1)),
+ }| {
+ ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
+ },
)
.unzip::<_, _, Vec<_>, Vec<_>>();
let statement_execution_updates =
@@ -243,7 +245,7 @@ impl Coordinator {
let statement_lifecycle_updates =
std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
.into_iter()
- .map(|update| (update, 1))
+ .map(|update| (update, Diff::ONE))
.collect();
use IntrospectionType::*;
@@ -601,7 +603,7 @@ impl Coordinator {
) -> [(Row, Diff); 2] {
let retraction = Self::pack_statement_began_execution_update(began_record);
let new = Self::pack_full_statement_execution_update(began_record, ended_record);
- [(retraction, -1), (new, 1)]
+ [(retraction, -Diff::ONE), (new, Diff::ONE)]
}
/// Mutate a statement execution record via the given function `f`.
@@ -618,12 +620,12 @@ impl Coordinator {
let retraction = Self::pack_statement_began_execution_update(record);
self.statement_logging
.pending_statement_execution_events
- .push((retraction, -1));
+ .push((retraction, -Diff::ONE));
f(record);
let update = Self::pack_statement_began_execution_update(record);
self.statement_logging
.pending_statement_execution_events
- .push((update, 1));
+ .push((update, Diff::ONE));
}
/// Set the `cluster_id` for a statement, once it's known.
@@ -770,7 +772,7 @@ impl Coordinator {
let mseh_update = Self::pack_statement_began_execution_update(&record);
self.statement_logging
.pending_statement_execution_events
- .push((mseh_update, 1));
+ .push((mseh_update, Diff::ONE));
self.statement_logging
.executions_begun
.insert(ev_id, record);
diff --git c/src/catalog-debug/src/main.rs i/src/catalog-debug/src/main.rs
index 47c042dc8d..3200bb3492 100644
--- c/src/catalog-debug/src/main.rs
+++ i/src/catalog-debug/src/main.rs
@@ -62,6 +62,7 @@ use mz_sql::catalog::EnvironmentId;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::controller::StorageError;
use mz_storage_types::sources::SourceData;
+use mz_storage_types::StorageDiff;
use serde::{Deserialize, Serialize};
use tracing::{error, Instrument};
@@ -394,8 +395,8 @@ async fn dump(
.collect();
let total_count = entries.len();
- let addition_count = entries.iter().filter(|entry| entry.diff == 1).count();
- let retraction_count = entries.iter().filter(|entry| entry.diff == -1).count();
+ let addition_count = entries.iter().filter(|entry| *entry.diff == 1).count();
+ let retraction_count = entries.iter().filter(|entry| *entry.diff == -1).count();
let entries = if stats_only { None } else { Some(entries) };
let dumped_col = DumpedCollection {
total_count,
@@ -702,7 +703,7 @@ async fn upgrade_check(
handle_purpose: "catalog upgrade check".to_string(),
};
let persisted_schema = persist_client
- .latest_schema::<SourceData, (), Timestamp, Diff>(shard_id, diagnostics)
+ .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics)
.await
.expect("invalid persist usage");
// If in the new version a BuiltinTable or BuiltinSource is changed (e.g. a new
diff --git c/src/catalog/src/durable.rs i/src/catalog/src/durable.rs
index e566a1b39b..412c75a2ee 100644
--- c/src/catalog/src/durable.rs
+++ i/src/catalog/src/durable.rs
@@ -398,7 +398,7 @@ impl AuditLogIterator {
.into_iter()
.map(|(kind, ts, diff)| {
assert_eq!(
- diff, 1,
+ *diff, 1,
"audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
);
assert!(
diff --git c/src/catalog/src/durable/objects/state_update.rs i/src/catalog/src/durable/objects/state_update.rs
index 7f00f98772..117afbd8f2 100644
--- c/src/catalog/src/durable/objects/state_update.rs
+++ i/src/catalog/src/durable/objects/state_update.rs
@@ -42,6 +42,7 @@ use mz_repr::adt::jsonb::Jsonb;
use mz_repr::adt::numeric::{Dec, Numeric};
use mz_repr::Diff;
use mz_storage_types::sources::SourceData;
+use mz_storage_types::StorageDiff;
use proptest_derive::Arbitrary;
use tracing::error;
@@ -391,7 +392,7 @@ impl StateUpdateKindJson {
type PersistStateUpdate = (
(Result<SourceData, String>, Result<(), String>),
Timestamp,
- i64,
+ StorageDiff,
);
impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
@@ -998,7 +999,7 @@ impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
StateUpdate {
kind: StateUpdateKindJson::from(key),
ts,
- diff,
+ diff: diff.into(),
}
}
}
diff --git c/src/catalog/src/durable/persist.rs i/src/catalog/src/durable/persist.rs
index b472f1e346..5dbaaa1188 100644
--- c/src/catalog/src/durable/persist.rs
+++ i/src/catalog/src/durable/persist.rs
@@ -39,6 +39,7 @@ use mz_persist_types::codec_impls::UnitSchema;
use mz_proto::{RustType, TryFromProtoError};
use mz_repr::{Diff, RelationDesc, ScalarType};
use mz_storage_types::sources::SourceData;
+use mz_storage_types::StorageDiff;
use sha2::Digest;
use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
use timely::Container;
@@ -263,7 +264,10 @@ impl FenceableToken {
let mut fence_updates = Vec::with_capacity(2);
if let Some(durable_token) = &durable_token {
- fence_updates.push((StateUpdateKind::FenceToken(durable_token.clone()), -1));
+ fence_updates.push((
+ StateUpdateKind::FenceToken(durable_token.clone()),
+ -Diff::ONE,
+ ));
}
let current_deploy_generation = current_deploy_generation
@@ -284,7 +288,10 @@ impl FenceableToken {
epoch: current_epoch,
};
- fence_updates.push((StateUpdateKind::FenceToken(current_token.clone()), 1));
+ fence_updates.push((
+ StateUpdateKind::FenceToken(current_token.clone()),
+ Diff::ONE,
+ ));
let current_fenceable_token = FenceableToken::Unfenced { current_token };
@@ -358,11 +365,11 @@ pub(crate) struct PersistHandle<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> {
/// The [`Mode`] that this catalog was opened in.
pub(crate) mode: Mode,
/// Since handle to control compaction.
- since_handle: SinceHandle<SourceData, (), Timestamp, Diff, i64>,
+ since_handle: SinceHandle<SourceData, (), Timestamp, StorageDiff, i64>,
/// Write handle to persist.
- write_handle: WriteHandle<SourceData, (), Timestamp, Diff>,
+ write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
/// Listener to catalog changes.
- listen: Listen<SourceData, (), Timestamp, Diff>,
+ listen: Listen<SourceData, (), Timestamp, StorageDiff>,
/// Handle for connecting to persist.
persist_client: PersistClient,
/// Catalog shard ID.
@@ -389,7 +396,7 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
/// Increment the version in the catalog upgrade shard to the code's current version.
async fn increment_catalog_upgrade_shard_version(&self, organization_id: Uuid) {
let upgrade_shard_id = shard_id(organization_id, UPGRADE_SEED);
- let mut write_handle: WriteHandle<(), (), Timestamp, Diff> = self
+ let mut write_handle: WriteHandle<(), (), Timestamp, StorageDiff> = self
.persist_client
.open_writer(
upgrade_shard_id,
@@ -402,7 +409,7 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
)
.await
.expect("invalid usage");
- const EMPTY_UPDATES: &[(((), ()), Timestamp, Diff)] = &[];
+ const EMPTY_UPDATES: &[(((), ()), Timestamp, StorageDiff)] = &[];
let mut upper = write_handle.fetch_recent_upper().await.clone();
loop {
let next_upper = upper
@@ -469,10 +476,10 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
})
.collect();
let contains_retraction = parsed_updates.iter().any(|(update, diff)| {
- matches!(update, StateUpdateKind::FenceToken(..)) && *diff == -1
+ matches!(update, StateUpdateKind::FenceToken(..)) && **diff == -1
});
let contains_addition = parsed_updates.iter().any(|(update, diff)| {
- matches!(update, StateUpdateKind::FenceToken(..)) && *diff == 1
+ matches!(update, StateUpdateKind::FenceToken(..)) && **diff == 1
});
let contains_fence = contains_retraction && contains_addition;
Some((contains_fence, updates))
@@ -482,7 +489,7 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
let updates = updates.into_iter().map(|(kind, diff)| {
let kind: StateUpdateKindJson = kind.into();
- ((Into::<SourceData>::into(kind), ()), commit_ts, diff)
+ ((Into::<SourceData>::into(kind), ()), commit_ts, *diff)
});
let next_upper = commit_ts.step_forward();
let res = self
@@ -660,7 +667,7 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
let mut errors = Vec::new();
for (kind, ts, diff) in updates {
- if diff != 1 && diff != -1 {
+ if *diff != 1 && *diff != -1 {
panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
}
@@ -720,7 +727,7 @@ impl<T: TryIntoStateUpdateKind, U: ApplyUpdate<T>> PersistHandle<T, U> {
}
/// Open a read handle to the catalog.
- async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, Diff> {
+ async fn read_handle(&self) -> ReadHandle<SourceData, (), Timestamp, StorageDiff> {
self.persist_client
.open_leased_reader(
self.shard_id,
@@ -758,13 +765,13 @@ impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
{
let key = key.clone();
let value = value.clone();
- if diff == 1 {
+ if *diff == 1 {
let prev = map.insert(key, value);
assert_eq!(
prev, None,
"values must be explicitly retracted before inserting a new value"
);
- } else if diff == -1 {
+ } else if *diff == -1 {
let prev = map.remove(&key);
assert_eq!(
prev,
@@ -778,7 +785,7 @@ impl<U: ApplyUpdate<StateUpdateKind>> PersistHandle<StateUpdateKind, U> {
let mut snapshot = Snapshot::empty();
for (kind, ts, diff) in trace {
let diff = *diff;
- if diff != 1 && diff != -1 {
+ if *diff != 1 && *diff != -1 {
panic!("invalid update in consolidated trace: ({kind:?}, {ts:?}, {diff:?})");
}
@@ -907,14 +914,14 @@ impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
if !update.kind.is_audit_log() && update.kind.is_always_deserializable() {
let kind = TryInto::try_into(&update.kind).expect("kind is known to be deserializable");
match (kind, update.diff) {
- (StateUpdateKind::Config(key, value), 1) => {
+ (StateUpdateKind::Config(key, value), Diff::ONE) => {
let prev = self.configs.insert(key.key, value.value);
assert_eq!(
prev, None,
"values must be explicitly retracted before inserting a new value"
);
}
- (StateUpdateKind::Config(key, value), -1) => {
+ (StateUpdateKind::Config(key, value), Diff::MINUS_ONE) => {
let prev = self.configs.remove(&key.key);
assert_eq!(
prev,
@@ -922,14 +929,14 @@ impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
"retraction does not match existing value"
);
}
- (StateUpdateKind::Setting(key, value), 1) => {
+ (StateUpdateKind::Setting(key, value), Diff::ONE) => {
let prev = self.settings.insert(key.name, value.value);
assert_eq!(
prev, None,
"values must be explicitly retracted before inserting a new value"
);
}
- (StateUpdateKind::Setting(key, value), -1) => {
+ (StateUpdateKind::Setting(key, value), Diff::MINUS_ONE) => {
let prev = self.settings.remove(&key.name);
assert_eq!(
prev,
@@ -937,7 +944,7 @@ impl ApplyUpdate<StateUpdateKindJson> for UnopenedCatalogStateInner {
"retraction does not match existing value"
);
}
- (StateUpdateKind::FenceToken(fence_token), 1) => {
+ (StateUpdateKind::FenceToken(fence_token), Diff::ONE) => {
current_fence_token.maybe_fence(fence_token)?;
}
_ => {}
@@ -1038,7 +1045,7 @@ impl UnopenedPersistCatalogState {
// Commit an empty write at the minimum timestamp so the catalog is always readable.
let upper = {
- const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, Diff)] = &[];
+ const EMPTY_UPDATES: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
let upper = Antichain::from_elem(Timestamp::minimum());
let next_upper = Timestamp::minimum().step_forward();
match write_handle
@@ -1087,7 +1094,7 @@ impl UnopenedPersistCatalogState {
// If the snapshot is not consolidated, and we see multiple epoch values while applying the
// updates, then we might accidentally fence ourselves out.
soft_assert_no_log!(
- snapshot.iter().all(|(_, _, diff)| *diff == 1),
+ snapshot.iter().all(|(_, _, diff)| **diff == 1),
"snapshot should be consolidated: {snapshot:#?}"
);
@@ -1211,7 +1218,7 @@ impl UnopenedPersistCatalogState {
.into_iter()
.partition(|(update, _, _)| update.is_audit_log());
self.snapshot = snapshot;
- let audit_log_count = audit_logs.iter().map(|(_, _, diff)| diff).sum();
+ let audit_log_count = *audit_logs.iter().map(|(_, _, diff)| diff).sum::<Diff>();
let audit_log_handle = AuditLogIterator::new(audit_logs);
// Perform data migrations.
@@ -1568,7 +1575,7 @@ impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
metrics
.collection_entries
.with_label_values(&[&collection_type.to_string()])
- .add(update.diff);
+ .add(*update.diff);
}
{
@@ -1583,8 +1590,8 @@ impl ApplyUpdate<StateUpdateKind> for CatalogStateInner {
match (update.kind, update.diff) {
(StateUpdateKind::AuditLog(_, ()), _) => Ok(None),
// Nothing to due for fence token retractions but wait for the next insertion.
- (StateUpdateKind::FenceToken(_), -1) => Ok(None),
- (StateUpdateKind::FenceToken(token), 1) => {
+ (StateUpdateKind::FenceToken(_), Diff::MINUS_ONE) => Ok(None),
+ (StateUpdateKind::FenceToken(token), Diff::ONE) => {
current_fence_token.maybe_fence(token)?;
Ok(None)
}
@@ -1836,7 +1843,10 @@ fn desc() -> RelationDesc {
/// Generates a timestamp for reading from `read_handle` that is as fresh as possible, given
/// `upper`.
-fn as_of(read_handle: &ReadHandle<SourceData, (), Timestamp, Diff>, upper: Timestamp) -> Timestamp {
+fn as_of(
+ read_handle: &ReadHandle<SourceData, (), Timestamp, StorageDiff>,
+ upper: Timestamp,
+) -> Timestamp {
let since = read_handle.since().clone();
let mut as_of = upper.checked_sub(1).unwrap_or_else(|| {
panic!("catalog persist shard should be initialize, found upper: {upper:?}")
@@ -1879,7 +1889,7 @@ async fn fetch_catalog_upgrade_shard_version(
/// The output is consolidated and sorted by timestamp in ascending order.
#[mz_ore::instrument(level = "debug")]
async fn snapshot_binary(
- read_handle: &mut ReadHandle<SourceData, (), Timestamp, Diff>,
+ read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
as_of: Timestamp,
metrics: &Arc<Metrics>,
) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator {
@@ -1897,7 +1907,7 @@ async fn snapshot_binary(
/// The output is consolidated and sorted by timestamp in ascending order.
#[mz_ore::instrument(level = "debug")]
async fn snapshot_binary_inner(
- read_handle: &mut ReadHandle<SourceData, (), Timestamp, Diff>,
+ read_handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
as_of: Timestamp,
) -> impl Iterator<Item = StateUpdate<StateUpdateKindJson>> + DoubleEndedIterator {
let snapshot = read_handle
@@ -2009,7 +2019,7 @@ impl UnopenedPersistCatalogState {
.values
.into_iter()
.filter(|((k, _), _, diff)| {
- soft_assert_eq_or_log!(*diff, 1, "trace is consolidated");
+ soft_assert_eq_or_log!(**diff, 1, "trace is consolidated");
&key == k
})
.collect();
@@ -2022,9 +2032,9 @@ impl UnopenedPersistCatalogState {
let mut updates: Vec<_> = prev_values
.into_iter()
- .map(|((k, v), _, _)| (T::update(k, v), -1))
+ .map(|((k, v), _, _)| (T::update(k, v), -Diff::ONE))
.collect();
- updates.push((T::update(key, value), 1));
+ updates.push((T::update(key, value), Diff::ONE));
// We must fence out all other catalogs, if we haven't already, since we are writing.
match self.fenceable_token.generate_unfenced_token(self.mode)? {
Some((fence_updates, current_fenceable_token)) => {
@@ -2071,10 +2081,10 @@ impl UnopenedPersistCatalogState {
.values
.into_iter()
.filter(|((k, _), _, diff)| {
- soft_assert_eq_or_log!(*diff, 1, "trace is consolidated");
+ soft_assert_eq_or_log!(**diff, 1, "trace is consolidated");
&key == k
})
- .map(|((k, v), _, _)| (T::update(k, v), -1))
+ .map(|((k, v), _, _)| (T::update(k, v), -Diff::ONE))
.collect();
// We must fence out all other catalogs, if we haven't already, since we are writing.
diff --git c/src/catalog/src/durable/transaction.rs i/src/catalog/src/durable/transaction.rs
index 9c740ddcff..35a0535fd0 100644
--- c/src/catalog/src/durable/transaction.rs
+++ i/src/catalog/src/durable/transaction.rs
@@ -212,7 +212,7 @@ impl<'a> Transaction<'a> {
pub fn insert_audit_log_events(&mut self, events: impl IntoIterator<Item = VersionedEvent>) {
let events = events
.into_iter()
- .map(|event| (AuditLogKey { event }, 1, self.op_id));
+ .map(|event| (AuditLogKey { event }, Diff::ONE, self.op_id));
self.audit_log_updates.extend(events);
}
@@ -466,7 +466,7 @@ impl<'a> Transaction<'a> {
) -> Result<(), CatalogError> {
let key = ClusterKey { id: cluster_id };
- match self.clusters.update(
+ match *self.clusters.update(
|k, v| {
if *k == key {
let mut value = v.clone();
@@ -494,7 +494,7 @@ impl<'a> Transaction<'a> {
) -> Result<(), CatalogError> {
let key = ClusterReplicaKey { id: replica_id };
- match self.cluster_replicas.update(|k, v| {
+ match *self.cluster_replicas.update(|k, v| {
if *k == key {
let mut value = v.clone();
value.name = replica_to_name.to_string();
@@ -2949,7 +2949,7 @@ where
let pending = self.pending.get(k).map(Vec::as_slice).unwrap_or_default();
let mut updates = Vec::with_capacity(pending.len() + 1);
if let Some(initial) = self.initial.get(k) {
- updates.push((initial, 1));
+ updates.push((initial, Diff::ONE));
}
updates.extend(
pending
@@ -3038,7 +3038,7 @@ where
self.pending.entry(k).or_default().push(TransactionUpdate {
value: v,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
soft_assert_no_log!(self.verify().is_ok());
Ok(())
@@ -3057,24 +3057,24 @@ where
f: F,
ts: Timestamp,
) -> Result<Diff, DurableCatalogError> {
- let mut changed = 0;
+ let mut changed = Diff::ZERO;
let mut keys = BTreeSet::new();
// Keep a copy of pending in case of uniqueness violation.
let pending = self.pending.clone();
self.for_values_mut(|p, k, v| {
if let Some(next) = f(k, v) {
- changed += 1;
+ changed += Diff::ONE;
keys.insert(k.clone());
let updates = p.entry(k.clone()).or_default();
updates.push(TransactionUpdate {
value: v.clone(),
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
updates.push(TransactionUpdate {
value: next,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
}
});
@@ -3148,26 +3148,26 @@ where
entry.push(TransactionUpdate {
value: prev,
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
entry.push(TransactionUpdate {
value: v,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
}
(Some(v), None) => {
entry.push(TransactionUpdate {
value: v,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
}
(None, Some(prev)) => {
entry.push(TransactionUpdate {
value: prev,
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
}
(None, None) => {}
@@ -3211,26 +3211,26 @@ where
entry.push(TransactionUpdate {
value: prev,
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
entry.push(TransactionUpdate {
value: v,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
}
(Some(v), None) => {
entry.push(TransactionUpdate {
value: v,
ts,
- diff: 1,
+ diff: Diff::ONE,
});
}
(None, Some(prev)) => {
entry.push(TransactionUpdate {
value: prev,
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
}
(None, None) => {}
@@ -3266,7 +3266,7 @@ where
p.entry(k.clone()).or_default().push(TransactionUpdate {
value: v.clone(),
ts,
- diff: -1,
+ diff: -Diff::ONE,
});
}
});
@@ -3339,14 +3339,17 @@ mod tests {
}
let mut table: BTreeMap<Vec<u8>, String> = BTreeMap::new();
- fn commit(table: &mut BTreeMap<Vec<u8>, String>, mut pending: Vec<(Vec<u8>, String, i64)>) {
+ fn commit(
+ table: &mut BTreeMap<Vec<u8>, String>,
+ mut pending: Vec<(Vec<u8>, String, Diff)>,
+ ) {
// Sort by diff so that we process retractions first.
pending.sort_by(|a, b| a.2.cmp(&b.2));
for (k, v, diff) in pending {
- if diff == -1 {
+ if *diff == -1 {
let prev = table.remove(&k);
assert_eq!(prev, Some(v));
- } else if diff == 1 {
+ } else if *diff == 1 {
let prev = table.insert(k, v);
assert_eq!(prev, None);
} else {
@@ -3370,7 +3373,7 @@ mod tests {
table_txn
.update(|_k, _v| Some("v3".to_string()), 2)
.unwrap(),
- 1
+ Diff::ONE
);
// Uniqueness violation.
@@ -3399,10 +3402,10 @@ mod tests {
assert_eq!(
pending,
vec![
- (1i64.to_le_bytes().to_vec(), "v1".to_string(), -1),
- (1i64.to_le_bytes().to_vec(), "v3".to_string(), 1),
- (2i64.to_le_bytes().to_vec(), "v2".to_string(), -1),
- (3i64.to_le_bytes().to_vec(), "v4".to_string(), 1),
+ (1i64.to_le_bytes().to_vec(), "v1".to_string(), -Diff::ONE),
+ (1i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
+ (2i64.to_le_bytes().to_vec(), "v2".to_string(), -Diff::ONE),
+ (3i64.to_le_bytes().to_vec(), "v4".to_string(), Diff::ONE),
]
);
commit(&mut table, pending);
@@ -3447,9 +3450,9 @@ mod tests {
assert_eq!(
pending,
vec![
- (1i64.to_le_bytes().to_vec(), "v3".to_string(), -1),
- (1i64.to_le_bytes().to_vec(), "v5".to_string(), 1),
- (5i64.to_le_bytes().to_vec(), "v3".to_string(), 1),
+ (1i64.to_le_bytes().to_vec(), "v3".to_string(), -Diff::ONE),
+ (1i64.to_le_bytes().to_vec(), "v5".to_string(), Diff::ONE),
+ (5i64.to_le_bytes().to_vec(), "v3".to_string(), Diff::ONE),
]
);
commit(&mut table, pending);
@@ -3523,8 +3526,8 @@ mod tests {
assert_eq!(
pending,
vec![
- (1i64.to_le_bytes().to_vec(), "v5".to_string(), -1),
- (3i64.to_le_bytes().to_vec(), "v6".to_string(), 1),
+ (1i64.to_le_bytes().to_vec(), "v5".to_string(), -Diff::ONE),
+ (3i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
]
);
commit(&mut table, pending);
@@ -3577,9 +3580,9 @@ mod tests {
assert_eq!(
pending,
vec![
- (1i64.to_le_bytes().to_vec(), "v6".to_string(), 1),
- (3i64.to_le_bytes().to_vec(), "v6".to_string(), -1),
- (42i64.to_le_bytes().to_vec(), "v7".to_string(), 1),
+ (1i64.to_le_bytes().to_vec(), "v6".to_string(), Diff::ONE),
+ (3i64.to_le_bytes().to_vec(), "v6".to_string(), -Diff::ONE),
+ (42i64.to_le_bytes().to_vec(), "v7".to_string(), Diff::ONE),
]
);
commit(&mut table, pending);
@@ -3631,8 +3634,8 @@ mod tests {
assert_eq!(
pending,
vec![
- (1i64.…
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
3a45c7e to
7ee7dbb
Compare
|
I tried to use It appears we're using the type name somwhere, and not the codec name. |
teskje
left a comment
There was a problem hiding this comment.
LGTM. Got some minor comments, mostly about style.
Signed-off-by: Moritz Hoffmann <mh@materialize.com>
4a134b8 to
5e5ebe9
Compare
5e5ebe9 to
203ac3b
Compare
Huh! Totally unexpected to me. Thanks for flagging it. |
|
Ah, I see! That name isn't serialized; it's used to sanity check the in-memory cache. It looks like only some of the references to that shard (from your error, I guess the catalog shard) were switched to the new diff type, and others were still using |
| .expect("known to be `Ok` from `is_ok()` call above") | ||
| { | ||
| if diff < &1 { | ||
| if diff.is_negative() { |
There was a problem hiding this comment.
No, that's wrong! I'll send a PR to fix it. This can cause some rows with diff 0 to survive, but tracing the path of the data further down, it looks as if we're consolidating before writing, so we should remove the rows with the zero diff later on. Still, good to fix!
MaterializeInc#32028 introduced a potential behavior change where we would skip rows with a diff less than 1, and now we skip rows with a negative diff. This change restores the former behavior. Signed-off-by: Moritz Hoffmann <mh@materialize.com>
#32028 introduced a potential behavior change where we would skip rows with a diff less than 1, and now we skip rows with a negative diff. This change restores the former behavior. Reported by @def- in #32028 (review) ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. Signed-off-by: Moritz Hoffmann <mh@materialize.com>
Follow up to #32028 to enable panics on overflow in CI. ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Signed-off-by: Moritz Hoffmann <mh@materialize.com>
The change introduced an overflowing type, which overflows in release builds, too. It implements all relevant traits to be usable where we would use a number normally, including differential and a few other traits defined by external crates.
It uses the overflowing type in all parts of Materialize where possible. Persist still requires the old plain
i64type, so we introduce aStorageDifftype alias, and at the boundary to persist convert to/from the overflowing diff type.This isn't ideal because storage can overflow, too, on consolidation, but changing the type there seems harder and is out of my expertise.
The diff is quite large to account for boilerplate changes, everywhere where one would write 1 previously, one now has to convert type or mention
Diff::ONEdirectly.The first commit adds the overflowing type and changes Materialize to use it, the second adds a feature flag to control the overflow behavior at runtime. By default, overflows are ignored, corresponding to the current behavior.
The
ore_overflowing_behaviorfeature flag controls the behavior, and can have one of the following values:ignoreoverflows silently,soft_panicpanics on overflow if soft assertions are enabled, otherwise warns, andpanicunconditionally panics on overflow.Checklist
$T ⇔ Proto$Tmapping (possibly in a backwards-incompatible way), then it is tagged with aT-protolabel.