diff --git a/internal/layout.go b/internal/layout.go index c3472290..d8b9eee0 100644 --- a/internal/layout.go +++ b/internal/layout.go @@ -18,22 +18,32 @@ package internal import "path" -// deckhouse repo structure -// root-segment: -// root-segment/install: -// root-segment/install-standalone: -// root-segment/release-channel: -// root-segment/modules/: -// root-segment/modules//releases: -// root-segment/modules//extra/: +// deckhouse repo structure (relative to root path like registry.deckhouse.io/deckhouse/fe) +// +// Platform: +// +// : - Deckhouse main image +// /release-channel: - Release channel metadata +// /install: - Installer image +// /install-standalone: - Standalone installer +// +// Security: +// +// /security/: - Security databases (trivy-db, trivy-bdu, etc.) +// +// Modules: +// +// /modules/: - Module main image +// /modules//release: - Module release channel metadata +// /modules//extra/: - Module extra images const ( InstallSegment = "install" InstallStandaloneSegment = "install-standalone" ReleaseChannelSegment = "release-channel" - ModulesSegment = "modules" - ModulesExtraSegment = "extra" - ModulesReleasesSegment = "releases" + ModulesSegment = "modules" + ModulesReleaseSegment = "release" + ModulesExtraSegment = "extra" SecuritySegment = "security" @@ -49,9 +59,9 @@ var pathByMirrorType = map[MirrorType]string{ MirrorTypeDeckhouseInstallStandalone: InstallStandaloneSegment, MirrorTypeDeckhouseReleaseChannels: ReleaseChannelSegment, - MirrorTypeModules: ModulesSegment, - MirrorTypeModulesReleaseChannels: ModulesReleasesSegment, - MirrorTypeModulesExtra: ModulesExtraSegment, + // Module paths are relative to modules// directory + MirrorTypeModules: "", // Module main image at root of module dir + MirrorTypeModulesReleaseChannels: ModulesReleaseSegment, // modules//release MirrorTypeSecurity: SecuritySegment, MirrorTypeSecurityTrivyDBSegment: path.Join(SecuritySegment, SecurityTrivyDBSegment), diff --git a/internal/mirror.go b/internal/mirror.go index 456f0c92..3e87f9e8 100644 --- a/internal/mirror.go +++ b/internal/mirror.go @@ -25,7 +25,6 @@ const ( MirrorTypeDeckhouseReleaseChannels MirrorTypeModules MirrorTypeModulesReleaseChannels - MirrorTypeModulesExtra MirrorTypeSecurity MirrorTypeSecurityTrivyDBSegment MirrorTypeSecurityTrivyBDUSegment diff --git a/internal/mirror/cmd/pull/flags/flags.go b/internal/mirror/cmd/pull/flags/flags.go index e0aacd0b..c5487349 100644 --- a/internal/mirror/cmd/pull/flags/flags.go +++ b/internal/mirror/cmd/pull/flags/flags.go @@ -55,8 +55,9 @@ var ( SourceRegistryPassword string DeckhouseLicenseToken string - DoGOSTDigest bool - NoPullResume bool + DoGOSTDigest bool + NoPullResume bool + IgnoreSuspend bool NoPlatform bool NoSecurityDB bool @@ -161,6 +162,12 @@ module-name@=v1.3.0+stable → exact tag match: include only v1.3.0 and and publ false, "Do not continue last unfinished pull operation and start from scratch.", ) + flagSet.BoolVar( + &IgnoreSuspend, + "ignore-suspend", + false, + "Ignore suspended release channels and continue mirroring. Use with caution.", + ) flagSet.BoolVar( &NoPlatform, "no-platform", diff --git a/internal/mirror/cmd/pull/pull.go b/internal/mirror/cmd/pull/pull.go index e7b5de70..56d4ce7f 100644 --- a/internal/mirror/cmd/pull/pull.go +++ b/internal/mirror/cmd/pull/pull.go @@ -23,8 +23,10 @@ import ( "fmt" "log/slog" "os" + "os/signal" "path" "path/filepath" + "syscall" "time" "github.com/Masterminds/semver/v3" @@ -102,11 +104,23 @@ func NewCommand() *cobra.Command { } func pull(cmd *cobra.Command, _ []string) error { + // Set up graceful cancellation on Ctrl+C + parentCtx := cmd.Context() + if parentCtx == nil { + parentCtx = context.Background() + } + ctx, cancel := signal.NotifyContext(parentCtx, syscall.SIGINT, syscall.SIGTERM) + defer cancel() + puller := NewPuller(cmd) puller.logger.Infof("d8 version: %s", version.Version) - if err := puller.Execute(cmd.Context()); err != nil { + if err := puller.Execute(ctx); err != nil { + if errors.Is(err, context.Canceled) { + puller.logger.WarnLn("Operation cancelled by user") + return nil + } return ErrPullFailed } @@ -164,6 +178,7 @@ func buildPullParams(logger params.Logger) *params.PullParams { SkipSecurityDatabases: pullflags.NoSecurityDB, SkipModules: pullflags.NoModules, OnlyExtraImages: pullflags.OnlyExtraImages, + IgnoreSuspend: pullflags.IgnoreSuspend, DeckhouseTag: pullflags.DeckhouseTag, SinceVersion: pullflags.SinceVersion, } @@ -276,6 +291,7 @@ func (p *Puller) Execute(ctx context.Context) error { SkipSecurity: pullflags.NoSecurityDB, SkipModules: pullflags.NoModules, OnlyExtraImages: pullflags.OnlyExtraImages, + IgnoreSuspend: pullflags.IgnoreSuspend, ModuleFilter: filter, BundleDir: pullflags.ImagesBundlePath, BundleChunkSize: pullflags.ImagesBundleChunkSizeGB * 1000 * 1000 * 1000, diff --git a/internal/mirror/cmd/push/push.go b/internal/mirror/cmd/push/push.go index a2fb674a..af3a0452 100644 --- a/internal/mirror/cmd/push/push.go +++ b/internal/mirror/cmd/push/push.go @@ -23,9 +23,11 @@ import ( "io" "log/slog" "os" + "os/signal" "path" "path/filepath" "strings" + "syscall" "time" "github.com/google/go-containerregistry/pkg/authn" @@ -36,6 +38,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/registry" regclient "github.com/deckhouse/deckhouse/pkg/registry/client" + "github.com/deckhouse/deckhouse-cli/internal/mirror" "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" "github.com/deckhouse/deckhouse-cli/internal/mirror/operations" "github.com/deckhouse/deckhouse-cli/internal/version" @@ -188,7 +191,7 @@ func pushStaticPackages(pushParams *params.PushParams, logger params.Logger, cli } if err = pkg.Close(); err != nil { - logger.Warnf("Could not close bundle package %s: %w", pkgName, err) + logger.Warnf("Could not close bundle package %s: %v", pkgName, err) } } return nil @@ -289,6 +292,11 @@ func (p *Pusher) Execute() error { return err } + // Use new push service when NEW_PULL env is set + if os.Getenv("NEW_PULL") == "true" { + return p.executeNewPush() + } + if err := p.pushStaticPackages(); err != nil { return err } @@ -300,6 +308,63 @@ func (p *Pusher) Execute() error { return nil } +// executeNewPush runs the push using the push service. +// This service expects the bundle to have the exact same structure as the registry: +// - Each OCI layout's relative path becomes its registry segment +// - Works with unified bundles where pull saved the structure as-is +func (p *Pusher) executeNewPush() error { + // Set up graceful cancellation on Ctrl+C + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + logger := dkplog.NewNop() + + if log.DebugLogLevel() >= 3 { + logger = dkplog.NewLogger(dkplog.WithLevel(slog.LevelDebug)) + } + + // Create registry client + clientOpts := ®client.Options{ + Insecure: p.pushParams.Insecure, + TLSSkipVerify: p.pushParams.SkipTLSVerification, + Logger: logger, + } + + if p.pushParams.RegistryAuth != nil { + clientOpts.Auth = p.pushParams.RegistryAuth + } + + var client registry.Client + client = regclient.NewClientWithOptions(p.pushParams.RegistryHost, clientOpts) + + // Scope to the registry path + if p.pushParams.RegistryPath != "" { + client = client.WithSegment(p.pushParams.RegistryPath) + } + + svc := mirror.NewPushService( + client, + &mirror.PushServiceOptions{ + BundleDir: p.pushParams.BundleDir, + WorkingDir: p.pushParams.WorkingDir, + }, + logger.Named("push"), + p.logger.(*log.SLogger), + ) + + err := svc.Push(ctx) + if err != nil { + // Handle context cancellation gracefully + if errors.Is(err, context.Canceled) { + p.logger.WarnLn("Operation cancelled by user") + return nil + } + return err + } + + return nil +} + // validateRegistryAccess validates access to the registry func (p *Pusher) validateRegistryAccess() error { p.logger.InfoLn("Validating registry access") diff --git a/internal/mirror/modules/layout.go b/internal/mirror/modules/layout.go index d818cf58..2fc853a4 100644 --- a/internal/mirror/modules/layout.go +++ b/internal/mirror/modules/layout.go @@ -117,15 +117,20 @@ type ImageLayouts struct { platform v1.Platform workingDir string - Modules *regimage.ImageLayout + // Modules is the main module image layout (modules//) + Modules *regimage.ImageLayout + // ModulesReleaseChannels is the release channel layout (modules//release/) ModulesReleaseChannels *regimage.ImageLayout - ModulesExtra *regimage.ImageLayout + // ExtraImages holds layouts for each extra image (modules//extra//) + // Key is the extra image name (e.g., "scanner", "enforcer") + ExtraImages map[string]*regimage.ImageLayout } func NewImageLayouts(rootFolder string) *ImageLayouts { l := &ImageLayouts{ - workingDir: rootFolder, - platform: v1.Platform{Architecture: "amd64", OS: "linux"}, + workingDir: rootFolder, + platform: v1.Platform{Architecture: "amd64", OS: "linux"}, + ExtraImages: make(map[string]*regimage.ImageLayout), } return l @@ -144,8 +149,6 @@ func (l *ImageLayouts) setLayoutByMirrorType(rootFolder string, mirrorType inter l.Modules = layout case internal.MirrorTypeModulesReleaseChannels: l.ModulesReleaseChannels = layout - case internal.MirrorTypeModulesExtra: - l.ModulesExtra = layout default: return fmt.Errorf("wrong mirror type in modules image layout: %v", mirrorType) } @@ -153,6 +156,24 @@ func (l *ImageLayouts) setLayoutByMirrorType(rootFolder string, mirrorType inter return nil } +// GetOrCreateExtraLayout returns or creates a layout for a specific extra image. +// Extra images are stored under: modules//extra// +func (l *ImageLayouts) GetOrCreateExtraLayout(extraName string) (*regimage.ImageLayout, error) { + if existing, ok := l.ExtraImages[extraName]; ok { + return existing, nil + } + + // Create layout at modules//extra// + layoutPath := filepath.Join(l.workingDir, "extra", extraName) + layout, err := regimage.NewImageLayout(layoutPath) + if err != nil { + return nil, fmt.Errorf("create extra image layout for %s: %w", extraName, err) + } + + l.ExtraImages[extraName] = layout + return layout, nil +} + // AsList returns a list of layout.Path's in it. Undefined path's are not included in the list. func (l *ImageLayouts) AsList() []layout.Path { paths := make([]layout.Path, 0) @@ -162,8 +183,11 @@ func (l *ImageLayouts) AsList() []layout.Path { if l.ModulesReleaseChannels != nil { paths = append(paths, l.ModulesReleaseChannels.Path()) } - if l.ModulesExtra != nil { - paths = append(paths, l.ModulesExtra.Path()) + // Add all extra image layouts + for _, extraLayout := range l.ExtraImages { + if extraLayout != nil { + paths = append(paths, extraLayout.Path()) + } } return paths } diff --git a/internal/mirror/modules/modules.go b/internal/mirror/modules/modules.go index 220870f1..f6dfc5f0 100644 --- a/internal/mirror/modules/modules.go +++ b/internal/mirror/modules/modules.go @@ -25,6 +25,7 @@ import ( "io" "os" "path/filepath" + "regexp" "strings" "time" @@ -323,26 +324,88 @@ func (svc *Service) pullSingleModule(ctx context.Context, module moduleData) err return fmt.Errorf("pull module images: %w", err) } } + + // Also pull release images with version tags (modules//release:v1.x.x) + // These are in addition to channel tags (alpha, beta, etc.) + if len(moduleVersions) > 0 { + releaseVersionSet := make(map[string]*puller.ImageMeta) + for _, version := range moduleVersions { + releaseVersionSet[svc.rootURL+"/modules/"+module.name+"/release:"+version] = nil + downloadList.ModuleReleaseChannels[svc.rootURL+"/modules/"+module.name+"/release:"+version] = nil + } + + config := puller.PullConfig{ + Name: module.name + " release versions", + ImageSet: releaseVersionSet, + Layout: svc.layout.Module(module.name).ModulesReleaseChannels, + AllowMissingTags: true, + GetterService: svc.modulesService.Module(module.name).ReleaseChannels(), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull release version images for %s: %v", module.name, err)) + // Don't fail - version release images may not exist for all versions + } + } + + // Extract and pull internal digest images from module versions (images_digests.json) + // These are internal images that module uses at runtime + digestImages := svc.extractInternalDigestImages(ctx, module.name, moduleVersions) + if len(digestImages) > 0 { + // Add digest images to download list + digestImageSet := make(map[string]*puller.ImageMeta) + for _, digestRef := range digestImages { + digestImageSet[digestRef] = nil + downloadList.Module[digestRef] = nil + } + + config := puller.PullConfig{ + Name: module.name + " internal images", + ImageSet: digestImageSet, + Layout: svc.layout.Module(module.name).Modules, + AllowMissingTags: true, + GetterService: svc.modulesService.Module(module.name), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull internal digest images for %s: %v", module.name, err)) + // Don't fail on missing internal images, just log warning + } + } } // Extract and pull extra images from module versions - extraImages := svc.findExtraImages(ctx, module.name, moduleVersions) + // Each extra image gets its own layout: modules//extra// + extraImagesByName := svc.findExtraImages(ctx, module.name, moduleVersions) + + for extraName, images := range extraImagesByName { + if len(images) == 0 { + continue + } + + // Get or create layout for this extra image + extraLayout, err := svc.layout.Module(module.name).GetOrCreateExtraLayout(extraName) + if err != nil { + return fmt.Errorf("create layout for extra image %s: %w", extraName, err) + } - if len(extraImages) > 0 { - for img := range extraImages { - downloadList.ModuleExtra[img] = nil + // Build image set for this extra + imageSet := make(map[string]*puller.ImageMeta) + for _, img := range images { + imageSet[img.FullRef] = nil + downloadList.ModuleExtra[img.FullRef] = nil } config := puller.PullConfig{ - Name: module.name + " extra", - ImageSet: downloadList.ModuleExtra, - Layout: svc.layout.Module(module.name).ModulesExtra, + Name: module.name + "/" + extraName, + ImageSet: imageSet, + Layout: extraLayout, AllowMissingTags: true, - GetterService: svc.modulesService.Module(module.name).Extra(), + GetterService: svc.modulesService.Module(module.name).ExtraImage(extraName), } if err := svc.pullerService.PullImages(ctx, config); err != nil { - return fmt.Errorf("pull extra images: %w", err) + return fmt.Errorf("pull extra image %s: %w", extraName, err) } } @@ -369,9 +432,13 @@ func (svc *Service) extractVersionsFromReleaseChannels(ctx context.Context, modu svc.logger.Debug(fmt.Sprintf("Failed to extract version.json for %s/%s: %v", moduleName, channel, err)) continue } - if versionJSON.Version != "" { - versions = append(versions, "v"+versionJSON.Version) + version := versionJSON.Version + // Ensure version has "v" prefix (some may already have it) + if !strings.HasPrefix(version, "v") { + version = "v" + version + } + versions = append(versions, version) } } @@ -407,9 +474,22 @@ func extractVersionJSON(img interface{ Extract() io.ReadCloser }) (*versionJSON, } } -// findExtraImages finds extra images from module images -func (svc *Service) findExtraImages(ctx context.Context, moduleName string, versions []string) map[string]struct{} { - extraImages := make(map[string]struct{}) +// extraImageInfo holds information about an extra image to pull +type extraImageInfo struct { + // Name is the extra image name (e.g., "scanner", "enforcer") + Name string + // Tag is the image tag + Tag string + // FullRef is the full image reference for pulling + FullRef string +} + +// findExtraImages finds extra images from module images. +// Returns a map where key is extra image name, value is list of image refs to pull. +// Extra images are stored under: modules//extra/: +func (svc *Service) findExtraImages(ctx context.Context, moduleName string, versions []string) map[string][]extraImageInfo { + // Map of extra-name -> list of images to pull + extraImages := make(map[string][]extraImageInfo) for _, version := range versions { // Skip digest references @@ -448,8 +528,14 @@ func (svc *Service) findExtraImages(ctx context.Context, moduleName string, vers continue } + // Extra images go under: modules//extra/: fullImagePath := svc.rootURL + "/modules/" + moduleName + "/extra/" + imageName + ":" + imageTag - extraImages[fullImagePath] = struct{}{} + + extraImages[imageName] = append(extraImages[imageName], extraImageInfo{ + Name: imageName, + Tag: imageTag, + FullRef: fullImagePath, + }) } } @@ -481,6 +567,90 @@ func extractExtraImagesJSON(img interface{ Extract() io.ReadCloser }) (map[strin } } +// digestRegex matches sha256 digests in images_digests.json +var digestRegex = regexp.MustCompile(`sha256:[a-f0-9]{64}`) + +// extractImagesDigestsJSON extracts images_digests.json from module image +// and returns list of sha256 digests. These are internal images that module uses at runtime. +func extractImagesDigestsJSON(img interface{ Extract() io.ReadCloser }) ([]string, error) { + rc := img.Extract() + defer rc.Close() + + tr := tar.NewReader(rc) + for { + hdr, err := tr.Next() + if err == io.EOF { + return nil, fmt.Errorf("images_digests.json not found in image") + } + if err != nil { + return nil, err + } + + if hdr.Name == "images_digests.json" { + data, err := io.ReadAll(tr) + if err != nil { + return nil, fmt.Errorf("read images_digests.json: %w", err) + } + // Extract all sha256:... digests from JSON file + digests := digestRegex.FindAllString(string(data), -1) + return digests, nil + } + } +} + +// extractInternalDigestImages extracts internal digest images from module versions. +// It reads images_digests.json from each module version image and returns +// list of image references in format "repo@sha256:..." which will be pulled +// and stored with tag = hex part of digest. +func (svc *Service) extractInternalDigestImages(ctx context.Context, moduleName string, versions []string) []string { + seenDigests := make(map[string]struct{}) + var digestRefs []string + + moduleRepo := svc.rootURL + "/modules/" + moduleName + + for _, version := range versions { + // Skip digest references + if strings.Contains(version, "@sha256:") { + continue + } + + tag := version + if strings.Contains(version, ":") { + parts := strings.SplitN(version, ":", 2) + tag = parts[1] + } + + img, err := svc.modulesService.Module(moduleName).GetImage(ctx, tag) + if err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to get module image %s:%s for digest extraction: %v", moduleName, tag, err)) + continue + } + + // Extract images_digests.json + digests, err := extractImagesDigestsJSON(img) + if err != nil { + svc.logger.Debug(fmt.Sprintf("No images_digests.json in %s:%s: %v", moduleName, tag, err)) + continue + } + + svc.logger.Debug(fmt.Sprintf("Found %d internal digests in %s:%s", len(digests), moduleName, tag)) + + for _, digest := range digests { + if _, seen := seenDigests[digest]; seen { + continue + } + seenDigests[digest] = struct{}{} + + // Create reference in format repo@sha256:... + // When pulled, the tag will be the hex part (after last ":") + digestRef := moduleRepo + "@" + digest + digestRefs = append(digestRefs, digestRef) + } + } + + return digestRefs +} + // pullVexImages finds and pulls VEX attestation images for module images func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downloadList *ImageDownloadList) { allImages := make([]string, 0) @@ -492,6 +662,8 @@ func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downlo allImages = append(allImages, img) } + // Find VEX images and add to a separate set for pulling + vexImageSet := make(map[string]*puller.ImageMeta) for _, img := range allImages { vexImageName, err := svc.findVexImage(ctx, moduleName, img) if err != nil { @@ -500,9 +672,26 @@ func (svc *Service) pullVexImages(ctx context.Context, moduleName string, downlo } if vexImageName != "" { svc.logger.Debug(fmt.Sprintf("Found VEX image: %s", vexImageName)) + vexImageSet[vexImageName] = nil downloadList.Module[vexImageName] = nil } } + + // Pull VEX images if any found + if len(vexImageSet) > 0 { + config := puller.PullConfig{ + Name: moduleName + " VEX images", + ImageSet: vexImageSet, + Layout: svc.layout.Module(moduleName).Modules, + AllowMissingTags: true, // VEX images may not exist + GetterService: svc.modulesService.Module(moduleName), + } + + if err := svc.pullerService.PullImages(ctx, config); err != nil { + svc.logger.Debug(fmt.Sprintf("Failed to pull VEX images for %s: %v", moduleName, err)) + // Don't fail on VEX image pull errors + } + } } // findVexImage checks if a VEX attestation image exists for the given image @@ -593,9 +782,11 @@ func (svc *Service) packModules(modules []moduleData) error { pkg = f } - // Pack from the module's working directory + // Pack from the module's working directory with prefix to create correct registry structure. + // This ensures the tar contains paths like "modules//index.json" instead of just "index.json". moduleDir := filepath.Join(svc.layout.workingDir, module.name) - if err := bundle.Pack(context.Background(), moduleDir, pkg); err != nil { + tarPrefix := filepath.Join("modules", module.name) + if err := bundle.PackWithPrefix(context.Background(), moduleDir, tarPrefix, pkg); err != nil { return fmt.Errorf("pack module %s: %w", pkgName, err) } @@ -652,10 +843,11 @@ func createOCIImageLayoutsForModule( ) (*ImageLayouts, error) { layouts := NewImageLayouts(rootFolder) + // Only create layouts for main module and release channels. + // Extra image layouts are created dynamically when extra images are discovered. mirrorTypes := []internal.MirrorType{ internal.MirrorTypeModules, internal.MirrorTypeModulesReleaseChannels, - internal.MirrorTypeModulesExtra, } for _, mtype := range mirrorTypes { diff --git a/internal/mirror/platform/layout.go b/internal/mirror/platform/layout.go index 00907045..36f7618c 100644 --- a/internal/mirror/platform/layout.go +++ b/internal/mirror/platform/layout.go @@ -56,6 +56,8 @@ func (l *ImageDownloadList) FillDeckhouseImages(deckhouseVersions []string) { l.Deckhouse[l.rootURL+":"+version] = nil l.DeckhouseInstall[path.Join(l.rootURL, internal.InstallSegment)+":"+version] = nil l.DeckhouseInstallStandalone[path.Join(l.rootURL, internal.InstallStandaloneSegment)+":"+version] = nil + // Also add version tags to release-channel (e.g., release-channel:v1.74.0) + l.DeckhouseReleaseChannel[path.Join(l.rootURL, internal.ReleaseChannelSegment)+":"+version] = nil } } diff --git a/internal/mirror/platform/platform.go b/internal/mirror/platform/platform.go index 100231e3..2cfca9b6 100644 --- a/internal/mirror/platform/platform.go +++ b/internal/mirror/platform/platform.go @@ -56,6 +56,8 @@ type Options struct { BundleDir string // BundleChunkSize is the max size of bundle chunks in bytes (0 = no chunking) BundleChunkSize int64 + // IgnoreSuspend allows mirroring even if release channels are suspended + IgnoreSuspend bool } type Service struct { @@ -278,9 +280,9 @@ func (svc *Service) getReleaseChannelVersionFromRegistry(ctx context.Context, re return nil, fmt.Errorf("cannot get %s release channel version.json: %w", releaseChannel, err) } - // if meta.Suspend { - // return nil, fmt.Errorf("source registry contains suspended release channel %q, try again later", releaseChannel) - // } + if meta.Suspend && !svc.options.IgnoreSuspend { + return nil, fmt.Errorf("source registry contains suspended release channel %q, try again later (use --ignore-suspend to override)", releaseChannel) + } ver, err := semver.NewVersion(meta.Version) if err != nil { @@ -299,11 +301,8 @@ func (svc *Service) getReleaseChannelVersionFromRegistry(ctx context.Context, re svc.userLogger.Debugf("image reference: %s@%s", imageMeta, digest.String()) - err = svc.layout.DeckhouseReleaseChannel.AddImage(image, imageMeta.GetTagReference()) - if err != nil { - return nil, fmt.Errorf("append %s release channel image to layout: %w", releaseChannel, err) - } - + // Don't add to layout here - pullDeckhouseReleaseChannels will add it + // Just record in downloadList for later pull svc.downloadList.DeckhouseReleaseChannel[imageMeta.GetTagReference()] = puller.NewImageMeta(meta.Version, imageMeta.GetTagReference(), &digest) return ver, nil diff --git a/internal/mirror/pull.go b/internal/mirror/pull.go index 4a1a74ef..0c9ab239 100644 --- a/internal/mirror/pull.go +++ b/internal/mirror/pull.go @@ -40,6 +40,8 @@ type PullServiceOptions struct { SkipModules bool // OnlyExtraImages pulls only extra images for modules (without main module images) OnlyExtraImages bool + // IgnoreSuspend allows mirroring even if release channels are suspended + IgnoreSuspend bool // ModuleFilter is the filter for module selection (whitelist/blacklist) ModuleFilter *libmodules.Filter // BundleDir is the directory to store the bundle @@ -88,6 +90,7 @@ func NewPullService( TargetTag: targetTag, BundleDir: options.BundleDir, BundleChunkSize: options.BundleChunkSize, + IgnoreSuspend: options.IgnoreSuspend, }, logger, userLogger, diff --git a/internal/mirror/puller/puller.go b/internal/mirror/puller/puller.go index f85b8b00..e940db91 100644 --- a/internal/mirror/puller/puller.go +++ b/internal/mirror/puller/puller.go @@ -19,8 +19,11 @@ package puller import ( "context" "fmt" + "strings" "time" + v1 "github.com/google/go-containerregistry/pkg/v1" + dkplog "github.com/deckhouse/deckhouse/pkg/log" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" @@ -58,6 +61,28 @@ func (ps *PullerService) PullImages(ctx context.Context, config PullConfig) erro _, tag := SplitImageRefByRepoAndTag(image) + // Check if this is a digest reference (repo@sha256:abc...) + // For digest references, we already know the digest - it's in the reference itself + if strings.Contains(image, "@sha256:") { + // Extract digest from reference + digestStr := image[strings.Index(image, "@sha256:")+1:] // "sha256:abc..." + + digest, err := v1.NewHash(digestStr) + if err != nil { + ps.userLogger.Debugf("failed to parse digest from %s: %v", image, err) + + if config.AllowMissingTags { + continue + } + + return fmt.Errorf("parse digest from reference %s: %w", image, err) + } + + config.ImageSet[image] = NewImageMeta(tag, image, &digest) + + continue + } + digest, err := config.GetterService.GetDigest(ctx, tag) if err != nil { if config.AllowMissingTags { @@ -69,6 +94,7 @@ func (ps *PullerService) PullImages(ctx context.Context, config PullConfig) erro config.ImageSet[image] = NewImageMeta(tag, image, digest) } + ps.userLogger.InfoLn("All required " + config.Name + " meta are pulled!") if err := ps.PullImageSet(ctx, config.ImageSet, config.Layout, config.GetterService.GetImage); err != nil { diff --git a/internal/mirror/puller/types.go b/internal/mirror/puller/types.go index f5f4b974..2e85c7da 100644 --- a/internal/mirror/puller/types.go +++ b/internal/mirror/puller/types.go @@ -70,10 +70,5 @@ func SplitImageRefByRepoAndTag(imageReferenceString string) (string, string) { repo := imageReferenceString[:splitIndex] tag := imageReferenceString[splitIndex+1:] - if strings.HasSuffix(repo, "@sha256") { - repo = strings.TrimSuffix(repo, "@sha256") - tag = "@sha256:" + tag - } - return repo, tag } diff --git a/internal/mirror/push.go b/internal/mirror/push.go index 7f4c7430..37fc1e23 100644 --- a/internal/mirror/push.go +++ b/internal/mirror/push.go @@ -17,23 +17,383 @@ limitations under the License. package mirror import ( + "context" + "fmt" + "io" + "io/fs" + "log/slog" + "os" + "path/filepath" + "slices" + "strings" + + "github.com/google/go-containerregistry/pkg/v1/layout" + "github.com/google/go-containerregistry/pkg/v1/random" + dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" + "github.com/deckhouse/deckhouse-cli/internal" + "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" + "github.com/deckhouse/deckhouse-cli/internal/mirror/pusher" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/bundle" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" - registryservice "github.com/deckhouse/deckhouse-cli/pkg/registry/service" ) -type PushService struct { - registryService registryservice.Service +const ( + dirPermissions = 0755 +) + +// PushServiceOptions contains configuration options for PushService +type PushServiceOptions struct { + // BundleDir is the directory containing the bundle to push + BundleDir string + // WorkingDir is the temporary directory for unpacking bundles + WorkingDir string +} +// PushService handles pushing OCI layouts to registry. +// It treats the layout structure as the source of truth - the relative path of each layout +// becomes the registry segment directly. +// +// Expected layout structure (after unpack): +// +// / +// ├── index.json # Deckhouse main images +// ├── blobs/ +// ├── install/ # Deckhouse Install +// │ ├── index.json +// │ └── blobs/ +// ├── install-standalone/ # Deckhouse Standalone Install +// ├── release-channel/ # Deckhouse release channels +// ├── security/ # Security databases +// │ ├── trivy-db/ +// │ ├── trivy-bdu/ +// │ ├── trivy-java-db/ +// │ └── trivy-checks/ +// └── modules/ # Modules +// └── / +// ├── index.json +// ├── release/ +// └── / +type PushService struct { + client registry.Client + options *PushServiceOptions + pusher *pusher.Service logger *dkplog.Logger userLogger *log.SLogger } -func NewPushService(registryService registryservice.Service, logger *dkplog.Logger, userLogger *log.SLogger) *PushService { +// NewPushService creates a new PushService +func NewPushService( + client registry.Client, + options *PushServiceOptions, + logger *dkplog.Logger, + userLogger *log.SLogger, +) *PushService { + if options == nil { + options = &PushServiceOptions{} + } + return &PushService{ - registryService: registryService, - logger: logger, - userLogger: userLogger, + client: client, + options: options, + pusher: pusher.NewService(logger, userLogger), + logger: logger, + userLogger: userLogger, + } +} + +// Push uploads all OCI layouts from the bundle to the registry. +// It unpacks all packages into a unified directory and pushes each layout +// using its relative path as the registry segment. +// +// The key principle: no path transformations. Whatever path the layout has +// in the unpacked directory becomes its path in the registry. +func (svc *PushService) Push(ctx context.Context) error { + // Create unified directory for unpacking + dirPath := filepath.Join(svc.options.WorkingDir, "unified") + if err := os.MkdirAll(dirPath, dirPermissions); err != nil { + return fmt.Errorf("create unified directory: %w", err) + } + defer func() { + if err := os.RemoveAll(dirPath); err != nil { + svc.logger.Warn("Failed to cleanup unified directory", + slog.String("path", dirPath), + slog.Any("error", err)) + } + }() + + // Unpack all packages into unified structure + if err := svc.unpackAllPackages(ctx, dirPath); err != nil { + return fmt.Errorf("unpack packages: %w", err) + } + + // Push all layouts recursively + if err := svc.userLogger.Process("Push to registry", func() error { + return svc.pushAllLayouts(ctx, dirPath) + }); err != nil { + return err + } + + // Create modules index (deckhouse/modules: tags for discovery) + return svc.userLogger.Process("Create modules index", func() error { + return svc.createModulesIndex(ctx, dirPath) + }) +} + +// unpackAllPackages unpacks all tar packages from bundle directory into unified directory. +// All packages are unpacked to the same root - the structure inside each tar +// should already have the correct paths. +func (svc *PushService) unpackAllPackages(ctx context.Context, dirPath string) error { + entries, err := os.ReadDir(svc.options.BundleDir) + if err != nil { + return fmt.Errorf("read bundle dir: %w", err) + } + + packages := svc.findPackages(entries) + if len(packages) == 0 { + return fmt.Errorf("no packages found in bundle directory") + } + + svc.userLogger.Infof("Found %d packages to unpack", len(packages)) + + for _, pkgName := range packages { + if err := svc.unpackPackage(ctx, dirPath, pkgName); err != nil { + // Log warning but continue with other packages + svc.userLogger.Warnf("Failed to unpack %s: %v", pkgName, err) + } + } + + return nil +} + +// findPackages finds all package names (without .tar extension) in the bundle directory. +// It handles both regular .tar files and chunked packages (.tar.chunk000). +func (svc *PushService) findPackages(entries []os.DirEntry) []string { + packagesSet := make(map[string]struct{}) + + for _, entry := range entries { + name := entry.Name() + + // Handle regular tar files + if strings.HasSuffix(name, ".tar") { + pkgName := strings.TrimSuffix(name, ".tar") + packagesSet[pkgName] = struct{}{} + continue + } + + // Handle chunked files (e.g., "platform.tar.chunk000") + if idx := strings.Index(name, ".tar.chunk"); idx != -1 { + pkgName := name[:idx] + packagesSet[pkgName] = struct{}{} + } + } + + packages := make([]string, 0, len(packagesSet)) + for pkg := range packagesSet { + packages = append(packages, pkg) + } + slices.Sort(packages) + + return packages +} + +// unpackPackage unpacks a single package to the unified directory. +func (svc *PushService) unpackPackage(ctx context.Context, dirPath, pkgName string) error { + return svc.userLogger.Process(fmt.Sprintf("Unpack %s", pkgName), func() error { + pkg, err := svc.openPackage(pkgName) + if err != nil { + return fmt.Errorf("open package: %w", err) + } + defer pkg.Close() + + // Unpack directly to unified directory - no path transformations + if err := bundle.Unpack(ctx, pkg, dirPath); err != nil { + return fmt.Errorf("unpack: %w", err) + } + + return nil + }) +} + +// openPackage opens a package file, trying .tar first, then chunked format. +func (svc *PushService) openPackage(pkgName string) (io.ReadCloser, error) { + tarPath := filepath.Join(svc.options.BundleDir, pkgName+".tar") + + pkg, err := os.Open(tarPath) + if err == nil { + return pkg, nil + } + + if !os.IsNotExist(err) { + return nil, fmt.Errorf("open %s: %w", tarPath, err) } + + // Try chunked format + return chunked.Open(svc.options.BundleDir, pkgName+".tar") +} + +// pushAllLayouts recursively walks the directory and pushes each OCI layout found. +// The relative path from root becomes the registry segment. +func (svc *PushService) pushAllLayouts(ctx context.Context, rootDir string) error { + layouts, err := svc.findLayouts(rootDir) + if err != nil { + return fmt.Errorf("scan layouts: %w", err) + } + + if len(layouts) == 0 { + svc.userLogger.InfoLn("No layouts to push") + return nil + } + + svc.userLogger.Infof("Found %d layouts to push", len(layouts)) + + for _, layoutDir := range layouts { + if err := ctx.Err(); err != nil { + return err + } + + if err := svc.pushSingleLayout(ctx, rootDir, layoutDir); err != nil { + return err + } + } + + return nil +} + +// findLayouts finds all OCI layouts in the directory by looking for index.json files. +func (svc *PushService) findLayouts(rootDir string) ([]string, error) { + var layouts []string + + err := filepath.WalkDir(rootDir, func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() || d.Name() != "index.json" { + return nil + } + layouts = append(layouts, filepath.Dir(path)) + return nil + }) + if err != nil { + return nil, err + } + + slices.Sort(layouts) + return layouts, nil +} + +// pushSingleLayout pushes a single OCI layout to the registry. +func (svc *PushService) pushSingleLayout(ctx context.Context, rootDir, layoutDir string) error { + // Check if layout has any images + hasImages, err := svc.layoutHasImages(layoutDir) + if err != nil { + svc.logger.Warn("Failed to check layout", + slog.String("path", layoutDir), + slog.Any("error", err)) + return nil + } + if !hasImages { + return nil + } + + // Build registry segment from relative path + relPath, _ := filepath.Rel(rootDir, layoutDir) + segment := "" + if relPath != "." { + segment = relPath + } + + // Create client with appropriate segments + targetClient := svc.client + if segment != "" { + // Apply each path component as a segment + for _, seg := range strings.Split(segment, string(os.PathSeparator)) { + targetClient = targetClient.WithSegment(seg) + } + } + + svc.userLogger.Infof("Pushing %s", targetClient.GetRegistry()) + + if err := svc.pusher.PushLayout(ctx, layout.Path(layoutDir), targetClient); err != nil { + return fmt.Errorf("push layout %q: %w", relPath, err) + } + + return nil +} + +// layoutHasImages checks if an OCI layout has any images to push. +func (svc *PushService) layoutHasImages(layoutDir string) (bool, error) { + layoutPath := layout.Path(layoutDir) + index, err := layoutPath.ImageIndex() + if err != nil { + return false, fmt.Errorf("read index: %w", err) + } + + indexManifest, err := index.IndexManifest() + if err != nil { + return false, fmt.Errorf("parse index manifest: %w", err) + } + + return len(indexManifest.Manifests) > 0, nil +} + +// createModulesIndex creates the modules index in the registry. +// This pushes a small random image for each module with tag = module name +// to deckhouse/modules repo, enabling module discovery via ListTags. +func (svc *PushService) createModulesIndex(ctx context.Context, rootDir string) error { + modulesDir := filepath.Join(rootDir, internal.ModulesSegment) + + // Check if modules directory exists + entries, err := os.ReadDir(modulesDir) + if err != nil { + if os.IsNotExist(err) { + svc.userLogger.InfoLn("No modules directory found, skipping modules index") + return nil + } + return fmt.Errorf("read modules directory: %w", err) + } + + // Find all module directories + var moduleNames []string + for _, entry := range entries { + if entry.IsDir() { + moduleNames = append(moduleNames, entry.Name()) + } + } + + if len(moduleNames) == 0 { + svc.userLogger.InfoLn("No modules found, skipping modules index") + return nil + } + + slices.Sort(moduleNames) + svc.userLogger.Infof("Creating modules index with %d modules", len(moduleNames)) + + // Get client scoped to modules repo + modulesClient := svc.client.WithSegment(internal.ModulesSegment) + + // Push a small random image for each module with tag = module name + for _, moduleName := range moduleNames { + if err := ctx.Err(); err != nil { + return err + } + + svc.userLogger.Infof("Creating index tag: %s:%s", modulesClient.GetRegistry(), moduleName) + + // Create minimal random image (32 bytes, 1 layer) + img, err := random.Image(32, 1) + if err != nil { + return fmt.Errorf("create random image for module %s: %w", moduleName, err) + } + + // Push with module name as tag + if err := modulesClient.PushImage(ctx, moduleName, img); err != nil { + return fmt.Errorf("push module index tag %s: %w", moduleName, err) + } + } + + svc.userLogger.Infof("Modules index created successfully") + return nil } diff --git a/internal/mirror/pusher/pusher.go b/internal/mirror/pusher/pusher.go index 5a129e13..e093abff 100644 --- a/internal/mirror/pusher/pusher.go +++ b/internal/mirror/pusher/pusher.go @@ -20,99 +20,114 @@ import ( "context" "fmt" "io" + "log/slog" "os" "path/filepath" - "strings" + "time" - "github.com/samber/lo" + "github.com/google/go-containerregistry/pkg/v1/layout" dkplog "github.com/deckhouse/deckhouse/pkg/log" + "github.com/deckhouse/deckhouse/pkg/registry" "github.com/deckhouse/deckhouse-cli/internal/mirror/chunked" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/errorutil" "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry" + "github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/retry/task" ) -// PusherService handles the pushing of images to the registry -type PusherService struct { +const ( + pushRetryAttempts = 4 + pushRetryDelay = 3 * time.Second +) + +// Service handles the pushing of images to the registry +type Service struct { logger *dkplog.Logger userLogger *log.SLogger } -// NewPusherService creates a new PusherService -func NewPusherService( - logger *dkplog.Logger, - userLogger *log.SLogger, -) *PusherService { - return &PusherService{ +// NewService creates a new pusher service +func NewService(logger *dkplog.Logger, userLogger *log.SLogger) *Service { + return &Service{ logger: logger, userLogger: userLogger, } } -// PushModules pushes module packages from the bundle directory -func (ps *PusherService) PushModules(_ context.Context, bundleDir string, _ interface{}) error { - bundleContents, err := os.ReadDir(bundleDir) - if err != nil { - return fmt.Errorf("list bundle directory: %w", err) +// PackageExists checks if a package exists (tar or chunked) +func (s *Service) PackageExists(bundleDir, pkgName string) bool { + packagePath := filepath.Join(bundleDir, pkgName+".tar") + if _, err := os.Stat(packagePath); err == nil { + return true } + // Check for chunked package + if _, err := os.Stat(packagePath + ".chunk000"); err == nil { + return true + } + return false +} - modulePackages := lo.Compact(lo.Map(bundleContents, func(item os.DirEntry, _ int) string { - fileExt := filepath.Ext(item.Name()) - pkgName, _, ok := strings.Cut(strings.TrimPrefix(item.Name(), "module-"), ".") - switch { - case !ok: - fallthrough - case fileExt != ".tar" && fileExt != ".chunk": - fallthrough - case !strings.HasPrefix(item.Name(), "module-"): - return "" - } - return pkgName - })) - - successfullyPushedModules := make([]string, 0) - for _, modulePackageName := range modulePackages { - if lo.Contains(successfullyPushedModules, modulePackageName) { - continue - } +// PushLayout pushes all images from an OCI layout to the registry +func (s *Service) PushLayout(ctx context.Context, layoutPath layout.Path, client registry.Client) error { + index, err := layoutPath.ImageIndex() + if err != nil { + return fmt.Errorf("read OCI image index: %w", err) + } - if err = ps.userLogger.Process("Push module: "+modulePackageName, func() error { - pkg, err := ps.openPackage(bundleDir, "module-"+modulePackageName) - if err != nil { - return fmt.Errorf("open package %q: %w", modulePackageName, err) - } - defer pkg.Close() + indexManifest, err := index.IndexManifest() + if err != nil { + return fmt.Errorf("parse OCI image index manifest: %w", err) + } - // Here we would call operations.PushModule, but since we don't have access to it, - // we'll leave this as a placeholder - // if err = operations.PushModule(pushParams, modulePackageName, pkg, client); err != nil { - // return fmt.Errorf("failed to push module %q: %w", modulePackageName, err) - // } + if len(indexManifest.Manifests) == 0 { + return nil + } - ps.userLogger.InfoLn("Module " + modulePackageName + " pushed successfully") + s.userLogger.Infof("Pushing %d images", len(indexManifest.Manifests)) - successfullyPushedModules = append(successfullyPushedModules, modulePackageName) + for i, manifest := range indexManifest.Manifests { + tag := manifest.Annotations["io.deckhouse.image.short_tag"] + if tag == "" { + s.logger.Warn("Skipping image without short_tag annotation", slog.String("digest", manifest.Digest.String())) + continue + } - return nil - }); err != nil { - ps.userLogger.WarnLn(err) + img, err := index.Image(manifest.Digest) + if err != nil { + return fmt.Errorf("read image %s: %w", tag, err) } - } - if len(successfullyPushedModules) > 0 { - ps.userLogger.Infof("Modules pushed: %v", strings.Join(successfullyPushedModules, ", ")) + imageReferenceString := fmt.Sprintf("%s:%s", client.GetRegistry(), tag) + err = retry.RunTask( + ctx, + s.userLogger, + fmt.Sprintf("[%d / %d] Pushing %s", i+1, len(indexManifest.Manifests), imageReferenceString), + task.WithConstantRetries(pushRetryAttempts, pushRetryDelay, func(ctx context.Context) error { + if err := client.PushImage(ctx, tag, img); err != nil { + if errorutil.IsTrivyMediaTypeNotAllowedError(err) { + return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning) + } + return fmt.Errorf("write %s:%s to registry: %w", client.GetRegistry(), tag, err) + } + return nil + })) + if err != nil { + return fmt.Errorf("push image %s: %w", tag, err) + } } return nil } -// openPackage opens a package file, trying .tar first, then .chunk -func (ps *PusherService) openPackage(bundleDir, pkgName string) (io.ReadCloser, error) { +// OpenPackage opens a package file, trying .tar first, then chunked +func (s *Service) OpenPackage(bundleDir, pkgName string) (io.ReadCloser, error) { p := filepath.Join(bundleDir, pkgName+".tar") pkg, err := os.Open(p) switch { case os.IsNotExist(err): - return ps.openChunkedPackage(bundleDir, pkgName) + return s.openChunkedPackage(bundleDir, pkgName) case err != nil: return nil, fmt.Errorf("read bundle package %s: %w", pkgName, err) } @@ -120,8 +135,7 @@ func (ps *PusherService) openPackage(bundleDir, pkgName string) (io.ReadCloser, return pkg, nil } -// openChunkedPackage opens a chunked package -func (ps *PusherService) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, error) { +func (s *Service) openChunkedPackage(bundleDir, pkgName string) (io.ReadCloser, error) { pkg, err := chunked.Open(bundleDir, pkgName+".tar") if err != nil { return nil, fmt.Errorf("open bundle package %q: %w", pkgName, err) diff --git a/internal/mirror/releases/versions.go b/internal/mirror/releases/versions.go index 4c4cbe54..be503698 100644 --- a/internal/mirror/releases/versions.go +++ b/internal/mirror/releases/versions.go @@ -132,7 +132,7 @@ func VersionsToMirror(pullParams *params.PullParams, client registry.Client, tag alphaChannelVersion, found := releaseChannelsVersions[internal.AlphaChannel] if found { - versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAnbBelowAlpha(mirrorFromVersion, tags, alphaChannelVersion) + versionsAboveMinimal := parseAndFilterVersionsAboveMinimalAndBelowAlpha(mirrorFromVersion, tags, alphaChannelVersion) versionsAboveMinimal = FilterOnlyLatestPatches(versionsAboveMinimal) return deduplicateVersions(append(vers, versionsAboveMinimal...)), channels, nil @@ -160,7 +160,7 @@ func getReleasedTagsFromRegistry(pullParams *params.PullParams, client registry. return tags, nil } -func parseAndFilterVersionsAboveMinimalAnbBelowAlpha( +func parseAndFilterVersionsAboveMinimalAndBelowAlpha( minVersion *semver.Version, tags []string, alphaChannelVersion *semver.Version, @@ -222,9 +222,9 @@ func getReleaseChannelVersionFromRegistry(mirrorCtx *params.PullParams, releaseC return nil, fmt.Errorf("cannot find release channel version: %w", err) } - // if releaseInfo.Suspended { - // return nil, fmt.Errorf("cannot mirror Deckhouse: source registry contains suspended release channel %q, try again later", releaseChannel) - // } + if releaseInfo.Suspended && !mirrorCtx.IgnoreSuspend { + return nil, fmt.Errorf("cannot mirror Deckhouse: source registry contains suspended release channel %q, try again later (use --ignore-suspend to override)", releaseChannel) + } ver, err := semver.NewVersion(releaseInfo.Version) if err != nil { diff --git a/internal/mirror/releases/versions_test.go b/internal/mirror/releases/versions_test.go index 70025af5..cf241a45 100644 --- a/internal/mirror/releases/versions_test.go +++ b/internal/mirror/releases/versions_test.go @@ -23,7 +23,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseAndFilterVersionsAboveMinimalAnbBelowAlpha(t *testing.T) { +func TestParseAndFilterVersionsAboveMinimalAndBelowAlpha(t *testing.T) { minVersion := semver.MustParse("v1.50.0") alphaVersion := semver.MustParse("v1.60.0") @@ -66,7 +66,7 @@ func TestParseAndFilterVersionsAboveMinimalAnbBelowAlpha(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := parseAndFilterVersionsAboveMinimalAnbBelowAlpha(minVersion, tt.tags, alphaVersion) + result := parseAndFilterVersionsAboveMinimalAndBelowAlpha(minVersion, tt.tags, alphaVersion) resultStrs := make([]string, len(result)) for i, v := range result { @@ -184,14 +184,14 @@ func TestDeduplicateVersions(t *testing.T) { } // Benchmark tests -func BenchmarkParseAndFilterVersionsAboveMinimalAnbBelowAlpha(b *testing.B) { +func BenchmarkParseAndFilterVersionsAboveMinimalAndBelowAlpha(b *testing.B) { minVersion := semver.MustParse("v1.50.0") alphaVersion := semver.MustParse("v1.60.0") tags := []string{"v1.49.0", "v1.50.0", "v1.51.0", "v1.52.0", "v1.59.0", "v1.60.0", "v1.61.0"} b.ResetTimer() for i := 0; i < b.N; i++ { - parseAndFilterVersionsAboveMinimalAnbBelowAlpha(minVersion, tags, alphaVersion) + parseAndFilterVersionsAboveMinimalAndBelowAlpha(minVersion, tags, alphaVersion) } } diff --git a/internal/mirror/security/security.go b/internal/mirror/security/security.go index 6defd517..ab174dff 100644 --- a/internal/mirror/security/security.go +++ b/internal/mirror/security/security.go @@ -77,9 +77,9 @@ func NewService( options = &Options{} } - tmpDir := filepath.Join(workingDir, "security") - - layout, err := createOCIImageLayoutsForSecurity(tmpDir) + // workingDir is the root where we create layouts + // Layouts will be created at workingDir/security/trivy-db, etc. + layout, err := createOCIImageLayoutsForSecurity(workingDir) if err != nil { //TODO: handle error userLogger.Warnf("Create OCI Image Layouts: %v", err) diff --git a/pkg/libmirror/bundle/bundle.go b/pkg/libmirror/bundle/bundle.go index 8b409f03..b3b96faa 100644 --- a/pkg/libmirror/bundle/bundle.go +++ b/pkg/libmirror/bundle/bundle.go @@ -71,8 +71,15 @@ func Unpack(ctx context.Context, source io.Reader, targetPath string) error { } func Pack(ctx context.Context, sourcePath string, sink io.Writer) error { + return PackWithPrefix(ctx, sourcePath, "", sink) +} + +// PackWithPrefix packs directory contents into tar with an optional prefix for all paths. +// For example, PackWithPrefix(ctx, "/tmp/module", "modules/stronghold", sink) will create +// tar entries like "modules/stronghold/index.json" instead of just "index.json". +func PackWithPrefix(ctx context.Context, sourcePath string, prefix string, sink io.Writer) error { tarWriter := tar.NewWriter(sink) - if err := filepath.Walk(sourcePath, packFunc(ctx, sourcePath, tarWriter)); err != nil { + if err := filepath.Walk(sourcePath, packFuncWithPrefix(ctx, sourcePath, prefix, tarWriter)); err != nil { return fmt.Errorf("pack mirrored images into tar: %w", err) } @@ -83,7 +90,7 @@ func Pack(ctx context.Context, sourcePath string, sink io.Writer) error { return nil } -func packFunc(ctx context.Context, pathPrefix string, writer *tar.Writer) filepath.WalkFunc { +func packFuncWithPrefix(ctx context.Context, pathPrefix string, tarPrefix string, writer *tar.Writer) filepath.WalkFunc { unixEpochStart := time.Unix(0, 0) return func(path string, info fs.FileInfo, err error) error { if ctx.Err() != nil { @@ -102,6 +109,10 @@ func packFunc(ctx context.Context, pathPrefix string, writer *tar.Writer) filepa } pathInTar := strings.TrimPrefix(path, pathPrefix+string(os.PathSeparator)) + // Add prefix if specified + if tarPrefix != "" { + pathInTar = tarPrefix + "/" + pathInTar + } err = writer.WriteHeader(&tar.Header{ Typeflag: tar.TypeReg, Format: tar.FormatGNU, diff --git a/pkg/libmirror/layouts/push.go b/pkg/libmirror/layouts/push.go index ab44507f..2563c901 100644 --- a/pkg/libmirror/layouts/push.go +++ b/pkg/libmirror/layouts/push.go @@ -26,7 +26,6 @@ import ( "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/layout" - "github.com/google/go-containerregistry/pkg/v1/remote" "github.com/hashicorp/go-multierror" "github.com/samber/lo" "github.com/samber/lo/parallel" @@ -72,10 +71,7 @@ func PushLayoutToRepoContext( parallelismConfig params.ParallelismConfig, insecure, skipVerifyTLS bool, ) error { - refOpts, remoteOpts := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS) - if parallelismConfig.Blobs != 0 { - remoteOpts = append(remoteOpts, remote.WithJobs(parallelismConfig.Blobs)) - } + refOpts, _ := auth.MakeRemoteRegistryRequestOptions(authProvider, insecure, skipVerifyTLS) index, err := imagesLayout.ImageIndex() if err != nil { @@ -96,12 +92,17 @@ func PushLayoutToRepoContext( for _, manifestSet := range batches { if parallelismConfig.Images == 1 { - tag := manifestSet[0].Annotations["io.deckhouse.image.short_tag"] - - imageRef := registryRepo + ":" + tag - - logger.Infof("[%d / %d] Pushing image %s", imagesCount, len(indexManifest.Manifests), imageRef) - if err = pushImage(ctx, client, registryRepo, index, manifestSet[0], refOpts, remoteOpts, logger); err != nil { + cfg := &pushImageConfig{ + client: client, + registryRepo: registryRepo, + index: index, + manifest: manifestSet[0], + refOpts: refOpts, + logger: logger, + imageNum: imagesCount, + totalImages: len(indexManifest.Manifests), + } + if err = pushImage(ctx, cfg); err != nil { return fmt.Errorf("Push Image: %w", err) } imagesCount++ @@ -117,8 +118,20 @@ func PushLayoutToRepoContext( errMu := &sync.Mutex{} merr := &multierror.Error{} - parallel.ForEach(manifestSet, func(item v1.Descriptor, _ int) { - if err = pushImage(ctx, client, registryRepo, index, item, refOpts, remoteOpts, logger); err != nil { + currentImagesCount := imagesCount + parallel.ForEach(manifestSet, func(item v1.Descriptor, idx int) { + imageNum := currentImagesCount + idx + cfg := &pushImageConfig{ + client: client, + registryRepo: registryRepo, + index: index, + manifest: item, + refOpts: refOpts, + logger: logger, + imageNum: imageNum, + totalImages: len(indexManifest.Manifests), + } + if err = pushImage(ctx, cfg); err != nil { errMu.Lock() defer errMu.Unlock() merr = multierror.Append(merr, err) @@ -137,53 +150,44 @@ func PushLayoutToRepoContext( return nil } -func pushImage( - ctx context.Context, - client registry.Client, - registryRepo string, - index v1.ImageIndex, - manifest v1.Descriptor, - refOpts []name.Option, - _ []remote.Option, - _ params.Logger, -) error { - tag := manifest.Annotations["io.deckhouse.image.short_tag"] - imageRef := registryRepo + ":" + tag - img, err := index.Image(manifest.Digest) +type pushImageConfig struct { + client registry.Client + registryRepo string + index v1.ImageIndex + manifest v1.Descriptor + refOpts []name.Option + logger params.Logger + imageNum int + totalImages int +} + +func pushImage(ctx context.Context, cfg *pushImageConfig) error { + tag := cfg.manifest.Annotations["io.deckhouse.image.short_tag"] + imageRef := cfg.registryRepo + ":" + tag + img, err := cfg.index.Image(cfg.manifest.Digest) if err != nil { return fmt.Errorf("Read image: %v", err) } - ref, err := name.ParseReference(imageRef, refOpts...) + ref, err := name.ParseReference(imageRef, cfg.refOpts...) if err != nil { return fmt.Errorf("Parse image reference: %v", err) } - err = retry.RunTaskWithContext( - ctx, silentLogger{}, "push", + err = retry.RunTask( + ctx, + cfg.logger, + fmt.Sprintf("[%d / %d] Pushing %s", cfg.imageNum, cfg.totalImages, imageRef), task.WithConstantRetries(4, 3*time.Second, func(ctx context.Context) error { - if err = client.PushImage(ctx, tag, img); err != nil { + if err = cfg.client.PushImage(ctx, tag, img); err != nil { if errorutil.IsTrivyMediaTypeNotAllowedError(err) { return fmt.Errorf(errorutil.CustomTrivyMediaTypesWarning) } return fmt.Errorf("Write %s to registry: %w", ref.String(), err) } return nil - }), - ) + })) if err != nil { return fmt.Errorf("Run push task: %v", err) } return nil } - -type silentLogger struct{} - -var _ params.Logger = silentLogger{} - -func (silentLogger) Debugf(_ string, _ ...interface{}) {} -func (silentLogger) DebugLn(_ ...interface{}) {} -func (silentLogger) Infof(_ string, _ ...interface{}) {} -func (silentLogger) InfoLn(_ ...interface{}) {} -func (silentLogger) Warnf(_ string, _ ...interface{}) {} -func (silentLogger) WarnLn(_ ...interface{}) {} -func (silentLogger) Process(_ string, _ func() error) error { return nil } diff --git a/pkg/libmirror/operations/params/pull.go b/pkg/libmirror/operations/params/pull.go index 0477ce8b..c674840b 100644 --- a/pkg/libmirror/operations/params/pull.go +++ b/pkg/libmirror/operations/params/pull.go @@ -29,6 +29,7 @@ type PullParams struct { SkipSecurityDatabases bool // --no-security-db SkipModules bool // --no-modules OnlyExtraImages bool // --only-extra-images + IgnoreSuspend bool // --ignore-suspend BundleChunkSize int64 // Plain bytes // Only one of those 2 is filled at a single time or none at all. diff --git a/pkg/libmirror/util/retry/retry.go b/pkg/libmirror/util/retry/retry.go index 0521a0b7..cde1f680 100644 --- a/pkg/libmirror/util/retry/retry.go +++ b/pkg/libmirror/util/retry/retry.go @@ -31,10 +31,10 @@ type Task interface { } func RunTask(ctx context.Context, logger params.Logger, name string, task Task) error { - return RunTaskWithContext(ctx, logger, name, task) + return runTaskWithContext(ctx, logger, name, task) } -func RunTaskWithContext(ctx context.Context, logger params.Logger, name string, task Task) error { +func runTaskWithContext(ctx context.Context, logger params.Logger, name string, task Task) error { restarts := uint(0) var lastErr error for restarts < task.MaxRetries() { diff --git a/pkg/registry/service/module_service.go b/pkg/registry/service/module_service.go index 4238c950..57470b67 100644 --- a/pkg/registry/service/module_service.go +++ b/pkg/registry/service/module_service.go @@ -38,6 +38,7 @@ type ModuleService struct { *BasicService moduleReleaseChannels *ModuleReleaseService extra *BasicService + extraImages map[string]*BasicService logger *log.Logger } @@ -50,6 +51,7 @@ func NewModuleService(client registry.Client, logger *log.Logger) *ModuleService BasicService: NewBasicService(moduleServiceName, client, logger), moduleReleaseChannels: NewModuleReleaseService(NewBasicService(moduleReleaseChannelsServiceName, client.WithSegment(moduleReleaseChannelsSegment), logger)), extra: NewBasicService(moduleExtraServiceName, client.WithSegment(moduleExtraSegment), logger), + extraImages: make(map[string]*BasicService), logger: logger, } @@ -63,6 +65,20 @@ func (s *ModuleService) Extra() *BasicService { return s.extra } +// ExtraImage returns a BasicService scoped to a specific extra image (e.g., modules//extra/) +func (s *ModuleService) ExtraImage(extraName string) *BasicService { + if s.extraImages == nil { + s.extraImages = make(map[string]*BasicService) + } + + if _, exists := s.extraImages[extraName]; !exists { + extraClient := s.client.WithSegment(moduleExtraSegment).WithSegment(extraName) + s.extraImages[extraName] = NewBasicService(moduleExtraServiceName+"/"+extraName, extraClient, s.logger) + } + + return s.extraImages[extraName] +} + type ModulesService struct { client registry.Client