Skip to content

Commit 6a4e233

Browse files
authored
Merge pull request #34140 from bkirwi/batch-match
[persist] Check schemas in batch append
2 parents 360defa + 1bbd31f commit 6a4e233

File tree

6 files changed

+40
-2
lines changed

6 files changed

+40
-2
lines changed

src/persist-client/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ fn main() {
5252
.type_attribute(".", "#[allow(missing_docs)]")
5353
.btree_map(["."])
5454
.bytes([
55+
".mz_persist_client.batch.ProtoBatch",
5556
".mz_persist_client.internal.diff.ProtoStateFieldDiffs",
5657
".mz_persist_client.internal.service.ProtoPushDiff",
5758
".mz_persist_client.internal.state.ProtoEncodedSchemas",

src/persist-client/src/batch.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import "persist-client/src/internal/state.proto";
2121
message ProtoBatch {
2222
string shard_id = 1;
2323
string version = 2;
24+
bytes key_schema = 5;
25+
bytes val_schema = 6;
26+
2427
mz_persist_client.internal.state.ProtoHollowBatch batch = 3;
2528

2629
reserved 4;

src/persist-client/src/batch.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ pub struct Batch<K, V, T, D> {
8181
/// The version of Materialize which wrote this batch.
8282
pub(crate) version: Version,
8383

84+
/// The encoded schemas of the data in the batch.
85+
pub(crate) schemas: (Bytes, Bytes),
86+
8487
/// A handle to the data represented by this batch.
8588
pub(crate) batch: HollowBatch<T>,
8689

@@ -121,13 +124,15 @@ where
121124
blob: Arc<dyn Blob>,
122125
shard_metrics: Arc<ShardMetrics>,
123126
version: Version,
127+
schemas: (Bytes, Bytes),
124128
batch: HollowBatch<T>,
125129
) -> Self {
126130
Self {
127131
batch_delete_enabled,
128132
metrics,
129133
shard_metrics,
130134
version,
135+
schemas,
131136
batch,
132137
blob,
133138
_phantom: PhantomData,
@@ -209,6 +214,8 @@ where
209214
shard_id: self.shard_metrics.shard_id.into_proto(),
210215
version: self.version.to_string(),
211216
batch: Some(self.batch.into_proto()),
217+
key_schema: self.schemas.0.clone(),
218+
val_schema: self.schemas.1.clone(),
212219
};
213220
self.mark_consumed();
214221
ret
@@ -729,6 +736,10 @@ where
729736
self.blob,
730737
shard_metrics,
731738
self.version,
739+
(
740+
K::encode_schema(&*self.write_schemas.key),
741+
V::encode_schema(&*self.write_schemas.val),
742+
),
732743
HollowBatch::new(desc, run_parts, total_updates, run_meta, run_splits),
733744
);
734745

src/persist-client/src/internal/machine.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,6 +1538,10 @@ pub mod datadriven {
15381538
Arc::clone(&self.client.blob),
15391539
self.client.metrics.shards.shard(&self.shard_id, "test"),
15401540
self.client.cfg.build_version.clone(),
1541+
(
1542+
<String>::encode_schema(&*SCHEMAS.key),
1543+
<()>::encode_schema(&*SCHEMAS.val),
1544+
),
15411545
hollow,
15421546
)
15431547
}

src/persist-client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ impl PersistClient {
603603
metrics: Arc::clone(&self.metrics),
604604
shard_metrics,
605605
version: Version::parse(&batch.version).expect("valid transmittable batch"),
606+
schemas: (batch.key_schema, batch.val_schema),
606607
batch: batch
607608
.batch
608609
.into_rust_if_some("ProtoBatch::batch")

src/persist-client/src/write.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use differential_dataflow::trace::Description;
1919
use futures::StreamExt;
2020
use futures::stream::FuturesUnordered;
2121
use mz_dyncfg::Config;
22-
use mz_ore::instrument;
2322
use mz_ore::task::RuntimeExt;
23+
use mz_ore::{instrument, soft_panic_or_log};
2424
use mz_persist::location::Blob;
2525
use mz_persist_types::schema::SchemaId;
2626
use mz_persist_types::{Codec, Codec64};
@@ -32,7 +32,7 @@ use timely::PartialOrder;
3232
use timely::order::TotalOrder;
3333
use timely::progress::{Antichain, Timestamp};
3434
use tokio::runtime::Handle;
35-
use tracing::{Instrument, debug_span, info, warn};
35+
use tracing::{Instrument, debug_span, error, info, warn};
3636
use uuid::Uuid;
3737

3838
use crate::batch::{
@@ -568,6 +568,23 @@ where
568568
TODO: Error on very old versions once the leaked blob detector exists."
569569
)
570570
}
571+
fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
572+
if batch_schema.is_empty() {
573+
// Schema is either trivial or missing!
574+
return;
575+
}
576+
let batch_schema: A::Schema = A::decode_schema(batch_schema);
577+
if *writer_schema != batch_schema {
578+
error!(
579+
?writer_schema,
580+
?batch_schema,
581+
"writer and batch schemas should be identical"
582+
);
583+
soft_panic_or_log!("writer and batch schemas should be identical");
584+
}
585+
}
586+
assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
587+
assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
571588
}
572589

573590
let lower = expected_upper.clone();
@@ -822,6 +839,7 @@ where
822839
metrics: Arc::clone(&self.metrics),
823840
shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
824841
version: Version::parse(&batch.version).expect("valid transmittable batch"),
842+
schemas: (batch.key_schema, batch.val_schema),
825843
batch: batch
826844
.batch
827845
.into_rust_if_some("ProtoBatch::batch")

0 commit comments

Comments
 (0)