Skip to content

Commit baf05bc

Browse files
authored
[copy_from] Proper cancelation via CancelOneshotIngestion message (#31136)
This PR fixes the `TODO(cf1)` related to canceling oneshot ingestions. It adds a `StorageCommand::CancelOneshotIngestion` that `reduce`s/compacts away a corresponding `StorageCommand::RunOneshotIngestion`, much like `ComputeCommand::Peek` and `ComputeCommand::CancelPeek`. We send a `StorageCommand::CancelOneshotIngestion` whenever a user has canceled a `COPY FROM` statement, but also the storage controller will send one whenever a `RunOneshotIngestion` command completes. ### Motivation Fix `TODO(cf1)` related to cancelation ### Checklist - [x] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [x] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [x] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [x] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](https://github.com/MaterializeInc/cloud/pull/5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [x] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 42253b8 commit baf05bc

File tree

10 files changed

+316
-79
lines changed

10 files changed

+316
-79
lines changed

src/adapter/src/active_compute_sink.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ use mz_expr::compare_columns;
2222
use mz_ore::cast::CastFrom;
2323
use mz_ore::now::EpochMillis;
2424
use mz_repr::adt::numeric;
25-
use mz_repr::{Datum, GlobalId, IntoRowIterator, Row, Timestamp};
25+
use mz_repr::{CatalogItemId, Datum, GlobalId, IntoRowIterator, Row, Timestamp};
2626
use mz_sql::plan::SubscribeOutput;
27+
use mz_storage_types::instances::StorageInstanceId;
2728
use timely::progress::Antichain;
2829
use tokio::sync::{mpsc, oneshot};
2930
use uuid::Uuid;
3031

3132
use crate::coord::peek::PeekResponseUnary;
32-
use crate::{AdapterError, ExecuteResponse};
33+
use crate::{AdapterError, ExecuteContext, ExecuteResponse};
3334

3435
#[derive(Debug)]
3536
/// A description of an active compute sink from the coordinator's perspective.
@@ -435,3 +436,16 @@ impl ActiveCopyTo {
435436
let _ = self.tx.send(message);
436437
}
437438
}
439+
440+
/// State we keep in the `Coordinator` to track active `COPY FROM` statements.
441+
#[derive(Debug)]
442+
pub(crate) struct ActiveCopyFrom {
443+
/// ID of the ingestion running in clusterd.
444+
pub ingestion_id: uuid::Uuid,
445+
/// The cluster this is currently running on.
446+
pub cluster_id: StorageInstanceId,
447+
/// The table we're currently copying into.
448+
pub table_id: CatalogItemId,
449+
/// Context of the SQL session that ran the statement.
450+
pub ctx: ExecuteContext,
451+
}

src/adapter/src/coord.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ use tracing::{debug, info, info_span, span, warn, Instrument, Level, Span};
171171
use tracing_opentelemetry::OpenTelemetrySpanExt;
172172
use uuid::Uuid;
173173

174-
use crate::active_compute_sink::ActiveComputeSink;
174+
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyFrom};
175175
use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
176176
use crate::client::{Client, Handle};
177177
use crate::command::{Command, ExecuteResponse};
@@ -1668,7 +1668,7 @@ pub struct Coordinator {
16681668
active_webhooks: BTreeMap<CatalogItemId, WebhookAppenderInvalidator>,
16691669
/// A map of active `COPY FROM` statements. The Coordinator waits for `clusterd`
16701670
/// to stage Batches in Persist that we will then link into the shard.
1671-
active_copies: BTreeMap<ConnectionId, ExecuteContext>,
1671+
active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,
16721672

16731673
/// A map from connection ids to a watch channel that is set to `true` if the connection
16741674
/// received a cancel request.

src/adapter/src/coord/ddl.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ impl Coordinator {
207207
let mut cluster_replicas_to_drop = vec![];
208208
let mut compute_sinks_to_drop = BTreeMap::new();
209209
let mut peeks_to_drop = vec![];
210+
let mut copies_to_drop = vec![];
210211
let mut clusters_to_create = vec![];
211212
let mut cluster_replicas_to_create = vec![];
212213
let mut update_tracing_config = false;
@@ -454,6 +455,18 @@ impl Coordinator {
454455
}
455456
}
456457

458+
// Clean up any pending `COPY` statements that rely on dropped relations or clusters.
459+
for (conn_id, pending_copy) in &self.active_copies {
460+
let dropping_table = table_gids_to_drop
461+
.iter()
462+
.any(|(item_id, _gid)| pending_copy.table_id == *item_id);
463+
let dropping_cluster = clusters_to_drop.contains(&pending_copy.cluster_id);
464+
465+
if dropping_table || dropping_cluster {
466+
copies_to_drop.push(conn_id.clone());
467+
}
468+
}
469+
457470
let storage_ids_to_drop = sources_to_drop
458471
.iter()
459472
.map(|(_, gid)| *gid)
@@ -666,6 +679,11 @@ impl Coordinator {
666679
}
667680
}
668681
}
682+
if !copies_to_drop.is_empty() {
683+
for conn_id in copies_to_drop {
684+
self.cancel_pending_copy(&conn_id);
685+
}
686+
}
669687
if !indexes_to_drop.is_empty() {
670688
self.drop_indexes(indexes_to_drop);
671689
}

src/adapter/src/coord/sequencer/inner/copy_from.rs

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use url::Url;
2323
use uuid::Uuid;
2424

2525
use crate::coord::sequencer::inner::return_if_err;
26-
use crate::coord::{Coordinator, TargetCluster};
26+
use crate::coord::{ActiveCopyFrom, Coordinator, TargetCluster};
2727
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
2828
use crate::session::{TransactionOps, WriteOp};
2929
use crate::{AdapterError, ExecuteContext, ExecuteResponse};
@@ -177,8 +177,16 @@ impl Coordinator {
177177
});
178178
});
179179
// Stash the execute context so we can cancel the COPY.
180-
self.active_copies
181-
.insert(ctx.session().conn_id().clone(), ctx);
180+
let conn_id = ctx.session().conn_id().clone();
181+
self.active_copies.insert(
182+
conn_id,
183+
ActiveCopyFrom {
184+
ingestion_id,
185+
cluster_id,
186+
table_id: id,
187+
ctx,
188+
},
189+
);
182190

183191
let _result = self
184192
.controller
@@ -193,11 +201,20 @@ impl Coordinator {
193201
table_id: CatalogItemId,
194202
batches: Vec<Result<ProtoBatch, String>>,
195203
) {
196-
let Some(mut ctx) = self.active_copies.remove(&conn_id) else {
197-
tracing::warn!(?conn_id, "got response for canceled COPY FROM");
204+
let Some(active_copy) = self.active_copies.remove(&conn_id) else {
205+
// Getting a successful response for a cancel COPY FROM is unexpected.
206+
tracing::warn!(%conn_id, ?batches, "got response for canceled COPY FROM");
198207
return;
199208
};
200209

210+
let ActiveCopyFrom {
211+
ingestion_id,
212+
cluster_id: _,
213+
table_id: _,
214+
mut ctx,
215+
} = active_copy;
216+
tracing::info!(%ingestion_id, num_batches = ?batches.len(), "received batches to append");
217+
201218
let mut all_batches = SmallVec::with_capacity(batches.len());
202219
let mut all_errors = SmallVec::<[String; 1]>::with_capacity(batches.len());
203220
let mut row_count = 0u64;
@@ -248,8 +265,21 @@ impl Coordinator {
248265
/// Cancel any active `COPY FROM` statements/oneshot ingestions.
249266
#[mz_ore::instrument(level = "debug")]
250267
pub(crate) fn cancel_pending_copy(&mut self, conn_id: &ConnectionId) {
251-
// TODO(cf1): Also cancel the dataflow running on clusterd.
252-
if let Some(ctx) = self.active_copies.remove(conn_id) {
268+
if let Some(ActiveCopyFrom {
269+
ingestion_id,
270+
cluster_id: _,
271+
table_id: _,
272+
ctx,
273+
}) = self.active_copies.remove(conn_id)
274+
{
275+
let cancel_result = self
276+
.controller
277+
.storage
278+
.cancel_oneshot_ingestion(ingestion_id);
279+
if let Err(err) = cancel_result {
280+
tracing::error!(?err, "failed to cancel OneshotIngestion");
281+
}
282+
253283
ctx.retire(Err(AdapterError::Canceled));
254284
}
255285
}

src/storage-client/src/client.proto

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,21 @@ message ProtoRunIngestionCommand {
4848
mz_storage_types.sources.ProtoIngestionDescription description = 2;
4949
}
5050

51-
message ProtoRunOneshotIngestionCommand {
51+
message ProtoRunOneshotIngestion {
5252
mz_proto.ProtoU128 ingestion_id = 1;
5353
mz_repr.global_id.ProtoGlobalId collection_id = 2;
5454
mz_storage_types.controller.ProtoCollectionMetadata storage_metadata = 3;
5555
mz_storage_types.oneshot_sources.ProtoOneshotIngestionRequest request = 4;
5656
}
5757

58+
message ProtoRunOneshotIngestionsCommand {
59+
repeated ProtoRunOneshotIngestion ingestions = 1;
60+
}
61+
62+
message ProtoCancelOneshotIngestionsCommand {
63+
repeated mz_proto.ProtoU128 ingestions = 1;
64+
}
65+
5866
message ProtoCreateSources {
5967
repeated ProtoRunIngestionCommand sources = 1;
6068
}
@@ -94,7 +102,8 @@ message ProtoStorageCommand {
94102
google.protobuf.Empty allow_writes = 7;
95103
ProtoRunSinks run_sinks = 4;
96104
mz_storage_types.parameters.ProtoStorageParameters update_configuration = 5;
97-
ProtoRunOneshotIngestionCommand oneshot_ingestion = 10;
105+
ProtoRunOneshotIngestionsCommand run_oneshot_ingestions = 10;
106+
ProtoCancelOneshotIngestionsCommand cancel_oneshot_ingestions = 11;
98107
}
99108
}
100109

src/storage-client/src/client.rs

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use smallvec::SmallVec;
4040
use timely::progress::frontier::{Antichain, MutableAntichain};
4141
use timely::PartialOrder;
4242
use tonic::{Request, Status as TonicStatus, Streaming};
43+
use uuid::Uuid;
4344

4445
use crate::client::proto_storage_server::ProtoStorage;
4546
use crate::metrics::ReplicaMetrics;
@@ -123,18 +124,29 @@ pub enum StorageCommand<T = mz_repr::Timestamp> {
123124
UpdateConfiguration(StorageParameters),
124125
/// Run the enumerated sources, each associated with its identifier.
125126
RunIngestions(Vec<RunIngestionCommand>),
126-
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
127-
/// Persist.
128-
///
129-
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
130-
/// responsible for linking the staged data into a shard.
131-
RunOneshotIngestion(RunOneshotIngestionCommand),
132127
/// Enable compaction in storage-managed collections.
133128
///
134129
/// Each entry in the vector names a collection and provides a frontier after which
135130
/// accumulations must be correct.
136131
AllowCompaction(Vec<(GlobalId, Antichain<T>)>),
137132
RunSinks(Vec<RunSinkCommand<T>>),
133+
/// Run a dataflow which will ingest data from an external source and only __stage__ it in
134+
/// Persist.
135+
///
136+
/// Unlike regular ingestions/sources, some other component (e.g. `environmentd`) is
137+
/// responsible for linking the staged data into a shard.
138+
RunOneshotIngestion(Vec<RunOneshotIngestion>),
139+
/// `CancelOneshotIngestion` instructs the replica to cancel the identified oneshot ingestions.
140+
///
141+
/// It is invalid to send a [`CancelOneshotIngestion`] command that references a oneshot
142+
/// ingestion that was not created by a corresponding [`RunOneshotIngestion`] command before.
143+
/// Doing so may cause the replica to exhibit undefined behavior.
144+
///
145+
/// [`CancelOneshotIngestion`]: crate::client::StorageCommand::CancelOneshotIngestion
146+
/// [`RunOneshotIngestion`]: crate::client::StorageCommand::RunOneshotIngestion
147+
CancelOneshotIngestion {
148+
ingestions: Vec<Uuid>,
149+
},
138150
}
139151

140152
impl<T> StorageCommand<T> {
@@ -146,7 +158,8 @@ impl<T> StorageCommand<T> {
146158
| InitializationComplete
147159
| AllowWrites
148160
| UpdateConfiguration(_)
149-
| AllowCompaction(_) => false,
161+
| AllowCompaction(_)
162+
| CancelOneshotIngestion { .. } => false,
150163
// TODO(cf2): multi-replica oneshot ingestions. At the moment returning
151164
// true here means we can't run `COPY FROM` on multi-replica clusters, this
152165
// should be easy enough to support though.
@@ -199,7 +212,7 @@ impl RustType<ProtoRunIngestionCommand> for RunIngestionCommand {
199212

200213
/// A command that starts ingesting the given ingestion description
201214
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
202-
pub struct RunOneshotIngestionCommand {
215+
pub struct RunOneshotIngestion {
203216
/// The ID of the ingestion dataflow.
204217
pub ingestion_id: uuid::Uuid,
205218
/// The ID of collection we'll stage batches for.
@@ -210,30 +223,30 @@ pub struct RunOneshotIngestionCommand {
210223
pub request: OneshotIngestionRequest,
211224
}
212225

213-
impl RustType<ProtoRunOneshotIngestionCommand> for RunOneshotIngestionCommand {
214-
fn into_proto(&self) -> ProtoRunOneshotIngestionCommand {
215-
ProtoRunOneshotIngestionCommand {
226+
impl RustType<ProtoRunOneshotIngestion> for RunOneshotIngestion {
227+
fn into_proto(&self) -> ProtoRunOneshotIngestion {
228+
ProtoRunOneshotIngestion {
216229
ingestion_id: Some(self.ingestion_id.into_proto()),
217230
collection_id: Some(self.collection_id.into_proto()),
218231
storage_metadata: Some(self.collection_meta.into_proto()),
219232
request: Some(self.request.into_proto()),
220233
}
221234
}
222235

223-
fn from_proto(proto: ProtoRunOneshotIngestionCommand) -> Result<Self, TryFromProtoError> {
224-
Ok(RunOneshotIngestionCommand {
236+
fn from_proto(proto: ProtoRunOneshotIngestion) -> Result<Self, TryFromProtoError> {
237+
Ok(RunOneshotIngestion {
225238
ingestion_id: proto
226239
.ingestion_id
227-
.into_rust_if_some("ProtoRunOneshotIngestionCommand::ingestion_id")?,
240+
.into_rust_if_some("ProtoRunOneshotIngestion::ingestion_id")?,
228241
collection_id: proto
229242
.collection_id
230-
.into_rust_if_some("ProtoRunOneshotIngestionCommand::collection_id")?,
243+
.into_rust_if_some("ProtoRunOneshotIngestion::collection_id")?,
231244
collection_meta: proto
232245
.storage_metadata
233-
.into_rust_if_some("ProtoRunOneshotIngestionCommand::storage_metadata")?,
246+
.into_rust_if_some("ProtoRunOneshotIngestion::storage_metadata")?,
234247
request: proto
235248
.request
236-
.into_rust_if_some("ProtoRunOneshotIngestionCommand::request")?,
249+
.into_rust_if_some("ProtoRunOneshotIngestion::request")?,
237250
})
238251
}
239252
}
@@ -300,12 +313,19 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
300313
StorageCommand::RunIngestions(sources) => CreateSources(ProtoCreateSources {
301314
sources: sources.into_proto(),
302315
}),
303-
StorageCommand::RunOneshotIngestion(oneshot) => {
304-
OneshotIngestion(oneshot.into_proto())
305-
}
306316
StorageCommand::RunSinks(sinks) => RunSinks(ProtoRunSinks {
307317
sinks: sinks.into_proto(),
308318
}),
319+
StorageCommand::RunOneshotIngestion(ingestions) => {
320+
RunOneshotIngestions(ProtoRunOneshotIngestionsCommand {
321+
ingestions: ingestions.iter().map(|cmd| cmd.into_proto()).collect(),
322+
})
323+
}
324+
StorageCommand::CancelOneshotIngestion { ingestions } => {
325+
CancelOneshotIngestions(ProtoCancelOneshotIngestionsCommand {
326+
ingestions: ingestions.iter().map(|uuid| uuid.into_proto()).collect(),
327+
})
328+
}
309329
}),
310330
}
311331
}
@@ -334,8 +354,21 @@ impl RustType<ProtoStorageCommand> for StorageCommand<mz_repr::Timestamp> {
334354
Some(RunSinks(ProtoRunSinks { sinks })) => {
335355
Ok(StorageCommand::RunSinks(sinks.into_rust()?))
336356
}
337-
Some(OneshotIngestion(oneshot)) => {
338-
Ok(StorageCommand::RunOneshotIngestion(oneshot.into_rust()?))
357+
Some(RunOneshotIngestions(oneshot)) => {
358+
let ingestions = oneshot
359+
.ingestions
360+
.into_iter()
361+
.map(|cmd| cmd.into_rust())
362+
.collect::<Result<_, _>>()?;
363+
Ok(StorageCommand::RunOneshotIngestion(ingestions))
364+
}
365+
Some(CancelOneshotIngestions(oneshot)) => {
366+
let ingestions = oneshot
367+
.ingestions
368+
.into_iter()
369+
.map(|uuid| uuid.into_rust())
370+
.collect::<Result<_, _>>()?;
371+
Ok(StorageCommand::CancelOneshotIngestion { ingestions })
339372
}
340373
None => Err(TryFromProtoError::missing_field(
341374
"ProtoStorageCommand::kind",
@@ -802,7 +835,8 @@ where
802835
| StorageCommand::AllowWrites
803836
| StorageCommand::UpdateConfiguration(_)
804837
| StorageCommand::AllowCompaction(_)
805-
| StorageCommand::RunOneshotIngestion(_) => {}
838+
| StorageCommand::RunOneshotIngestion(_)
839+
| StorageCommand::CancelOneshotIngestion { .. } => {}
806840
};
807841
}
808842

src/storage-client/src/controller.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ pub trait StorageController: Debug {
498498
result_tx: OneshotResultCallback<ProtoBatch>,
499499
) -> Result<(), StorageError<Self::Timestamp>>;
500500

501+
/// Cancel a oneshot ingestion.
502+
fn cancel_oneshot_ingestion(
503+
&mut self,
504+
ingestion_id: uuid::Uuid,
505+
) -> Result<(), StorageError<Self::Timestamp>>;
506+
501507
/// Alter the sink identified by the given id to match the provided `ExportDescription`.
502508
async fn alter_export(
503509
&mut self,

0 commit comments

Comments
 (0)