Skip to content

Commit 1bbd31f

Browse files
committed
Assert that the schemas match at append time
1 parent 234e6ca commit 1bbd31f

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

src/persist-client/src/write.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use differential_dataflow::trace::Description;
1919
use futures::StreamExt;
2020
use futures::stream::FuturesUnordered;
2121
use mz_dyncfg::Config;
22-
use mz_ore::instrument;
2322
use mz_ore::task::RuntimeExt;
23+
use mz_ore::{instrument, soft_panic_or_log};
2424
use mz_persist::location::Blob;
2525
use mz_persist_types::schema::SchemaId;
2626
use mz_persist_types::{Codec, Codec64};
@@ -32,7 +32,7 @@ use timely::PartialOrder;
3232
use timely::order::TotalOrder;
3333
use timely::progress::{Antichain, Timestamp};
3434
use tokio::runtime::Handle;
35-
use tracing::{Instrument, debug_span, info, warn};
35+
use tracing::{Instrument, debug_span, error, info, warn};
3636
use uuid::Uuid;
3737

3838
use crate::batch::{
@@ -568,6 +568,23 @@ where
568568
TODO: Error on very old versions once the leaked blob detector exists."
569569
)
570570
}
571+
fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
572+
if batch_schema.is_empty() {
573+
// Schema is either trivial or missing!
574+
return;
575+
}
576+
let batch_schema: A::Schema = A::decode_schema(batch_schema);
577+
if *writer_schema != batch_schema {
578+
error!(
579+
?writer_schema,
580+
?batch_schema,
581+
"writer and batch schemas should be identical"
582+
);
583+
soft_panic_or_log!("writer and batch schemas should be identical");
584+
}
585+
}
586+
assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
587+
assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
571588
}
572589

573590
let lower = expected_upper.clone();

0 commit comments

Comments
 (0)