Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions components/registry-facade/pkg/registry/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import (
"github.com/gitpod-io/gitpod/registry-facade/api"
)

var backoffParams = wait.Backoff{
Duration: 100 * time.Millisecond,
Factor: 1.5,
// retrievalBackoffParams defines the backoff parameters for blob retrieval.
// Aiming at ~10 seconds total time for retries
var retrievalBackoffParams = wait.Backoff{
Duration: 1 * time.Second,
Factor: 1.2,
Jitter: 0.2,
Steps: 4,
Steps: 5,
}

func (reg *Registry) handleBlob(ctx context.Context, r *http.Request) http.Handler {
Expand Down Expand Up @@ -213,35 +215,49 @@ func (bh *blobHandler) getBlob(w http.ResponseWriter, r *http.Request) {

func (bh *blobHandler) retrieveFromSource(ctx context.Context, src BlobSource, w http.ResponseWriter, r *http.Request) (handled, dontCache bool, err error) {
log.Debugf("retrieving blob %s from %s", bh.Digest, src.Name())
dontCache, mediaType, url, rc, err := src.GetBlob(ctx, bh.Spec, bh.Digest)
if err != nil {
return false, true, xerrors.Errorf("cannnot fetch the blob from source %s: %v", src.Name(), err)
}
if rc != nil {
defer rc.Close()
}

if url != "" {
http.Redirect(w, r, url, http.StatusPermanentRedirect)
return true, true, nil
}
var n int64
t0 := time.Now()
var body bytes.Buffer
var finalMediaType string

// The entire operation is now inside the backoff loop
err = wait.ExponentialBackoffWithContext(ctx, retrievalBackoffParams, func(ctx context.Context) (done bool, err error) {
// 1. GetBlob is now INSIDE the retry loop
var url string
var rc io.ReadCloser
dontCache, finalMediaType, url, rc, err = src.GetBlob(ctx, bh.Spec, bh.Digest)
if err != nil {
log.WithField("blobSource", src.Name()).WithError(err).Warn("error fetching blob from source, retrying...")
return false, nil
}
if rc != nil {
defer rc.Close()
}

w.Header().Set("Content-Type", mediaType)
if url != "" {
http.Redirect(w, r, url, http.StatusPermanentRedirect)
dontCache = true
return true, nil
}

bp := bufPool.Get().(*[]byte)
defer bufPool.Put(bp)
body.Reset()
bp := bufPool.Get().(*[]byte)
defer bufPool.Put(bp)

var n int64
t0 := time.Now()
err = wait.ExponentialBackoffWithContext(ctx, backoffParams, func(ctx context.Context) (done bool, err error) {
n, err = io.CopyBuffer(w, rc, *bp)
// 2. CopyBuffer is also inside the retry loop
n, err = io.CopyBuffer(&body, rc, *bp)
if err == nil {
return true, nil
}

// Check for retryable errors during copy
if errors.Is(err, syscall.ECONNRESET) || errors.Is(err, syscall.EPIPE) {
log.WithField("blobSource", src.Name()).WithField("baseRef", bh.Spec.BaseRef).WithError(err).Warn("retry get blob because of error")
// TODO(gpl): current error seem to be captured by this - but does it make sense to widen this condition?
log.WithField("blobSource", src.Name()).WithField("baseRef", bh.Spec.BaseRef).WithError(err).Warn("retry get blob because of streaming error")
return false, nil
}

return true, err
})

Expand All @@ -252,6 +268,9 @@ func (bh *blobHandler) retrieveFromSource(ctx context.Context, src BlobSource, w
return false, true, err
}

w.Header().Set("Content-Type", finalMediaType)
w.Write(body.Bytes())

if bh.Metrics != nil {
bh.Metrics.BlobDownloadCounter.WithLabelValues(src.Name(), "true").Inc()
bh.Metrics.BlobDownloadSpeedHist.WithLabelValues(src.Name()).Observe(float64(n) / time.Since(t0).Seconds())
Expand Down
142 changes: 142 additions & 0 deletions components/registry-facade/pkg/registry/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ package registry
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/containerd/containerd/remotes"
Expand All @@ -30,6 +33,9 @@ import (
ma "github.com/multiformats/go-multiaddr"
"github.com/opencontainers/go-digest"
redis "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"

rfapi "github.com/gitpod-io/gitpod/registry-facade/api"
)
Expand Down Expand Up @@ -250,3 +256,139 @@ func (rw *failFirstResponseWriter) Write(buf []byte) (int, error) {
func (rw *failFirstResponseWriter) WriteHeader(code int) {
rw.code = code
}

// mockBlobSource allows faking BlobSource behavior for tests.
type mockBlobSource struct {
// How many times GetBlob should fail before succeeding.
failCount int
// The error to return on failure.
failError error

// Internal counter for calls.
callCount int
// The data to return on success.
successData string

// Whether to use a reader that fails mid-stream on the first call.
failReaderOnFirstCall bool
// The number of bytes to read successfully before the reader fails.
failAfterBytes int
}

func (m *mockBlobSource) Name() string { return "mock" }
func (m *mockBlobSource) HasBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) bool {
return true
}

func (m *mockBlobSource) GetBlob(ctx context.Context, details *rfapi.ImageSpec, dgst digest.Digest) (dontCache bool, mediaType string, url string, data io.ReadCloser, err error) {
m.callCount++
if m.callCount <= m.failCount {
return false, "", "", nil, m.failError
}

if m.failReaderOnFirstCall && m.callCount == 1 {
return false, "application/octet-stream", "", io.NopCloser(&failingReader{
reader: strings.NewReader(m.successData),
failAfterBytes: m.failAfterBytes,
failError: m.failError,
}), nil
}

return false, "application/octet-stream", "", io.NopCloser(strings.NewReader(m.successData)), nil
}

// failingReader is a reader that fails after a certain point.
type failingReader struct {
reader io.Reader
failAfterBytes int
failError error
bytesRead int
}

func (fr *failingReader) Read(p []byte) (n int, err error) {
if fr.bytesRead >= fr.failAfterBytes {
return 0, fr.failError
}
n, err = fr.reader.Read(p)
if err != nil {
return n, err
}
fr.bytesRead += n
if fr.bytesRead >= fr.failAfterBytes {
// Return the error, but also the bytes read in this call.
return n, fr.failError
}
return n, nil
}

func TestRetrieveFromSource_RetryOnGetBlob(t *testing.T) {
// Arrange
mockSource := &mockBlobSource{
failCount: 2,
failError: errors.New("transient network error"),
successData: "hello world",
}

bh := &blobHandler{
Digest: "sha256:dummy",
Spec: &rfapi.ImageSpec{},
}

// Use short backoff for testing
originalBackoff := retrievalBackoffParams
retrievalBackoffParams = wait.Backoff{
Duration: 1 * time.Millisecond,
Steps: 3,
}
defer func() { retrievalBackoffParams = originalBackoff }()

w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/v2/...", nil)

// Act
handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)

// Assert
require.NoError(t, err)
assert.True(t, handled)
assert.False(t, dontCache)
assert.Equal(t, "hello world", w.Body.String())
assert.Equal(t, 3, mockSource.callCount, "Expected GetBlob to be called 3 times (2 failures + 1 success)")
}

func TestRetrieveFromSource_RetryOnCopy(t *testing.T) {
// Arrange
mockSource := &mockBlobSource{
failCount: 0, // GetBlob succeeds immediately
failReaderOnFirstCall: true,
failAfterBytes: 5,
failError: syscall.EPIPE,
successData: "hello world",
}

bh := &blobHandler{
Digest: "sha256:dummy",
Spec: &rfapi.ImageSpec{},
}

// Use short backoff for testing
originalBackoff := retrievalBackoffParams
retrievalBackoffParams = wait.Backoff{
Duration: 1 * time.Millisecond,
Steps: 3,
}
defer func() { retrievalBackoffParams = originalBackoff }()

w := httptest.NewRecorder()
r := httptest.NewRequest("GET", "/v2/...", nil)

// Act
handled, dontCache, err := bh.retrieveFromSource(context.Background(), mockSource, w, r)

// Assert
require.NoError(t, err)
assert.True(t, handled)
assert.False(t, dontCache)
assert.Equal(t, "hello world", w.Body.String())
assert.Equal(t, 2, mockSource.callCount, "Expected GetBlob to be called twice (1st succeeds, copy fails, 2nd succeeds)")
}
6 changes: 4 additions & 2 deletions memory-bank/activeContext.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ Building a comprehensive knowledge base of the Gitpod codebase and architecture
- Documented 33 service components and 11 API components
- Enhanced API component documentation with code generation information
- Implemented server readiness probe with database, SpiceDB, and Redis connectivity checks
- **Improved `registry-facade` resilience by implementing a comprehensive retry mechanism for blob retrieval, addressing transient network errors.**

## Next Steps

1. **Component Interactions**: Understand inter-component communication
2. **Development Environment**: Configure local development setup
1. **Monitor `registry-facade`:** Observe the component's behavior with the new retry logic to ensure it correctly handles the previously identified network issues.
2. **Component Interactions**: Understand inter-component communication
3. **Development Environment**: Configure local development setup
3. **Build System**: Gain experience with in-tree and Leeway builds
4. **Component Builds**: Practice building different component types
5. **Initial Tasks**: Identify specific improvement areas
Expand Down
6 changes: 6 additions & 0 deletions memory-bank/components/registry-facade.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ The component acts as an "image layer smuggler," inserting layers into container
- `cmd/run.go`: Implements the main registry service
- `cmd/setup.go`: Handles service setup and configuration
- `pkg/registry/`: Core registry implementation
- `blob.go`: Handles blob retrieval from various sources (local store, IPFS, upstream registries). Contains a resilient retry mechanism to handle transient network errors during both connection and data transfer phases.

## Key Implementation Details

### Blob Retrieval Retry Logic
The `retrieveFromSource` function in `pkg/registry/blob.go` implements an exponential backoff retry mechanism that wraps the entire blob retrieval process. This ensures that transient network errors, such as `TLS handshake timeout` or `connection reset`, that occur during either the initial connection (`GetBlob`) or the data streaming (`io.CopyBuffer`) are retried. This makes the service more resilient to intermittent network issues when fetching blobs from upstream sources like S3.

## Dependencies

Expand Down
6 changes: 6 additions & 0 deletions memory-bank/progress.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ No specific blockers identified yet.

## Recent Progress

### 6/6/2025
- Investigated `registry-facade` 500 errors (CLC-195).
- Analyzed logs and identified TLS handshake timeouts as a root cause.
- Implemented a more resilient retry mechanism in `pkg/registry/blob.go` to handle transient network errors during blob retrieval.
- Updated `registry-facade` component documentation.

### 3/17/2025
- Implemented server readiness probe with database, SpiceDB, and Redis checks
- Created PRD document for the implementation
Expand Down
Loading