Skip to content

Commit ec75306

Browse files
authored
fix append bug (#974)
* fix: hdfs readdir bug * fix append error
1 parent 04a992a commit ec75306

File tree

3 files changed

+210
-21
lines changed

3 files changed

+210
-21
lines changed

pkg/fs/client/ufs/hdfs.go

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -359,12 +359,45 @@ func (fs *hdfsFileSystem) Open(name string, flags uint32, size uint64) (FileHand
359359
fs.Lock()
360360
defer fs.Unlock()
361361

362-
return &hdfsFileHandle{
363-
name: name,
364-
reader: nil,
365-
fs: fs,
366-
writer: nil,
367-
}, nil
362+
// read only
363+
if flag&syscall.O_ACCMODE == syscall.O_RDONLY {
364+
reader, err := fs.client.Open(fs.GetPath(name))
365+
if err != nil {
366+
log.Errorf("hdfs client open err: %v", err)
367+
return nil, err
368+
}
369+
370+
return &hdfsFileHandle{
371+
name: name,
372+
reader: reader,
373+
fs: fs,
374+
writer: nil,
375+
}, nil
376+
}
377+
378+
// append only
379+
380+
if flag&syscall.O_APPEND != 0 {
381+
// hdfs nameNode maybe not release fh, has error: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException
382+
for i := 0; i < 3; i++ {
383+
writer, err := fs.client.Append(fs.GetPath(name))
384+
if err != nil {
385+
if fs.shouldRetry(err) {
386+
time.Sleep(100 * time.Millisecond * time.Duration(i*i))
387+
continue
388+
}
389+
log.Errorf("hdfs client append err: %v", err)
390+
return nil, err
391+
}
392+
return &hdfsFileHandle{
393+
name: name,
394+
reader: nil,
395+
fs: fs,
396+
writer: writer,
397+
}, nil
398+
}
399+
}
400+
return nil, syscall.ENOTSUP
368401
}
369402

370403
func (fs *hdfsFileSystem) Create(name string, flags, mode uint32) (fd FileHandle, err error) {
@@ -455,12 +488,9 @@ var _ FileHandle = &hdfsFileHandle{}
455488
func (fh *hdfsFileHandle) Read(buf []byte, off uint64) (int, error) {
456489
log.Tracef("hdfs read: fh.name[%s], offset[%d]", fh.name, off)
457490
if fh.reader == nil {
458-
reader, err := fh.fs.client.Open(fh.fs.GetPath(fh.name))
459-
if err != nil {
460-
log.Errorf("hdfs client open err: %v", err)
461-
return 0, err
462-
}
463-
fh.reader = reader
491+
err := fmt.Errorf("hdfs read: file[%s] bad file descriptor reader==nil", fh.name)
492+
log.Errorf(err.Error())
493+
return 0, err
464494
}
465495
n, err := fh.reader.ReadAt(buf, int64(off))
466496
if err != nil && err != io.EOF {
@@ -482,11 +512,9 @@ func (fh *hdfsFileHandle) Write(data []byte, off uint64) (uint32, error) {
482512
log.Tracef("hdfs write: fh.name[%s], dataLength[%d], offset[%d], fh[%+v]", fh.name, len(data), off, fh)
483513
var err error
484514
if fh.writer == nil {
485-
fh.writer, err = fh.fs.client.Append(fh.fs.GetPath(fh.name))
486-
if err != nil {
487-
log.Errorf("hdfs client append err: %v", err)
488-
return 0, err
489-
}
515+
err = fmt.Errorf("hdfs write: file[%s] bad file descriptor writer==nil", fh.name)
516+
log.Errorf(err.Error())
517+
return 0, err
490518
}
491519

492520
n, err := fh.writer.Write(data)

pkg/fs/client/ufs/hdfs_test.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"os"
2424
"reflect"
2525
"sync"
26+
"syscall"
2627
"testing"
2728

2829
"github.com/agiledragon/gomonkey/v2"
@@ -262,3 +263,158 @@ func Test_hdfsFileHandle_Write(t *testing.T) {
262263
})
263264
}
264265
}
266+
267+
func Test_hdfsFileSystem_OpenRead(t *testing.T) {
268+
type fields struct {
269+
client *hdfs.Client
270+
subpath string
271+
blockSize int64
272+
replication int
273+
Mutex sync.Mutex
274+
}
275+
type args struct {
276+
name string
277+
flags uint32
278+
size uint64
279+
}
280+
tests := []struct {
281+
name string
282+
fields fields
283+
args args
284+
want FileHandle
285+
wantErr assert.ErrorAssertionFunc
286+
}{
287+
{
288+
name: "want read open err",
289+
fields: fields{
290+
client: &hdfs.Client{},
291+
subpath: "./",
292+
},
293+
args: args{
294+
name: "test",
295+
flags: uint32(1),
296+
},
297+
want: nil,
298+
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
299+
return true
300+
},
301+
},
302+
{
303+
name: "want read open nil",
304+
fields: fields{
305+
client: &hdfs.Client{},
306+
subpath: "./",
307+
},
308+
args: args{
309+
name: "test",
310+
flags: uint32(1),
311+
},
312+
want: nil,
313+
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
314+
return false
315+
},
316+
},
317+
}
318+
319+
var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
320+
return nil, fmt.Errorf("open fail")
321+
})
322+
defer p1.Reset()
323+
324+
var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int {
325+
return syscall.O_RDONLY
326+
})
327+
defer p4.Reset()
328+
329+
for _, tt := range tests {
330+
if tt.name == "want read open nil" {
331+
var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
332+
return nil, nil
333+
})
334+
defer p2.Reset()
335+
}
336+
t.Run(tt.name, func(t *testing.T) {
337+
fs := &hdfsFileSystem{
338+
client: tt.fields.client,
339+
subpath: tt.fields.subpath,
340+
blockSize: tt.fields.blockSize,
341+
replication: tt.fields.replication,
342+
Mutex: tt.fields.Mutex,
343+
}
344+
got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size)
345+
if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) {
346+
return
347+
}
348+
assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)
349+
})
350+
}
351+
}
352+
353+
func Test_hdfsFileSystem_Open(t *testing.T) {
354+
type fields struct {
355+
client *hdfs.Client
356+
subpath string
357+
blockSize int64
358+
replication int
359+
Mutex sync.Mutex
360+
}
361+
type args struct {
362+
name string
363+
flags uint32
364+
size uint64
365+
}
366+
tests := []struct {
367+
name string
368+
fields fields
369+
args args
370+
want FileHandle
371+
wantErr assert.ErrorAssertionFunc
372+
}{
373+
{
374+
name: "want retry err",
375+
fields: fields{
376+
client: &hdfs.Client{},
377+
subpath: "./",
378+
},
379+
args: args{
380+
name: "test",
381+
flags: uint32(1),
382+
},
383+
want: nil,
384+
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
385+
return true
386+
},
387+
},
388+
}
389+
390+
var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
391+
return nil, nil
392+
})
393+
defer p1.Reset()
394+
var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Append", func(_ *hdfs.Client, name string) (*hdfs.FileWriter, error) {
395+
return nil, fmt.Errorf("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException")
396+
})
397+
defer p2.Reset()
398+
399+
var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int {
400+
return syscall.O_WRONLY | syscall.O_APPEND
401+
})
402+
defer p4.Reset()
403+
404+
for _, tt := range tests {
405+
t.Run(tt.name, func(t *testing.T) {
406+
fs := &hdfsFileSystem{
407+
client: tt.fields.client,
408+
subpath: tt.fields.subpath,
409+
blockSize: tt.fields.blockSize,
410+
replication: tt.fields.replication,
411+
Mutex: tt.fields.Mutex,
412+
}
413+
got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size)
414+
if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) {
415+
return
416+
}
417+
assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)
418+
})
419+
}
420+
}

pkg/fs/client/ufs/s3.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -941,10 +941,15 @@ func (fs *s3FileSystem) Get(name string, flags uint32, off, limit int64) (io.Rea
941941
request.Range = &r
942942
}
943943

944-
response, err := fs.s3.GetObject(request)
945-
if err != nil {
946-
log.Errorf("s3 get: s3.GetObject[%s] off[%d] limit[%d] err: %v ", name, off, limit, err)
947-
return nil, err
944+
var response *s3.GetObjectOutput
945+
var err error
946+
for i := 0; i < 3; i++ {
947+
response, err = fs.s3.GetObject(request)
948+
if err != nil {
949+
log.Errorf("s3 get[%v]: s3.GetObject[%s] off[%d] limit[%d] err: %v ", i, name, off, limit, err)
950+
} else {
951+
break
952+
}
948953
}
949954
return response.Body, err
950955
}

0 commit comments

Comments
 (0)