diff --git a/src/storage-types/src/sources.rs b/src/storage-types/src/sources.rs index 18e14cfa83076..56f2e1ca5dab7 100644 --- a/src/storage-types/src/sources.rs +++ b/src/storage-types/src/sources.rs @@ -142,6 +142,43 @@ impl IngestionDescription { } } +impl IngestionDescription { + pub fn indexed_source_exports( + &self, + primary_source_id: &GlobalId, + ) -> BTreeMap> { + let mut source_exports = BTreeMap::new(); + // `self.source_exports` contains all source-exports (e.g. subsources & tables) as well as + // the primary source relation. It's not guaranteed that the primary source relation is + // the first element in the map, however it much be set to output 0 to align with + // assumptions in source implementations. This is the case even if the primary + // export will not have any data output for it, since output 0 is the convention + // used for errors that should halt the entire source dataflow. + // TODO: See if we can simplify this to avoid needing to include the primary output + // if no data will be exported to it. This requires refactoring all error output handling. + let mut next_output = 1; + for (id, export) in self.source_exports.iter() { + let ingestion_output = if id == primary_source_id { + 0 + } else { + let idx = next_output; + next_output += 1; + idx + }; + + source_exports.insert( + *id, + IndexedSourceExport { + ingestion_output, + export: export.clone(), + }, + ); + } + + source_exports + } +} + impl AlterCompatible for IngestionDescription { fn alter_compatible( &self, @@ -242,6 +279,14 @@ impl IntoInlineConnection } } +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)] +pub struct IndexedSourceExport { + /// Which output index from the ingestion this export refers to. + pub ingestion_output: usize, + /// The SourceExport + pub export: SourceExport, +} + #[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Arbitrary)] pub struct SourceExport { /// The collection metadata needed to write the exported data diff --git a/src/storage/src/decode.rs b/src/storage/src/decode.rs index 49d56c18c469f..c31fee48501e6 100644 --- a/src/storage/src/decode.rs +++ b/src/storage/src/decode.rs @@ -571,7 +571,7 @@ pub fn render_decode_delimited( let health = transient_errors.map(|err: Rc| { let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None); HealthStatusMessage { - id: None, + index: 0, namespace: if matches!(&*err, CsrConnectError::Ssh(_)) { StatusNamespace::Ssh } else { diff --git a/src/storage/src/healthcheck.rs b/src/storage/src/healthcheck.rs index 8ba30113a70f3..2c62f039d14c9 100644 --- a/src/storage/src/healthcheck.rs +++ b/src/storage/src/healthcheck.rs @@ -212,14 +212,16 @@ impl<'a> From<&'a OverallStatus> for Status { #[derive(Debug)] struct HealthState { + id: GlobalId, healths: PerWorkerHealthStatus, last_reported_status: Option, halt_with: Option<(StatusNamespace, HealthStatusUpdate)>, } impl HealthState { - fn new(worker_count: usize) -> HealthState { + fn new(id: GlobalId, worker_count: usize) -> HealthState { HealthState { + id, healths: PerWorkerHealthStatus { errors_by_worker: vec![Default::default(); worker_count], }, @@ -308,9 +310,10 @@ impl HealthOperator for DefaultWriter { /// A health message consumed by the `health_operator`. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct HealthStatusMessage { - /// The object that this status message is about. When None, it refers to the entire ingestion - /// as a whole. When Some, it refers to a specific subsource. - pub id: Option, + /// The index of the object this message describes. + /// + /// Useful for sub-objects like sub-sources. + pub index: usize, /// The namespace of the health update. pub namespace: StatusNamespace, /// The update itself. @@ -336,6 +339,8 @@ pub(crate) fn health_operator<'g, G, P>( object_type: &'static str, // An indexed stream of health updates. Indexes are configured in `configs`. health_stream: &Stream, + // A map of index to collection id that we intend to report on. + configs: BTreeMap, // An impl of `HealthOperator` that configures the output behavior of this operator. health_operator_impl: P, // Whether or not we should actually write namespaced errors in the `details` column. @@ -360,7 +365,7 @@ where } else { // We'll route all the work to a single arbitrary worker; // there's not much to do, and we need a global view. - usize::cast_from(mark_starting.iter().next().hashed()) % worker_count + usize::cast_from(configs.keys().next().hashed()) % worker_count }; let is_active_worker = chosen_worker_id == healthcheck_worker_id; @@ -376,22 +381,20 @@ where ); let button = health_op.build(move |mut _capabilities| async move { - let mut health_states: BTreeMap<_, _> = mark_starting - .iter() - .copied() - .chain([halting_id]) - .map(|id| (id, HealthState::new(worker_count))) + let mut health_states: BTreeMap<_, _> = configs + .into_iter() + .map(|(output_idx, id)| (output_idx, HealthState::new(id, worker_count))) .collect(); // Write the initial starting state to the status shard for all managed objects if is_active_worker { - for (id, state) in health_states.iter_mut() { - if mark_starting.contains(id) { + for state in health_states.values_mut() { + if mark_starting.contains(&state.id) { let status = OverallStatus::Starting; let timestamp = mz_ore::now::to_datetime(now()); health_operator_impl .record_new_status( - *id, + state.id, timestamp, (&status).into(), status.error(), @@ -406,19 +409,21 @@ where } } - let mut outputs_seen = BTreeMap::>::new(); + let mut outputs_seen = BTreeMap::>::new(); while let Some(event) = input.next().await { if let AsyncEvent::Data(_cap, rows) = event { for (worker_id, message) in rows { let HealthStatusMessage { - id, + index: output_index, namespace: ns, update: health_event, } = message; - let id = id.unwrap_or(halting_id); let HealthState { - healths, halt_with, .. - } = match health_states.get_mut(&id) { + id, + healths, + halt_with, + .. + } = match health_states.get_mut(&output_index) { Some(health) => health, // This is a health status update for a sub-object_type that we did not request to // be generated, which means it doesn't have a GlobalId and should not be @@ -429,7 +434,7 @@ where // Its important to track `new_round` per-namespace, so namespaces are reasoned // about in `merge_update` independently. let new_round = outputs_seen - .entry(id) + .entry(output_index) .or_insert_with(BTreeSet::new) .insert(ns.clone()); @@ -449,12 +454,15 @@ where let mut halt_with_outer = None; - while let Some((id, _)) = outputs_seen.pop_first() { + while let Some((output_index, _)) = outputs_seen.pop_first() { let HealthState { + id, healths, last_reported_status, halt_with, - } = health_states.get_mut(&id).expect("known to exist"); + } = health_states + .get_mut(&output_index) + .expect("known to exist"); let new_status = healths.decide_status(); @@ -468,7 +476,7 @@ where let timestamp = mz_ore::now::to_datetime(now()); health_operator_impl .record_new_status( - id, + *id, timestamp, (&new_status).into(), new_status.error(), @@ -483,7 +491,7 @@ where // Set halt with if None. if halt_with_outer.is_none() && halt_with.is_some() { - halt_with_outer = Some((id, halt_with.clone())); + halt_with_outer = Some((*id, halt_with.clone())); } } @@ -613,7 +621,7 @@ mod tests { Update(TestUpdate { worker_id: 1, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -630,7 +638,7 @@ mod tests { Update(TestUpdate { worker_id: 1, namespace: StatusNamespace::Generator, - id: Some(GlobalId::User(1)), + input_index: 1, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -641,7 +649,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: Some(GlobalId::User(1)), + input_index: 1, update: HealthStatusUpdate::stalled("uhoh".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -655,7 +663,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: Some(GlobalId::User(1)), + input_index: 1, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -695,7 +703,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: Some(GlobalId::User(1)), + input_index: 1, update: HealthStatusUpdate::stalled("uhoh".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -734,7 +742,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled("uhoh".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -747,7 +755,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Kafka, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled("uhoh".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -761,7 +769,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Kafka, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -774,7 +782,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -810,7 +818,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Ssh, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled("uhoh".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -823,7 +831,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Ssh, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled("uhoh2".to_string(), None), }), AssertStatus(vec![StatusToAssert { @@ -836,7 +844,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Ssh, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), // We haven't starting running yet, as a `Default` namespace hasn't told us. @@ -848,7 +856,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -882,7 +890,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled( "uhoh".to_string(), Some("hint1".to_string()), @@ -898,7 +906,7 @@ mod tests { Update(TestUpdate { worker_id: 1, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled( "uhoh2".to_string(), Some("hint2".to_string()), @@ -916,7 +924,7 @@ mod tests { Update(TestUpdate { worker_id: 1, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::stalled( "uhoh2".to_string(), Some("hint3".to_string()), @@ -934,7 +942,7 @@ mod tests { Update(TestUpdate { worker_id: 0, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -948,7 +956,7 @@ mod tests { Update(TestUpdate { worker_id: 1, namespace: StatusNamespace::Generator, - id: None, + input_index: 0, update: HealthStatusUpdate::running(), }), AssertStatus(vec![StatusToAssert { @@ -996,7 +1004,7 @@ mod tests { struct TestUpdate { worker_id: u64, namespace: StatusNamespace, - id: Option, + input_index: usize, update: HealthStatusUpdate, } @@ -1103,6 +1111,7 @@ mod tests { *inputs.first_key_value().unwrap().0, "source_test", &input, + inputs.iter().map(|(id, index)| (*index, *id)).collect(), TestWriter { sender: out_tx, input_mapping: inputs, @@ -1182,7 +1191,7 @@ mod tests { capability.as_ref().unwrap(), ( element.worker_id, - element.id, + element.input_index, element.namespace, element.update, ), @@ -1193,7 +1202,7 @@ mod tests { }); let output = output.exchange(|d| d.0).map(|d| HealthStatusMessage { - id: d.1, + index: d.1, namespace: d.2, update: d.3, }); diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 1703d7fa3e136..eec8955682722 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -119,6 +119,7 @@ impl StorageMetrics { primary_source_id: GlobalId, worker_id: usize, data_shard: &mz_persist_client::ShardId, + output_index: usize, ) -> source::SourcePersistSinkMetrics { source::SourcePersistSinkMetrics::new( &self.source_defs.source_defs, @@ -126,16 +127,18 @@ impl StorageMetrics { primary_source_id, worker_id, data_shard, + output_index, ) } /// Get a `SourceMetrics` for the given id and worker id. pub(crate) fn get_source_metrics( &self, + name: &str, id: GlobalId, worker_id: usize, ) -> source::SourceMetrics { - source::SourceMetrics::new(&self.source_defs.source_defs, id, worker_id) + source::SourceMetrics::new(&self.source_defs.source_defs, name, id, worker_id) } /// Get a `PgMetrics` for the given id. diff --git a/src/storage/src/metrics/source.rs b/src/storage/src/metrics/source.rs index b9fb5034c5728..f7a0d2d82a615 100644 --- a/src/storage/src/metrics/source.rs +++ b/src/storage/src/metrics/source.rs @@ -33,6 +33,7 @@ pub mod postgres; #[derive(Clone, Debug)] pub(crate) struct GeneralSourceMetricDefs { // Source metrics + pub(crate) capability: UIntGaugeVec, pub(crate) resume_upper: IntGaugeVec, pub(crate) commit_upper_ready_times: UIntGaugeVec, pub(crate) commit_upper_accepted_times: UIntGaugeVec, @@ -56,6 +57,11 @@ impl GeneralSourceMetricDefs { Self { // TODO(guswynn): some of these metrics are not clear when subsources are involved, and // should be fixed + capability: registry.register(metric!( + name: "mz_capability", + help: "The current capability for this dataflow.", + var_labels: ["topic", "source_id", "worker_id"], + )), resume_upper: registry.register(metric!( name: "mz_resume_upper", // TODO(guswynn): should this also track the resumption frontier operator? @@ -80,33 +86,33 @@ impl GeneralSourceMetricDefs { progress: registry.register(metric!( name: "mz_source_progress", help: "A timestamp gauge representing forward progess in the data shard", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), row_inserts: registry.register(metric!( name: "mz_source_row_inserts", help: "A counter representing the actual number of rows being inserted to the data shard", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), row_retractions: registry.register(metric!( name: "mz_source_row_retractions", help: "A counter representing the actual number of rows being retracted from the data shard", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), error_inserts: registry.register(metric!( name: "mz_source_error_inserts", help: "A counter representing the actual number of errors being inserted to the data shard", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), error_retractions: registry.register(metric!( name: "mz_source_error_retractions", help: "A counter representing the actual number of errors being retracted from the data shard", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), persist_sink_processed_batches: registry.register(metric!( name: "mz_source_processed_batches", help: "A counter representing the number of persist sink batches with actual data \ we have successfully processed.", - var_labels: ["source_id", "shard", "worker_id"], + var_labels: ["source_id", "output", "shard", "worker_id"], )), } } @@ -114,6 +120,8 @@ impl GeneralSourceMetricDefs { /// General metrics about sources that are not specific to the source type pub(crate) struct SourceMetrics { + /// Value of the capability associated with this source + pub(crate) capability: DeleteOnDropGauge<'static, AtomicU64, Vec>, /// The resume_upper for a source. pub(crate) resume_upper: DeleteOnDropGauge<'static, AtomicI64, Vec>, /// The number of ready remap bindings that are held in the reclock commit upper operator. @@ -126,10 +134,17 @@ impl SourceMetrics { /// Initializes source metrics for a given (source_id, worker_id) pub(crate) fn new( defs: &GeneralSourceMetricDefs, + source_name: &str, source_id: GlobalId, worker_id: usize, ) -> SourceMetrics { + let labels = &[ + source_name.to_string(), + source_id.to_string(), + worker_id.to_string(), + ]; SourceMetrics { + capability: defs.capability.get_delete_on_drop_metric(labels.to_vec()), resume_upper: defs .resume_upper .get_delete_on_drop_metric(vec![source_id.to_string()]), @@ -161,31 +176,37 @@ impl SourcePersistSinkMetrics { parent_source_id: GlobalId, worker_id: usize, shard_id: &mz_persist_client::ShardId, + output_index: usize, ) -> SourcePersistSinkMetrics { let shard = shard_id.to_string(); SourcePersistSinkMetrics { progress: defs.progress.get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard.clone(), worker_id.to_string(), ]), row_inserts: defs.row_inserts.get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard.clone(), worker_id.to_string(), ]), row_retractions: defs.row_retractions.get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard.clone(), worker_id.to_string(), ]), error_inserts: defs.error_inserts.get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard.clone(), worker_id.to_string(), ]), error_retractions: defs.error_retractions.get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard.clone(), worker_id.to_string(), ]), @@ -193,6 +214,7 @@ impl SourcePersistSinkMetrics { .persist_sink_processed_batches .get_delete_on_drop_metric(vec![ parent_source_id.to_string(), + output_index.to_string(), shard, worker_id.to_string(), ]), diff --git a/src/storage/src/render.rs b/src/storage/src/render.rs index aff96cb240bca..7cf5954cf7f18 100644 --- a/src/storage/src/render.rs +++ b/src/storage/src/render.rs @@ -272,7 +272,7 @@ pub fn build_ingestion_dataflow( let base_source_config = RawSourceCreationConfig { name: format!("{}-{}", connection.name(), primary_source_id), id: primary_source_id, - source_exports: description.source_exports.clone(), + source_exports: description.indexed_source_exports(&primary_source_id), timestamp_interval: description.desc.timestamp_interval, worker_id: mz_scope.index(), worker_count: mz_scope.peers(), @@ -297,7 +297,7 @@ pub fn build_ingestion_dataflow( busy_signal: Arc::clone(&busy_signal), }; - let (outputs, source_health, source_tokens) = match connection { + let (mut outputs, source_health, source_tokens) = match connection { GenericSourceConnection::Kafka(c) => crate::render::sources::render_source( mz_scope, &debug_name, @@ -337,38 +337,46 @@ pub fn build_ingestion_dataflow( }; tokens.extend(source_tokens); + let mut health_configs = BTreeMap::new(); + let mut upper_streams = vec![]; let mut health_streams = vec![source_health]; - for (export_id, (ok, err)) in outputs { - let export = &description.source_exports[&export_id]; + let source_exports = description.indexed_source_exports(&primary_source_id); + for (export_id, export) in source_exports { + let (ok, err) = outputs + .get_mut(export.ingestion_output) + .expect("known to exist"); let source_data = ok.map(Ok).concat(&err.map(Err)); let metrics = storage_state.metrics.get_source_persist_sink_metrics( export_id, primary_source_id, worker_id, - &export.storage_metadata.data_shard, + &export.export.storage_metadata.data_shard, + export.ingestion_output, ); tracing::info!( id = %primary_source_id, - "timely-{worker_id}: persisting export {} of {}", - export_id, - primary_source_id + "timely-{worker_id}: persisting export #{} of {} into {}", + export.ingestion_output, + primary_source_id, + export_id ); let (upper_stream, errors, sink_tokens) = crate::render::persist_sink::render( mz_scope, export_id, - export.storage_metadata.clone(), + export.export.storage_metadata.clone(), source_data, storage_state, metrics, + export.ingestion_output, Arc::clone(&busy_signal), ); upper_streams.push(upper_stream); tokens.extend(sink_tokens); - let sink_health = errors.map(move |err: Rc| { + let sink_health = errors.map(|err: Rc| { let halt_status = HealthStatusUpdate::halting(err.display_with_causes().to_string(), None); HealthStatusMessage { @@ -378,6 +386,7 @@ pub fn build_ingestion_dataflow( } }); health_streams.push(sink_health.leave()); + health_configs.insert(export.ingestion_output, export_id); } mz_scope @@ -399,6 +408,7 @@ pub fn build_ingestion_dataflow( primary_source_id, "source", &health_stream, + health_configs, crate::healthcheck::DefaultWriter { command_tx: Rc::clone(&storage_state.internal_cmd_tx), updates: Rc::clone(&storage_state.object_status_updates), @@ -445,6 +455,13 @@ pub fn build_export_dataflow( crate::render::sinks::render_sink(scope, storage_state, id, &description); tokens.extend(sink_tokens); + let mut health_configs = BTreeMap::new(); + health_configs.insert( + // There is only 1 sink (as opposed to many sub-sources), so we just use a single + // index. + 0, id, + ); + // Note that sinks also have only 1 active worker, which simplifies the work that // `health_operator` has to do internally. let health_token = crate::healthcheck::health_operator( @@ -454,6 +471,7 @@ pub fn build_export_dataflow( id, "sink", &health_stream, + health_configs, crate::healthcheck::DefaultWriter { command_tx: Rc::clone(&storage_state.internal_cmd_tx), updates: Rc::clone(&storage_state.object_status_updates), diff --git a/src/storage/src/render/persist_sink.rs b/src/storage/src/render/persist_sink.rs index d3c42090751b6..3e034b8aa7af1 100644 --- a/src/storage/src/render/persist_sink.rs +++ b/src/storage/src/render/persist_sink.rs @@ -159,6 +159,22 @@ impl AddAssign<&BatchMetrics> for BatchMetrics { } } +impl BatchMetrics { + fn is_empty(&self) -> bool { + let BatchMetrics { + inserts: self_inserts, + retractions: self_retractions, + error_inserts: self_error_inserts, + error_retractions: self_error_retractions, + } = self; + + *self_inserts == 0 + && *self_retractions == 0 + && *self_error_inserts == 0 + && *self_error_retractions == 0 + } +} + /// Manages batches and metrics. struct BatchBuilderAndMetadata where @@ -281,6 +297,7 @@ pub(crate) fn render( desired_collection: Collection, Diff>, storage_state: &StorageState, metrics: SourcePersistSinkMetrics, + output_index: usize, busy_signal: Arc, ) -> ( Stream, @@ -324,6 +341,7 @@ where &written_batches, persist_clients, storage_state, + output_index, metrics, Arc::clone(&busy_signal), ); @@ -880,6 +898,7 @@ fn append_batches( batches: &Stream>, persist_clients: Arc, storage_state: &StorageState, + output_index: usize, metrics: SourcePersistSinkMetrics, busy_signal: Arc, ) -> ( @@ -1010,6 +1029,18 @@ where let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum()); let mut batches_frontier = Antichain::from_elem(Timestamp::minimum()); + // Pause the source to prevent committing the snapshot, + // if the failpoint is configured + let mut pg_snapshot_pause = false; + (|| { + fail::fail_point!("pg_snapshot_pause", |val| { + pg_snapshot_pause = val.map_or(false, |index| { + let index: usize = index.parse().unwrap(); + index == output_index + }); + }); + })(); + loop { tokio::select! { Some(event) = descriptions_input.next() => { @@ -1143,6 +1174,18 @@ where let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::>(); + // We evaluate this above to avoid checking an environment variable + // in a hot loop. Note that we only pause before we emit + // non-empty batches, because we do want to bump the upper + // with empty ones before we start ingesting the snapshot. + // + // This is a fairly complex failure case we need to check + // see `test/cluster/pg-snapshot-partial-failure` for more + // information. + if pg_snapshot_pause && !to_append.is_empty() && !batch_metrics.is_empty() { + futures::future::pending().await + } + let result = { let maybe_err = if *read_only_rx.borrow() { diff --git a/src/storage/src/render/sources.rs b/src/storage/src/render/sources.rs index 05777e53b99d5..279b9160cd5cf 100644 --- a/src/storage/src/render/sources.rs +++ b/src/storage/src/render/sources.rs @@ -11,7 +11,6 @@ //! //! See [`render_source`] for more details. -use std::collections::BTreeMap; use std::iter; use std::sync::Arc; @@ -33,7 +32,6 @@ use mz_timely_util::builder_async::PressOnDropButton; use mz_timely_util::operator::CollectionExt; use mz_timely_util::order::refine_antichain; use serde::{Deserialize, Serialize}; -use timely::container::CapacityContainerBuilder; use timely::dataflow::operators::generic::operator::empty; use timely::dataflow::operators::{Concat, ConnectLoop, Feedback, Leave, Map, OkErr}; use timely::dataflow::scopes::{Child, Scope}; @@ -46,6 +44,9 @@ use crate::source::types::{DecodeResult, SourceOutput, SourceRender}; use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig}; use crate::upsert::UpsertKey; +/// The output index for health streams, used to handle multiplexed streams +pub(crate) type OutputIndex = usize; + /// _Renders_ complete _differential_ [`Collection`]s /// that represent the final source and its errors /// as requested by the original `CREATE SOURCE` statement, @@ -66,13 +67,10 @@ pub fn render_source<'g, G, C>( storage_state: &crate::storage_state::StorageState, base_source_config: RawSourceCreationConfig, ) -> ( - BTreeMap< - GlobalId, - ( - Collection, Row, Diff>, - Collection, DataflowError, Diff>, - ), - >, + Vec<( + Collection, Row, Diff>, + Collection, DataflowError, Diff>, + )>, Stream, Vec, ) @@ -102,7 +100,7 @@ where // Build the _raw_ ok and error sources using `create_raw_source` and the // correct `SourceReader` implementations - let (exports, mut health, source_tokens) = source::create_raw_source( + let (streams, mut health, source_tokens) = source::create_raw_source( scope, storage_state, resume_stream, @@ -113,25 +111,18 @@ where needed_tokens.extend(source_tokens); - let mut outputs = BTreeMap::new(); - for (export_id, export) in exports { - type CB = CapacityContainerBuilder; - let (ok_stream, err_stream) = - export.map_fallible::, CB<_>, _, _, _>("export-demux-ok-err", |r| r); - + let mut outputs = vec![]; + for (export_id, ok_source, err_source, data_config) in streams { // All sources should push their various error streams into this vector, // whose contents will be concatenated and inserted along the collection. // All subsources include the non-definite errors of the ingestion - let error_collections = vec![err_stream.map(DataflowError::from)]; + let error_collections = vec![err_source.map(DataflowError::from)]; - let data_config = base_source_config.source_exports[&export_id] - .data_config - .clone(); let (ok, err, extra_tokens, health_stream) = render_source_stream( scope, dataflow_debug_name, export_id, - ok_stream, + ok_source, data_config, description.clone(), error_collections, @@ -140,7 +131,7 @@ where starter.clone(), ); needed_tokens.extend(extra_tokens); - outputs.insert(export_id, (ok, err)); + outputs.push((ok, err)); health = health.concat(&health_stream.leave()); } @@ -374,8 +365,8 @@ where ( upsert.leave(), health_update - .map(|(id, update)| HealthStatusMessage { - id, + .map(|(index, update)| HealthStatusMessage { + index, namespace: StatusNamespace::Upsert, update, }) diff --git a/src/storage/src/sink/kafka.rs b/src/storage/src/sink/kafka.rs index 74c6ebd6822bb..703b1c42b3a90 100644 --- a/src/storage/src/sink/kafka.rs +++ b/src/storage/src/sink/kafka.rs @@ -190,7 +190,7 @@ impl> SinkRender for KafkaSinkConnection { ); let running_status = Some(HealthStatusMessage { - id: None, + index: 0, update: HealthStatusUpdate::Running, namespace: StatusNamespace::Kafka, }) @@ -802,7 +802,7 @@ fn sink_collection>( }; HealthStatusMessage { - id: None, + index: 0, update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint), namespace: if matches!(*error, ContextCreationError::Ssh(_)) { StatusNamespace::Ssh @@ -1435,7 +1435,7 @@ fn encode_collection( }); let statuses = errors.map(|error| HealthStatusMessage { - id: None, + index: 0, update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None), namespace: StatusNamespace::Kafka, }); diff --git a/src/storage/src/source/generator.rs b/src/storage/src/source/generator.rs index 258ef563ccecf..4f4d0b8248334 100644 --- a/src/storage/src/source/generator.rs +++ b/src/storage/src/source/generator.rs @@ -15,10 +15,8 @@ use std::time::Duration; use differential_dataflow::AsCollection; use futures::StreamExt; -use itertools::Itertools; -use mz_ore::cast::CastFrom; use mz_ore::iter::IteratorExt; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::Row; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::load_generator::{ Event, Generator, KeyValueLoadGenerator, LoadGenerator, LoadGeneratorOutput, @@ -27,8 +25,6 @@ use mz_storage_types::sources::load_generator::{ use mz_storage_types::sources::{MzOffset, SourceExportDetails, SourceTimestamp}; use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton}; use mz_timely_util::containers::stack::AccountedStackBuilder; -use timely::container::CapacityContainerBuilder; -use timely::dataflow::operators::core::Partition; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; use tokio::time::{interval_at, Instant}; @@ -140,7 +136,7 @@ impl GeneratorKind { committed_uppers: impl futures::Stream> + 'static, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -148,17 +144,20 @@ impl GeneratorKind { ) { // figure out which output types from the generator belong to which output indexes let mut output_map = BTreeMap::new(); - for (idx, (_, export)) in config.source_exports.iter().enumerate() { - let output_type = match &export.details { + for (_, export) in config.source_exports.iter() { + let output_type = match &export.export.details { SourceExportDetails::LoadGenerator(details) => details.output, // This is an export that doesn't need any data output to it. SourceExportDetails::None => continue, - _ => panic!("unexpected source export details: {:?}", export.details), + _ => panic!( + "unexpected source export details: {:?}", + export.export.details + ), }; output_map .entry(output_type) .or_insert_with(Vec::new) - .push(idx); + .push(export.ingestion_output); } match self { @@ -201,7 +200,7 @@ impl SourceRender for LoadGeneratorSourceConnection { committed_uppers: impl futures::Stream> + 'static, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -231,7 +230,7 @@ fn render_simple_generator>( committed_uppers: impl futures::Stream> + 'static, output_map: BTreeMap>, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -240,23 +239,6 @@ fn render_simple_generator>( let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone()); let (data_output, stream) = builder.new_output::>(); - let partition_count = u64::cast_from(config.source_exports.len()); - let data_streams: Vec<_> = stream.partition::, _, _>( - partition_count, - |((output, data), time, diff): &( - (usize, Result), - MzOffset, - Diff, - )| { - let output = u64::cast_from(*output); - (output, (data.clone(), time.clone(), diff.clone())) - }, - ); - let mut data_collections = BTreeMap::new(); - for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) { - data_collections.insert(*id, data_stream.as_collection()); - } - let (health_output, health_stream) = builder.new_output(); let (stats_output, stats_stream) = builder.new_output(); @@ -346,7 +328,7 @@ fn render_simple_generator>( None => continue, }; - let message: Result = Ok(SourceMessage { + let message = Ok(SourceMessage { key: Row::default(), value, metadata: Row::default(), @@ -369,7 +351,7 @@ fn render_simple_generator>( health_output.give( &health_cap, HealthStatusMessage { - id: None, + index: 0, namespace: StatusNamespace::Generator, update: HealthStatusUpdate::running(), }, @@ -435,7 +417,7 @@ fn render_simple_generator>( }); ( - data_collections, + stream.as_collection(), None, health_stream, stats_stream, diff --git a/src/storage/src/source/generator/key_value.rs b/src/storage/src/source/generator/key_value.rs index f5f2904529bc0..60f374efe56e7 100644 --- a/src/storage/src/source/generator/key_value.rs +++ b/src/storage/src/source/generator/key_value.rs @@ -13,10 +13,9 @@ use std::sync::Arc; use differential_dataflow::AsCollection; use futures::stream::StreamExt; -use itertools::Itertools; use mz_ore::cast::CastFrom; use mz_ore::iter::IteratorExt; -use mz_repr::{Datum, Diff, GlobalId, Row}; +use mz_repr::{Datum, Diff, Row}; use mz_storage_types::errors::DataflowError; use mz_storage_types::sources::load_generator::{KeyValueLoadGenerator, LoadGeneratorOutput}; use mz_storage_types::sources::{MzOffset, SourceTimestamp}; @@ -25,7 +24,6 @@ use mz_timely_util::containers::stack::AccountedStackBuilder; use rand::rngs::StdRng; use rand::{RngCore, SeedableRng}; use timely::container::CapacityContainerBuilder; -use timely::dataflow::operators::core::Partition; use timely::dataflow::operators::{Concat, ToStream}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; @@ -43,7 +41,7 @@ pub fn render>( start_signal: impl std::future::Future + 'static, output_map: BTreeMap>, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -55,23 +53,6 @@ pub fn render>( let mut builder = AsyncOperatorBuilder::new(config.name.clone(), scope.clone()); let (data_output, stream) = builder.new_output::>(); - let partition_count = u64::cast_from(config.source_exports.len()); - let data_streams: Vec<_> = stream.partition::, _, _>( - partition_count, - |((output, data), time, diff): &( - (usize, Result), - MzOffset, - Diff, - )| { - let output = u64::cast_from(*output); - (output, (data.clone(), time.clone(), diff.clone())) - }, - ); - let mut data_collections = BTreeMap::new(); - for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) { - data_collections.insert(*id, data_stream.as_collection()); - } - let (_progress_output, progress_stream) = builder.new_output::>(); let (stats_output, stats_stream) = builder.new_output::>(); @@ -226,7 +207,7 @@ pub fn render>( }); let status = [HealthStatusMessage { - id: None, + index: 0, namespace: StatusNamespace::Generator, update: HealthStatusUpdate::running(), }] @@ -234,7 +215,7 @@ pub fn render>( let stats_stream = stats_stream.concat(&steady_state_stats_stream); ( - data_collections, + stream.as_collection(), Some(progress_stream), status, stats_stream, diff --git a/src/storage/src/source/kafka.rs b/src/storage/src/source/kafka.rs index 4d8cbe11516ac..f88f1acc23106 100644 --- a/src/storage/src/source/kafka.rs +++ b/src/storage/src/source/kafka.rs @@ -19,7 +19,6 @@ use anyhow::anyhow; use chrono::{DateTime, NaiveDateTime}; use differential_dataflow::{AsCollection, Hashable}; use futures::StreamExt; -use itertools::Itertools; use maplit::btreemap; use mz_kafka_util::client::{ get_partitions, GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, @@ -38,7 +37,9 @@ use mz_storage_types::errors::{ use mz_storage_types::sources::kafka::{ KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound, }; -use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp}; +use mz_storage_types::sources::{ + IndexedSourceExport, MzOffset, SourceExport, SourceExportDetails, SourceTimestamp, +}; use mz_timely_util::antichain::AntichainExt; use mz_timely_util::builder_async::{ Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, @@ -55,7 +56,6 @@ use rdkafka::{ClientContext, Message, TopicPartitionList}; use serde::{Deserialize, Serialize}; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; -use timely::dataflow::operators::core::Partition; use timely::dataflow::operators::{Broadcast, Capability}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; @@ -164,7 +164,6 @@ fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool { } struct SourceOutputInfo { - id: GlobalId, output_index: usize, resume_upper: Antichain, metadata_columns: Vec, @@ -186,7 +185,7 @@ impl SourceRender for KafkaSourceConnection { resume_uppers: impl futures::Stream> + 'static, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -195,34 +194,11 @@ impl SourceRender for KafkaSourceConnection { ) { let (metadata, probes, metadata_token) = render_metadata_fetcher(scope, self.clone(), config.clone()); - let (data, progress, health, stats, reader_token) = render_reader( - scope, - self, - config.clone(), - resume_uppers, - metadata, - start_signal, - ); - - let partition_count = u64::cast_from(config.source_exports.len()); - let data_streams: Vec<_> = data.inner.partition::, _, _>( - partition_count, - |((output, data), time, diff): &( - (usize, Result), - _, - Diff, - )| { - let output = u64::cast_from(*output); - (output, (data.clone(), time.clone(), diff.clone())) - }, - ); - let mut data_collections = BTreeMap::new(); - for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) { - data_collections.insert(*id, data_stream.as_collection()); - } + let (data, progress, health, stats, reader_token) = + render_reader(scope, self, config, resume_uppers, metadata, start_signal); ( - data_collections, + data, Some(progress), health, stats, @@ -261,11 +237,15 @@ fn render_reader>( let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline); let mut outputs = vec![]; - for (idx, (id, export)) in config.source_exports.iter().enumerate() { - let SourceExport { - details, - storage_metadata: _, - data_config: _, + for (id, export) in &config.source_exports { + let IndexedSourceExport { + ingestion_output, + export: + SourceExport { + details, + storage_metadata: _, + data_config: _, + }, } = export; let resume_upper = Antichain::from_iter( config @@ -290,9 +270,8 @@ fn render_reader>( }; let output = SourceOutputInfo { - id: *id, resume_upper, - output_index: idx, + output_index: *ingestion_output, metadata_columns, }; outputs.push(output); @@ -437,7 +416,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace: if matches!(e, ContextCreationError::Ssh(_)) { StatusNamespace::Ssh } else { @@ -640,7 +619,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace, update: HealthStatusUpdate::running(), }, @@ -660,7 +639,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace: StatusNamespace::Kafka, update, }, @@ -672,7 +651,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace: StatusNamespace::Ssh, update, }, @@ -715,7 +694,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace: StatusNamespace::Kafka, update: status, }, @@ -822,7 +801,7 @@ fn render_reader>( health_output.give( &health_cap, HealthStatusMessage { - id: Some(output.id), + index: output.output_index, namespace: StatusNamespace::Kafka, update: status, }, diff --git a/src/storage/src/source/mysql.rs b/src/storage/src/source/mysql.rs index 47ad454359797..b99460fc6b5ff 100644 --- a/src/storage/src/source/mysql.rs +++ b/src/storage/src/source/mysql.rs @@ -50,24 +50,19 @@ //! The error streams from both of those operators are published to the source status and also //! trigger a restart of the dataflow. -use std::collections::BTreeMap; use std::convert::Infallible; use std::fmt; use std::io; use std::rc::Rc; -use differential_dataflow::AsCollection; -use itertools::Itertools; -use mz_ore::cast::CastFrom; use mz_repr::Diff; -use mz_repr::GlobalId; use mz_storage_types::errors::{DataflowError, SourceError}; +use mz_storage_types::sources::IndexedSourceExport; use mz_storage_types::sources::SourceExport; use mz_timely_util::containers::stack::{AccountedStackBuilder, StackWrapper}; use serde::{Deserialize, Serialize}; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pushers::Tee; -use timely::dataflow::operators::core::Partition; use timely::dataflow::operators::{CapabilitySet, Concat, Map, ToStream}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; @@ -108,7 +103,7 @@ impl SourceRender for MySqlSourceConnection { resume_uppers: impl futures::Stream> + 'static, _start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -117,12 +112,19 @@ impl SourceRender for MySqlSourceConnection { ) { // Collect the source outputs that we will be exporting. let mut source_outputs = Vec::new(); - for (idx, (id, export)) in config.source_exports.iter().enumerate() { - let SourceExport { - details, - storage_metadata: _, - data_config: _, - } = export; + for ( + id, + IndexedSourceExport { + ingestion_output, + export: + SourceExport { + details, + storage_metadata: _, + data_config: _, + }, + }, + ) in &config.source_exports + { let details = match details { SourceExportDetails::MySql(details) => details, // This is an export that doesn't need any data output to it. @@ -142,8 +144,8 @@ impl SourceRender for MySqlSourceConnection { ); let name = MySqlTableName::new(&desc.schema_name, &desc.name); source_outputs.push(SourceOutputInfo { - output_index: idx, table_name: name.clone(), + output_index: *ingestion_output, desc, text_columns: details.text_columns.clone(), exclude_columns: details.exclude_columns.clone(), @@ -173,32 +175,14 @@ impl SourceRender for MySqlSourceConnection { ); let (stats_stream, stats_err, probe_stream, stats_token) = - statistics::render(scope.clone(), config.clone(), self, resume_uppers); + statistics::render(scope.clone(), config, self, resume_uppers); let stats_stream = stats_stream.concat(&snapshot_stats); let updates = snapshot_updates.concat(&repl_updates); - let partition_count = u64::cast_from(config.source_exports.len()); - let data_streams: Vec<_> = updates - .inner - .partition::, _, _>( - partition_count, - |((output, data), time, diff): &( - (usize, Result), - _, - Diff, - )| { - let output = u64::cast_from(*output); - (output, (data.clone(), time.clone(), diff.clone())) - }, - ); - let mut data_collections = BTreeMap::new(); - for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) { - data_collections.insert(*id, data_stream.as_collection()); - } let health_init = std::iter::once(HealthStatusMessage { - id: None, + index: 0, namespace: Self::STATUS_NAMESPACE, update: HealthStatusUpdate::Running, }) @@ -222,7 +206,7 @@ impl SourceRender for MySqlSourceConnection { }; HealthStatusMessage { - id: None, + index: 0, namespace: namespace.clone(), update, } @@ -230,7 +214,7 @@ impl SourceRender for MySqlSourceConnection { let health = health_init.concat(&health_errs); ( - data_collections, + updates, Some(uppers), health, stats_stream, diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs index 2541161893a9c..aa62437c7f8fa 100644 --- a/src/storage/src/source/postgres.rs +++ b/src/storage/src/source/postgres.rs @@ -84,25 +84,22 @@ use std::convert::Infallible; use std::rc::Rc; use std::time::Duration; -use differential_dataflow::AsCollection; use itertools::Itertools as _; use mz_expr::{EvalError, MirScalarExpr}; -use mz_ore::cast::CastFrom; use mz_ore::error::ErrorExt; use mz_postgres_util::desc::PostgresTableDesc; use mz_postgres_util::{simple_query_opt, Client, PostgresError}; -use mz_repr::{Datum, Diff, GlobalId, Row}; +use mz_repr::{Datum, Row}; use mz_sql_parser::ast::display::AstDisplay; use mz_sql_parser::ast::Ident; use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails}; use mz_storage_types::sources::postgres::CastType; use mz_storage_types::sources::{ - MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp, + IndexedSourceExport, MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, + SourceTimestamp, }; use mz_timely_util::builder_async::PressOnDropButton; use serde::{Deserialize, Serialize}; -use timely::container::CapacityContainerBuilder; -use timely::dataflow::operators::core::Partition; use timely::dataflow::operators::{Concat, Map, ToStream}; use timely::dataflow::{Scope, Stream}; use timely::progress::Antichain; @@ -130,7 +127,7 @@ impl SourceRender for PostgresSourceConnection { resume_uppers: impl futures::Stream> + 'static, _start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, @@ -139,12 +136,19 @@ impl SourceRender for PostgresSourceConnection { ) { // Collect the source outputs that we will be exporting into a per-table map. let mut table_info = BTreeMap::new(); - for (idx, (id, export)) in config.source_exports.iter().enumerate() { - let SourceExport { - details, - storage_metadata: _, - data_config: _, - } = export; + for ( + id, + IndexedSourceExport { + ingestion_output, + export: + SourceExport { + details, + storage_metadata: _, + data_config: _, + }, + }, + ) in &config.source_exports + { let details = match details { SourceExportDetails::Postgres(details) => details, // This is an export that doesn't need any data output to it. @@ -169,7 +173,7 @@ impl SourceRender for PostgresSourceConnection { table_info .entry(output.desc.oid) .or_insert_with(BTreeMap::new) - .insert(idx, output); + .insert(*ingestion_output, output); } let metrics = config.metrics.get_postgres_source_metrics(config.id); @@ -186,7 +190,7 @@ impl SourceRender for PostgresSourceConnection { let (repl_updates, uppers, stats_stream, probe_stream, repl_err, repl_token) = replication::render( scope.clone(), - config.clone(), + config, self, table_info, &rewinds, @@ -198,27 +202,9 @@ impl SourceRender for PostgresSourceConnection { let stats_stream = stats_stream.concat(&snapshot_stats); let updates = snapshot_updates.concat(&repl_updates); - let partition_count = u64::cast_from(config.source_exports.len()); - let data_streams: Vec<_> = updates - .inner - .partition::, _, _>( - partition_count, - |((output, data), time, diff): &( - (usize, Result), - MzOffset, - Diff, - )| { - let output = u64::cast_from(*output); - (output, (data.clone(), time.clone(), diff.clone())) - }, - ); - let mut data_collections = BTreeMap::new(); - for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) { - data_collections.insert(*id, data_stream.as_collection()); - } let init = std::iter::once(HealthStatusMessage { - id: None, + index: 0, namespace: Self::STATUS_NAMESPACE, update: HealthStatusUpdate::Running, }) @@ -246,7 +232,7 @@ impl SourceRender for PostgresSourceConnection { }; HealthStatusMessage { - id: None, + index: 0, namespace: namespace.clone(), update, } @@ -255,7 +241,7 @@ impl SourceRender for PostgresSourceConnection { let health = init.concat(&errs); ( - data_collections, + updates, Some(uppers), health, stats_stream, diff --git a/src/storage/src/source/source_reader_pipeline.rs b/src/storage/src/source/source_reader_pipeline.rs index b5539eca7623c..7d6ebe79e1254 100644 --- a/src/storage/src/source/source_reader_pipeline.rs +++ b/src/storage/src/source/source_reader_pipeline.rs @@ -34,6 +34,7 @@ use std::time::Duration; use differential_dataflow::lattice::Lattice; use differential_dataflow::{AsCollection, Collection, Hashable}; use futures::stream::StreamExt; +use itertools::Itertools; use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::error::ErrorExt; @@ -44,20 +45,23 @@ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::dyncfgs; use mz_storage_types::errors::DataflowError; -use mz_storage_types::sources::{SourceConnection, SourceExport, SourceTimestamp}; +use mz_storage_types::sources::{ + IndexedSourceExport, SourceConnection, SourceExportDataConfig, SourceTimestamp, +}; use mz_timely_util::antichain::AntichainExt; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, }; use mz_timely_util::capture::PusherCapture; +use mz_timely_util::operator::StreamExt as _; use mz_timely_util::reclock::reclock; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::capture::capture::Capture; use timely::dataflow::operators::capture::{Event, EventPusher}; use timely::dataflow::operators::core::Map as _; -use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc; -use timely::dataflow::operators::{Broadcast, CapabilitySet, Concat, Inspect, Leave}; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::operators::{Broadcast, CapabilitySet, Concat, Inspect, Leave, Partition}; use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, Stream}; use timely::order::TotalOrder; @@ -84,7 +88,7 @@ pub struct RawSourceCreationConfig { /// The ID of this instantiation of this source. pub id: GlobalId, /// The details of the outputs from this ingestion. - pub source_exports: BTreeMap>, + pub source_exports: BTreeMap>, /// The ID of the worker on which this operator is executing pub worker_id: usize, /// The total count of workers @@ -174,14 +178,12 @@ pub fn create_raw_source<'g, G: Scope, C>( source_connection: C, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap< + Vec<( GlobalId, - Collection< - Child<'g, G, mz_repr::Timestamp>, - Result, DataflowError>, - Diff, - >, - >, + Collection, SourceOutput, Diff>, + Collection, DataflowError, Diff>, + SourceExportDataConfig, + )>, Stream, Vec, ) @@ -197,7 +199,11 @@ where watch::channel(MutableAntichain::new_bottom(C::Time::minimum())); let (probed_upper_tx, probed_upper_rx) = watch::channel(None); - let source_metrics = Arc::new(config.metrics.get_source_metrics(id, worker_id)); + let source_metrics = Arc::new( + config + .metrics + .get_source_metrics(&config.name, id, worker_id), + ); let timestamp_desc = source_connection.timestamp_desc(); @@ -221,12 +227,13 @@ where Arc::clone(&source_metrics), ); - let mut reclocked_exports = BTreeMap::new(); + let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone()); + + let streams = demux_source_exports(config.clone(), reclocked); let config = config.clone(); - let reclocked_exports2 = &mut reclocked_exports; - let (health, source_tokens) = scope.parent.scoped("SourceTimeDomain", move |scope| { - let (exports, source_upper, health_stream, source_tokens) = source_render_operator( + let (streams, health, source_tokens) = scope.parent.scoped("SourceTimeDomain", move |scope| { + let (source, source_upper, health_stream, source_tokens) = source_render_operator( scope, config.clone(), source_connection, @@ -235,34 +242,30 @@ where start_signal, ); - for (id, export) in exports { - let (reclock_pusher, reclocked) = reclock(&remap_collection, config.as_of.clone()); - export - .inner - .map(move |(result, from_time, diff)| { - let result = match result { - Ok(msg) => Ok(SourceOutput { - key: msg.key.clone(), - value: msg.value.clone(), - metadata: msg.metadata.clone(), - from_time: from_time.clone(), - }), - Err(err) => Err(err.clone()), - }; - (result, from_time.clone(), *diff) - }) - .capture_into(PusherCapture(reclock_pusher)); - reclocked_exports2.insert(id, reclocked); - } + source + .inner + .map(move |((output, result), from_time, diff)| { + let result = match result { + Ok(msg) => Ok(SourceOutput { + key: msg.key.clone(), + value: msg.value.clone(), + metadata: msg.metadata.clone(), + from_time: from_time.clone(), + }), + Err(err) => Err(err.clone()), + }; + ((*output, result), from_time.clone(), *diff) + }) + .capture_into(PusherCapture(reclock_pusher)); source_upper.capture_into(FrontierCapture(ingested_upper_tx)); - (health_stream.leave(), source_tokens) + (streams, health_stream.leave(), source_tokens) }); tokens.extend(source_tokens); - (reclocked_exports, health, tokens) + (streams, health, tokens) } pub struct FrontierCapture(watch::Sender>); @@ -291,7 +294,7 @@ fn source_render_operator( resume_uppers: impl futures::Stream> + 'static, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Stream, Stream, Vec, @@ -311,8 +314,8 @@ where trace!(%upper, "timely-{worker_id} source({source_id}) received resume upper"); }); - let (exports, progress, health, stats, probes, tokens) = - source_connection.render(scope, config.clone(), resume_uppers, start_signal); + let (input_data, progress, health, stats, probes, tokens) = + source_connection.render(scope, config, resume_uppers, start_signal); crate::source::statistics::process_statistics( scope.clone(), @@ -323,106 +326,69 @@ where ); let name = format!("SourceGenericStats({})", source_id); - let mut builder = OperatorBuilderRc::new(name, scope.clone()); - - let (_, derived_progress) = builder.new_output::>(); - let (mut health_output, derived_health) = builder.new_output::>(); - - let mut export_collections = BTreeMap::new(); - let mut export_handles = vec![]; - // Loop invariant: The operator contains export_handles.len() inputs and - // export_handles.len() + 2 outputs - for (id, export) in exports { - // This output is not connected to any of the existing inputs. - let connection = vec![Antichain::new(); export_handles.len()]; - let (export_output, new_export) = - builder.new_output_connection::>(connection); - - // This input's frontier flow into the progress collection and the corresponding output - let mut connection = vec![Antichain::from_elem(Default::default()), Antichain::new()]; - // No frontier implications for other outputs - for _ in 0..export_handles.len() { - connection.push(Antichain::new()); - } - // Standard frontier implication for the corresponding output of this input - connection.push(Antichain::from_elem(Default::default())); - let export_input = builder.new_input_connection(&export.inner, Pipeline, connection); - export_handles.push((id, export_input, export_output)); - let new_export: StackedCollection> = - new_export.as_collection(); - export_collections.insert(id, new_export); - } + let mut builder = AsyncOperatorBuilder::new(name, scope.clone()); + + let (data_output, data) = builder.new_output::>(); + let (progress_output, derived_progress) = builder.new_output::>(); + let mut data_input = builder.new_input_for_many( + &input_data.inner, + Pipeline, + [&data_output, &progress_output], + ); + let (health_output, derived_health) = builder.new_output::>(); - let bytes_read_counter = config.metrics.source_defs.bytes_read.clone(); - let source_metrics = config.metrics.get_source_metrics(config.id, worker_id); + builder.build(move |mut caps| async move { + let health_cap = caps.pop().unwrap(); + drop(caps); - // Compute the overall resume upper to report for the ingestion - let resume_upper = Antichain::from_iter( - config - .resume_uppers - .values() - .flat_map(|f| f.iter().cloned()), - ); - source_metrics - .resume_upper - .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper)); + let mut statuses_by_idx = BTreeMap::new(); - builder.build(move |mut caps| { - let health_cap = caps.remove(1); - move |_frontiers| { - let mut statuses_by_idx = BTreeMap::new(); - let mut health_output = health_output.activate(); - - for (id, input, output) in export_handles.iter_mut() { - while let Some((cap, data)) = input.next() { - for (message, _, _) in data.iter() { - let status = match message { - Ok(_) => HealthStatusUpdate::running(), - // All errors coming into the data stream are definite. - // Downstream consumers of this data will preserve this - // status. - Err(ref error) => HealthStatusUpdate::stalled( - error.to_string(), - Some( - "retracting the errored value may resume the source" - .to_string(), - ), - ), - }; - - let statuses: &mut Vec<_> = statuses_by_idx.entry(*id).or_default(); - - let status = HealthStatusMessage { - id: Some(*id), - namespace: C::STATUS_NAMESPACE.clone(), - update: status, - }; - if statuses.last() != Some(&status) { - statuses.push(status); - } + while let Some(event) = data_input.next().await { + let AsyncEvent::Data([cap_data, _cap_progress], mut data) = event else { + continue; + }; + for ((output_index, message), _, _) in data.iter() { + let status = match message { + Ok(_) => HealthStatusUpdate::running(), + // All errors coming into the data stream are definite. + // Downstream consumers of this data will preserve this + // status. + Err(ref error) => HealthStatusUpdate::stalled( + error.to_string(), + Some("retracting the errored value may resume the source".to_string()), + ), + }; + + let statuses: &mut Vec<_> = statuses_by_idx.entry(*output_index).or_default(); + + let status = HealthStatusMessage { + index: *output_index, + namespace: C::STATUS_NAMESPACE.clone(), + update: status, + }; + if statuses.last() != Some(&status) { + statuses.push(status); + } - match message { - Ok(message) => { - source_statistics.inc_messages_received_by(1); - let key_len = u64::cast_from(message.key.byte_len()); - let value_len = u64::cast_from(message.value.byte_len()); - bytes_read_counter.inc_by(key_len + value_len); - source_statistics.inc_bytes_received_by(key_len + value_len); - } - Err(_) => {} - } + match message { + Ok(message) => { + source_statistics.inc_messages_received_by(1); + let key_len = u64::cast_from(message.key.byte_len()); + let value_len = u64::cast_from(message.value.byte_len()); + source_statistics.inc_bytes_received_by(key_len + value_len); } - let mut output = output.activate(); - output.session(&cap).give_container(data); + Err(_) => {} + } + } + data_output.give_container(&cap_data, &mut data); - for statuses in statuses_by_idx.values_mut() { - if statuses.is_empty() { - continue; - } - health_output.session(&health_cap).give_container(statuses); - statuses.clear() - } + for statuses in statuses_by_idx.values_mut() { + if statuses.is_empty() { + continue; } + + health_output.give_container(&health_cap, statuses); + statuses.clear() } } }); @@ -443,7 +409,7 @@ where }); ( - export_collections, + data.as_collection(), progress, health.concat(&derived_health), tokens, @@ -633,6 +599,128 @@ where (remap_stream.as_collection(), button.press_on_drop()) } +/// Demultiplexes a combined stream of all source exports into individual collections per source export +fn demux_source_exports( + config: RawSourceCreationConfig, + input: Collection, DataflowError>), Diff>, +) -> Vec<( + GlobalId, + Collection, Diff>, + Collection, + SourceExportDataConfig, +)> +where + G: Scope, + FromTime: SourceTimestamp, +{ + let RawSourceCreationConfig { + name, + id, + source_exports, + worker_id, + worker_count: _, + timestamp_interval: _, + storage_metadata: _, + as_of: _, + resume_uppers, + source_resume_uppers: _, + metrics, + now_fn: _, + persist_clients: _, + source_statistics: _, + shared_remap_upper: _, + config: _, + remap_collection_id: _, + busy_signal: _, + } = config; + + // TODO(guswynn): expose function + let bytes_read_counter = metrics.source_defs.bytes_read.clone(); + let source_metrics = metrics.get_source_metrics(&name, id, worker_id); + + // Compute the overall resume upper to report for the ingestion + let resume_upper = Antichain::from_iter(resume_uppers.values().flat_map(|f| f.iter().cloned())); + source_metrics + .resume_upper + .set(mz_persist_client::metrics::encode_ts_metric(&resume_upper)); + + let input = input.inner.inspect_core(move |event| match event { + Ok((_, data)) => { + for ((_idx, result), _time, _diff) in data.iter() { + if let Ok(msg) = result { + bytes_read_counter.inc_by(u64::cast_from(msg.key.byte_len())); + bytes_read_counter.inc_by(u64::cast_from(msg.value.byte_len())); + } + } + } + Err([time]) => source_metrics.capability.set(time.into()), + Err([]) => source_metrics + .capability + .set(mz_repr::Timestamp::MAX.into()), + // `mz_repr::Timestamp` is totally ordered and so there can be at most one element in the + // frontier. If this ever changes we need to rethink how we surface this in metrics. We + // will notice when that happens because the `expect()` will fail. + Err(_) => unreachable!("there can be at most one element for totally ordered times"), + }); + + // TODO(petrosagg): output the two streams directly + type CB = CapacityContainerBuilder; + let (ok_muxed_stream, err_muxed_stream) = input.map_fallible::, CB<_>, _, _, _>( + "reclock-demux-ok-err", + |((output, r), ts, diff)| match r { + Ok(ok) => Ok(((output, ok), ts, diff)), + Err(err) => Err(((output, err), ts, diff)), + }, + ); + + let exports_by_index = source_exports + .iter() + .map(|(id, export)| (export.ingestion_output, (*id, &export.export.data_config))) + .collect::>(); + + // We use the output index from the source export to route values to its ok + // and err streams. There is one partition per source export; however, + // source export indices can be non-contiguous, so we need to ensure we have + // at least as many partitions as we reference. + let partition_count = u64::cast_from( + exports_by_index + .keys() + .max() + .expect("source exports must have elements") + + 1, + ); + + let ok_streams: Vec<_> = ok_muxed_stream + .partition(partition_count, |((output, data), time, diff)| { + (u64::cast_from(output), (data, time, diff)) + }) + .into_iter() + .map(|stream| stream.as_collection()) + .collect(); + + let err_streams: Vec<_> = err_muxed_stream + .partition(partition_count, |((output, err), time, diff)| { + (u64::cast_from(output), (err, time, diff)) + }) + .into_iter() + .map(|stream| stream.as_collection()) + .collect(); + + ok_streams + .into_iter() + .zip_eq(err_streams) + .enumerate() + .filter_map(|(idx, (ok_stream, err_stream))| { + // We only want to return streams for partitions with a data config, which + // indicates that they actually have data. The filtered streams were just + // empty partitions for any non-continuous values in the output indexes. + exports_by_index + .get(&idx) + .map(|export| (export.0, ok_stream, err_stream, (*export.1).clone())) + }) + .collect() +} + /// Reclocks an `IntoTime` frontier stream into a `FromTime` frontier stream. This is used for the /// virtual (through persist) feedback edge so that we convert the `IntoTime` resumption frontier /// into the `FromTime` frontier that is used with the source's `OffsetCommiter`. @@ -652,7 +740,7 @@ where let scope = bindings.scope().clone(); let name = format!("ReclockCommitUpper({id})"); - let mut builder = OperatorBuilderRc::new(name, scope); + let mut builder = OperatorBuilder::new(name, scope); let mut bindings = builder.new_input(&bindings.inner, Pipeline); let _ = builder.new_input(committed_upper, Pipeline); diff --git a/src/storage/src/source/types.rs b/src/storage/src/source/types.rs index 81cb37fa9cdda..cfffe096b8d34 100644 --- a/src/storage/src/source/types.rs +++ b/src/storage/src/source/types.rs @@ -12,7 +12,6 @@ // https://github.com/tokio-rs/prost/issues/237 // #![allow(missing_docs)] -use std::collections::BTreeMap; use std::convert::Infallible; use std::fmt::Debug; use std::future::Future; @@ -21,7 +20,7 @@ use std::sync::Arc; use std::task::{ready, Context, Poll}; use differential_dataflow::Collection; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::{Diff, Row}; use mz_storage_types::errors::{DataflowError, DecodeError}; use mz_storage_types::sources::SourceTimestamp; use mz_timely_util::builder_async::PressOnDropButton; @@ -94,7 +93,7 @@ pub trait SourceRender { resume_uppers: impl futures::Stream> + 'static, start_signal: impl std::future::Future + 'static, ) -> ( - BTreeMap>>, + StackedCollection)>, Option>, Stream, Stream, diff --git a/src/storage/src/upsert.rs b/src/storage/src/upsert.rs index 1fb3382c59ecd..2a1ffef8139f7 100644 --- a/src/storage/src/upsert.rs +++ b/src/storage/src/upsert.rs @@ -21,7 +21,7 @@ use futures::StreamExt; use indexmap::map::Entry; use itertools::Itertools; use mz_ore::error::ErrorExt; -use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row}; +use mz_repr::{Datum, DatumVec, Diff, Row}; use mz_rocksdb::ValueIterator; use mz_storage_operators::metrics::BackpressureMetrics; use mz_storage_types::configuration::StorageConfiguration; @@ -44,6 +44,7 @@ use timely::progress::{Antichain, Timestamp}; use crate::healthcheck::HealthStatusUpdate; use crate::metrics::upsert::UpsertMetrics; +use crate::render::sources::OutputIndex; use crate::storage_state::StorageInstanceContext; use crate::upsert_continual_feedback; use autospill::AutoSpillBackend; @@ -215,7 +216,7 @@ pub(crate) fn upsert( backpressure_metrics: Option, ) -> ( Collection, Diff>, - Stream, HealthStatusUpdate)>, + Stream, Stream, PressOnDropButton, ) @@ -402,7 +403,7 @@ fn upsert_operator( snapshot_buffering_max: Option, ) -> ( Collection, Diff>, - Stream, HealthStatusUpdate)>, + Stream, Stream, PressOnDropButton, ) @@ -718,7 +719,7 @@ fn upsert_classic( snapshot_buffering_max: Option, ) -> ( Collection, Diff>, - Stream, HealthStatusUpdate)>, + Stream, Stream, PressOnDropButton, ) @@ -995,8 +996,8 @@ impl UpsertErrorEmitter for ( &mut AsyncOutputHandle< ::Timestamp, - CapacityContainerBuilder, HealthStatusUpdate)>>, - Tee<::Timestamp, Vec<(Option, HealthStatusUpdate)>>, + CapacityContainerBuilder>, + Tee<::Timestamp, Vec<(OutputIndex, HealthStatusUpdate)>>, >, &Capability<::Timestamp>, ) @@ -1012,13 +1013,13 @@ async fn process_upsert_state_error( e: anyhow::Error, health_output: &AsyncOutputHandle< ::Timestamp, - CapacityContainerBuilder, HealthStatusUpdate)>>, - Tee<::Timestamp, Vec<(Option, HealthStatusUpdate)>>, + CapacityContainerBuilder>, + Tee<::Timestamp, Vec<(OutputIndex, HealthStatusUpdate)>>, >, health_cap: &Capability<::Timestamp>, ) { let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None); - health_output.give(health_cap, (None, update)); + health_output.give(health_cap, (0, update)); std::future::pending::<()>().await; unreachable!("pending future never returns"); } diff --git a/src/storage/src/upsert_continual_feedback.rs b/src/storage/src/upsert_continual_feedback.rs index a822388a83aee..70be0642baf98 100644 --- a/src/storage/src/upsert_continual_feedback.rs +++ b/src/storage/src/upsert_continual_feedback.rs @@ -22,7 +22,7 @@ use differential_dataflow::{AsCollection, Collection}; use indexmap::map::Entry; use itertools::Itertools; use mz_ore::vec::VecExt; -use mz_repr::{Diff, GlobalId, Row}; +use mz_repr::{Diff, Row}; use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError}; use mz_timely_util::builder_async::{ Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton, @@ -38,6 +38,7 @@ use timely::progress::{Antichain, Timestamp}; use crate::healthcheck::HealthStatusUpdate; use crate::metrics::upsert::UpsertMetrics; +use crate::render::sources::OutputIndex; use crate::upsert::types::UpsertValueAndSize; use crate::upsert::types::{self as upsert_types, ValueMetadata}; use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend}; @@ -117,7 +118,7 @@ pub fn upsert_inner( snapshot_buffering_max: Option, ) -> ( Collection, Diff>, - Stream, HealthStatusUpdate)>, + Stream, Stream, PressOnDropButton, ) diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index 82f48c10d19ee..79b6214e12891 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -1720,6 +1720,39 @@ def kill_replica_with_delay() -> None: killer.join() +def workflow_pg_snapshot_partial_failure(c: Composition) -> None: + """Test PostgreSQL snapshot partial failure""" + + c.down(destroy_volumes=True) + + with c.override( + # Start postgres for the pg source + Testdrive(no_reset=True), + Clusterd( + name="clusterd1", + environment_extra=["FAILPOINTS=pg_snapshot_pause=return(2)"], + ), + ): + c.up("materialized", "postgres", "clusterd1") + + c.run_testdrive_files("pg-snapshot-partial-failure/01-configure-postgres.td") + c.run_testdrive_files("pg-snapshot-partial-failure/02-create-sources.td") + + c.run_testdrive_files( + "pg-snapshot-partial-failure/03-verify-good-sub-source.td" + ) + + c.kill("clusterd1") + # Restart the storage instance with the failpoint off... + with c.override( + # turn off the failpoint + Clusterd(name="clusterd1") + ): + c.run_testdrive_files("pg-snapshot-partial-failure/04-add-more-data.td") + c.up("clusterd1") + c.run_testdrive_files("pg-snapshot-partial-failure/05-verify-data.td") + + def workflow_test_compute_reconciliation_reuse(c: Composition) -> None: """ Test that compute reconciliation reuses existing dataflows. diff --git a/test/cluster/pg-snapshot-partial-failure/01-configure-postgres.td b/test/cluster/pg-snapshot-partial-failure/01-configure-postgres.td new file mode 100644 index 0000000000000..b1c5bcf0a1ad1 --- /dev/null +++ b/test/cluster/pg-snapshot-partial-failure/01-configure-postgres.td @@ -0,0 +1,28 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ postgres-execute connection=postgres://postgres:postgres@postgres +CREATE USER debezium WITH SUPERUSER PASSWORD 'debezium'; +GRANT ALL PRIVILEGES ON DATABASE "postgres" TO debezium; +GRANT ALL PRIVILEGES ON SCHEMA "public" TO debezium; + +CREATE PUBLICATION mz_source; + +$ postgres-execute connection=postgres://postgres:postgres@postgres +DROP TABLE IF EXISTS one; +CREATE TABLE one (f1 INTEGER); +ALTER TABLE one REPLICA IDENTITY FULL; +INSERT INTO one VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); +ALTER PUBLICATION mz_source ADD TABLE one; + +DROP TABLE IF EXISTS two; +CREATE TABLE two (f1 INTEGER); +ALTER TABLE two REPLICA IDENTITY FULL; +INSERT INTO two VALUES (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); +ALTER PUBLICATION mz_source ADD TABLE two; diff --git a/test/cluster/pg-snapshot-partial-failure/02-create-sources.td b/test/cluster/pg-snapshot-partial-failure/02-create-sources.td new file mode 100644 index 0000000000000..513a667cd67c4 --- /dev/null +++ b/test/cluster/pg-snapshot-partial-failure/02-create-sources.td @@ -0,0 +1,41 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true + +> DROP SOURCE IF EXISTS mz_source CASCADE; +> DROP SECRET IF EXISTS pgpass CASCADE; +> DROP CONNECTION IF EXISTS pg CASCADE; + +> CREATE CLUSTER storage REPLICAS ( + r1 ( + STORAGECTL ADDRESSES ['clusterd1:2100'], + STORAGE ADDRESSES ['clusterd1:2103'], + COMPUTECTL ADDRESSES ['clusterd1:2101'], + COMPUTE ADDRESSES ['clusterd1:2102'] + ) + ) + +> CREATE SECRET pgpass AS 'postgres' +> CREATE CONNECTION pg TO POSTGRES ( + HOST postgres, + DATABASE postgres, + USER postgres, + PASSWORD SECRET pgpass + ) + +> CREATE SOURCE mz_source + IN CLUSTER storage + FROM POSTGRES + CONNECTION pg + (PUBLICATION 'mz_source'); + +> CREATE TABLE one FROM SOURCE mz_source (REFERENCE one); +> CREATE TABLE two FROM SOURCE mz_source (REFERENCE two); diff --git a/test/cluster/pg-snapshot-partial-failure/03-verify-good-sub-source.td b/test/cluster/pg-snapshot-partial-failure/03-verify-good-sub-source.td new file mode 100644 index 0000000000000..8810ffe17d9bb --- /dev/null +++ b/test/cluster/pg-snapshot-partial-failure/03-verify-good-sub-source.td @@ -0,0 +1,25 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# The non-stalled sub-source should work. +> SELECT COUNT(*) FROM one; +10 + +# Here we note that the top-level source is also queryable,... +> SELECT * FROM mz_source + +# We want to verify that the second subsource won't produce a snapshot but we +# can't do anything better other than sleep for while and check that the upper +# remains at 0. Given the tiny amount of data we'll give just a few seconds +# after we have already confirmed that the first subsource has completed its +# snapshot. +$ sleep-is-probably-flaky-i-have-justified-my-need-with-a-comment duration=5s + +> SELECT write_frontier FROM mz_internal.mz_frontiers f JOIN mz_objects o ON f.object_id = o.id WHERE o.name = 'two'; +0 diff --git a/test/cluster/pg-snapshot-partial-failure/04-add-more-data.td b/test/cluster/pg-snapshot-partial-failure/04-add-more-data.td new file mode 100644 index 0000000000000..9436a33c8eb73 --- /dev/null +++ b/test/cluster/pg-snapshot-partial-failure/04-add-more-data.td @@ -0,0 +1,12 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +$ postgres-execute connection=postgres://postgres:postgres@postgres +INSERT INTO one VALUES (11); +INSERT INTO two VALUES (11); diff --git a/test/cluster/pg-snapshot-partial-failure/05-verify-data.td b/test/cluster/pg-snapshot-partial-failure/05-verify-data.td new file mode 100644 index 0000000000000..cb34f89b547bd --- /dev/null +++ b/test/cluster/pg-snapshot-partial-failure/05-verify-data.td @@ -0,0 +1,23 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# The non-stuck sub-source is fine... +> SELECT COUNT(*) FROM two; +11 + +# The above select statement should ensure that the persist-sink for `one` also +# received the "new" snapshot, ensuring we don't double count, but we add additional +# sleeping here to ensure that we aren't just reading the original snapshot committed +# to `one`. +> SELECT mz_unsafe.mz_sleep(10) + + +# But the other one has duplicated data! +> SELECT COUNT(*) FROM one; +11