Skip to content

Commit fa43bdc

Browse files
committed
update parallel
1 parent 657ab14 commit fa43bdc

File tree

9 files changed

+317
-56
lines changed

9 files changed

+317
-56
lines changed

pkg/fileservice/file_service.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,6 @@ type IOVector struct {
9797

9898
// Caches indicates extra caches to operate on
9999
Caches []IOVectorCache
100-
101-
// DisableParallel controls whether to skip parallel multipart uploads even if supported.
102-
DisableParallel bool
103-
// ForceParallel controls whether to try parallel multipart uploads when possible.
104-
ForceParallel bool
105100
}
106101

107102
type IOEntry struct {

pkg/fileservice/get.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ func Get[T any](fs FileService, name string) (res T, err error) {
5353

5454
var NoDefaultCredentialsForETL = os.Getenv("MO_NO_DEFAULT_CREDENTIALS") != ""
5555

56+
func etlParallelMode(ctx context.Context) ParallelMode {
57+
if mode, ok := parallelModeFromContext(ctx); ok {
58+
return mode
59+
}
60+
if mode, ok := parseParallelMode(strings.TrimSpace(os.Getenv("MO_ETL_PARALLEL_MODE"))); ok {
61+
return mode
62+
}
63+
return ParallelOff
64+
}
65+
5666
// GetForETL get or creates a FileService instance for ETL operations
5767
// if service part of path is empty, a LocalETLFS will be created
5868
// if service part of path is not empty, a ETLFileService typed instance will be extracted from fs argument
@@ -110,6 +120,7 @@ func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileSer
110120
KeySecret: accessSecret,
111121
KeyPrefix: keyPrefix,
112122
Name: name,
123+
ParallelMode: etlParallelMode(ctx),
113124
},
114125
DisabledCacheConfig,
115126
nil,
@@ -143,6 +154,7 @@ func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileSer
143154
Bucket: bucket,
144155
KeyPrefix: keyPrefix,
145156
Name: name,
157+
ParallelMode: etlParallelMode(ctx),
146158
},
147159
DisabledCacheConfig,
148160
nil,
@@ -157,6 +169,7 @@ func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileSer
157169
}
158170
args.NoBucketValidation = true
159171
args.IsHDFS = fsPath.Service == "hdfs"
172+
args.ParallelMode = etlParallelMode(ctx)
160173
res, err = NewS3FS(
161174
ctx,
162175
args,
@@ -198,6 +211,7 @@ func GetForETL(ctx context.Context, fs FileService, path string) (res ETLFileSer
198211
KeyPrefix: keyPrefix,
199212
Name: name,
200213
IsMinio: true,
214+
ParallelMode: etlParallelMode(ctx),
201215
},
202216
DisabledCacheConfig,
203217
nil,

pkg/fileservice/object_storage_arguments.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ import (
2828

2929
type ObjectStorageArguments struct {
3030
// misc
31-
Name string `toml:"name"`
32-
KeyPrefix string `toml:"key-prefix"`
33-
SharedConfigProfile string `toml:"shared-config-profile"`
34-
NoDefaultCredentials bool `toml:"no-default-credentials"`
35-
NoBucketValidation bool `toml:"no-bucket-validation"`
36-
Concurrency int64 `toml:"concurrency"`
31+
Name string `toml:"name"`
32+
KeyPrefix string `toml:"key-prefix"`
33+
SharedConfigProfile string `toml:"shared-config-profile"`
34+
NoDefaultCredentials bool `toml:"no-default-credentials"`
35+
NoBucketValidation bool `toml:"no-bucket-validation"`
36+
Concurrency int64 `toml:"concurrency"`
37+
ParallelMode ParallelMode `toml:"parallel-mode"`
3738

3839
// s3
3940
Bucket string `toml:"bucket"`
@@ -101,6 +102,10 @@ func (o *ObjectStorageArguments) SetFromString(arguments []string) error {
101102
if err == nil {
102103
o.Concurrency = n
103104
}
105+
case "parallel-mode", "parallel":
106+
if mode, ok := parseParallelMode(value); ok {
107+
o.ParallelMode = mode
108+
}
104109

105110
case "bucket":
106111
o.Bucket = value

pkg/fileservice/parallel_mode.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fileservice
16+
17+
import (
18+
"context"
19+
"strings"
20+
)
21+
22+
// ParallelMode controls when multipart parallel uploads are used.
23+
type ParallelMode uint8
24+
25+
const (
26+
ParallelOff ParallelMode = iota
27+
ParallelAuto
28+
ParallelForce
29+
)
30+
31+
func parseParallelMode(s string) (ParallelMode, bool) {
32+
switch strings.ToLower(s) {
33+
case "off", "false", "0", "":
34+
return ParallelOff, true
35+
case "auto":
36+
return ParallelAuto, true
37+
case "force", "on", "true", "1":
38+
return ParallelForce, true
39+
default:
40+
return ParallelOff, false
41+
}
42+
}
43+
44+
type parallelModeKey struct{}
45+
46+
// WithParallelMode sets a per-call parallel mode override on context.
47+
func WithParallelMode(ctx context.Context, mode ParallelMode) context.Context {
48+
return context.WithValue(ctx, parallelModeKey{}, mode)
49+
}
50+
51+
// parallelModeFromContext retrieves a parallel mode override if present.
52+
func parallelModeFromContext(ctx context.Context) (ParallelMode, bool) {
53+
if ctx == nil {
54+
return ParallelOff, false
55+
}
56+
if v := ctx.Value(parallelModeKey{}); v != nil {
57+
if mode, ok := v.(ParallelMode); ok {
58+
return mode, true
59+
}
60+
}
61+
return ParallelOff, false
62+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fileservice
16+
17+
import (
18+
"context"
19+
"testing"
20+
)
21+
22+
func TestParseParallelModeVariants(t *testing.T) {
23+
cases := []struct {
24+
in string
25+
expect ParallelMode
26+
ok bool
27+
}{
28+
{"", ParallelOff, true},
29+
{"off", ParallelOff, true},
30+
{"false", ParallelOff, true},
31+
{"0", ParallelOff, true},
32+
{"auto", ParallelAuto, true},
33+
{"force", ParallelForce, true},
34+
{"on", ParallelForce, true},
35+
{"1", ParallelForce, true},
36+
{"xxx", ParallelOff, false},
37+
}
38+
39+
for _, c := range cases {
40+
mode, ok := parseParallelMode(c.in)
41+
if mode != c.expect || ok != c.ok {
42+
t.Fatalf("input %s got (%v,%v) expect (%v,%v)", c.in, mode, ok, c.expect, c.ok)
43+
}
44+
}
45+
}
46+
47+
func TestWithParallelModeOnContext(t *testing.T) {
48+
base := context.Background()
49+
ctx := WithParallelMode(base, ParallelForce)
50+
mode, ok := parallelModeFromContext(ctx)
51+
if !ok || mode != ParallelForce {
52+
t.Fatalf("expected force override, got %v %v", mode, ok)
53+
}
54+
if _, ok := parallelModeFromContext(base); ok {
55+
t.Fatalf("unexpected mode on base context")
56+
}
57+
}
58+
59+
func TestETLParallelModePriority(t *testing.T) {
60+
t.Setenv("MO_ETL_PARALLEL_MODE", "off")
61+
if m := etlParallelMode(context.Background()); m != ParallelOff {
62+
t.Fatalf("expect off from env, got %v", m)
63+
}
64+
65+
ctx := WithParallelMode(context.Background(), ParallelForce)
66+
if m := etlParallelMode(ctx); m != ParallelForce {
67+
t.Fatalf("ctx override should win, got %v", m)
68+
}
69+
70+
t.Setenv("MO_ETL_PARALLEL_MODE", "auto")
71+
if m := etlParallelMode(context.Background()); m != ParallelAuto {
72+
t.Fatalf("env auto not applied, got %v", m)
73+
}
74+
}

0 commit comments

Comments
 (0)