Skip to content

Commit 6098087

Browse files
committed
feat: implement pull-state for properly keeping track of layer progress and pull events
1 parent 4d0d09e commit 6098087

File tree

9 files changed

+455
-13
lines changed

9 files changed

+455
-13
lines changed

examples/pull/main.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/distribution/reference"
67
client2 "github.com/docker/docker/client"
78
"github.com/silenium-dev/docker-wrapper/pkg/client"
@@ -17,21 +18,30 @@ func main() {
1718
cli, err := client.NewWithOpts(
1819
client.WithAuthProvider(authProvider),
1920
client.FromEnv,
20-
client.WithDockerOpts(client2.WithTimeout(time.Second*10)),
21+
client.WithDockerOpts(client2.WithTimeout(time.Hour*1)),
2122
)
2223
if err != nil {
2324
panic(err)
2425
}
2526
//ref, err := reference.ParseDockerRef("quay.io/prometheus/node-exporter@sha256:a25fbdaa3e4d03e0d735fd03f231e9a48332ecf40ca209b2f103b1f970d1cde0")
26-
ref, err := reference.ParseDockerRef("confluentinc/cp-kafka:latest")
27+
ref, err := reference.ParseDockerRef("localstack/localstack:latest")
2728
if err != nil {
2829
panic(err)
2930
}
30-
eventChan, err := cli.Pull(context.Background(), ref)
31+
stateChan, err := cli.PullWithState(context.Background(), ref)
3132
if err != nil {
3233
panic(err)
3334
}
34-
for event := range eventChan {
35-
println(event.String())
35+
for state := range stateChan {
36+
print("\033[2J")
37+
fmt.Printf("%s\n", state.Status())
38+
for idx, l := range state.Layers() {
39+
fmt.Printf("%02d [%s]: %s\n", idx, l.Id(), l.Status())
40+
}
3641
}
42+
digest, err := cli.Pull(context.Background(), ref)
43+
if err != nil {
44+
panic(err)
45+
}
46+
fmt.Printf("Digest: %s\n", digest)
3747
}

pkg/client/pull.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,25 @@ package client
22

33
import (
44
"context"
5+
"fmt"
56
"github.com/distribution/reference"
67
"github.com/docker/docker/api/types"
78
"github.com/docker/docker/api/types/registry"
9+
"github.com/opencontainers/go-digest"
810
"github.com/silenium-dev/docker-wrapper/pkg/client/pull"
911
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
12+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/state"
1013
)
1114

12-
func (c *Client) Pull(ctx context.Context, ref reference.Named) (chan events.PullEvent, error) {
15+
func (c *Client) PullWithState(ctx context.Context, ref reference.Named) (chan state.Pull, error) {
16+
eventChan, err := c.PullWithEvents(ctx, ref)
17+
if err != nil {
18+
return nil, err
19+
}
20+
return pull.StateFromStream(ctx, ref, eventChan), nil
21+
}
22+
23+
func (c *Client) PullWithEvents(ctx context.Context, ref reference.Named) (chan events.PullEvent, error) {
1324
var encodedAuth string
1425
var err error
1526
if c.authProvider != nil {
@@ -24,3 +35,22 @@ func (c *Client) Pull(ctx context.Context, ref reference.Named) (chan events.Pul
2435
}
2536
return pull.ParseStream(ctx, reader), nil
2637
}
38+
39+
func (c *Client) Pull(ctx context.Context, ref reference.Named) (digest.Digest, error) {
40+
eventChan, err := c.PullWithEvents(ctx, ref)
41+
if err != nil {
42+
return "", err
43+
}
44+
45+
var digestEvent *events.Digest
46+
for event := range eventChan {
47+
if _, ok := event.(*events.Digest); digestEvent == nil && ok {
48+
digestEvent = event.(*events.Digest)
49+
}
50+
}
51+
if digestEvent == nil {
52+
return "", fmt.Errorf("no digest event received")
53+
}
54+
55+
return digestEvent.Digest, nil
56+
}

pkg/client/pull/events/error.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package events
2+
3+
type LayerError struct {
4+
LayerBase
5+
PullError
6+
}
7+
8+
type PullError struct {
9+
Error string
10+
}
11+
12+
func (e *PullError) String() string {
13+
return e.Error
14+
}

pkg/client/pull/events/event.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type PullEvent interface {
1313
}
1414

1515
type LayerEvent interface {
16+
PullEvent
1617
LayerId() string
1718
}
1819

@@ -47,6 +48,7 @@ func Parse(event base.PullProgressEvent) (PullEvent, error) {
4748
Hide: event.ProgressDetail.HideCounts,
4849
}
4950
progressBase := ProgressBase{layer, progress}
51+
errorEvent := PullError{Error: event.Error}
5052

5153
switch event.Status {
5254
case AlreadyExistsStatus:
@@ -76,6 +78,11 @@ func Parse(event base.PullProgressEvent) (PullEvent, error) {
7678
case PullCompleteStatus:
7779
return &PullComplete{layer}, nil
7880
default:
81+
if event.Error != "" && event.ID != "" {
82+
return &LayerError{layer, errorEvent}, nil
83+
} else if event.Error != "" {
84+
return &errorEvent, nil
85+
}
7986
if strings.HasPrefix(event.Status, "Pulling from") {
8087
return &PullStarted{}, nil
8188
}

pkg/client/pull/events/final.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package events
22

3-
import "fmt"
3+
import (
4+
"fmt"
5+
)
46

57
type DownloadedNewerImage struct {
68
Final

pkg/client/pull/state.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package pull
2+
3+
import (
4+
"context"
5+
"github.com/distribution/reference"
6+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/events"
7+
"github.com/silenium-dev/docker-wrapper/pkg/client/pull/state"
8+
)
9+
10+
func StateFromStream(ctx context.Context, ref reference.Named, ch chan events.PullEvent) chan state.Pull {
11+
out := make(chan state.Pull)
12+
13+
go processEvents(ctx, ref, ch, out)
14+
15+
return out
16+
}
17+
18+
func processEvents(ctx context.Context, ref reference.Named, ch chan events.PullEvent, out chan state.Pull) {
19+
defer close(out)
20+
var current state.Pull
21+
var err error
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return
26+
case event, ok := <-ch:
27+
if !ok {
28+
return
29+
}
30+
var next state.Pull
31+
if current == nil {
32+
next, err = state.NewPullState(ref, event)
33+
} else {
34+
next, err = current.Next(event)
35+
}
36+
if err != nil {
37+
panic(err)
38+
}
39+
current = next
40+
out <- current
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)