Skip to content

Commit 9d9e275

Browse files
committed
Add Wave containers reports (preview)
This commit adds the ability to create a contaiens report file used by Wave when running a pipeline execution. The contains reports can be used to precisily replicate the pipeline execution with the puntcual list of containers build by Wave. Signed-off-by: Paolo Di Tommaso <[email protected]>
1 parent b75ec44 commit 9d9e275

File tree

10 files changed

+509
-3
lines changed

10 files changed

+509
-3
lines changed

docs/wave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,8 @@ The following configuration options are available:
196196
`wave.strategy`
197197
: The strategy to be used when resolving ambiguous Wave container requirements (default: `'container,dockerfile,conda,spack'`).
198198

199+
`wave.report.enabled` (preview)
200+
: Enable the reporting of the Wave containers used during the pipeline execution (default: `false`, requires version `23.06.0-edge` or later).
201+
202+
`wave.report.file` (preview)
203+
: The name of the containers report file (default: `containers-<timestamp>.config` requires version `23.06.0-edge` or later).
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.seqera.wave.plugin
2+
3+
import java.time.Instant
4+
5+
import groovy.transform.Canonical
6+
import groovy.transform.CompileStatic
7+
/**
8+
* Model a container request record
9+
*
10+
* @author Paolo Di Tommaso <[email protected]>
11+
*/
12+
@Canonical
13+
@CompileStatic
14+
class DescribeContainerResponse {
15+
16+
static class User {
17+
Long id
18+
String userName
19+
String email
20+
}
21+
22+
@Canonical
23+
static class RequestInfo {
24+
final User user
25+
final Long workspaceId
26+
final String containerImage
27+
final ContainerConfig containerConfig
28+
final String platform
29+
final String towerEndpoint
30+
final String fingerprint
31+
final Instant timestamp
32+
final String zoneId
33+
final String ipAddress
34+
}
35+
36+
@Canonical
37+
static class BuildInfo {
38+
final String containerFile
39+
final String condaFile
40+
final String buildRepository
41+
final String cacheRepository
42+
}
43+
44+
@Canonical
45+
static class ContainerInfo {
46+
String image
47+
String digest
48+
}
49+
50+
final String token
51+
final Instant expiration
52+
final RequestInfo request
53+
final BuildInfo build
54+
final ContainerInfo source
55+
final ContainerInfo wave
56+
57+
}

plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveClient.groovy

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@ import java.net.http.HttpRequest
2222
import java.net.http.HttpResponse
2323
import java.nio.file.Path
2424
import java.time.Duration
25+
import java.time.Instant
2526
import java.time.OffsetDateTime
2627
import java.util.concurrent.Callable
2728
import java.util.concurrent.TimeUnit
29+
import java.util.regex.Pattern
2830

2931
import com.google.common.cache.Cache
3032
import com.google.common.cache.CacheBuilder
3133
import com.google.common.util.concurrent.UncheckedExecutionException
3234
import com.google.gson.Gson
35+
import com.google.gson.GsonBuilder
3336
import com.google.gson.reflect.TypeToken
3437
import groovy.json.JsonOutput
3538
import groovy.transform.CompileStatic
3639
import groovy.transform.Memoized
40+
import io.seqera.wave.plugin.adapter.InstantAdapter
3741
import io.seqera.wave.plugin.config.TowerConfig
3842
import io.seqera.wave.plugin.config.WaveConfig
3943
import io.seqera.wave.plugin.exception.BadResponseException
@@ -59,6 +63,8 @@ class WaveClient {
5963

6064
private static Logger log = LoggerFactory.getLogger(WaveClient)
6165

66+
private static final Pattern CONTAINER_PATH = ~/(\S+)\/wt\/([a-z0-9]+)\/\S+/
67+
6268
private static final List<String> DEFAULT_CONDA_CHANNELS = ['conda-forge','defaults']
6369

6470
private static final String DEFAULT_SPACK_ARCH = 'x86_64'
@@ -89,6 +95,8 @@ class WaveClient {
8995

9096
private List<String> condaChannels
9197

98+
final private String waveRegistry
99+
92100
WaveClient(Session session) {
93101
this.session = session
94102
this.config = new WaveConfig(session.config.wave as Map ?: Collections.emptyMap(), SysEnv.get())
@@ -98,6 +106,7 @@ class WaveClient {
98106
this.condaChannels = session.getCondaConfig()?.getChannels() ?: DEFAULT_CONDA_CHANNELS
99107
log.debug "Wave server endpoint: ${endpoint}"
100108
this.packer = new Packer()
109+
this.waveRegistry = new URI(endpoint).getAuthority()
101110
// create cache
102111
cache = CacheBuilder<String, SubmitContainerTokenResponse>
103112
.newBuilder()
@@ -594,4 +603,61 @@ class WaveClient {
594603
return null
595604
}
596605

606+
String resolveSourceContainer(String container) {
607+
final token = getWaveToken(container)
608+
if( !token )
609+
return container
610+
final resp = fetchContainerInfo(token)
611+
final describe = jsonToDescribeContainerResponse(resp)
612+
return describe.source.digest==describe.wave.digest
613+
? digestImage(describe.source) // when the digest are equals, return the source because it's a stable name
614+
: digestImage(describe.wave) // otherwise returns the wave container name
615+
}
616+
617+
protected String digestImage(DescribeContainerResponse.ContainerInfo info) {
618+
if( !info.digest )
619+
return info.image
620+
final p = info.image.lastIndexOf(':')
621+
return p!=-1
622+
? info.image.substring(0,p) + '@' + info.digest
623+
: info.image + '@' + info.digest
624+
}
625+
626+
protected String getWaveToken(String name) {
627+
if( !name )
628+
return null
629+
final matcher = CONTAINER_PATH.matcher(name)
630+
if( !matcher.find() )
631+
return null
632+
return matcher.group(1)==waveRegistry
633+
? matcher.group(2)
634+
: null
635+
}
636+
637+
@Memoized
638+
protected String fetchContainerInfo(String token) {
639+
final uri = new URI("$endpoint/container-token/$token")
640+
log.trace "Wave request container info: $uri"
641+
final req = HttpRequest.newBuilder()
642+
.uri(uri)
643+
.headers('Content-Type','application/json')
644+
.GET()
645+
.build()
646+
647+
final resp = httpClient.send(req, HttpResponse.BodyHandlers.ofString())
648+
final code = resp.statusCode()
649+
if( code>=200 && code<400 ) {
650+
log.debug "Wave container config info: [$code] ${resp.body()}"
651+
return resp.body()
652+
}
653+
throw new BadResponseException("Unexpected response for \'$uri\': [${resp.statusCode()}] ${resp.body()}")
654+
}
655+
656+
protected DescribeContainerResponse jsonToDescribeContainerResponse(String json) {
657+
final gson = new GsonBuilder()
658+
.registerTypeAdapter(Instant.class, new InstantAdapter())
659+
.create();
660+
final type = new TypeToken<DescribeContainerResponse>(){}.getType()
661+
return gson.fromJson(json, type)
662+
}
597663
}

plugins/nf-wave/src/main/io/seqera/wave/plugin/WaveFactory.groovy

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ class WaveFactory implements TraceObserverFactory {
4343
wave.bundleProjectResources = true
4444
session.disableRemoteBinDir = true
4545
}
46-
return Collections.emptyList()
46+
47+
final observer = new WaveObserver(session)
48+
return wave?.enabled && observer.reportOpts().enabled()
49+
? List.<TraceObserver>of(observer)
50+
: List.<TraceObserver>of()
4751
}
4852
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.wave.plugin
19+
20+
import java.util.concurrent.ConcurrentHashMap
21+
22+
import groovy.transform.CompileStatic
23+
import io.seqera.wave.plugin.config.ReportOpts
24+
import nextflow.Session
25+
import nextflow.file.FileHelper
26+
import nextflow.processor.TaskHandler
27+
import nextflow.trace.TraceObserver
28+
import nextflow.trace.TraceRecord
29+
/**
30+
*
31+
* @author Paolo Di Tommaso <[email protected]>
32+
*/
33+
@CompileStatic
34+
class WaveObserver implements TraceObserver {
35+
36+
private WaveClient client
37+
38+
private ConcurrentHashMap<String,String> containers = new ConcurrentHashMap<>()
39+
40+
WaveObserver(Session session) {
41+
this.client = new WaveClient(session)
42+
}
43+
44+
protected void apply(TaskHandler handler) {
45+
final process = handler.task.getProcessor().getName()
46+
containers.computeIfAbsent(process, (String it) -> {
47+
final container = handler.task.getContainer()
48+
return client.resolveSourceContainer(container)
49+
})
50+
}
51+
52+
void onProcessComplete(TaskHandler handler, TraceRecord trace){
53+
apply(handler)
54+
}
55+
56+
void onProcessCached(TaskHandler handler, TraceRecord trace){
57+
apply(handler)
58+
}
59+
60+
@Override
61+
void onFlowComplete() {
62+
final result = renderContainersConfig(containers)
63+
// save the report file
64+
FileHelper
65+
.asPath(reportOpts().file())
66+
.text = result.toString()
67+
}
68+
69+
protected String renderContainersConfig(Map<String,String> containers) {
70+
final result = new StringBuilder()
71+
for( Map.Entry<String,String> entry : containers ) {
72+
result.append("process { withName: '${entry.key}' { container='$entry.value' }}\n")
73+
}
74+
return result.toString()
75+
}
76+
77+
ReportOpts reportOpts() {
78+
client.config().reportOpts()
79+
}
80+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.wave.plugin.adapter
19+
20+
import java.lang.reflect.Type
21+
import java.time.Instant
22+
23+
import com.google.gson.JsonDeserializationContext
24+
import com.google.gson.JsonDeserializer
25+
import com.google.gson.JsonElement
26+
import com.google.gson.JsonParseException
27+
import com.google.gson.JsonPrimitive
28+
import com.google.gson.JsonSerializationContext
29+
import com.google.gson.JsonSerializer
30+
31+
/**
32+
* Gson adapter for java Instant class
33+
*
34+
* @author Paolo Di Tommaso <[email protected]>
35+
*/
36+
class InstantAdapter implements JsonSerializer<Instant>, JsonDeserializer<Instant> {
37+
38+
@Override
39+
JsonElement serialize(Instant instant, Type type, JsonSerializationContext context) {
40+
return new JsonPrimitive(instant.toString());
41+
}
42+
43+
@Override
44+
Instant deserialize(JsonElement json, Type type, JsonDeserializationContext context) throws JsonParseException {
45+
String instantString = json.getAsString();
46+
return Instant.parse(instantString);
47+
}
48+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2020-2022, Seqera Labs
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package io.seqera.wave.plugin.config
19+
20+
import groovy.transform.CompileStatic
21+
import nextflow.trace.TraceHelper
22+
23+
/**
24+
*
25+
* @author Paolo Di Tommaso <[email protected]>
26+
*/
27+
@CompileStatic
28+
class ReportOpts {
29+
30+
final private Boolean enabled
31+
32+
final private String file
33+
34+
ReportOpts(Map opts) {
35+
this.enabled = opts.enabled as Boolean
36+
this.file = opts.file
37+
}
38+
39+
boolean enabled() {
40+
enabled || file != null
41+
}
42+
43+
String file() {
44+
if( file )
45+
return file
46+
return enabled
47+
? "containers-${TraceHelper.launchTimestampFmt()}.config"
48+
: null
49+
}
50+
}

plugins/nf-wave/src/main/io/seqera/wave/plugin/config/WaveConfig.groovy

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class WaveConfig {
4040
final private Boolean bundleProjectResources
4141
final private String buildRepository
4242
final private String cacheRepository
43+
final private ReportOpts reportOpts
4344

4445
WaveConfig(Map opts, Map<String,String> env=System.getenv()) {
4546
this.enabled = opts.enabled
@@ -52,6 +53,7 @@ class WaveConfig {
5253
this.cacheRepository = opts.navigate('build.cacheRepository') as String
5354
this.strategy = parseStrategy(opts.strategy)
5455
this.bundleProjectResources = opts.bundleProjectResources
56+
this.reportOpts = new ReportOpts(opts.report as Map ?: Map.of())
5557
if( !endpoint.startsWith('http://') && !endpoint.startsWith('https://') )
5658
throw new IllegalArgumentException("Endpoint URL should start with 'http:' or 'https:' protocol prefix - offending value: $endpoint")
5759
}
@@ -121,4 +123,6 @@ class WaveConfig {
121123
Duration tokensCacheMaxDuration() {
122124
return tokensCacheMaxDuration
123125
}
126+
127+
ReportOpts reportOpts() { reportOpts }
124128
}

0 commit comments

Comments
 (0)