Skip to content

Commit 56cf67f

Browse files
committed
Make watch set installation fallible; make RegisterFrontendPeek wait for a result
1 parent 8afa02d commit 56cf67f

File tree

9 files changed

+80
-40
lines changed

9 files changed

+80
-40
lines changed

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ pub enum Command {
240240
is_fast_path: bool,
241241
/// If statement logging is enabled, contains the info necessary for installing watch sets.
242242
ids_to_watch: Option<IdsToWatch>,
243+
tx: oneshot::Sender<Result<(), AdapterError>>,
243244
},
244245

245246
/// Statement logging event from frontend peek sequencing.

src/adapter/src/coord.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ use mz_catalog::memory::objects::{
104104
};
105105
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
106106
use mz_compute_client::as_of_selection;
107-
use mz_compute_client::controller::error::{DataflowCreationError, InstanceMissing};
107+
use mz_compute_client::controller::error::{CollectionLookupError, DataflowCreationError, InstanceMissing};
108108
use mz_compute_types::ComputeInstanceId;
109109
use mz_compute_types::dataflows::DataflowDescription;
110110
use mz_compute_types::plan::Plan;
@@ -3764,13 +3764,14 @@ impl Coordinator {
37643764
objects: BTreeSet<GlobalId>,
37653765
t: Timestamp,
37663766
state: WatchSetResponse,
3767-
) {
3768-
let ws_id = self.controller.install_compute_watch_set(objects, t);
3767+
) -> Result<(), CollectionLookupError> {
3768+
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
37693769
self.connection_watch_sets
37703770
.entry(conn_id.clone())
37713771
.or_default()
37723772
.insert(ws_id);
37733773
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3774+
Ok(())
37743775
}
37753776

37763777
/// Install a _watch set_ in the controller that is automatically associated with the given
@@ -3782,13 +3783,14 @@ impl Coordinator {
37823783
objects: BTreeSet<GlobalId>,
37833784
t: Timestamp,
37843785
state: WatchSetResponse,
3785-
) {
3786-
let ws_id = self.controller.install_storage_watch_set(objects, t);
3786+
) -> Result<(), CollectionLookupError> {
3787+
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
37873788
self.connection_watch_sets
37883789
.entry(conn_id.clone())
37893790
.or_default()
37903791
.insert(ws_id);
37913792
self.installed_watch_sets.insert(ws_id, (conn_id, state));
3793+
Ok(())
37923794
}
37933795

37943796
/// Cancels pending watchsets associated with the provided connection id.

src/adapter/src/coord/command_handler.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,6 @@ impl Coordinator {
347347
tx,
348348
} => {
349349
soft_assert_eq_or_log!(ctx_extra.contents().is_some(), ids_to_watch.is_some());
350-
if let (Some(logging_id), Some(ids_to_watch)) = (ctx_extra.contents(), ids_to_watch) {
351-
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch);
352-
}
353-
354350
let result = self
355351
.implement_slow_path_peek(
356352
*dataflow_plan,
@@ -364,9 +360,9 @@ impl Coordinator {
364360
max_result_size,
365361
max_query_result_size,
366362
ctx_extra,
363+
ids_to_watch,
367364
)
368365
.await;
369-
370366
let _ = tx.send(result);
371367
}
372368

@@ -381,15 +377,6 @@ impl Coordinator {
381377
tx,
382378
} => {
383379
soft_assert_eq_or_log!(ctx_extra.contents().is_some(), ids_to_watch.is_some());
384-
if let (Some(logging_id), Some(ids_to_watch)) = (ctx_extra.contents(), ids_to_watch) {
385-
// (The old peek sequencing forgot to do this for COPY TO.)
386-
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch);
387-
}
388-
389-
// We retire the execution context immediately, as the coordinator doesn't need
390-
// to track the execution for statement logging purposes (the frontend handles that).
391-
let _ = ctx_extra.retire();
392-
393380
// implement_copy_to spawns a background task that sends the response
394381
// through tx when the COPY TO completes (or immediately if setup fails).
395382
// We just call it and let it handle all response sending.
@@ -399,6 +386,8 @@ impl Coordinator {
399386
target_replica,
400387
source_ids,
401388
conn_id,
389+
ctx_extra,
390+
ids_to_watch,
402391
tx,
403392
)
404393
.await;
@@ -411,6 +400,7 @@ impl Coordinator {
411400
ctx_extra,
412401
is_fast_path,
413402
ids_to_watch,
403+
tx,
414404
} => {
415405
self.handle_register_frontend_peek(
416406
uuid,
@@ -420,6 +410,7 @@ impl Coordinator {
420410
ctx_extra,
421411
is_fast_path,
422412
ids_to_watch,
413+
tx,
423414
);
424415
}
425416
Command::FrontendStatementLogging(event) => {
@@ -1877,10 +1868,14 @@ impl Coordinator {
18771868
ctx_extra: ExecuteContextExtra,
18781869
is_fast_path: bool,
18791870
ids_to_watch: Option<IdsToWatch>,
1871+
tx: oneshot::Sender<Result<(), AdapterError>>,
18801872
) {
18811873
soft_assert_eq_or_log!(ctx_extra.contents().is_some(), ids_to_watch.is_some());
18821874
if let (Some(logging_id), Some(ids_to_watch)) = (ctx_extra.contents(), ids_to_watch) {
1883-
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch);
1875+
if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch) {
1876+
let _ = tx.send(Err(AdapterError::concurrent_dependency_drop_from_collection_lookup_error(e, cluster_id.into())));
1877+
return;
1878+
}
18841879
}
18851880

18861881
// Store the peek in pending_peeks for later retrieval when results arrive
@@ -1900,5 +1895,7 @@ impl Coordinator {
19001895
.entry(conn_id)
19011896
.or_default()
19021897
.insert(uuid, cluster_id.into());
1898+
1899+
let _ = tx.send(Ok(()));
19031900
}
19041901
}

src/adapter/src/coord/peek.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use tracing::{Instrument, Span};
5656
use uuid::Uuid;
5757

5858
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
59+
use crate::coord::statement_logging::IdsToWatch;
5960
use crate::coord::timestamp_selection::TimestampDetermination;
6061
use crate::optimize::OptimizerError;
6162
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
@@ -1211,7 +1212,19 @@ impl crate::coord::Coordinator {
12111212
max_result_size: u64,
12121213
max_query_result_size: Option<u64>,
12131214
mut ctx_extra: ExecuteContextExtra,
1215+
ids_to_watch: Option<IdsToWatch>,
12141216
) -> Result<ExecuteResponse, AdapterError> {
1217+
// Install watch sets for statement lifecycle logging if enabled.
1218+
if let (Some(logging_id), Some(ids_to_watch)) = (ctx_extra.contents(), ids_to_watch) {
1219+
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch)
1220+
.map_err(|e| {
1221+
AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1222+
e,
1223+
compute_instance,
1224+
)
1225+
})?;
1226+
}
1227+
12151228
let source_arity = intermediate_result_type.arity();
12161229

12171230
let planned_peek = PlannedPeek {
@@ -1239,8 +1252,8 @@ impl crate::coord::Coordinator {
12391252
.await
12401253
}
12411254

1242-
/// Implements a COPY TO command by validating S3 connection, shipping the dataflow,
1243-
/// and spawning a background task to wait for completion.
1255+
/// Implements a COPY TO command by installing peek watch sets, validating S3 connection,
1256+
/// shipping the dataflow, and spawning a background task to wait for completion.
12441257
/// This is called from the command handler for ExecuteCopyTo.
12451258
///
12461259
/// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow
@@ -1256,6 +1269,8 @@ impl crate::coord::Coordinator {
12561269
target_replica: Option<ReplicaId>,
12571270
source_ids: BTreeSet<GlobalId>,
12581271
conn_id: ConnectionId,
1272+
ctx_extra: ExecuteContextExtra,
1273+
ids_to_watch: Option<IdsToWatch>,
12591274
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
12601275
) {
12611276
// Helper to send error and return early
@@ -1264,6 +1279,22 @@ impl crate::coord::Coordinator {
12641279
let _ = tx.send(Err(e));
12651280
};
12661281

1282+
// Install watch sets for statement lifecycle logging if enabled.
1283+
if let (Some(logging_id), Some(ids_to_watch)) = (ctx_extra.contents(), ids_to_watch) {
1284+
if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch) {
1285+
let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1286+
e,
1287+
compute_instance,
1288+
);
1289+
send_err(tx, err);
1290+
return;
1291+
}
1292+
}
1293+
1294+
// We retire the execution context immediately, as the coordinator doesn't need
1295+
// to track the execution for statement logging purposes (the frontend handles that).
1296+
let _ = ctx_extra.retire();
1297+
12671298
let sink_id = df_desc.sink_id();
12681299

12691300
// # Inlined from peek_copy_to_preflight

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3301,7 +3301,7 @@ impl Coordinator {
33013301
plan_validity,
33023302
read_hold,
33033303
}),
3304-
);
3304+
).expect("plan validity verified above; we are on the coordinator main task, so they couldn't have gone away since then");
33053305
}
33063306

33073307
#[instrument]

src/adapter/src/coord/sequencer/inner/peek.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ impl Coordinator {
851851
&id_bundle,
852852
determination.timestamp_context.timestamp_or_default(),
853853
);
854-
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch);
854+
self.install_peek_watch_sets(conn_id.clone(), logging_id, ids_to_watch).expect("the old peek sequencing re-verifies the dependencies' existence before installing the new watch sets");
855855
}
856856

857857
let max_result_size = self.catalog().system_config().max_result_size();

src/adapter/src/coord/statement_logging.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use tokio::time::MissedTickBehavior;
3232
use tracing::debug;
3333
use uuid::Uuid;
3434
use mz_adapter_types::connection::ConnectionId;
35+
use mz_compute_client::controller::error::CollectionLookupError;
3536
use crate::coord::{ConnMeta, Coordinator, WatchSetResponse};
3637
use crate::session::{LifecycleTimestamps, Session};
3738
use crate::statement_logging::{
@@ -1461,7 +1462,7 @@ impl Coordinator {
14611462
conn_id: ConnectionId,
14621463
logging_id: StatementLoggingId,
14631464
ids_to_watch: IdsToWatch,
1464-
) {
1465+
) -> Result<(), CollectionLookupError> {
14651466
let IdsToWatch {
14661467
timestamp,
14671468
storage_ids,
@@ -1476,7 +1477,7 @@ impl Coordinator {
14761477
logging_id,
14771478
StatementLifecycleEvent::StorageDependenciesFinished,
14781479
),
1479-
);
1480+
)?;
14801481
self.install_compute_watch_set(
14811482
conn_id,
14821483
compute_ids,
@@ -1485,6 +1486,7 @@ impl Coordinator {
14851486
logging_id,
14861487
StatementLifecycleEvent::ComputeDependenciesFinished,
14871488
),
1488-
);
1489+
)?;
1490+
Ok(())
14891491
}
14901492
}

src/adapter/src/peek_client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,15 +343,17 @@ impl PeekClient {
343343

344344
// Register coordinator tracking of this peek. This has to complete before issuing the peek.
345345
let ctx_extra = crate::coord::ExecuteContextExtra::new(statement_logging_id);
346-
self.coordinator_client.send(Command::RegisterFrontendPeek {
346+
self.call_coordinator(|tx| Command::RegisterFrontendPeek {
347347
uuid,
348348
conn_id: conn_id.clone(),
349349
cluster_id: compute_instance.into(),
350350
depends_on,
351351
ctx_extra,
352352
is_fast_path: true,
353353
ids_to_watch,
354-
});
354+
tx,
355+
})
356+
.await?;
355357

356358
// At this stage we don't know column names for the result because we
357359
// only know the peek's result type as a bare SqlRelationType.

src/controller/src/lib.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ use mz_txn_wal::metrics::Metrics as TxnMetrics;
6262
use timely::progress::{Antichain, Timestamp};
6363
use tokio::sync::mpsc;
6464
use uuid::Uuid;
65+
use mz_compute_client::controller::error::CollectionLookupError;
6566

6667
pub mod clusters;
6768

@@ -372,15 +373,20 @@ where
372373
&mut self,
373374
mut objects: BTreeSet<GlobalId>,
374375
t: T,
375-
) -> WatchSetId {
376+
) -> Result<WatchSetId, CollectionLookupError> {
376377
let ws_id = self.watch_set_id_gen.allocate_id();
377378

379+
// Collect all frontiers first, returning any errors
380+
let frontiers: BTreeMap<GlobalId, _> = objects
381+
.iter()
382+
.map(|id| {
383+
self.compute
384+
.collection_frontiers(*id, None)
385+
.map(|f| (*id, f.write_frontier))
386+
})
387+
.collect::<Result<_, _>>()?;
378388
objects.retain(|id| {
379-
let frontier = self
380-
.compute
381-
.collection_frontiers(*id, None)
382-
.map(|f| f.write_frontier)
383-
.expect("missing compute dependency");
389+
let frontier = frontiers.get(id).expect("just collected");
384390
frontier.less_equal(&t)
385391
});
386392
if objects.is_empty() {
@@ -395,7 +401,7 @@ where
395401
self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
396402
}
397403

398-
ws_id
404+
Ok(ws_id)
399405
}
400406

401407
/// Install a _watch set_ in the controller.
@@ -410,13 +416,12 @@ where
410416
&mut self,
411417
mut objects: BTreeSet<GlobalId>,
412418
t: T,
413-
) -> WatchSetId {
419+
) -> Result<WatchSetId, CollectionLookupError> {
414420
let ws_id = self.watch_set_id_gen.allocate_id();
415421

416422
let uppers = self
417423
.storage
418-
.collections_frontiers(objects.iter().cloned().collect())
419-
.expect("missing storage dependencies")
424+
.collections_frontiers(objects.iter().cloned().collect())?
420425
.into_iter()
421426
.map(|(id, _since, upper)| (id, upper))
422427
.collect::<BTreeMap<_, _>>();
@@ -436,7 +441,7 @@ where
436441
}
437442
self.unfulfilled_watch_sets.insert(ws_id, (objects, t));
438443
}
439-
ws_id
444+
Ok(ws_id)
440445
}
441446

442447
/// Uninstalls a previously installed WatchSetId. The method is a no-op if the watch set has

0 commit comments

Comments
 (0)