diff --git a/cmd/nerdctl/image/image_push.go b/cmd/nerdctl/image/image_push.go index 47104a4b7e5..e0cb4cc8143 100644 --- a/cmd/nerdctl/image/image_push.go +++ b/cmd/nerdctl/image/image_push.go @@ -69,6 +69,17 @@ func PushCommand() *cobra.Command { cmd.Flags().Bool(allowNonDistFlag, false, "Allow pushing images with non-distributable blobs") + // #region connection limit flags + cmd.Flags().Int("max-conns-per-host", 5, "Maximum number of connections per registry host") + cmd.Flags().Int("max-idle-conns", 50, "Maximum number of idle connections") + cmd.Flags().Int("request-timeout", 300, "Request timeout in seconds") + // #endregion + + // #region retry flags + cmd.Flags().Int("max-retries", 3, "Maximum number of retry attempts for 503 errors") + cmd.Flags().Int("retry-initial-delay", 1000, "Initial delay before first retry in milliseconds") + // #endregion + return cmd } @@ -113,6 +124,26 @@ func pushOptions(cmd *cobra.Command) (types.ImagePushOptions, error) { if err != nil { return types.ImagePushOptions{}, err } + maxConnsPerHost, err := cmd.Flags().GetInt("max-conns-per-host") + if err != nil { + return types.ImagePushOptions{}, err + } + maxIdleConns, err := cmd.Flags().GetInt("max-idle-conns") + if err != nil { + return types.ImagePushOptions{}, err + } + requestTimeout, err := cmd.Flags().GetInt("request-timeout") + if err != nil { + return types.ImagePushOptions{}, err + } + maxRetries, err := cmd.Flags().GetInt("max-retries") + if err != nil { + return types.ImagePushOptions{}, err + } + retryInitialDelay, err := cmd.Flags().GetInt("retry-initial-delay") + if err != nil { + return types.ImagePushOptions{}, err + } return types.ImagePushOptions{ GOptions: globalOptions, SignOptions: signOptions, @@ -124,6 +155,11 @@ func pushOptions(cmd *cobra.Command) (types.ImagePushOptions, error) { IpfsAddress: ipfsAddress, Quiet: quiet, AllowNondistributableArtifacts: allowNonDist, + MaxConnsPerHost: maxConnsPerHost, + MaxIdleConns: maxIdleConns, + RequestTimeout: requestTimeout, + MaxRetries: maxRetries, + RetryInitialDelay: retryInitialDelay, Stdout: cmd.OutOrStdout(), }, nil } diff --git a/pkg/api/types/image_types.go b/pkg/api/types/image_types.go index 5ff507ccc7c..38b22051f9a 100644 --- a/pkg/api/types/image_types.go +++ b/pkg/api/types/image_types.go @@ -200,6 +200,16 @@ type ImagePushOptions struct { Quiet bool // AllowNondistributableArtifacts allow pushing non-distributable artifacts AllowNondistributableArtifacts bool + // MaxConnsPerHost maximum number of connections per registry host (default: 5) + MaxConnsPerHost int + // MaxIdleConns maximum number of idle connections (default: 50) + MaxIdleConns int + // RequestTimeout timeout for registry requests in seconds (default: 300) + RequestTimeout int + // MaxRetries maximum number of retry attempts for 503 errors (default: 3) + MaxRetries int + // RetryInitialDelay initial delay before first retry in milliseconds (default: 1000) + RetryInitialDelay int } // RemoteSnapshotterFlags are used for pulling with remote snapshotters diff --git a/pkg/cmd/image/push.go b/pkg/cmd/image/push.go index 8731b0cfc94..95cca5fb4b1 100644 --- a/pkg/cmd/image/push.go +++ b/pkg/cmd/image/push.go @@ -24,6 +24,7 @@ import ( "net/http" "os" "path/filepath" + "time" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -34,7 +35,6 @@ import ( "github.com/containerd/containerd/v2/core/images/converter" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" - dockerconfig "github.com/containerd/containerd/v2/core/remotes/docker/config" "github.com/containerd/containerd/v2/pkg/reference" "github.com/containerd/log" "github.com/containerd/stargz-snapshotter/estargz" @@ -165,17 +165,29 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options } dOpts = append(dOpts, dockerconfigresolver.WithHostsDirs(options.GOptions.HostsDir)) - ho, err := dockerconfigresolver.NewHostOptions(ctx, refDomain, dOpts...) - if err != nil { - return err + // Configure connection limits to prevent registry overload (503 errors) + if options.MaxConnsPerHost > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxConnsPerHost(options.MaxConnsPerHost)) } - - resolverOpts := docker.ResolverOptions{ - Tracker: pushTracker, - Hosts: dockerconfig.ConfigureHosts(ctx, *ho), + if options.MaxIdleConns > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxIdleConns(options.MaxIdleConns)) + } + if options.RequestTimeout > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRequestTimeout(time.Duration(options.RequestTimeout)*time.Second)) } + if options.MaxRetries > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxRetries(options.MaxRetries)) + } + if options.RetryInitialDelay > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRetryInitialDelay(time.Duration(options.RetryInitialDelay)*time.Millisecond)) + } + // Use the local push tracker for this operation + dOpts = append(dOpts, dockerconfigresolver.WithTracker(pushTracker)) - resolver := docker.NewResolver(resolverOpts) + resolver, err := dockerconfigresolver.New(ctx, refDomain, dOpts...) + if err != nil { + return err + } if err = pushFunc(resolver); err != nil { // In some circumstance (e.g. people just use 80 port to support pure http), the error will contain message like "dial tcp : connection refused" if !errors.Is(err, http.ErrSchemeMismatch) && !errutil.IsErrConnectionRefused(err) { @@ -184,6 +196,22 @@ func Push(ctx context.Context, client *containerd.Client, rawRef string, options if options.GOptions.InsecureRegistry { log.G(ctx).WithError(err).Warnf("server %q does not seem to support HTTPS, falling back to plain HTTP", refDomain) dOpts = append(dOpts, dockerconfigresolver.WithPlainHTTP(true)) + // Apply same connection limits for HTTP fallback + if options.MaxConnsPerHost > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxConnsPerHost(options.MaxConnsPerHost)) + } + if options.MaxIdleConns > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxIdleConns(options.MaxIdleConns)) + } + if options.RequestTimeout > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRequestTimeout(time.Duration(options.RequestTimeout)*time.Second)) + } + if options.MaxRetries > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithMaxRetries(options.MaxRetries)) + } + if options.RetryInitialDelay > 0 { + dOpts = append(dOpts, dockerconfigresolver.WithRetryInitialDelay(time.Duration(options.RetryInitialDelay)*time.Millisecond)) + } resolver, err = dockerconfigresolver.New(ctx, refDomain, dOpts...) if err != nil { return err diff --git a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go index 8577b8e2bc6..0c1ca331533 100644 --- a/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go +++ b/pkg/imgutil/dockerconfigresolver/dockerconfigresolver.go @@ -20,6 +20,14 @@ import ( "context" "crypto/tls" "errors" + "fmt" + "io" + "math" + "net" + "net/http" + "strings" + "sync" + "time" "github.com/containerd/containerd/v2/core/remotes" "github.com/containerd/containerd/v2/core/remotes/docker" @@ -30,11 +38,156 @@ import ( var PushTracker = docker.NewInMemoryTracker() +// Global semaphores per registry host to enforce true concurrency limits +var ( + semaphoresMutex sync.RWMutex + semaphores = make(map[string]chan struct{}) +) + +// getSemaphore returns or creates a semaphore for a given host with the specified limit +func getSemaphore(host string, limit int) chan struct{} { + semaphoresMutex.Lock() + defer semaphoresMutex.Unlock() + + key := fmt.Sprintf("%s:%d", host, limit) + if sem, exists := semaphores[key]; exists { + return sem + } + + // Create a new semaphore with the specified limit + sem := make(chan struct{}, limit) + semaphores[key] = sem + log.L.Debugf("Created semaphore for %s with limit %d", host, limit) + return sem +} + +// semaphoreTransport wraps an http.RoundTripper to enforce true concurrency limits using semaphores +type semaphoreTransport struct { + transport http.RoundTripper + limit int +} + +// RoundTrip implements http.RoundTripper with semaphore-based concurrency limiting +func (st *semaphoreTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if st.limit <= 0 { + // No limit, use underlying transport directly + return st.transport.RoundTrip(req) + } + + host := req.URL.Host + sem := getSemaphore(host, st.limit) + + // Acquire semaphore (blocks if limit reached) + log.L.Debugf("Acquiring semaphore for %s (limit %d)", host, st.limit) + sem <- struct{}{} + defer func() { + <-sem // Release semaphore + log.L.Debugf("Released semaphore for %s", host) + }() + + log.L.Debugf("Acquired semaphore for %s, making request", host) + return st.transport.RoundTrip(req) +} + +// retryTransport wraps an http.RoundTripper to add retry logic for 503 errors +type retryTransport struct { + transport http.RoundTripper + maxRetries int + initialDelay time.Duration +} + +// classifies whether the error should trigger a retry and retruns true or false +// depending on the result +func RoundTripErrorClassifier(resp *http.Response, err error, rt *retryTransport, attempt int) bool { + if resp != nil && resp.StatusCode == http.StatusServiceUnavailable { + log.L.Infof("retryTransport.RoundTrip: Retrying due to 503 Service Unavailable error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if err != nil { + // Check for specific network errors that warrant a retry + if errors.Is(err, io.EOF) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to io.EOF error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if strings.Contains(err.Error(), "connection reset by peer") { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to 'connection reset by peer' error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to timeout network error: %v (attempt %d/%d)", netErr, attempt+1, rt.maxRetries) + return true + } else if errors.Is(err, context.DeadlineExceeded) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to context deadline exceeded error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } else if errors.Is(err, context.Canceled) { + log.L.Debugf("retryTransport.RoundTrip: Retrying due to context canceled error (attempt %d/%d)", attempt+1, rt.maxRetries) + return true + } + log.L.Debugf("retryTransport.RoundTrip: Not retrying for non-retryable error: %T %v", err, err) + } + return false +} + +// RoundTrip implements http.RoundTripper with retry logic for 503 Service Unavailable errors +func (rt *retryTransport) RoundTrip(req *http.Request) (*http.Response, error) { + log.L.Infof("retryTransport.RoundTrip: Starting request to %s (maxRetries=%d)", req.URL.Host, rt.maxRetries) + + for attempt := 0; attempt <= rt.maxRetries; attempt++ { + // Clone the request for potential retries + reqClone := req.Clone(req.Context()) + + resp, err := rt.transport.RoundTrip(reqClone) + + statusCode := 0 + if resp != nil { + statusCode = resp.StatusCode + } + log.L.Infof("retryTransport.RoundTrip: attempt %d, err=%v, status=%d", attempt, err, statusCode) + + // Retry logic: retry on 503, EOF, connection reset, or temporary network errors. + // These errors are often transient and can be resolved by a retry. + log.L.Infof("retryTransport.RoundTrip: Evaluating retry conditions - resp=%v, statusCode=%d, StatusServiceUnavailable=%d", resp != nil, statusCode, http.StatusServiceUnavailable) + shouldRetry := RoundTripErrorClassifier(resp, err, rt, attempt) + log.L.Infof("retryTransport.RoundTrip: shouldRetry=%v for attempt %d", shouldRetry, attempt) + if shouldRetry { + // We have a condition that warrants a retry. + if attempt == rt.maxRetries { + log.L.Debugf("Max retries (%d) exceeded for request to %s", rt.maxRetries, req.URL.Host) + return resp, err // Return the last response and error + } + + // Close the response body before retrying. + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + + // Calculate exponential backoff delay: initialDelay * 2^attempt + delay := time.Duration(float64(rt.initialDelay) * math.Pow(2, float64(attempt))) + log.L.Debugf("Request to %s failed, retrying in %v (attempt %d/%d)", + req.URL.Host, delay, attempt+1, rt.maxRetries) + + // Wait before retrying. + time.Sleep(delay) + continue // Continue to the next retry attempt + } + + // If we are here, it means we are not retrying. + log.L.Infof("retryTransport.RoundTrip: Not retrying, returning response (status=%d, err=%v)", statusCode, err) + return resp, err + } + + // This should never be reached, but return error just in case + return nil, fmt.Errorf("unexpected retry logic error") +} + type opts struct { - plainHTTP bool - skipVerifyCerts bool - hostsDirs []string - authCreds AuthCreds + plainHTTP bool + skipVerifyCerts bool + hostsDirs []string + AuthCreds AuthCreds + maxConnsPerHost int + maxIdleConns int + requestTimeout time.Duration + maxRetries int + retryInitialDelay time.Duration + tracker docker.StatusTrackLocker } // Opt for New @@ -64,7 +217,49 @@ func WithHostsDirs(orig []string) Opt { func WithAuthCreds(ac AuthCreds) Opt { return func(o *opts) { - o.authCreds = ac + o.AuthCreds = ac + } +} + +// WithMaxConnsPerHost sets the maximum number of connections per host +func WithMaxConnsPerHost(n int) Opt { + return func(o *opts) { + o.maxConnsPerHost = n + } +} + +// WithMaxIdleConns sets the maximum number of idle connections +func WithMaxIdleConns(n int) Opt { + return func(o *opts) { + o.maxIdleConns = n + } +} + +// WithRequestTimeout sets the request timeout +func WithRequestTimeout(d time.Duration) Opt { + return func(o *opts) { + o.requestTimeout = d + } +} + +// WithMaxRetries sets the maximum number of retry attempts for 503 errors +func WithMaxRetries(n int) Opt { + return func(o *opts) { + o.maxRetries = n + } +} + +// WithRetryInitialDelay sets the initial delay before first retry +func WithRetryInitialDelay(d time.Duration) Opt { + return func(o *opts) { + o.retryInitialDelay = d + } +} + +// WithTracker sets a custom status tracker +func WithTracker(tracker docker.StatusTrackLocker) Opt { + return func(o *opts) { + o.tracker = tracker } } @@ -102,8 +297,8 @@ func NewHostOptions(ctx context.Context, refHostname string, optFuncs ...Opt) (* return dir, nil } - if o.authCreds != nil { - ho.Credentials = o.authCreds + if o.AuthCreds != nil { + ho.Credentials = o.AuthCreds } else { authCreds, err := NewAuthCreds(refHostname) if err != nil { @@ -140,19 +335,106 @@ func NewHostOptions(ctx context.Context, refHostname string, optFuncs ...Opt) (* // $DOCKER_CONFIG defaults to "~/.docker". // // refHostname is like "docker.io". +type customResolver struct { + remotes.Resolver + client *http.Client +} + +func (r *customResolver) Client(ctx context.Context, host string) (*http.Client, error) { + return r.client, nil +} + func New(ctx context.Context, refHostname string, optFuncs ...Opt) (remotes.Resolver, error) { ho, err := NewHostOptions(ctx, refHostname, optFuncs...) if err != nil { return nil, err } + // Configure HTTP client with connection limits to prevent registry overload + var o opts + for _, of := range optFuncs { + of(&o) + } + + // Use custom tracker if provided, otherwise use global PushTracker + tracker := PushTracker + if o.tracker != nil { + tracker = o.tracker + } + + // Build the custom transport chain first + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } + + // Apply connection limits + if o.maxConnsPerHost > 0 { + transport.MaxConnsPerHost = o.maxConnsPerHost + } + if o.maxIdleConns > 0 { + transport.MaxIdleConns = o.maxIdleConns + } + + finalTransport := http.RoundTripper(transport) + + // Wrap transport with semaphore-based concurrency limiting first + if o.maxConnsPerHost > 0 { + finalTransport = &semaphoreTransport{ + transport: finalTransport, + limit: o.maxConnsPerHost, + } + log.L.Debugf("Enabled semaphore-based concurrency limiting: limit=%d", o.maxConnsPerHost) + } + + // Then, wrap with retry logic if retries are configured + if o.maxRetries > 0 { + retryDelay := o.retryInitialDelay + if retryDelay == 0 { + retryDelay = 1000 * time.Millisecond // Default to 1 second + } + finalTransport = &retryTransport{ + transport: finalTransport, + maxRetries: o.maxRetries, + initialDelay: retryDelay, + } + log.L.Infof("Enabled retry logic: maxRetries=%d, initialDelay=%v for %s", o.maxRetries, retryDelay, refHostname) + } + + client := &http.Client{ + Transport: finalTransport, + } + + if o.requestTimeout > 0 { + client.Timeout = o.requestTimeout + } + + // Set up the host options with our custom client via UpdateClient + ho.UpdateClient = func(defaultClient *http.Client) error { + // Replace the default client's transport with our custom retry transport + defaultClient.Transport = finalTransport + if o.requestTimeout > 0 { + defaultClient.Timeout = o.requestTimeout + } + return nil + } + resolverOpts := docker.ResolverOptions{ - Tracker: PushTracker, + Tracker: tracker, Hosts: dockerconfig.ConfigureHosts(ctx, *ho), } resolver := docker.NewResolver(resolverOpts) - return resolver, nil + return &customResolver{ + Resolver: resolver, + client: client, + }, nil } // AuthCreds is for docker.WithAuthCreds