Skip to content

Commit d4acdd1

Browse files
committed
fixup duckdb/python/java
Signed-off-by: Andrew Duffy <[email protected]>
1 parent 2117c5e commit d4acdd1

File tree

3 files changed

+13
-9
lines changed

3 files changed

+13
-9
lines changed

vortex-duckdb/src/copy.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,17 +126,21 @@ impl CopyFunction for VortexCopyFunction {
126126
let array_stream = ArrayStreamAdapter::new(bind_data.dtype.clone(), rx.into_stream());
127127

128128
let handle = SESSION.handle();
129-
let writer = handle.spawn(async move {
129+
let write_task = handle.spawn(async move {
130130
let mut file = async_fs::File::create(file_path).await?;
131-
SESSION.write_options().write(&mut file, array_stream).await
131+
SESSION
132+
.write_options()
133+
.write(&mut file, array_stream)
134+
.await
135+
.map(|(summary, _)| summary)
132136
});
133137

134138
let worker_pool = RUNTIME.new_pool();
135139
worker_pool.set_workers_to_available_parallelism();
136140

137141
Ok(GlobalState {
138142
worker_pool,
139-
write_task: Mutex::new(Some(writer)),
143+
write_task: Mutex::new(Some(write_task)),
140144
sink: Some(sink),
141145
})
142146
}

vortex-jni/src/writer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ pub extern "system" fn Java_dev_vortex_jni_NativeWriterMethods_create(
176176

177177
let (store, _scheme) = make_object_store(&url, &properties)?;
178178
let write_handle = SESSION.handle().spawn(async move {
179-
let mut write = ObjectStoreWriter::new(store, &path).await?;
180-
let summary = SESSION.write_options().write(&mut write, w).await?;
179+
let write = ObjectStoreWriter::new(store, &path).await?;
180+
let (summary, mut write) = SESSION.write_options().write(write, w).await?;
181181
write.shutdown().await?;
182182
Ok(summary)
183183
});

vortex-python/src/io.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ pub fn read_url<'py>(
171171
pub fn write(py: Python, iter: PyIntoArrayIterator, path: &str) -> PyResult<()> {
172172
py.detach(|| {
173173
TOKIO_RUNTIME.block_on(async move {
174-
let mut file = File::create(path).await?;
174+
let file = File::create(path).await?;
175175
SESSION
176176
.write_options()
177-
.write(&mut file, iter.into_inner().into_array_stream())
177+
.write(file, iter.into_inner().into_array_stream())
178178
.await
179179
})
180180
})?;
@@ -285,7 +285,7 @@ impl PyVortexWriteOptions {
285285
pub fn write_path(&self, py: Python, iter: PyIntoArrayIterator, path: &str) -> PyResult<()> {
286286
py.detach(|| {
287287
TOKIO_RUNTIME.block_on(async move {
288-
let mut file = File::create(path).await?;
288+
let file = File::create(path).await?;
289289

290290
let mut strategy = WriteStrategyBuilder::new();
291291
if let Some(compressor) = self.compressor.as_ref() {
@@ -295,7 +295,7 @@ impl PyVortexWriteOptions {
295295
SESSION
296296
.write_options()
297297
.with_strategy(strategy.build())
298-
.write(&mut file, iter.into_inner().into_array_stream())
298+
.write(file, iter.into_inner().into_array_stream())
299299
.await
300300
})
301301
})?;

0 commit comments

Comments
 (0)