Skip to content

Commit d90c523

Browse files
authored
Use piped stream to upload stream (#40)
1 parent 2894842 commit d90c523

File tree

2 files changed

+76
-34
lines changed

2 files changed

+76
-34
lines changed

plugins/nf-snowflake/src/main/nextflow/snowflake/nio/SnowflakeFileSystemProvider.groovy

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package nextflow.snowflake.nio
33
import groovy.transform.CompileStatic
44
import groovy.util.logging.Slf4j
55
import nextflow.snowflake.SnowflakeConnectionPool
6+
import nextflow.util.ThreadPoolManager
67

78
import java.nio.channels.SeekableByteChannel
89
import java.nio.file.AccessMode
@@ -37,14 +38,30 @@ import java.util.concurrent.ConcurrentHashMap
3738
class SnowflakeFileSystemProvider extends FileSystemProvider {
3839

3940
private static final String SCHEME = 'snowflake'
40-
41+
4142
private final Map<String, SnowflakeFileSystem> fileSystems = new ConcurrentHashMap<>()
4243
private final SnowflakeStageClient client
44+
private java.util.concurrent.ExecutorService uploadExecutor
4345

4446
SnowflakeFileSystemProvider() {
4547
this.client = new SnowflakeStageClient(SnowflakeConnectionPool.getInstance())
4648
}
4749

50+
/**
51+
* Lazily initialize the upload executor
52+
* Called on first stream creation when session is guaranteed to exist
53+
*/
54+
private synchronized void ensureExecutorInitialized() {
55+
if (uploadExecutor != null) {
56+
return
57+
}
58+
59+
// Use Nextflow's ThreadPoolManager for proper lifecycle management
60+
// ThreadPoolManager automatically registers shutdown callbacks with the session
61+
uploadExecutor = ThreadPoolManager.create('snowflake-upload')
62+
log.debug("Created Snowflake upload thread pool using ThreadPoolManager")
63+
}
64+
4865
@Override
4966
String getScheme() {
5067
return SCHEME
@@ -120,7 +137,10 @@ class SnowflakeFileSystemProvider extends FileSystemProvider {
120137
}
121138
}
122139

123-
return new SnowflakeStageOutputStream(client, snowflakePath)
140+
// Lazily initialize executor when first stream is created
141+
ensureExecutorInitialized()
142+
143+
return new SnowflakeStageOutputStream(client, snowflakePath, uploadExecutor)
124144
}
125145

126146
@Override

plugins/nf-snowflake/src/main/nextflow/snowflake/nio/SnowflakeStageOutputStream.groovy

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@ package nextflow.snowflake.nio
33
import groovy.transform.CompileStatic
44
import groovy.util.logging.Slf4j
55

6+
import java.util.concurrent.Future
7+
import java.util.concurrent.ExecutionException
8+
import java.util.concurrent.ExecutorService
9+
610
/**
711
* OutputStream implementation for writing to Snowflake stages
8-
*
9-
* Buffers data to a temporary file, then uploads to stage on close()
12+
*
13+
* Uses piped streams to enable streaming uploads without buffering entire content in memory.
14+
* Data written to this stream is immediately available to the background upload thread.
1015
*
1116
* @author Hongye Yu
1217
*/
@@ -16,71 +21,88 @@ class SnowflakeStageOutputStream extends OutputStream {
1621

1722
private final SnowflakeStageClient client
1823
private final SnowflakePath path
19-
private OutputStream delegate
20-
private File tempFile
24+
private final PipedOutputStream pipedOutput
25+
private final PipedInputStream pipedInput
26+
private final Future<Long> uploadFuture
2127
private boolean closed = false
2228

23-
SnowflakeStageOutputStream(SnowflakeStageClient client, SnowflakePath path) {
29+
SnowflakeStageOutputStream(SnowflakeStageClient client, SnowflakePath path, ExecutorService executor) {
2430
this.client = client
2531
this.path = path
26-
initialize()
27-
}
2832

29-
private void initialize() throws IOException {
30-
// Create temp file for buffering
31-
tempFile = File.createTempFile("snowflake-upload-", ".tmp")
32-
delegate = new FileOutputStream(tempFile)
33-
log.debug("Initialized OutputStream for ${path}")
33+
// Create piped streams with 64KB buffer (default is 1KB which is too small)
34+
this.pipedInput = new PipedInputStream(64 * 1024)
35+
this.pipedOutput = new PipedOutputStream(pipedInput)
36+
37+
// Start async upload immediately
38+
this.uploadFuture = executor.submit(new java.util.concurrent.Callable<Long>() {
39+
@Override
40+
Long call() throws Exception {
41+
try {
42+
log.debug("Background upload started for ${path}")
43+
client.upload(path, pipedInput, -1L) // -1 means size unknown
44+
long bytesRead = 0
45+
// Note: we can't track exact bytes read without wrapping the stream
46+
log.debug("Background upload completed for ${path}")
47+
return bytesRead
48+
} catch (Exception e) {
49+
log.error("Background upload failed for ${path}", e)
50+
throw e
51+
} finally {
52+
try {
53+
pipedInput.close()
54+
} catch (IOException ignored) {}
55+
}
56+
}
57+
})
58+
59+
log.debug("Initialized streaming OutputStream for ${path}")
3460
}
3561

3662
@Override
3763
void write(int b) throws IOException {
3864
checkClosed()
39-
delegate.write(b)
65+
pipedOutput.write(b)
4066
}
4167

4268
@Override
4369
void write(byte[] b) throws IOException {
4470
checkClosed()
45-
delegate.write(b)
71+
pipedOutput.write(b)
4672
}
4773

4874
@Override
4975
void write(byte[] b, int off, int len) throws IOException {
5076
checkClosed()
51-
delegate.write(b, off, len)
77+
pipedOutput.write(b, off, len)
5278
}
5379

5480
@Override
5581
void flush() throws IOException {
5682
checkClosed()
57-
delegate.flush()
83+
pipedOutput.flush()
5884
}
5985

6086
@Override
6187
void close() throws IOException {
6288
if (closed) {
6389
return
6490
}
65-
91+
6692
closed = true
67-
93+
94+
// Close the output pipe to signal EOF to the upload thread
95+
pipedOutput.close()
96+
97+
// Wait for upload to complete
6898
try {
69-
// Flush and close the delegate stream
70-
delegate.flush()
71-
delegate.close()
72-
73-
// Upload the temp file to the stage
74-
tempFile.withInputStream { input ->
75-
client.upload(path, input, tempFile.length())
76-
}
77-
99+
Long bytesUploaded = uploadFuture.get()
78100
log.debug("Successfully uploaded to ${path}")
79-
} finally {
80-
// Clean up temp file
81-
if (tempFile != null) {
82-
tempFile.delete()
83-
}
101+
} catch (InterruptedException e) {
102+
Thread.currentThread().interrupt()
103+
throw new IOException("Upload interrupted for ${path}", e)
104+
} catch (ExecutionException e) {
105+
throw new IOException("Upload failed for ${path}: ${e.cause?.message}", e.cause)
84106
}
85107
}
86108

0 commit comments

Comments
 (0)