Skip to content

Commit e23842f

Browse files
committed
fix sca
1 parent 597a2a3 commit e23842f

File tree

3 files changed

+73
-31
lines changed

3 files changed

+73
-31
lines changed

pkg/fileservice/aws_sdk_v2.go

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -476,29 +476,31 @@ func (a *AwsSDKv2) WriteMultipartParallel(
476476

477477
bufPool := sync.Pool{
478478
New: func() any {
479-
return make([]byte, options.PartSize)
479+
buf := make([]byte, options.PartSize)
480+
return &buf
480481
},
481482
}
482483

483-
readChunk := func() (buf []byte, n int, err error) {
484-
raw := bufPool.Get().([]byte)
484+
readChunk := func() (bufPtr *[]byte, buf []byte, n int, err error) {
485+
bufPtr = bufPool.Get().(*[]byte)
486+
raw := *bufPtr
485487
n, err = io.ReadFull(r, raw)
486488
switch {
487489
case errors.Is(err, io.EOF):
488-
bufPool.Put(raw)
489-
return nil, 0, io.EOF
490+
bufPool.Put(bufPtr)
491+
return nil, nil, 0, io.EOF
490492
case errors.Is(err, io.ErrUnexpectedEOF):
491493
err = io.EOF
492-
return raw, n, err
494+
return bufPtr, raw, n, err
493495
case err != nil:
494-
bufPool.Put(raw)
495-
return nil, 0, err
496+
bufPool.Put(bufPtr)
497+
return nil, nil, 0, err
496498
default:
497-
return raw, n, nil
499+
return bufPtr, raw, n, nil
498500
}
499501
}
500502

501-
firstBuf, firstN, err := readChunk()
503+
firstBufPtr, firstBuf, firstN, err := readChunk()
502504
if err != nil && !errors.Is(err, io.EOF) {
503505
return err
504506
}
@@ -508,7 +510,7 @@ func (a *AwsSDKv2) WriteMultipartParallel(
508510
if errors.Is(err, io.EOF) && int64(firstN) < minMultipartPartSize {
509511
data := make([]byte, firstN)
510512
copy(data, firstBuf[:firstN])
511-
bufPool.Put(firstBuf)
513+
bufPool.Put(firstBufPtr)
512514
size := int64(firstN)
513515
return a.Write(ctx, key, bytes.NewReader(data), &size, options.Expire)
514516
}
@@ -521,7 +523,7 @@ func (a *AwsSDKv2) WriteMultipartParallel(
521523
})
522524
}, maxRetryAttemps, IsRetryableError)
523525
if createErr != nil {
524-
bufPool.Put(firstBuf)
526+
bufPool.Put(firstBufPtr)
525527
return createErr
526528
}
527529

@@ -537,9 +539,10 @@ func (a *AwsSDKv2) WriteMultipartParallel(
537539
}()
538540

539541
type partJob struct {
540-
num int32
541-
buf []byte
542-
n int
542+
num int32
543+
buf []byte
544+
bufPtr *[]byte
545+
n int
543546
}
544547

545548
var (
@@ -569,7 +572,9 @@ func (a *AwsSDKv2) WriteMultipartParallel(
569572
defer wg.Done()
570573
for job := range jobCh {
571574
if ctx.Err() != nil {
572-
bufPool.Put(job.buf)
575+
if job.bufPtr != nil {
576+
bufPool.Put(job.bufPtr)
577+
}
573578
continue
574579
}
575580
uploadOutput, uploadErr := DoWithRetry("upload part", func() (*s3.UploadPartOutput, error) {
@@ -583,10 +588,14 @@ func (a *AwsSDKv2) WriteMultipartParallel(
583588
}, maxRetryAttemps, IsRetryableError)
584589
if uploadErr != nil {
585590
setErr(uploadErr)
586-
bufPool.Put(job.buf)
591+
if job.bufPtr != nil {
592+
bufPool.Put(job.bufPtr)
593+
}
587594
continue
588595
}
589-
bufPool.Put(job.buf)
596+
if job.bufPtr != nil {
597+
bufPool.Put(job.bufPtr)
598+
}
590599
partsLock.Lock()
591600
parts = append(parts, types.CompletedPart{
592601
ETag: uploadOutput.ETag,
@@ -604,29 +613,34 @@ func (a *AwsSDKv2) WriteMultipartParallel(
604613
}
605614
}
606615

607-
sendJob := func(buf []byte, n int) bool {
616+
sendJob := func(bufPtr *[]byte, buf []byte, n int) bool {
608617
partNum++
609618
if partNum > maxMultipartParts {
610619
setErr(moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", partNum))
611-
bufPool.Put(buf)
620+
if bufPtr != nil {
621+
bufPool.Put(bufPtr)
622+
}
612623
return false
613624
}
614625
job := partJob{
615-
num: partNum,
616-
buf: buf,
617-
n: n,
626+
num: partNum,
627+
buf: buf,
628+
bufPtr: bufPtr,
629+
n: n,
618630
}
619631
select {
620632
case jobCh <- job:
621633
return true
622634
case <-ctx.Done():
623-
bufPool.Put(buf)
635+
if bufPtr != nil {
636+
bufPool.Put(bufPtr)
637+
}
624638
setErr(ctx.Err())
625639
return false
626640
}
627641
}
628642

629-
if !sendJob(firstBuf, firstN) {
643+
if !sendJob(firstBufPtr, firstBuf, firstN) {
630644
close(jobCh)
631645
wg.Wait()
632646
if firstErr != nil {
@@ -636,24 +650,24 @@ func (a *AwsSDKv2) WriteMultipartParallel(
636650
}
637651

638652
for {
639-
nextBuf, nextN, readErr := readChunk()
653+
nextBufPtr, nextBuf, nextN, readErr := readChunk()
640654
if errors.Is(readErr, io.EOF) && nextN == 0 {
641655
break
642656
}
643657
if readErr != nil && !errors.Is(readErr, io.EOF) {
644658
setErr(readErr)
645-
if nextBuf != nil {
646-
bufPool.Put(nextBuf)
659+
if nextBufPtr != nil {
660+
bufPool.Put(nextBufPtr)
647661
}
648662
break
649663
}
650664
if nextN == 0 {
651-
if nextBuf != nil {
652-
bufPool.Put(nextBuf)
665+
if nextBufPtr != nil {
666+
bufPool.Put(nextBufPtr)
653667
}
654668
break
655669
}
656-
if !sendJob(nextBuf, nextN) {
670+
if !sendJob(nextBufPtr, nextBuf, nextN) {
657671
break
658672
}
659673
if readErr != nil && errors.Is(readErr, io.EOF) {

pkg/fileservice/parallel_sdk_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package fileservice
216

317
import (

pkg/fileservice/parallel_upload_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,17 @@
1+
// Copyright 2025 Matrix Origin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
115
package fileservice
216

317
import (

0 commit comments

Comments
 (0)