Skip to content

Commit 49f79b0

Browse files
committed
Frontend peek: Split ExecuteCopyTo into 2 commands to not block the main task
1 parent 97a8d11 commit 49f79b0

File tree

7 files changed

+76
-48
lines changed

7 files changed

+76
-48
lines changed

src/adapter/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,7 @@ impl SessionClient {
10321032
| Command::GetTransactionReadHoldsBundle { .. }
10331033
| Command::StoreTransactionReadHolds { .. }
10341034
| Command::ExecuteSlowPathPeek { .. }
1035+
| Command::CopyToPreflight { .. }
10351036
| Command::ExecuteCopyTo { .. }
10361037
| Command::ExecuteSideEffectingFunc { .. }
10371038
| Command::RegisterFrontendPeek { .. }

src/adapter/src/command.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,18 @@ pub enum Command {
224224
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
225225
},
226226

227+
/// Preflight check for COPY TO S3 operation. This runs the slow S3 operations
228+
/// (loading SDK config, checking bucket path, verifying permissions, uploading sentinel)
229+
/// in a background task to avoid blocking the coordinator.
230+
CopyToPreflight {
231+
/// The S3 connection info needed for preflight checks.
232+
s3_sink_connection: mz_compute_types::sinks::CopyToS3OneshotSinkConnection,
233+
/// The sink ID for logging and S3 key management.
234+
sink_id: GlobalId,
235+
/// Response channel for the preflight result.
236+
tx: oneshot::Sender<Result<(), AdapterError>>,
237+
},
238+
227239
ExecuteCopyTo {
228240
df_desc: Box<DataflowDescription<mz_compute_types::plan::Plan>>,
229241
compute_instance: ComputeInstanceId,
@@ -309,6 +321,7 @@ impl Command {
309321
| Command::GetTransactionReadHoldsBundle { .. }
310322
| Command::StoreTransactionReadHolds { .. }
311323
| Command::ExecuteSlowPathPeek { .. }
324+
| Command::CopyToPreflight { .. }
312325
| Command::ExecuteCopyTo { .. }
313326
| Command::ExecuteSideEffectingFunc { .. }
314327
| Command::RegisterFrontendPeek { .. }
@@ -341,6 +354,7 @@ impl Command {
341354
| Command::GetTransactionReadHoldsBundle { .. }
342355
| Command::StoreTransactionReadHolds { .. }
343356
| Command::ExecuteSlowPathPeek { .. }
357+
| Command::CopyToPreflight { .. }
344358
| Command::ExecuteCopyTo { .. }
345359
| Command::ExecuteSideEffectingFunc { .. }
346360
| Command::RegisterFrontendPeek { .. }

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,7 @@ impl Message {
376376
}
377377
Command::StoreTransactionReadHolds { .. } => "store-transaction-read-holds",
378378
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
379+
Command::CopyToPreflight { .. } => "copy-to-preflight",
379380
Command::ExecuteCopyTo { .. } => "execute-copy-to",
380381
Command::ExecuteSideEffectingFunc { .. } => "execute-side-effecting-func",
381382
Command::RegisterFrontendPeek { .. } => "register-frontend-peek",

src/adapter/src/coord/command_handler.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,28 @@ impl Coordinator {
365365
let _ = tx.send(result);
366366
}
367367

368+
Command::CopyToPreflight {
369+
s3_sink_connection,
370+
sink_id,
371+
tx,
372+
} => {
373+
// Spawn a background task to perform the slow S3 preflight operations.
374+
// This avoids blocking the coordinator's main task.
375+
let connection_context = self.connection_context().clone();
376+
task::spawn(|| "copy_to_preflight", async move {
377+
let result = mz_storage_types::sinks::s3_oneshot_sink::preflight(
378+
connection_context,
379+
&s3_sink_connection.aws_connection,
380+
&s3_sink_connection.upload_info,
381+
s3_sink_connection.connection_id,
382+
sink_id,
383+
)
384+
.await
385+
.map_err(AdapterError::from);
386+
let _ = tx.send(result);
387+
});
388+
}
389+
368390
Command::ExecuteCopyTo {
369391
df_desc,
370392
compute_instance,

src/adapter/src/coord/peek.rs

Lines changed: 5 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use mz_compute_client::protocol::command::PeekTarget;
2828
use mz_compute_client::protocol::response::PeekResponse;
2929
use mz_compute_types::ComputeInstanceId;
3030
use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31-
use mz_compute_types::sinks::ComputeSinkConnection;
3231
use mz_controller_types::ClusterId;
3332
use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
3433
use mz_expr::row::RowCollection;
@@ -1255,12 +1254,14 @@ impl crate::coord::Coordinator {
12551254
.await
12561255
}
12571256

1258-
/// Implements a COPY TO command by installing peek watch sets, validating S3 connection,
1257+
/// Implements a `COPY TO` command by installing peek watch sets,
12591258
/// shipping the dataflow, and spawning a background task to wait for completion.
12601259
/// This is called from the command handler for ExecuteCopyTo.
12611260
///
1262-
/// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow
1263-
/// to avoid the complexity of the staging mechanism for the frontend peek sequencing path.
1261+
/// (The S3 preflight check must be completed successfully via the
1262+
/// `CopyToPreflight` command _before_ calling this method. The preflight is
1263+
/// handled separately to avoid blocking the coordinator's main task with
1264+
/// slow S3 network operations.)
12641265
///
12651266
/// This method does NOT block waiting for completion. Instead, it spawns a background task that
12661267
/// will send the response through the provided tx channel when the COPY TO completes.
@@ -1300,49 +1301,6 @@ impl crate::coord::Coordinator {
13001301

13011302
let sink_id = df_desc.sink_id();
13021303

1303-
// # Inlined from peek_copy_to_preflight
1304-
1305-
let connection_context = self.connection_context().clone();
1306-
let sinks = &df_desc.sink_exports;
1307-
1308-
if sinks.len() != 1 {
1309-
send_err(
1310-
tx,
1311-
AdapterError::Internal("expected exactly one copy to s3 sink".into()),
1312-
);
1313-
return;
1314-
}
1315-
let (sink_id_check, sink_desc) = sinks
1316-
.first_key_value()
1317-
.expect("known to be exactly one copy to s3 sink");
1318-
assert_eq!(sink_id, *sink_id_check);
1319-
1320-
match &sink_desc.connection {
1321-
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1322-
if let Err(e) = mz_storage_types::sinks::s3_oneshot_sink::preflight(
1323-
connection_context,
1324-
&conn.aws_connection,
1325-
&conn.upload_info,
1326-
conn.connection_id,
1327-
sink_id,
1328-
)
1329-
.await
1330-
{
1331-
send_err(tx, e.into());
1332-
return;
1333-
}
1334-
}
1335-
_ => {
1336-
send_err(
1337-
tx,
1338-
AdapterError::Internal("expected copy to s3 oneshot sink".into()),
1339-
);
1340-
return;
1341-
}
1342-
}
1343-
1344-
// # Inlined from peek_copy_to_dataflow
1345-
13461304
// Create and register ActiveCopyTo.
13471305
// Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
13481306
// This is different from the command's tx which sends the response to the client

src/adapter/src/frontend_peek.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1311,6 +1311,38 @@ impl PeekClient {
13111311
&df_meta.optimizer_notices,
13121312
);
13131313

1314+
// Extract S3 sink connection info for preflight check
1315+
let sink_id = df_desc.sink_id();
1316+
let sinks = &df_desc.sink_exports;
1317+
if sinks.len() != 1 {
1318+
return Err(AdapterError::Internal(
1319+
"expected exactly one copy to s3 sink".into(),
1320+
));
1321+
}
1322+
let (_, sink_desc) = sinks
1323+
.first_key_value()
1324+
.expect("known to be exactly one copy to s3 sink");
1325+
let s3_sink_connection = match &sink_desc.connection {
1326+
mz_compute_types::sinks::ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1327+
conn.clone()
1328+
}
1329+
_ => {
1330+
return Err(AdapterError::Internal(
1331+
"expected copy to s3 oneshot sink".into(),
1332+
));
1333+
}
1334+
};
1335+
1336+
// Perform S3 preflight check in background task (via coordinator).
1337+
// This runs slow S3 operations without blocking the coordinator's main task.
1338+
self.call_coordinator(|tx| Command::CopyToPreflight {
1339+
s3_sink_connection,
1340+
sink_id,
1341+
tx,
1342+
})
1343+
.await?;
1344+
1345+
// Preflight succeeded, now execute the actual COPY TO dataflow
13141346
let watch_set = statement_logging_id.map(|logging_id| {
13151347
WatchSetCreation::new(
13161348
logging_id,

0 commit comments

Comments
 (0)