@@ -28,6 +28,7 @@ use mz_compute_client::protocol::command::PeekTarget;
2828use mz_compute_client:: protocol:: response:: PeekResponse ;
2929use mz_compute_types:: ComputeInstanceId ;
3030use mz_compute_types:: dataflows:: { DataflowDescription , IndexImport } ;
31+ use mz_compute_types:: sinks:: ComputeSinkConnection ;
3132use mz_controller_types:: ClusterId ;
3233use mz_expr:: explain:: { HumanizedExplain , HumanizerMode , fmt_text_constant_rows} ;
3334use mz_expr:: row:: RowCollection ;
@@ -37,6 +38,7 @@ use mz_expr::{
3738} ;
3839use mz_ore:: cast:: CastFrom ;
3940use mz_ore:: str:: { StrExt , separated} ;
41+ use mz_ore:: task;
4042use mz_ore:: tracing:: OpenTelemetryContext ;
4143use mz_persist_client:: Schemas ;
4244use mz_persist_types:: codec_impls:: UnitSchema ;
@@ -49,10 +51,13 @@ use mz_repr::{
4951use mz_storage_types:: sources:: SourceData ;
5052use serde:: { Deserialize , Serialize } ;
5153use timely:: progress:: { Antichain , Timestamp } ;
54+ use tokio:: sync:: oneshot;
55+ use tracing:: { Instrument , Span } ;
5256use uuid:: Uuid ;
5357
58+ use crate :: active_compute_sink:: { ActiveComputeSink , ActiveCopyTo } ;
5459use crate :: coord:: timestamp_selection:: TimestampDetermination ;
55- use crate :: optimize:: OptimizerError ;
60+ use crate :: optimize:: { self , OptimizerError } ;
5661use crate :: statement_logging:: { StatementEndedExecutionReason , StatementExecutionStrategy } ;
5762use crate :: util:: ResultExt ;
5863use crate :: { AdapterError , ExecuteContextExtra , ExecuteResponse } ;
@@ -1238,6 +1243,117 @@ impl crate::coord::Coordinator {
12381243 . await
12391244 }
12401245
1246+ /// Implements a COPY TO command by validating S3 connection, shipping the dataflow,
1247+ /// and spawning a background task to wait for completion.
1248+ /// This is called from the command handler for ExecuteCopyTo.
1249+ ///
1250+ /// This method inlines the logic from peek_copy_to_preflight and peek_copy_to_dataflow
1251+ /// to avoid the complexity of the staging mechanism for the frontend peek sequencing path.
1252+ ///
1253+ /// This method does NOT block waiting for completion. Instead, it spawns a background task that
1254+ /// will send the response through the provided tx channel when the COPY TO completes.
1255+ /// All errors (setup or execution) are sent through tx.
1256+ pub ( crate ) async fn implement_copy_to (
1257+ & mut self ,
1258+ global_lir_plan : optimize:: copy_to:: GlobalLirPlan ,
1259+ compute_instance : ComputeInstanceId ,
1260+ target_replica : Option < ReplicaId > ,
1261+ source_ids : BTreeSet < GlobalId > ,
1262+ conn_id : ConnectionId ,
1263+ tx : oneshot:: Sender < Result < ExecuteResponse , AdapterError > > ,
1264+ ) {
1265+ // Helper to send error and return early
1266+ let send_err = |tx : oneshot:: Sender < Result < ExecuteResponse , AdapterError > > ,
1267+ e : AdapterError | {
1268+ let _ = tx. send ( Err ( e) ) ;
1269+ } ;
1270+
1271+ let sink_id = global_lir_plan. sink_id ( ) ;
1272+
1273+ // # Inlined from peek_copy_to_preflight
1274+
1275+ let connection_context = self . connection_context ( ) . clone ( ) ;
1276+ let sinks = & global_lir_plan. df_desc ( ) . sink_exports ;
1277+
1278+ if sinks. len ( ) != 1 {
1279+ send_err (
1280+ tx,
1281+ AdapterError :: Internal ( "expected exactly one copy to s3 sink" . into ( ) ) ,
1282+ ) ;
1283+ return ;
1284+ }
1285+ let ( sink_id_check, sink_desc) = sinks
1286+ . first_key_value ( )
1287+ . expect ( "known to be exactly one copy to s3 sink" ) ;
1288+ assert_eq ! ( sink_id, * sink_id_check) ;
1289+
1290+ match & sink_desc. connection {
1291+ ComputeSinkConnection :: CopyToS3Oneshot ( conn) => {
1292+ if let Err ( e) = mz_storage_types:: sinks:: s3_oneshot_sink:: preflight (
1293+ connection_context,
1294+ & conn. aws_connection ,
1295+ & conn. upload_info ,
1296+ conn. connection_id ,
1297+ sink_id,
1298+ )
1299+ . await
1300+ {
1301+ send_err ( tx, e. into ( ) ) ;
1302+ return ;
1303+ }
1304+ }
1305+ _ => {
1306+ send_err (
1307+ tx,
1308+ AdapterError :: Internal ( "expected copy to s3 oneshot sink" . into ( ) ) ,
1309+ ) ;
1310+ return ;
1311+ }
1312+ }
1313+
1314+ // # Inlined from peek_copy_to_dataflow
1315+
1316+ let ( df_desc, _df_meta) = global_lir_plan. unapply ( ) ;
1317+
1318+ // Create and register ActiveCopyTo.
1319+ // Note: sink_tx/sink_rx is the channel for the compute sink to notify completion
1320+ // This is different from the command's tx which sends the response to the client
1321+ let ( sink_tx, sink_rx) = oneshot:: channel ( ) ;
1322+ let active_copy_to = ActiveCopyTo {
1323+ conn_id : conn_id. clone ( ) ,
1324+ tx : sink_tx,
1325+ cluster_id : compute_instance,
1326+ depends_on : source_ids,
1327+ } ;
1328+
1329+ // Add metadata for the new COPY TO. CopyTo returns a `ready` future, so it is safe to drop.
1330+ drop (
1331+ self . add_active_compute_sink ( sink_id, ActiveComputeSink :: CopyTo ( active_copy_to) )
1332+ . await ,
1333+ ) ;
1334+
1335+ self . ship_dataflow ( df_desc, compute_instance, target_replica)
1336+ . await ;
1337+
1338+ // Spawn background task to wait for completion
1339+ // We must NOT await sink_rx here directly, as that would block the coordinator's main task
1340+ // from processing the completion message. Instead, we spawn a background task that will
1341+ // send the result through tx when the COPY TO completes.
1342+ let span = Span :: current ( ) ;
1343+ task:: spawn (
1344+ || "copy to completion" ,
1345+ async move {
1346+ let res = sink_rx. await ;
1347+ let result = match res {
1348+ Ok ( res) => res,
1349+ Err ( _) => Err ( AdapterError :: Internal ( "copy to sender dropped" . into ( ) ) ) ,
1350+ } ;
1351+ let _ = tx. send ( result) ;
1352+ }
1353+ . instrument ( span) ,
1354+ ) ;
1355+ }
1356+
12411357 /// Constructs an [`ExecuteResponse`] that that will send some rows to the
12421358 /// client immediately, as opposed to asking the dataflow layer to send along
12431359 /// the rows after some computation.
0 commit comments