Skip to content

Commit fe6a71f

Browse files
ilopezlunadoringeman
authored andcommitted
Per layer progress (docker#80)
* - renamings - add Layer struct into progress.Message struct * Reporter can receive a Layer on initialization * Reporter per layer * Verify layer progress * use buffered channel to reduce backpressure * Fix test * time based interval to 100 ms (the general threshold for human perception) and size based interval to 1 MB * p.Complete contains the completed bytes so far, meaning we need to keep track of previous p.Complete seen, and update the time based condition to: p.Complete-lastUpdateBytes >= MinBytesForUpdate Otherwise we always emit progress * drop intermediate updates if they're not serviced fast enough * minor: Renaming * clean up
1 parent a452c7e commit fe6a71f

File tree

6 files changed

+292
-67
lines changed

6 files changed

+292
-67
lines changed

pkg/distribution/distribution/client.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,7 @@ func (c *Client) PullModel(ctx context.Context, reference string, progressWriter
168168

169169
// Model doesn't exist in local store or digests don't match, pull from remote
170170

171-
pr := progress.NewProgressReporter(progressWriter, progress.PullMsg)
172-
defer func() {
173-
if err := pr.Wait(); err != nil {
174-
c.log.Warnf("Failed to write progress: %v", err)
175-
}
176-
}()
177-
178-
if err = c.store.Write(remoteModel, []string{reference}, pr.Updates()); err != nil {
171+
if err = c.store.Write(remoteModel, []string{reference}, progressWriter); err != nil {
179172
if writeErr := progress.WriteError(progressWriter, fmt.Sprintf("Error: %s", err.Error())); writeErr != nil {
180173
c.log.Warnf("Failed to write error message: %v", writeErr)
181174
// If we fail to write error message, don't try again

pkg/distribution/distribution/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -424,11 +424,11 @@ func TestClientPullModel(t *testing.T) {
424424
}
425425

426426
// Parse progress output as JSON
427-
var messages []progress.ProgressMessage
427+
var messages []progress.Message
428428
scanner := bufio.NewScanner(&progressBuffer)
429429
for scanner.Scan() {
430430
line := scanner.Text()
431-
var msg progress.ProgressMessage
431+
var msg progress.Message
432432
if err := json.Unmarshal([]byte(line), &msg); err != nil {
433433
t.Fatalf("Failed to parse JSON progress message: %v, line: %s", err, line)
434434
}
@@ -915,7 +915,7 @@ func TestPushProgress(t *testing.T) {
915915
tag := uri.Host + "/some/model/repo:some-tag"
916916

917917
// Create random "model" of a given size
918-
sz := int64(2 * 1024 * 1024) // 2 MB
918+
sz := int64(progress.MinBytesForUpdate)
919919
path, err := randomFile(sz)
920920
if err != nil {
921921
t.Fatalf("Failed to create temp file: %v", err)

pkg/distribution/internal/progress/reporter.go

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,39 @@ import (
66
"io"
77
"time"
88

9-
"github.com/google/go-containerregistry/pkg/v1"
9+
v1 "github.com/google/go-containerregistry/pkg/v1"
1010
)
1111

12-
// ProgressMessage represents a structured message for progress reporting
13-
type ProgressMessage struct {
14-
Type string `json:"type"` // "progress", "success", or "error"
15-
Message string `json:"message"` // Human-readable message
16-
Total uint64 `json:"total"` // Total bytes to transfer
17-
Pulled uint64 `json:"pulled"` // Bytes transferred so far
12+
// UpdateInterval defines how often progress updates should be sent
13+
const UpdateInterval = 100 * time.Millisecond
14+
15+
// MinBytesForUpdate defines the minimum number of bytes that need to be transferred
16+
// before sending a progress update
17+
const MinBytesForUpdate = 1024 * 1024 // 1MB
18+
19+
type Layer struct {
20+
ID string // Layer ID
21+
Size uint64 // Layer size
22+
Current uint64 // Current bytes transferred
23+
}
24+
25+
// Message represents a structured message for progress reporting
26+
type Message struct {
27+
Type string `json:"type"` // "progress", "success", or "error"
28+
Message string `json:"message"` // Human-readable message
29+
Total uint64 `json:"total"` // Deprecated: use Layer.Size
30+
Pulled uint64 `json:"pulled"` // Deprecated: use Layer.Current
31+
Layer Layer `json:"layer,omitempty"` // Current layer information
1832
}
1933

2034
type Reporter struct {
21-
progress chan v1.Update
22-
done chan struct{}
23-
err error
24-
out io.Writer
25-
format progressF
35+
progress chan v1.Update
36+
done chan struct{}
37+
err error
38+
out io.Writer
39+
format progressF
40+
layer v1.Layer
41+
TotalLayers int // Total number of layers
2642
}
2743

2844
type progressF func(update v1.Update) string
@@ -35,12 +51,13 @@ func PushMsg(update v1.Update) string {
3551
return fmt.Sprintf("Uploaded: %.2f MB", float64(update.Complete)/1024/1024)
3652
}
3753

38-
func NewProgressReporter(w io.Writer, msgF progressF) *Reporter {
54+
func NewProgressReporter(w io.Writer, msgF progressF, layer v1.Layer) *Reporter {
3955
return &Reporter{
4056
out: w,
41-
progress: make(chan v1.Update),
57+
progress: make(chan v1.Update, 1),
4258
done: make(chan struct{}),
4359
format: msgF,
60+
layer: layer,
4461
}
4562
}
4663

@@ -58,19 +75,36 @@ func (r *Reporter) Updates() chan<- v1.Update {
5875
go func() {
5976
var lastComplete int64
6077
var lastUpdate time.Time
61-
const updateInterval = 500 * time.Millisecond // Update every 500ms
62-
const minBytesForUpdate = 1024 * 1024 // At least 1MB difference
6378

6479
for p := range r.progress {
6580
if r.out == nil || r.err != nil {
6681
continue // If we fail to write progress, don't try again
6782
}
6883
now := time.Now()
69-
bytesDownloaded := p.Complete - lastComplete
84+
var total int64
85+
var layerID string
86+
if r.layer != nil { // In case of Push there is no layer yet
87+
id, err := r.layer.DiffID()
88+
if err != nil {
89+
r.err = err
90+
continue
91+
}
92+
layerID = id.String()
93+
size, err := r.layer.Size()
94+
if err != nil {
95+
r.err = err
96+
continue
97+
}
98+
total = size
99+
} else {
100+
total = p.Total
101+
}
102+
incrementalBytes := p.Complete - lastComplete
103+
70104
// Only update if enough time has passed or enough bytes downloaded or finished
71-
if now.Sub(lastUpdate) >= updateInterval ||
72-
bytesDownloaded >= minBytesForUpdate {
73-
if err := writeProgress(r.out, r.format(p), safeUint64(p.Total), safeUint64(p.Complete)); err != nil {
105+
if now.Sub(lastUpdate) >= UpdateInterval ||
106+
incrementalBytes >= MinBytesForUpdate {
107+
if err := WriteProgress(r.out, r.format(p), safeUint64(total), safeUint64(p.Complete), layerID); err != nil {
74108
r.err = err
75109
}
76110
lastUpdate = now
@@ -88,41 +122,46 @@ func (r *Reporter) Wait() error {
88122
return r.err
89123
}
90124

91-
// writeProgressMessage writes a JSON-formatted progress message to the writer
92-
func writeProgressMessage(w io.Writer, msg ProgressMessage) error {
93-
if w == nil {
94-
return nil
95-
}
96-
data, err := json.Marshal(msg)
97-
if err != nil {
98-
return err
99-
}
100-
_, err = fmt.Fprintf(w, "%s\n", data)
101-
return err
102-
}
103-
104-
// writeProgress writes a progress update message
105-
func writeProgress(w io.Writer, msg string, total, pulled uint64) error {
106-
return writeProgressMessage(w, ProgressMessage{
125+
// WriteProgress writes a progress update message
126+
func WriteProgress(w io.Writer, msg string, total, current uint64, layerID string) error {
127+
return write(w, Message{
107128
Type: "progress",
108129
Message: msg,
109130
Total: total,
110-
Pulled: pulled,
131+
Pulled: current,
132+
Layer: Layer{
133+
ID: layerID,
134+
Size: total,
135+
Current: current,
136+
},
111137
})
112138
}
113139

114140
// WriteSuccess writes a success message
115141
func WriteSuccess(w io.Writer, message string) error {
116-
return writeProgressMessage(w, ProgressMessage{
142+
return write(w, Message{
117143
Type: "success",
118144
Message: message,
119145
})
120146
}
121147

122148
// WriteError writes an error message
123149
func WriteError(w io.Writer, message string) error {
124-
return writeProgressMessage(w, ProgressMessage{
150+
return write(w, Message{
125151
Type: "error",
126152
Message: message,
127153
})
128154
}
155+
156+
// write writes a JSON-formatted progress message to the writer
157+
func write(w io.Writer, msg Message) error {
158+
if w == nil {
159+
return nil
160+
}
161+
data, err := json.Marshal(msg)
162+
if err != nil {
163+
return err
164+
}
165+
_, err = fmt.Fprintf(w, "%s\n", data)
166+
return err
167+
}

0 commit comments

Comments
 (0)