Skip to content

Commit 78e085e

Browse files
authored
single threaded duckdb copy (#4140)
Signed-off-by: Onur Satici <[email protected]>
1 parent 66965ff commit 78e085e

File tree

1 file changed

+21
-11
lines changed

1 file changed

+21
-11
lines changed

vortex-duckdb/src/copy.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33

44
use std::fmt::Debug;
55
use std::iter;
6-
use std::sync::Arc;
6+
use std::sync::{Arc, LazyLock};
77

88
use tokio::fs::File;
9-
use tokio::runtime::Handle;
9+
use tokio::runtime::{self, Handle, Runtime};
1010
use tokio::sync::mpsc;
1111
use tokio::sync::mpsc::Sender;
1212
use tokio::task::JoinHandle;
@@ -18,7 +18,6 @@ use vortex::error::{VortexExpect, VortexResult, vortex_err};
1818
use vortex::stream::ArrayStreamAdapter;
1919
use vortex_file::{VortexLayoutStrategy, VortexWriteOptions};
2020

21-
use crate::RUNTIME;
2221
use crate::convert::{data_chunk_to_arrow, from_duckdb_table};
2322
use crate::duckdb::{CopyFunction, DataChunk, LogicalType};
2423

@@ -30,6 +29,13 @@ pub struct BindData {
3029
fields: StructFields,
3130
}
3231

32+
static COPY_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
33+
runtime::Builder::new_current_thread()
34+
.enable_all()
35+
.build()
36+
.vortex_expect("Cannot start runtime")
37+
});
38+
3339
/// Write to a file has two phases, writing data chunks and then closing the file.
3440
/// We use a spawned tokio task to actually compress arrays are write it to disk.
3541
/// Each chunk is pushed into the sink and read from the task.
@@ -69,12 +75,16 @@ impl CopyFunction for VortexCopyFunction {
6975
_init_local: &mut Self::LocalState,
7076
chunk: &mut DataChunk,
7177
) -> VortexResult<()> {
72-
init_global
73-
.sink
74-
.as_ref()
75-
.vortex_expect("sink closed early")
76-
.blocking_send(data_chunk_to_arrow(bind_data.fields.names(), chunk))
77-
.map_err(|e| vortex_err!("send error {}", e.to_string()))?;
78+
let chunk = data_chunk_to_arrow(bind_data.fields.names(), chunk);
79+
COPY_RUNTIME.block_on(async {
80+
init_global
81+
.sink
82+
.as_ref()
83+
.vortex_expect("sink closed early")
84+
.send(chunk)
85+
.await
86+
.map_err(|e| vortex_err!("send error {}", e.to_string()))
87+
})?;
7888

7989
Ok(())
8090
}
@@ -83,7 +93,7 @@ impl CopyFunction for VortexCopyFunction {
8393
_bind_data: &Self::BindData,
8494
init_global: &mut Self::GlobalState,
8595
) -> VortexResult<()> {
86-
RUNTIME.block_on(async {
96+
COPY_RUNTIME.block_on(async {
8797
if let Some(sink) = init_global.sink.take() {
8898
drop(sink)
8999
}
@@ -106,7 +116,7 @@ impl CopyFunction for VortexCopyFunction {
106116
let array_stream =
107117
ArrayStreamAdapter::new(bind_data.dtype.clone(), ReceiverStream::new(rx));
108118

109-
let writer = RUNTIME.spawn(async move {
119+
let writer = COPY_RUNTIME.spawn(async move {
110120
let file = File::create(file_path).await?;
111121
VortexWriteOptions::default()
112122
.with_strategy(VortexLayoutStrategy::with_executor(Arc::new(

0 commit comments

Comments
 (0)