Skip to content

Commit 93d640a

Browse files
author
Mirko Brombin
authored
Merge pull request #17 from Vanilla-OS/feat/async
Feat/async
2 parents 012bfac + 23f2096 commit 93d640a

File tree

4 files changed

+124
-44
lines changed

4 files changed

+124
-44
lines changed

client.go

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"fmt"
1818
"os"
1919
"path/filepath"
20+
"time"
2021

2122
"github.com/containers/buildah/define"
2223
"github.com/containers/buildah/imagebuildah"
@@ -65,61 +66,106 @@ func NewPrometheus(root, graphDriverName string, maxParallelDownloads uint) (*Pr
6566
}, nil
6667
}
6768

68-
/* PullImage pulls an image from a remote registry and stores it in the
69-
* Prometheus store. It returns the manifest of the pulled image and an
70-
* error if any. Note that the 'docker://' prefix is automatically added
71-
* to the imageName to make it compatible with the alltransports.ParseImageName
72-
* method. */
73-
func (p *Prometheus) PullImage(imageName string, dstName string) (*OciManifest, error) {
74-
srcRef, err := alltransports.ParseImageName(fmt.Sprintf("docker://%s", imageName))
69+
// PullImage pulls an image from a remote registry and stores it in the
70+
// Prometheus store. It returns the manifest of the pulled image and an
71+
// error if any. Note that the 'docker://' prefix is automatically added
72+
// to the imageName to make it compatible with the alltransports.ParseImageName
73+
// method.
74+
func (p *Prometheus) PullImage(imageName, dstName string) (*OciManifest, error) {
75+
progressCh := make(chan types.ProgressProperties)
76+
manifestCh := make(chan OciManifest)
77+
78+
defer close(progressCh)
79+
defer close(manifestCh)
80+
81+
err := p.pullImage(imageName, dstName, progressCh, manifestCh)
7582
if err != nil {
7683
return nil, err
7784
}
85+
for {
86+
select {
87+
case report := <-progressCh:
88+
fmt.Printf("%s: %v/%v\n", report.Artifact.Digest.Encoded()[:12], report.Offset, report.Artifact.Size)
89+
case manifest := <-manifestCh:
90+
return &manifest, nil
91+
}
92+
}
93+
}
94+
95+
// PullImageAsync does the same thing as PullImage, but returns right
96+
// after starting the pull process. The user can track progress in the
97+
// background by reading from the `progressCh` channel, which contains
98+
// information about the current blob and its progress. When the pull
99+
// process is done, the image's manifest will be sent via the `manifestCh`
100+
// channel, which indicates the process is done.
101+
//
102+
// NOTE: The user is responsible for closing both channels once the operation
103+
// completes.
104+
func (p *Prometheus) PullImageAsync(imageName, dstName string, progressCh chan types.ProgressProperties, manifestCh chan OciManifest) error {
105+
err := p.pullImage(imageName, dstName, progressCh, manifestCh)
106+
return err
107+
}
108+
109+
func (p *Prometheus) pullImage(imageName, dstName string, progressCh chan types.ProgressProperties, manifestCh chan OciManifest) error {
110+
srcRef, err := alltransports.ParseImageName(fmt.Sprintf("docker://%s", imageName))
111+
if err != nil {
112+
return err
113+
}
78114

79115
destRef, err := storage.Transport.ParseStoreReference(p.Store, dstName)
80116
if err != nil {
81-
return nil, err
117+
return err
82118
}
83119

84120
systemCtx := &types.SystemContext{}
85121
policy, err := signature.DefaultPolicy(systemCtx)
86122
if err != nil {
87-
return nil, err
123+
return err
88124
}
89125

90126
policyCtx, err := signature.NewPolicyContext(policy)
91127
if err != nil {
92-
return nil, err
128+
return err
93129
}
94130

95-
pulledManifestBytes, err := copy.Image(
96-
context.Background(),
97-
policyCtx,
98-
destRef,
99-
srcRef,
100-
&copy.Options{
101-
ReportWriter: os.Stdout,
102-
MaxParallelDownloads: p.Config.MaxParallelDownloads,
103-
},
104-
)
131+
duration, err := time.ParseDuration("100ms")
105132
if err != nil {
106-
return nil, err
107-
}
133+
return err
134+
}
135+
136+
go func() {
137+
pulledManifestBytes, err := copy.Image(
138+
context.Background(),
139+
policyCtx,
140+
destRef,
141+
srcRef,
142+
&copy.Options{
143+
MaxParallelDownloads: p.Config.MaxParallelDownloads,
144+
ProgressInterval: duration,
145+
Progress: progressCh,
146+
},
147+
)
148+
if err != nil {
149+
return
150+
}
108151

109-
var manifest OciManifest
110-
err = json.Unmarshal(pulledManifestBytes, &manifest)
111-
if err != nil {
112-
return nil, err
113-
}
152+
var manifest OciManifest
153+
err = json.Unmarshal(pulledManifestBytes, &manifest)
154+
if err != nil {
155+
return
156+
}
114157

115-
// here we remove the 'sha256:' prefix from the digest, so we don't have
116-
// to deal with it later
117-
manifest.Config.Digest = manifest.Config.Digest[7:]
118-
for i := range manifest.Layers {
119-
manifest.Layers[i].Digest = manifest.Layers[i].Digest[7:]
120-
}
158+
// here we remove the 'sha256:' prefix from the digest, so we don't have
159+
// to deal with it later
160+
manifest.Config.Digest = manifest.Config.Digest[7:]
161+
for i := range manifest.Layers {
162+
manifest.Layers[i].Digest = manifest.Layers[i].Digest[7:]
163+
}
164+
165+
manifestCh <- manifest
166+
}()
121167

122-
return &manifest, nil
168+
return nil
123169
}
124170

125171
/* GetImageByDigest returns an image from the Prometheus store by its digest. */
@@ -185,8 +231,8 @@ func (p *Prometheus) BuildContainerFile(dockerfilePath string, imageName string)
185231
context.Background(),
186232
p.Store,
187233
define.BuildOptions{
188-
Output: imageName,
189-
},
234+
Output: imageName,
235+
},
190236
dockerfilePath,
191237
)
192238
if err != nil {

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package prometheus
22

3-
var version = "0.1.6"
3+
var version = "0.2.0"
44

55
func main() {}

tests/build_containerfile_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package tests
22

33
import (
4-
"io/ioutil"
4+
"os"
55
"testing"
66

77
"github.com/containers/storage/pkg/reexec"
@@ -22,8 +22,8 @@ func TestBuildContainerfile(t *testing.T) {
2222
t.Fatal("prometheus instance is nil")
2323
}
2424

25-
containerfile := []byte("FROM alpine:latest")
26-
err = ioutil.WriteFile("Containerfile", containerfile, 0644)
25+
containerfile := []byte("FROM docker.io/library/alpine:latest")
26+
err = os.WriteFile("Containerfile", containerfile, 0644)
2727
if err != nil {
2828
t.Fatalf("error creating Containerfile: %v", err)
2929
}

tests/pull_image_test.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,37 @@
11
package tests
22

33
import (
4+
"fmt"
5+
"os"
46
"testing"
57

8+
"github.com/containers/image/v5/types"
69
"github.com/containers/storage/pkg/reexec"
710
"github.com/vanilla-os/prometheus"
811
)
912

10-
func TestPullImage(t *testing.T) {
13+
var pmt *prometheus.Prometheus
14+
15+
func TestMain(m *testing.M) {
1116
if reexec.Init() {
1217
return
1318
}
1419

15-
pmt, err := prometheus.NewPrometheus("storage", "vfs", 5)
20+
var err error
21+
pmt, err = prometheus.NewPrometheus("storage", "vfs", 5)
1622
if err != nil {
17-
t.Fatalf("error creating Prometheus instance: %v", err)
23+
panic("error creating Prometheus instance: " + err.Error())
1824
}
1925

2026
if pmt == nil {
21-
t.Fatal("prometheus instance is nil")
27+
panic("prometheus instance is nil")
2228
}
2329

30+
status := m.Run()
31+
os.Exit(status)
32+
}
33+
34+
func TestPullImage(t *testing.T) {
2435
image, err := pmt.PullImage("docker.io/library/alpine:latest", "my-alpine")
2536
if err != nil {
2637
t.Fatalf("error pulling image: %v", err)
@@ -60,3 +71,26 @@ func TestPullImage(t *testing.T) {
6071
}
6172
}
6273
}
74+
75+
func TestPullImageAsync(t *testing.T) {
76+
progressCh := make(chan types.ProgressProperties)
77+
manifestCh := make(chan prometheus.OciManifest)
78+
79+
defer close(progressCh)
80+
defer close(manifestCh)
81+
82+
err := pmt.PullImageAsync("docker.io/library/alpine:latest", "my-alpine", progressCh, manifestCh)
83+
if err != nil {
84+
t.Fatalf("error pulling image: %v", err)
85+
}
86+
87+
for {
88+
select {
89+
case report := <-progressCh:
90+
fmt.Printf("%s: %v/%v\n", report.Artifact.Digest.Encoded()[:12], report.Offset, report.Artifact.Size)
91+
case manifest := <-manifestCh:
92+
fmt.Printf("Got manifest: %v\n", manifest)
93+
return
94+
}
95+
}
96+
}

0 commit comments

Comments
 (0)