Skip to content

Commit 28e6bd5

Browse files
authored
Merge pull request #34261 from ggevay/frontend-peek-copy-to
Frontend peek sequencing -- `COPY TO`
2 parents eda01f4 + 3e08e37 commit 28e6bd5

File tree

8 files changed

+416
-132
lines changed

8 files changed

+416
-132
lines changed

src/adapter/src/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1016,7 +1016,8 @@ impl SessionClient {
10161016
| Command::GetComputeInstanceClient { .. }
10171017
| Command::GetOracle { .. }
10181018
| Command::DetermineRealTimeRecentTimestamp { .. }
1019-
| Command::ExecuteSlowPathPeek { .. } => {}
1019+
| Command::ExecuteSlowPathPeek { .. }
1020+
| Command::ExecuteCopyTo { .. } => {}
10201021
};
10211022
cmd
10221023
});

src/adapter/src/command.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::session::{EndTransactionAction, RowBatchStream, Session};
5252
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
5353
use crate::util::Transmittable;
5454
use crate::webhook::AppendWebhookResponse;
55-
use crate::{AdapterNotice, AppendWebhookError};
55+
use crate::{AdapterNotice, AppendWebhookError, optimize};
5656

5757
#[derive(Debug)]
5858
pub struct CatalogSnapshot {
@@ -199,6 +199,15 @@ pub enum Command {
199199
max_query_result_size: Option<u64>,
200200
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
201201
},
202+
203+
ExecuteCopyTo {
204+
global_lir_plan: Box<optimize::copy_to::GlobalLirPlan>,
205+
compute_instance: ComputeInstanceId,
206+
target_replica: Option<ReplicaId>,
207+
source_ids: BTreeSet<GlobalId>,
208+
conn_id: ConnectionId,
209+
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
210+
},
202211
}
203212

204213
impl Command {
@@ -222,7 +231,8 @@ impl Command {
222231
| Command::GetComputeInstanceClient { .. }
223232
| Command::GetOracle { .. }
224233
| Command::DetermineRealTimeRecentTimestamp { .. }
225-
| Command::ExecuteSlowPathPeek { .. } => None,
234+
| Command::ExecuteSlowPathPeek { .. }
235+
| Command::ExecuteCopyTo { .. } => None,
226236
}
227237
}
228238

@@ -246,7 +256,8 @@ impl Command {
246256
| Command::GetComputeInstanceClient { .. }
247257
| Command::GetOracle { .. }
248258
| Command::DetermineRealTimeRecentTimestamp { .. }
249-
| Command::ExecuteSlowPathPeek { .. } => None,
259+
| Command::ExecuteSlowPathPeek { .. }
260+
| Command::ExecuteCopyTo { .. } => None,
250261
}
251262
}
252263
}

src/adapter/src/coord.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ impl Message {
367367
"determine-real-time-recent-timestamp"
368368
}
369369
Command::ExecuteSlowPathPeek { .. } => "execute-slow-path-peek",
370+
Command::ExecuteCopyTo { .. } => "execute-copy-to",
370371
},
371372
Message::ControllerReady {
372373
controller: ControllerReadiness::Compute,

src/adapter/src/coord/command_handler.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,28 @@ impl Coordinator {
344344

345345
let _ = tx.send(result);
346346
}
347+
348+
Command::ExecuteCopyTo {
349+
global_lir_plan,
350+
compute_instance,
351+
target_replica,
352+
source_ids,
353+
conn_id,
354+
tx,
355+
} => {
356+
// implement_copy_to spawns a background task that sends the response
357+
// through tx when the COPY TO completes (or immediately if setup fails).
358+
// We just call it and let it handle all response sending.
359+
self.implement_copy_to(
360+
*global_lir_plan,
361+
compute_instance,
362+
target_replica,
363+
source_ids,
364+
conn_id,
365+
tx,
366+
)
367+
.await;
368+
}
347369
}
348370
}
349371
.instrument(debug_span!("handle_command"))

src/adapter/src/coord/peek.rs

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use mz_compute_client::protocol::command::PeekTarget;
2828
use mz_compute_client::protocol::response::PeekResponse;
2929
use mz_compute_types::ComputeInstanceId;
3030
use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31+
use mz_compute_types::sinks::ComputeSinkConnection;
3132
use mz_controller_types::ClusterId;
3233
use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
3334
use mz_expr::row::RowCollection;
@@ -37,6 +38,7 @@ use mz_expr::{
3738
};
3839
use mz_ore::cast::CastFrom;
3940
use mz_ore::str::{StrExt, separated};
41+
use mz_ore::task;
4042
use mz_ore::tracing::OpenTelemetryContext;
4143
use mz_persist_client::Schemas;
4244
use mz_persist_types::codec_impls::UnitSchema;
@@ -49,10 +51,13 @@ use mz_repr::{
4951
use mz_storage_types::sources::SourceData;
5052
use serde::{Deserialize, Serialize};
5153
use timely::progress::{Antichain, Timestamp};
54+
use tokio::sync::oneshot;
55+
use tracing::{Instrument, Span};
5256
use uuid::Uuid;
5357

58+
use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
5459
use crate::coord::timestamp_selection::TimestampDetermination;
55-
use crate::optimize::OptimizerError;
60+
use crate::optimize::{self, OptimizerError};
5661
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
5762
use crate::util::ResultExt;
5863
use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
@@ -1238,6 +1243,117 @@ impl crate::coord::Coordinator {
12381243
.await
12391244
}
12401245

1246+
/// Implements a COPY TO command by validating S3 connection, shipping the dataflow,
1247+
/// and spawning a background task to wait for completion.
1248+
/// This is called from the command handler for ExecuteCopyTo.
1249+
///
1250+
/// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow
1251+
/// to avoid the complexity of the staging mechanism for the frontend peek sequencing path.
1252+
///
1253+
/// This method does NOT block waiting for completion. Instead, it spawns a background task that
1254+
/// will send the response through the provided tx channel when the COPY TO completes.
1255+
/// All errors (setup or execution) are sent through tx.
1256+
pub(crate) async fn implement_copy_to(
1257+
&mut self,
1258+
global_lir_plan: optimize::copy_to::GlobalLirPlan,
1259+
compute_instance: ComputeInstanceId,
1260+
target_replica: Option<ReplicaId>,
1261+
source_ids: BTreeSet<GlobalId>,
1262+
conn_id: ConnectionId,
1263+
tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1264+
) {
1265+
// Helper to send error and return early
1266+
let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1267+
e: AdapterError| {
1268+
let _ = tx.send(Err(e));
1269+
};
1270+
1271+
let sink_id = global_lir_plan.sink_id();
1272+
1273+
// # Inlined from peek_copy_to_preflight
1274+
1275+
let connection_context = self.connection_context().clone();
1276+
let sinks = &global_lir_plan.df_desc().sink_exports;
1277+
1278+
if sinks.len() != 1 {
1279+
send_err(
1280+
tx,
1281+
AdapterError::Internal("expected exactly one copy to s3 sink".into()),
1282+
);
1283+
return;
1284+
}
1285+
let (sink_id_check, sink_desc) = sinks
1286+
.first_key_value()
1287+
.expect("known to be exactly one copy to s3 sink");
1288+
assert_eq!(sink_id, *sink_id_check);
1289+
1290+
match &sink_desc.connection {
1291+
ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1292+
if let Err(e) = mz_storage_types::sinks::s3_oneshot_sink::preflight(
1293+
connection_context,
1294+
&conn.aws_connection,
1295+
&conn.upload_info,
1296+
conn.connection_id,
1297+
sink_id,
1298+
)
1299+
.await
1300+
{
1301+
send_err(tx, e.into());
1302+
return;
1303+
}
1304+
}
1305+
_ => {
1306+
send_err(
1307+
tx,
1308+
AdapterError::Internal("expected copy to s3 oneshot sink".into()),
1309+
);
1310+
return;
1311+
}
1312+
}
1313+
1314+
// # Inlined from peek_copy_to_dataflow
1315+
1316+
let (df_desc, _df_meta) = global_lir_plan.unapply();
1317+
1318+
// Create and register ActiveCopyTo.
1319+
// Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1320+
// This is different from the command's tx which sends the response to the client
1321+
let (sink_tx, sink_rx) = oneshot::channel();
1322+
let active_copy_to = ActiveCopyTo {
1323+
conn_id: conn_id.clone(),
1324+
tx: sink_tx,
1325+
cluster_id: compute_instance,
1326+
depends_on: source_ids,
1327+
};
1328+
1329+
// Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1330+
drop(
1331+
self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1332+
.await,
1333+
);
1334+
1335+
self.ship_dataflow(df_desc, compute_instance, target_replica)
1336+
.await;
1337+
1338+
// Spawn background task to wait for completion
1339+
// We must NOT await sink_rx here directly, as that would block the coordinator's main task
1340+
// from processing the completion message. Instead, we spawn a background task that will
1341+
// send the result through tx when the COPY TO completes.
1342+
let span = Span::current();
1343+
task::spawn(
1344+
|| "copy to completion",
1345+
async move {
1346+
let res = sink_rx.await;
1347+
let result = match res {
1348+
Ok(res) => res,
1349+
Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1350+
};
1351+
let _ = tx.send(result);
1352+
}
1353+
.instrument(span),
1354+
);
1355+
}
1356+
12411357
/// Constructs an [`ExecuteResponse`] that that will send some rows to the
12421358
/// client immediately, as opposed to asking the dataflow layer to send along
12431359
/// the rows after some computation.

src/adapter/src/coord/sequencer.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313
//! Logic for executing a planned SQL query.
1414
1515
use std::collections::BTreeSet;
16+
use std::str::FromStr;
1617
use std::sync::Arc;
1718

1819
use futures::FutureExt;
1920
use futures::future::LocalBoxFuture;
2021
use futures::stream::FuturesOrdered;
22+
use http::Uri;
2123
use inner::return_if_err;
2224
use maplit::btreemap;
2325
use mz_catalog::memory::objects::Cluster;
@@ -28,12 +30,13 @@ use mz_ore::cast::CastFrom;
2830
use mz_ore::tracing::OpenTelemetryContext;
2931
use mz_persist_client::stats::SnapshotPartStats;
3032
use mz_repr::explain::{ExprHumanizerExt, TransientItem};
31-
use mz_repr::{CatalogItemId, Diff, GlobalId, IntoRowIterator, Row, Timestamp};
33+
use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
3234
use mz_sql::catalog::{CatalogError, SessionCatalog};
3335
use mz_sql::names::ResolvedIds;
3436
use mz_sql::plan::{
3537
self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
36-
CreateSourcePlanBundle, FetchPlan, MutationKind, Params, Plan, PlanKind, RaisePlan,
38+
CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39+
RaisePlan,
3740
};
3841
use mz_sql::rbac;
3942
use mz_sql::session::metadata::SessionMetadata;
@@ -50,7 +53,7 @@ use tokio::sync::oneshot;
5053
use tracing::{Instrument, Level, Span, event};
5154

5255
use crate::ExecuteContext;
53-
use crate::catalog::Catalog;
56+
use crate::catalog::{Catalog, CatalogState};
5457
use crate::command::{Command, ExecuteResponse, Response};
5558
use crate::coord::appends::{DeferredOp, DeferredPlan};
5659
use crate::coord::validity::PlanValidity;
@@ -61,6 +64,7 @@ use crate::coord::{
6164
use crate::error::AdapterError;
6265
use crate::explain::insights::PlanInsightsContext;
6366
use crate::notice::AdapterNotice;
67+
use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
6468
use crate::optimize::peek;
6569
use crate::session::{
6670
EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
@@ -1010,6 +1014,39 @@ pub(crate) fn emit_optimizer_notices(
10101014
}
10111015
}
10121016

1017+
/// Evaluates a COPY TO target URI expression and validates it.
1018+
///
1019+
/// This function is shared between the old peek sequencing (sequence_copy_to)
1020+
/// and the new frontend peek sequencing to avoid code duplication.
1021+
pub fn eval_copy_to_uri(
1022+
to: HirScalarExpr,
1023+
session: &Session,
1024+
catalog_state: &CatalogState,
1025+
) -> Result<Uri, AdapterError> {
1026+
let style = ExprPrepStyle::OneShot {
1027+
logical_time: EvalTime::NotAvailable,
1028+
session,
1029+
catalog_state,
1030+
};
1031+
let mut to = to.lower_uncorrelated()?;
1032+
prep_scalar_expr(&mut to, style)?;
1033+
let temp_storage = RowArena::new();
1034+
let evaled = to.eval(&[], &temp_storage)?;
1035+
if evaled == Datum::Null {
1036+
coord_bail!("COPY TO target value can not be null");
1037+
}
1038+
let to_url = match Uri::from_str(evaled.unwrap_str()) {
1039+
Ok(url) => {
1040+
if url.scheme_str() != Some("s3") {
1041+
coord_bail!("only 's3://...' urls are supported as COPY TO target");
1042+
}
1043+
url
1044+
}
1045+
Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1046+
};
1047+
Ok(to_url)
1048+
}
1049+
10131050
/// Returns a future that will execute EXPLAIN FILTER PUSHDOWN, i.e., compute the filter pushdown
10141051
/// statistics for the given collections with the given MFPs.
10151052
///

0 commit comments

Comments
 (0)