Skip to content

Commit 3eb3a70

Browse files
jakobmoellerdevdynamic-ArchuUmang2608
authored
feat(transfer): Optimized approach for OCM transfer by implementing a concurrent worker pool (#1676)
<!-- markdownlint-disable MD041 --> #### What this PR does / why we need it Introduces a new experimental transfer mode that can be set with a max worker attribute. Default is still sequential. Concurrent transfer is enabled as soon as workers > 1 #### Which issue(s) this PR is related to <!-- Usage: `Related to #<issue number>`, or `Related to (paste link of issue)`. --> fix open-component-model/ocm-project#740 --------- Signed-off-by: Jakob Möller <contact@jakob-moeller.com> Co-authored-by: Archanaa <96463933+dynamic-archu@users.noreply.github.com> Co-authored-by: Umang2608 <67495145+umang2608@users.noreply.github.com>
1 parent b8191ec commit 3eb3a70

File tree

20 files changed

+1007
-219
lines changed

20 files changed

+1007
-219
lines changed

api/oci/cpi/support/flavor_manifest.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,11 @@ func (m *ManifestAccess) AddLayer(blob cpi.BlobAccess, d *artdesc.Descriptor) (i
133133
manifest.Layers = append(manifest.Layers, *d)
134134
return len(manifest.Layers) - 1, nil
135135
}
136+
137+
func (m *ManifestAccess) Modify(f func(manifest *artdesc.Manifest) error) error {
138+
m.lock.Lock()
139+
defer m.lock.Unlock()
140+
desc := m.GetDescriptor()
141+
142+
return f(desc)
143+
}

api/oci/extensions/repositories/ocireg/blobs.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,47 +29,51 @@ type blobContainer struct {
2929
}
3030

3131
type BlobContainers struct {
32-
lock sync.Mutex
3332
cache accessio.BlobCache
3433
fetcher oras.Fetcher
3534
pusher oras.Pusher
36-
mimes map[string]BlobContainer
35+
36+
mimes sync.Map // map[string]BlobContainer
3737
}
3838

3939
func NewBlobContainers(ctx cpi.Context, fetcher remotes.Fetcher, pusher oras.Pusher) *BlobContainers {
4040
return &BlobContainers{
4141
cache: cacheattr.Get(ctx),
4242
fetcher: fetcher,
4343
pusher: pusher,
44-
mimes: map[string]BlobContainer{},
4544
}
4645
}
4746

4847
func (c *BlobContainers) Get(mime string) (BlobContainer, error) {
49-
c.lock.Lock()
50-
defer c.lock.Unlock()
51-
52-
found := c.mimes[mime]
53-
if found == nil {
54-
container, err := NewBlobContainer(c.cache, mime, c.fetcher, c.pusher)
55-
if err != nil {
56-
return nil, err
57-
}
58-
c.mimes[mime] = container
59-
60-
return container, nil
48+
// Fast path: load existing
49+
if v, ok := c.mimes.Load(mime); ok {
50+
return v.(BlobContainer), nil
51+
}
52+
53+
// Slow path: need to create a new one
54+
newBC, err := NewBlobContainer(c.cache, mime, c.fetcher, c.pusher)
55+
if err != nil {
56+
return nil, err
6157
}
6258

63-
return found, nil
59+
// Try to publish it. Another goroutine may win the race.
60+
actual, loaded := c.mimes.LoadOrStore(mime, newBC)
61+
if loaded {
62+
// We lost the race — discard our new instance
63+
return actual.(BlobContainer), newBC.Unref()
64+
}
65+
66+
return newBC, nil
6467
}
6568

6669
func (c *BlobContainers) Release() error {
67-
c.lock.Lock()
68-
defer c.lock.Unlock()
6970
list := errors.ErrListf("releasing mime block caches")
70-
for _, b := range c.mimes {
71-
list.Add(b.Unref())
72-
}
71+
72+
c.mimes.Range(func(_, value any) bool {
73+
list.Add(value.(BlobContainer).Unref())
74+
return true
75+
})
76+
7377
return list.Result()
7478
}
7579

api/oci/extensions/repositories/ocireg/utils.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"sync"
8+
"time"
89

910
"github.com/containerd/containerd/remotes"
1011
"github.com/containerd/errdefs"
@@ -92,7 +93,12 @@ func pushData(ctx context.Context, p oras.Pusher, desc artdesc.Descriptor, data
9293
desc.Size = -1
9394
}
9495

95-
logging.Logger().Debug("*** push blob", "mediatype", desc.MediaType, "digest", desc.Digest, "key", key)
96+
logger := logging.Logger().WithValues("mediatype", desc.MediaType, "digest", desc.Digest, "key", key)
97+
logger.Debug("oci push blob")
98+
start := time.Now()
99+
defer func() {
100+
logger.Debug("oci push blob done", "elapsed", time.Since(start))
101+
}()
96102
if err := p.Push(ctx, desc, data); err != nil {
97103
if errdefs.IsAlreadyExists(err) {
98104
logging.Logger().Debug("blob already exists", "mediatype", desc.MediaType, "digest", desc.Digest)

api/oci/internal/repository.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ type ManifestAccess interface {
122122

123123
AddBlob(BlobAccess) error
124124
AddLayer(BlobAccess, *artdesc.Descriptor) (int, error)
125+
Modify(func(manifest *artdesc.Manifest) error) error
125126
SetConfigBlob(blob BlobAccess, d *artdesc.Descriptor) error
126127
}
127128

api/ocm/extensions/attrs/init.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
_ "ocm.software/ocm/api/ocm/extensions/attrs/hashattr"
66
_ "ocm.software/ocm/api/ocm/extensions/attrs/keepblobattr"
77
_ "ocm.software/ocm/api/ocm/extensions/attrs/mapocirepoattr"
8+
_ "ocm.software/ocm/api/ocm/extensions/attrs/maxworkersattr"
89
_ "ocm.software/ocm/api/ocm/extensions/attrs/ociuploadattr"
910
_ "ocm.software/ocm/api/ocm/extensions/attrs/plugincacheattr"
1011
_ "ocm.software/ocm/api/ocm/extensions/attrs/plugindirattr"
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package maxworkersattr
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"runtime"
7+
"strconv"
8+
"sync"
9+
10+
"ocm.software/ocm/api/datacontext"
11+
ocmlog "ocm.software/ocm/api/utils/logging"
12+
rtruntime "ocm.software/ocm/api/utils/runtime"
13+
)
14+
15+
var realm = ocmlog.SubRealm("api/ocm/extensions/attrs/maxworkersattr")
16+
17+
const (
18+
// TransferWorkersEnvVar defines the environment variable that configures
19+
// the maximum number of concurrent transfer workers.
20+
//
21+
// If set to a positive integer value, that number of concurrent workers is used.
22+
// If set to "auto", the number of logical CPU cores on the system is used.
23+
// If unset, the attribute value (if any) or the default of SingleWorker (1) is used.
24+
TransferWorkersEnvVar = "OCM_TRANSFER_WORKER_COUNT"
25+
26+
// ATTR_KEY is the globally unique key under which this attribute is registered
27+
// in the OCM data context. It follows the ocm.software naming convention.
28+
ATTR_KEY = "ocm.software/ocm/api/ocm/extensions/attrs/maxworkers"
29+
30+
// ATTR_SHORT is the short alias of the attribute, suitable for CLI or YAML use.
31+
ATTR_SHORT = "maxworkers"
32+
33+
// SingleWorker is the default number of workers (1) used when no configuration
34+
// is provided. This mode guarantees deterministic ordering of operations.
35+
SingleWorker uint = 1
36+
37+
// AutomaticWorkersBasedOnCPU is the string literal used to indicate that the number of workers
38+
// should be automatically determined based on the number of logical CPU cores.
39+
AutomaticWorkersBasedOnCPU = "auto"
40+
)
41+
42+
func init() {
43+
if err := datacontext.RegisterAttributeType(ATTR_KEY, AttributeType{}, ATTR_SHORT); err != nil {
44+
panic(err)
45+
}
46+
}
47+
48+
// AttributeType implements the datacontext.AttributeType interface for
49+
// the `maxworkers` attribute. It controls the maximum concurrency used
50+
// during resource and source transfer operations.
51+
type AttributeType struct{}
52+
53+
// Name returns the globally unique key for this attribute.
54+
func (a AttributeType) Name() string { return ATTR_KEY }
55+
56+
// Description provides extended docs for this attribute.
57+
func (a AttributeType) Description() string {
58+
return `
59+
*integer* or *"auto"*
60+
Specifies the maximum number of concurrent workers to use for resource and source,
61+
as well as reference transfer operations.
62+
63+
Supported values:
64+
- A positive integer: use exactly that number of workers.
65+
- The string "auto": automatically use the number of logical CPU cores.
66+
- Zero or omitted: fall back to single-worker mode (1). This is the default.
67+
This mode guarantees deterministic ordering of operations.
68+
69+
Precedence:
70+
1. Attribute set in the current OCM context.
71+
2. Environment variable OCM_TRANSFER_WORKER_COUNT.
72+
3. Default value (1).
73+
74+
WARNING: This is an experimental feature and may cause unexpected behavior
75+
depending on workload concurrency. Values above 1 may result in non-deterministic
76+
transfer ordering.
77+
`
78+
}
79+
80+
// Encode converts the attribute's Go value into its marshaled representation.
81+
// It supports uint, int, and string ("auto") forms.
82+
func (a AttributeType) Encode(v interface{}, m rtruntime.Marshaler) ([]byte, error) {
83+
switch val := v.(type) {
84+
case uint:
85+
return m.Marshal(val)
86+
case int:
87+
if val < 0 {
88+
return nil, fmt.Errorf("negative integer for %s not allowed", ATTR_SHORT)
89+
}
90+
return m.Marshal(uint(val))
91+
case string:
92+
if val != AutomaticWorkersBasedOnCPU {
93+
return nil, fmt.Errorf("invalid string value for %s: %q", ATTR_SHORT, val)
94+
}
95+
return m.Marshal(val)
96+
default:
97+
return nil, fmt.Errorf("unsupported type %T for %s", v, ATTR_SHORT)
98+
}
99+
}
100+
101+
// Decode converts marshaled bytes back into Go form (either uint or "auto").
102+
func (a AttributeType) Decode(data []byte, unmarshaller rtruntime.Unmarshaler) (interface{}, error) {
103+
// Try uint first (e.g., `6`)
104+
var value uint
105+
if err := unmarshaller.Unmarshal(data, &value); err == nil {
106+
return value, nil
107+
}
108+
109+
// Try string next (e.g., `"auto"` or `"6"`)
110+
var s string
111+
if err := unmarshaller.Unmarshal(data, &s); err == nil {
112+
switch s {
113+
case AutomaticWorkersBasedOnCPU:
114+
return s, nil
115+
default:
116+
if parsedVal, err := strconv.ParseUint(s, 10, 32); err == nil {
117+
return uint(parsedVal), nil
118+
}
119+
return nil, fmt.Errorf("invalid string value for %s: %q", ATTR_SHORT, s)
120+
}
121+
}
122+
123+
return nil, fmt.Errorf("failed to decode %s", ATTR_SHORT)
124+
}
125+
126+
////////////////////////////////////////////////////////////////////////////////
127+
128+
// Get returns the resolved number of concurrent transfer workers from the context.
129+
// Resolution order:
130+
// 1. Attribute value (ctx)
131+
// 2. Environment variable OCM_TRANSFER_WORKER_COUNT
132+
// 3. Default SingleWorker (1)
133+
//
134+
// The resolver only auto-detects CPUs if the value is exactly "auto".
135+
// Any 0 resolves to SingleWorker.
136+
func Get(ctx datacontext.Context) (uint, error) {
137+
val := SingleWorker
138+
var err error
139+
140+
if attribute := ctx.GetAttributes().GetAttribute(ATTR_KEY); attribute != nil {
141+
val, err = resolveWorkers(attribute)
142+
} else if env, ok := os.LookupEnv(TransferWorkersEnvVar); ok {
143+
val, err = resolveWorkers(env)
144+
}
145+
146+
if err != nil {
147+
return 0, err
148+
}
149+
150+
if val > SingleWorker {
151+
warnUnstableOnce.Do(func() {
152+
ctx.Logger(realm).Warn("attribute is set to more than 1 worker, this may cause unexpected behavior")
153+
})
154+
}
155+
156+
// 3) Default
157+
return val, nil
158+
}
159+
160+
// warnUnstableOnce ensures we only log only one warning if the attribute is retrieved multiple times.
161+
var warnUnstableOnce sync.Once
162+
163+
// Set stores the attribute after validation via the unified resolver.
164+
// Accepts uint, int>=0, or the string "auto".
165+
func Set(ctx datacontext.Context, workers any) error {
166+
val, err := resolveWorkers(workers)
167+
if err != nil {
168+
return err
169+
}
170+
return ctx.GetAttributes().SetAttribute(ATTR_KEY, val)
171+
}
172+
173+
////////////////////////////////////////////////////////////////////////////////
174+
175+
// resolveWorkers normalizes all supported input forms into a concrete uint.
176+
// Supported forms:
177+
// - uint, int >= 0
178+
// - string "auto" → runtime.NumCPU()
179+
// - numeric string (e.g. "4") → parsed value
180+
// - 0 → SingleWorker
181+
func resolveWorkers(v any) (uint, error) {
182+
switch t := v.(type) {
183+
case nil:
184+
return SingleWorker, nil
185+
186+
case uint:
187+
if t == 0 {
188+
return SingleWorker, nil
189+
}
190+
return t, nil
191+
192+
case int:
193+
if t < 0 {
194+
return 0, fmt.Errorf("%s cannot be negative", ATTR_SHORT)
195+
}
196+
if t == 0 {
197+
return SingleWorker, nil
198+
}
199+
return uint(t), nil
200+
201+
case string:
202+
if t == AutomaticWorkersBasedOnCPU {
203+
n := runtime.NumCPU()
204+
if n <= 0 {
205+
return SingleWorker, nil
206+
}
207+
return uint(n), nil
208+
}
209+
// Try numeric string conversion
210+
if parsed, err := strconv.ParseUint(t, 10, 32); err == nil {
211+
if parsed == 0 {
212+
return SingleWorker, nil
213+
}
214+
return uint(parsed), nil
215+
}
216+
return 0, fmt.Errorf("invalid string value for %s: %q", ATTR_SHORT, t)
217+
218+
default:
219+
return 0, fmt.Errorf("unexpected %s type %T", ATTR_SHORT, v)
220+
}
221+
}

0 commit comments

Comments
 (0)