Skip to content

Commit 8dfafce

Browse files
committed
more ut coverage
1 parent ca54027 commit 8dfafce

File tree

7 files changed

+561
-0
lines changed

7 files changed

+561
-0
lines changed

pkg/fileservice/get_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"testing"
2020

2121
"github.com/stretchr/testify/assert"
22+
"iter"
2223
)
2324

2425
func TestGetForBackup(t *testing.T) {
@@ -30,3 +31,56 @@ func TestGetForBackup(t *testing.T) {
3031
assert.True(t, ok)
3132
assert.Equal(t, dir, localFS.rootPath)
3233
}
34+
35+
func TestGetForBackupS3Opts(t *testing.T) {
36+
ctx := context.Background()
37+
dir := t.TempDir()
38+
spec := JoinPath("s3-opts,endpoint=disk,bucket="+dir+",prefix=backup-prefix,name=backup", "object")
39+
fs, err := GetForBackup(ctx, spec)
40+
assert.Nil(t, err)
41+
s3fs, ok := fs.(*S3FS)
42+
assert.True(t, ok)
43+
assert.Equal(t, "backup", s3fs.name)
44+
assert.Equal(t, "backup-prefix", s3fs.keyPrefix)
45+
}
46+
47+
type dummyFileService struct{ name string }
48+
49+
func (d dummyFileService) Delete(ctx context.Context, filePaths ...string) error { return nil }
50+
func (d dummyFileService) Name() string { return d.name }
51+
func (d dummyFileService) Read(ctx context.Context, vector *IOVector) error { return nil }
52+
func (d dummyFileService) ReadCache(ctx context.Context, vector *IOVector) error { return nil }
53+
func (d dummyFileService) Write(ctx context.Context, vector IOVector) error { return nil }
54+
func (d dummyFileService) List(ctx context.Context, dirPath string) iter.Seq2[*DirEntry, error] {
55+
return func(yield func(*DirEntry, error) bool) {
56+
yield(&DirEntry{Name: "a"}, nil)
57+
}
58+
}
59+
func (d dummyFileService) StatFile(ctx context.Context, filePath string) (*DirEntry, error) {
60+
return &DirEntry{Name: filePath}, nil
61+
}
62+
func (d dummyFileService) PrefetchFile(ctx context.Context, filePath string) error { return nil }
63+
func (d dummyFileService) Cost() *CostAttr { return nil }
64+
func (d dummyFileService) Close(ctx context.Context) {}
65+
66+
func TestGetFromMappings(t *testing.T) {
67+
fs1 := dummyFileService{name: "first"}
68+
fs2 := dummyFileService{name: "second"}
69+
mapping, err := NewFileServices("first", fs1, fs2)
70+
assert.NoError(t, err)
71+
72+
var res FileService
73+
res, err = Get[FileService](mapping, "second")
74+
assert.NoError(t, err)
75+
assert.Equal(t, "second", res.Name())
76+
77+
_, err = Get[FileService](mapping, "missing")
78+
assert.Error(t, err)
79+
80+
res, err = Get[FileService](fs1, "first")
81+
assert.NoError(t, err)
82+
assert.Equal(t, "first", res.Name())
83+
84+
_, err = Get[FileService](fs1, "other")
85+
assert.Error(t, err)
86+
}

pkg/fileservice/object_storage_arguments_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,61 @@ func TestAWSRegion(t *testing.T) {
185185
assert.NotNil(t, args.validate())
186186
}
187187

188+
func TestSetFromStringParallelMode(t *testing.T) {
189+
var args ObjectStorageArguments
190+
assert.NoError(t, args.SetFromString([]string{"parallel-mode=force"}))
191+
assert.Equal(t, ParallelForce, args.ParallelMode)
192+
193+
args = ObjectStorageArguments{
194+
ParallelMode: ParallelAuto,
195+
}
196+
assert.NoError(t, args.SetFromString([]string{"parallel-mode=unknown"}))
197+
assert.Equal(t, ParallelAuto, args.ParallelMode)
198+
}
199+
200+
func TestObjectStorageArgumentsValidateDefaults(t *testing.T) {
201+
args := ObjectStorageArguments{
202+
Endpoint: "example.com",
203+
}
204+
assert.NoError(t, args.validate())
205+
assert.Equal(t, "https://example.com", args.Endpoint)
206+
assert.Equal(t, "mo-service", args.RoleSessionName)
207+
}
208+
209+
func TestObjectStorageArgumentsShouldLoadDefaultCredentials(t *testing.T) {
210+
t.Setenv("AWS_ACCESS_KEY_ID", "ak")
211+
t.Setenv("AWS_SECRET_ACCESS_KEY", "sk")
212+
args := ObjectStorageArguments{}
213+
assert.True(t, args.shouldLoadDefaultCredentials())
214+
215+
args = ObjectStorageArguments{
216+
NoDefaultCredentials: true,
217+
KeyID: "id",
218+
KeySecret: "secret",
219+
}
220+
assert.False(t, args.shouldLoadDefaultCredentials())
221+
222+
args = ObjectStorageArguments{
223+
NoDefaultCredentials: true,
224+
RoleARN: "arn",
225+
}
226+
assert.True(t, args.shouldLoadDefaultCredentials())
227+
}
228+
229+
func TestObjectStorageArgumentsString(t *testing.T) {
230+
args := ObjectStorageArguments{
231+
Name: "foo",
232+
KeyPrefix: "bar",
233+
Concurrency: 3,
234+
}
235+
s := args.String()
236+
var decoded ObjectStorageArguments
237+
assert.NoError(t, json.Unmarshal([]byte(s), &decoded))
238+
assert.Equal(t, args.Name, decoded.Name)
239+
assert.Equal(t, args.KeyPrefix, decoded.KeyPrefix)
240+
assert.Equal(t, args.Concurrency, decoded.Concurrency)
241+
}
242+
188243
func TestParseHDFSArgs(t *testing.T) {
189244
var args ObjectStorageArguments
190245
if err := args.SetFromString([]string{
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fileservice
16+
17+
import (
18+
"context"
19+
"net/http/httptrace"
20+
"strings"
21+
"testing"
22+
23+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestObjectStorageHTTPTraceWriteMultipartParallel(t *testing.T) {
28+
upstream := &mockParallelObjectStorage{supports: true}
29+
wrapped := newObjectStorageHTTPTrace(upstream)
30+
31+
err := wrapped.WriteMultipartParallel(context.Background(), "key", strings.NewReader("data"), nil, nil)
32+
require.NoError(t, err)
33+
require.NotNil(t, upstream.ctx)
34+
require.NotNil(t, httptrace.ContextClientTrace(upstream.ctx))
35+
require.Equal(t, "key", upstream.key)
36+
}
37+
38+
func TestObjectStorageHTTPTraceWriteMultipartParallelUnsupported(t *testing.T) {
39+
wrapped := newObjectStorageHTTPTrace(dummyObjectStorage{})
40+
41+
err := wrapped.WriteMultipartParallel(context.Background(), "key", strings.NewReader("data"), nil, nil)
42+
require.Error(t, err)
43+
require.True(t, moerr.IsMoErrCode(err, moerr.ErrNotSupported))
44+
}
45+
46+
func TestObjectStorageHTTPTraceDelegates(t *testing.T) {
47+
upstream := &recordingObjectStorage{}
48+
wrapped := newObjectStorageHTTPTrace(upstream)
49+
ctx := context.Background()
50+
51+
require.NoError(t, wrapped.Delete(ctx, "a"))
52+
exists, err := wrapped.Exists(ctx, "b")
53+
require.NoError(t, err)
54+
require.True(t, exists)
55+
iterSeq := wrapped.List(ctx, "c")
56+
var listed []string
57+
iterSeq(func(entry *DirEntry, err error) bool {
58+
require.NoError(t, err)
59+
listed = append(listed, entry.Name)
60+
return true
61+
})
62+
reader, err := wrapped.Read(ctx, "d", nil, nil)
63+
require.NoError(t, err)
64+
defer reader.Close()
65+
buf := make([]byte, 4)
66+
_, _ = reader.Read(buf)
67+
size, err := wrapped.Stat(ctx, "e")
68+
require.NoError(t, err)
69+
require.Equal(t, int64(3), size)
70+
require.NoError(t, wrapped.Write(ctx, "f", strings.NewReader("payload"), nil, nil))
71+
72+
require.Len(t, upstream.calls, 6)
73+
for _, ctx := range upstream.ctxs {
74+
require.NotNil(t, httptrace.ContextClientTrace(ctx))
75+
}
76+
require.ElementsMatch(t, []string{"delete", "exists", "list", "read", "stat", "write"}, upstream.calls)
77+
require.Equal(t, []string{"one"}, listed)
78+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fileservice
16+
17+
import (
18+
"context"
19+
"strings"
20+
"testing"
21+
22+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
23+
metric "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
24+
"github.com/prometheus/client_golang/prometheus/testutil"
25+
"github.com/stretchr/testify/require"
26+
)
27+
28+
func TestObjectStorageMetricsWriteMultipartParallel(t *testing.T) {
29+
name := t.Name()
30+
upstream := &mockParallelObjectStorage{supports: true}
31+
wrapped := newObjectStorageMetrics(upstream, name)
32+
gauge := metric.FSObjectStorageOperations.WithLabelValues(name, "write")
33+
before := testutil.ToFloat64(gauge)
34+
35+
err := wrapped.WriteMultipartParallel(context.Background(), "key", strings.NewReader("data"), nil, nil)
36+
require.NoError(t, err)
37+
require.Equal(t, before+1, testutil.ToFloat64(gauge))
38+
require.Equal(t, "key", upstream.key)
39+
}
40+
41+
func TestObjectStorageMetricsWriteMultipartParallelNotSupported(t *testing.T) {
42+
name := t.Name()
43+
wrapped := newObjectStorageMetrics(dummyObjectStorage{}, name)
44+
gauge := metric.FSObjectStorageOperations.WithLabelValues(name, "write")
45+
before := testutil.ToFloat64(gauge)
46+
47+
err := wrapped.WriteMultipartParallel(context.Background(), "key", strings.NewReader("data"), nil, nil)
48+
require.Error(t, err)
49+
require.True(t, moerr.IsMoErrCode(err, moerr.ErrNotSupported))
50+
require.Equal(t, before+1, testutil.ToFloat64(gauge))
51+
}
52+
53+
func TestObjectStorageMetricsDelegates(t *testing.T) {
54+
name := t.Name()
55+
upstream := &recordingObjectStorage{}
56+
wrapped := newObjectStorageMetrics(upstream, name)
57+
58+
require.NoError(t, wrapped.Delete(context.Background(), "a"))
59+
_, _ = wrapped.Exists(context.Background(), "b")
60+
seq := wrapped.List(context.Background(), "c")
61+
seq(func(_ *DirEntry, _ error) bool { return true })
62+
rc, err := wrapped.Read(context.Background(), "d", nil, nil)
63+
require.NoError(t, err)
64+
require.NoError(t, rc.Close())
65+
_, _ = wrapped.Stat(context.Background(), "e")
66+
require.NoError(t, wrapped.Write(context.Background(), "f", strings.NewReader("x"), nil, nil))
67+
68+
require.ElementsMatch(t, []string{
69+
"delete", "exists", "list", "read", "stat", "write",
70+
}, upstream.calls)
71+
72+
// gauges incremented
73+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "delete")) >= 1)
74+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "exists")) >= 1)
75+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "list")) >= 1)
76+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "read")) >= 1)
77+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "stat")) >= 1)
78+
require.True(t, testutil.ToFloat64(metric.FSObjectStorageOperations.WithLabelValues(name, "write")) >= 1)
79+
}
80+
81+
func TestObjectStorageMetricsReadCloseDecrementsActive(t *testing.T) {
82+
name := t.Name()
83+
upstream := &recordingObjectStorage{}
84+
wrapped := newObjectStorageMetrics(upstream, name)
85+
active := metric.FSObjectStorageOperations.WithLabelValues(name, "active-read")
86+
before := testutil.ToFloat64(active)
87+
88+
r, err := wrapped.Read(context.Background(), "key", nil, nil)
89+
require.NoError(t, err)
90+
require.Equal(t, before+1, testutil.ToFloat64(active))
91+
require.NoError(t, r.Close())
92+
require.Equal(t, before, testutil.ToFloat64(active))
93+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fileservice
16+
17+
import (
18+
"context"
19+
"testing"
20+
"time"
21+
22+
"github.com/stretchr/testify/require"
23+
)
24+
25+
func TestObjectStorageSemaphoreSerializes(t *testing.T) {
26+
start := make(chan struct{}, 2)
27+
wait := make(chan struct{})
28+
upstream := &blockingObjectStorage{
29+
start: start,
30+
wait: wait,
31+
}
32+
sem := newObjectStorageSemaphore(upstream, 1)
33+
34+
done := make(chan struct{})
35+
go func() {
36+
require.NoError(t, sem.Write(context.Background(), "a", nil, nil, nil))
37+
close(done)
38+
}()
39+
40+
select {
41+
case <-start:
42+
case <-time.After(time.Second):
43+
t.Fatal("first write did not start")
44+
}
45+
46+
startSecond := make(chan struct{})
47+
go func() {
48+
defer close(startSecond)
49+
require.NoError(t, sem.Write(context.Background(), "b", nil, nil, nil))
50+
}()
51+
52+
select {
53+
case <-startSecond:
54+
t.Fatal("second write started before release")
55+
case <-time.After(50 * time.Millisecond):
56+
}
57+
58+
close(wait) // release first
59+
select {
60+
case <-startSecond:
61+
case <-time.After(time.Second):
62+
t.Fatal("second write not started after release")
63+
}
64+
<-done
65+
}
66+
67+
func TestObjectStorageSemaphoreReleasesOnError(t *testing.T) {
68+
start := make(chan struct{}, 1)
69+
wait := make(chan struct{})
70+
upstream := &blockingObjectStorage{
71+
start: start,
72+
wait: wait,
73+
err: context.DeadlineExceeded,
74+
}
75+
sem := newObjectStorageSemaphore(upstream, 1)
76+
77+
err := sem.Write(context.Background(), "a", nil, nil, nil)
78+
require.Error(t, err)
79+
close(wait)
80+
81+
// another call should proceed after the failed one
82+
select {
83+
case start <- struct{}{}:
84+
default:
85+
}
86+
require.NoError(t, sem.Delete(context.Background(), "x"))
87+
}

0 commit comments

Comments
 (0)