Skip to content

Commit 2894842

Browse files
authored
snowflake filesystem (#39)
1 parent 3eb2744 commit 2894842

21 files changed

+2629
-27
lines changed

plugins/nf-snowflake/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,3 @@ dependencies {
8787
test {
8888
useJUnitPlatform()
8989
}
90-

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeCacheFactory.groovy

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import nextflow.exception.AbortOperationException
1010
import nextflow.plugin.Priority
1111
import nextflow.cache.CacheDB
1212
import nextflow.cache.CacheFactory
13+
import nextflow.cache.DefaultCacheStore
1314

1415
/**
1516
* Implements the cloud cache factory
@@ -27,11 +28,17 @@ class SnowflakeCacheFactory extends CacheFactory {
2728
if( !uniqueId ) throw new AbortOperationException("Missing cache `uuid`")
2829
if( !runName ) throw new AbortOperationException("Missing cache `runName`")
2930

30-
// Defer validation of SNOWFLAKE_CACHE_PATH until the cache is actually used
31+
// Use SNOWFLAKE_CACHE_PATH if set, otherwise fall back to DefaultCacheStore
3132
final String cachePathEnv = System.getenv("SNOWFLAKE_CACHE_PATH")
32-
final Path basePath = cachePathEnv ? Paths.get(cachePathEnv) : null
3333

34-
final store = new SnowflakeCacheStore(uniqueId, runName, basePath)
35-
return new CacheDB(store)
34+
if( cachePathEnv ) {
35+
final Path basePath = Paths.get(cachePathEnv)
36+
final store = new SnowflakeCacheStore(uniqueId, runName, basePath)
37+
return new CacheDB(store)
38+
}
39+
else {
40+
final store = new DefaultCacheStore(uniqueId, runName, home)
41+
return new CacheDB(store)
42+
}
3643
}
3744
}

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeExecutor.groovy

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,24 +85,31 @@ class SnowflakeExecutor extends Executor implements ExtensionPoint {
8585
}
8686

8787
/**
88-
* Copy local bin directory to remote mounted workdir
88+
* Copy local bin directory to Snowflake stage
8989
*/
9090
protected void uploadBinDir() {
9191
if( session.binDir && !session.binDir.empty() && !session.disableRemoteBinDir ) {
9292
// Use session run name for directory isolation
9393
final String runId = session.runName
94-
final Path targetDir = Paths.get("/mnt/workdir", runId, "bin")
95-
96-
// Create target directory
94+
final Path workDirPath = session.workDir
95+
96+
log.debug("Uploading bin directory to Snowflake stage: ${workDirPath.toUriString()}")
97+
98+
// Upload to snowflake://stage/STAGE_NAME/<runId>/bin
99+
final Path targetDir = workDirPath.resolve(runId).resolve("bin")
100+
101+
// Create target directory (implicit in Snowflake)
97102
Files.createDirectories(targetDir)
98-
99-
// Copy all files from bin directory using standard file I/O
103+
104+
// Copy all files from bin directory to stage
100105
Files.list(session.binDir).forEach { Path source ->
101106
final Path target = targetDir.resolve(source.getFileName())
102107
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING)
108+
log.debug("Uploaded bin file: ${source.getFileName()} -> ${target.toUriString()}")
103109
}
104-
110+
105111
remoteBinDir = targetDir
112+
log.debug("Bin directory uploaded to: ${targetDir.toUriString()}")
106113
}
107114
}
108115

@@ -113,9 +120,40 @@ class SnowflakeExecutor extends Executor implements ExtensionPoint {
113120
protected void register() {
114121
super.register()
115122
snowflakeConfig = session.config.navigate("snowflake") as Map
123+
124+
// Validate that workDir uses snowflake:// scheme
125+
validateWorkDir()
126+
116127
uploadBinDir()
117128
}
118129

130+
/**
131+
* Validate that the work directory uses the snowflake:// scheme
132+
* @throws IllegalArgumentException if workDir doesn't use snowflake:// scheme
133+
*/
134+
private void validateWorkDir() {
135+
final Path workDirPath = session.workDir
136+
if (!workDirPath) {
137+
throw new IllegalArgumentException(
138+
"Work directory is not configured. " +
139+
"For Snowflake executor, you must specify a work directory using the snowflake:// scheme, " +
140+
"e.g., workDir = 'snowflake://stage/MY_STAGE/work'"
141+
)
142+
}
143+
144+
final String workDirStr = workDirPath.toUriString()
145+
if (!SnowflakeUri.isSnowflakeStageUri(workDirStr)) {
146+
throw new IllegalArgumentException(
147+
"Invalid work directory for Snowflake executor: ${workDirStr}\n" +
148+
"The work directory must use the snowflake:// scheme pointing to a Snowflake internal stage.\n" +
149+
"Example: workDir = 'snowflake://stage/MY_STAGE/work'\n" +
150+
"Current workDir: ${workDirStr}"
151+
)
152+
}
153+
154+
log.debug("Using Snowflake stage as work directory: ${workDirStr}")
155+
}
156+
119157
@Override
120158
boolean isContainerNative() {
121159
return true

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeFileCopyStrategy.groovy

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
11
package nextflow.snowflake
22

33
import groovy.transform.CompileStatic
4+
import groovy.util.logging.Slf4j
45
import nextflow.processor.TaskBean
56
import nextflow.executor.SimpleFileCopyStrategy
67
import java.nio.file.Path
78

89
/**
910
* File copy strategy for Snowflake executor with remote bin directory support
1011
*
12+
* Note: Path translation is handled by SnowflakeWrapperBuilder.create() which
13+
* pre-translates all paths in the TaskBean before this strategy is created.
14+
* This class only needs to handle bin directory setup.
15+
*
1116
* @author Hongye Yu
1217
*/
18+
@Slf4j
1319
@CompileStatic
1420
class SnowflakeFileCopyStrategy extends SimpleFileCopyStrategy {
1521

@@ -22,21 +28,32 @@ class SnowflakeFileCopyStrategy extends SimpleFileCopyStrategy {
2228

2329
/**
2430
* Override to prepend remote bin directory setup script
31+
*
32+
* The remoteBinDir is a snowflake:// path, so we need to translate it
33+
* to the container mount path for the script.
2534
*/
2635
@Override
2736
String getEnvScript(Map environment, boolean container) {
28-
if( remoteBinDir == null )
37+
if( remoteBinDir == null ) {
38+
log.debug("No remote bin directory configured")
2939
return super.getEnvScript(environment, container)
40+
}
41+
42+
// Translate bin directory path from snowflake:// to /mnt/stage/...
43+
String translatedBinDir = SnowflakeUri.translateToMount(remoteBinDir).toUriString()
44+
log.debug("Remote bin directory: ${remoteBinDir} -> ${translatedBinDir}")
3045

3146
final script = """\
3247
# Copy and setup remote bin directory
3348
NXF_BIN=\$(mktemp -d)
34-
cp -r ${remoteBinDir}/* \$NXF_BIN/
35-
chmod +x \$NXF_BIN/*
49+
cp -r ${translatedBinDir}/* \$NXF_BIN/ 2>&1 || echo "[DEBUG] Failed to copy bin files" >&2
50+
chmod +x \$NXF_BIN/* 2>&1
3651
export PATH=\$NXF_BIN:\$PATH
3752
""".stripIndent()
3853

39-
return script + super.getEnvScript(environment, container)
54+
// Get parent env script and handle null return value
55+
def parentScript = super.getEnvScript(environment, container)
56+
return script + (parentScript ?: '')
4057
}
4158
}
4259

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2021, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package nextflow.snowflake
18+
19+
import java.nio.file.Path
20+
import java.nio.file.spi.FileSystemProvider
21+
22+
import groovy.transform.CompileStatic
23+
import groovy.util.logging.Slf4j
24+
import nextflow.file.FileSystemPathFactory
25+
import nextflow.snowflake.nio.SnowflakePath
26+
import nextflow.snowflake.nio.SnowflakeFileSystemProvider
27+
28+
/**
29+
* Implements FileSystemPathFactory for Snowflake stages
30+
*
31+
* @author Hongye Yu
32+
*/
33+
@Slf4j
34+
@CompileStatic
35+
class SnowflakePathFactory extends FileSystemPathFactory {
36+
37+
@Override
38+
protected Path parseUri(String uri) {
39+
// Use centralized URI checking
40+
if (!SnowflakeUri.isSnowflakeStageUri(uri)) {
41+
return null
42+
}
43+
44+
try {
45+
URI parsedUri = new URI(uri)
46+
47+
// Find the Snowflake FileSystemProvider
48+
SnowflakeFileSystemProvider provider = FileSystemProvider.installedProviders()
49+
.find { it instanceof SnowflakeFileSystemProvider } as SnowflakeFileSystemProvider
50+
51+
if (!provider) {
52+
log.warn("Snowflake FileSystemProvider not found")
53+
return null
54+
}
55+
56+
return provider.getPath(parsedUri)
57+
}
58+
catch (Exception e) {
59+
log.warn("Failed to parse Snowflake URI: ${uri}", e)
60+
return null
61+
}
62+
}
63+
64+
@Override
65+
protected String toUriString(Path path) {
66+
if (!(path instanceof SnowflakePath)) {
67+
return null
68+
}
69+
70+
SnowflakePath snowPath = (SnowflakePath) path
71+
// Handle relative paths - return null to let default handler manage them
72+
if (!snowPath.isAbsolute()) {
73+
return null
74+
}
75+
return snowPath.toUri().toString()
76+
}
77+
78+
@Override
79+
protected String getBashLib(Path path) {
80+
// No special bash library needed for Snowflake
81+
return null
82+
}
83+
84+
@Override
85+
protected String getUploadCmd(String source, Path target) {
86+
if (!(target instanceof SnowflakePath)) {
87+
return null
88+
}
89+
90+
SnowflakePath snowPath = (SnowflakePath) target
91+
String stageRef = snowPath.toStageReference()
92+
return "PUT file://${source} @${stageRef}"
93+
}
94+
}

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakePlugin.groovy

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717
package nextflow.snowflake
1818

1919
import groovy.transform.CompileStatic
20+
import groovy.util.logging.Slf4j
21+
import nextflow.file.FileHelper
2022
import nextflow.plugin.BasePlugin
23+
import nextflow.snowflake.nio.SnowflakeFileSystemProvider
2124
import org.pf4j.PluginWrapper
2225

2326
/**
24-
* Implements the Hello plugins entry point
27+
* Nextflow plugin for Snowflake extensions
2528
*
2629
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
30+
* @author Hongye Yu
2731
*/
32+
@Slf4j
2833
@CompileStatic
2934
class SnowflakePlugin extends BasePlugin {
3035

@@ -39,5 +44,8 @@ class SnowflakePlugin extends BasePlugin {
3944
// Configure Snowflake JDBC to use SLF4J instead of java.util.logging
4045
// This redirects Snowflake JDBC logs to Nextflow's logging system
4146
System.setProperty('net.snowflake.jdbc.loggerImpl', 'net.snowflake.client.log.SLF4JLogger')
47+
48+
// Register the Snowflake FileSystemProvider
49+
FileHelper.getOrInstallProvider(SnowflakeFileSystemProvider)
4250
}
4351
}

plugins/nf-snowflake/src/main/nextflow/snowflake/SnowflakeTaskHandler.groovy

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,20 @@ class SnowflakeTaskHandler extends TaskHandler {
156156
this.connection = this.connectionPool.getConnection()
157157
this.statement = connection.createStatement()
158158

159+
// Create TaskBean and set defaults
159160
final TaskBean taskBean = new TaskBean(task)
160161
if (taskBean.scratch == null) {
161162
taskBean.scratch = scratchDir
162163
}
163-
final SnowflakeFileCopyStrategy fileCopyStrategy = new SnowflakeFileCopyStrategy(taskBean, executor)
164-
final BashWrapperBuilder builder = new BashWrapperBuilder(taskBean, fileCopyStrategy)
164+
165+
// Always enable stats collection to ensure .command.trace is generated
166+
// Nextflow core always attempts to read this file during task finalization
167+
taskBean.statsEnabled = true
168+
169+
// Use factory method to create wrapper builder (follows Fusion pattern)
170+
// This will translate snowflake:// paths to container mount paths
171+
// and create the SnowflakeFileCopyStrategy with bin directory support
172+
final SnowflakeWrapperBuilder builder = SnowflakeWrapperBuilder.create(taskBean, executor)
165173
builder.build()
166174

167175
final String spec = buildJobServiceSpec()
@@ -228,10 +236,25 @@ from specification
228236

229237
final String workDir = executor.getWorkDir().toUriString()
230238

231-
final String workDirStageEnv = System.getenv("workDirStage")
232-
final String workDirStage = workDirStageEnv != null ? workDirStageEnv :
233-
executor.snowflakeConfig.get("workDirStage")
234-
result.addWorkDirMount(workDir, String.format("%s/", workDirStage))
239+
// Check if workDir uses snowflake:// scheme
240+
if (SnowflakeUri.isSnowflakeStageUri(workDir)) {
241+
// Extract stage name using centralized parsing
242+
String stageName = SnowflakeUri.extractStageName(workDir)
243+
244+
// Mount the stage to /mnt/stage/<lowercase_stage_name>
245+
String mountPath = "/mnt/stage/${stageName.toLowerCase()}"
246+
result.addWorkDirMount(mountPath, stageName)
247+
248+
log.debug("Mounting Snowflake stage @${stageName} to ${mountPath}")
249+
} else {
250+
// Legacy behavior: use workDirStage config
251+
final String workDirStageEnv = System.getenv("workDirStage")
252+
final String workDirStage = workDirStageEnv != null ? workDirStageEnv :
253+
executor.snowflakeConfig.get("workDirStage")
254+
if (workDirStage) {
255+
result.addWorkDirMount(workDir, String.format("%s/", workDirStage))
256+
}
257+
}
235258

236259
result.addLocalVolume(scratchDir)
237260

@@ -312,7 +335,11 @@ from specification
312335

313336
private static List<String> classicSubmitCli(TaskRun task) {
314337
final result = new ArrayList(BashWrapperBuilder.BASH)
315-
result.add("${Escape.path(task.workDir)}/${TaskRun.CMD_RUN}".toString())
338+
339+
// Translate workDir if it's a snowflake:// path
340+
String workDirPath = SnowflakeUri.translateToMount(task.workDir).toUriString()
341+
result.add("${Escape.path(workDirPath)}/${TaskRun.CMD_RUN}".toString())
342+
316343
return result
317344
}
318345
}

0 commit comments

Comments
 (0)