Skip to content

Conversation

@gouhongshen
Copy link
Contributor

@gouhongshen gouhongshen commented Dec 9, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##23236

What this PR does / why we need it:

parallel writing object to remote storage.


PR Type

Enhancement


Description

  • Implement parallel multipart upload support for S3 and QCloud storage backends

  • Add ParallelMultipartWriter interface with configurable part size and concurrency

  • Integrate parallel uploads into S3FS write path with policy-based control flags

  • Enhance export functionality with context-aware cancellation handling


Diagram Walkthrough

flowchart LR
  A["IOVector with<br/>parallel flags"] -->|"DisableParallel<br/>ForceParallel"| B["S3FS write path"]
  B -->|"SupportsParallelMultipart"| C["ParallelMultipartWriter"]
  C -->|"AWS SDK"| D["AwsSDKv2<br/>WriteMultipartParallel"]
  C -->|"QCloud SDK"| E["QCloudSDK<br/>WriteMultipartParallel"]
  D -->|"Parallel workers<br/>with buffer pool"| F["S3 multipart upload"]
  E -->|"Parallel workers<br/>with buffer pool"| G["COS multipart upload"]
Loading

File Walkthrough

Relevant files
Enhancement
9 files
object_storage.go
Define parallel multipart upload interface and utilities 
+75/-0   
aws_sdk_v2.go
Implement parallel multipart upload for AWS S3                     
+250/-0 
qcloud_sdk.go
Implement parallel multipart upload for QCloud COS             
+252/-0 
s3_fs.go
Integrate parallel uploads into S3FS write path                   
+18/-2   
file_service.go
Add parallel upload control flags to IOVector                       
+5/-0     
policy.go
Add policy flags for parallel write control                           
+2/-0     
object_storage_http_trace.go
Add parallel multipart tracing support                                     
+19/-0   
object_storage_metrics.go
Add parallel multipart metrics support                                     
+17/-0   
object_storage_semaphore.go
Add parallel multipart semaphore support                                 
+18/-0   
Error handling
2 files
data_branch.go
Add writer error handling in CSV export                                   
+6/-0     
export.go
Improve context cancellation handling in export                   
+22/-3   

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 9, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Resource exhaustion

Description: Global goroutine pool via ants with size runtime.NumCPU() is initialized once and shared
across all parallel uploads, which could lead to resource starvation or unintended
cross-tenant interference if unbounded tasks accumulate; ensure queue limits and
backpressure are appropriate.
object_storage.go [45-54]

Referred Code
func getParallelUploadPool() *ants.Pool {
	parallelUploadPoolOnce.Do(func() {
		pool, err := ants.NewPool(runtime.NumCPU())
		if err != nil {
			panic(err)
		}
		parallelUploadPool = pool
	})
	return parallelUploadPool
}
Orphaned uploads

Description: Multipart part buffers are reused via sync.Pool and errors are retried, but on
cancellation or partial failure, parts list and upload IDs rely on best-effort abort; if
abort fails, orphaned multipart uploads may accumulate leading to storage leakage;
consider cleanup strategy.
aws_sdk_v2.go [474-596]

Referred Code
ctx, cancel := context.WithCancel(ctx)
defer cancel()

bufPool := sync.Pool{
	New: func() any {
		return make([]byte, options.PartSize)
	},
}

readChunk := func() (buf []byte, n int, err error) {
	raw := bufPool.Get().([]byte)
	n, err = io.ReadFull(r, raw)
	switch {
	case errors.Is(err, io.EOF):
		bufPool.Put(raw)
		return nil, 0, io.EOF
	case errors.Is(err, io.ErrUnexpectedEOF):
		err = io.EOF
		return raw, n, err
	case err != nil:
		bufPool.Put(raw)


 ... (clipped 102 lines)
Orphaned uploads

Description: Similar to S3 path, COS multipart uploads may leave orphaned uploads if abort fails on
error/cancel, potentially causing storage leakage and quota issues; add periodic cleanup
or server-side lifecycle rules.
qcloud_sdk.go [371-435]

Referred Code
defer func() {
	if err != nil {
		_, _ = a.client.Object.AbortMultipartUpload(ctx, key, output.UploadID)
	}
}()

type partJob struct {
	num int32
	buf []byte
	n   int
}

var (
	partNum   int32
	parts     []cos.Object
	partsLock sync.Mutex
	wg        sync.WaitGroup
	errOnce   sync.Once
	firstErr  error
)



 ... (clipped 44 lines)
Ticket Compliance
🟡
🎫 #23236
🟢 Fix 400 EntityTooSmall error when diff dump/export writes small objects to COS/S3-backed
stage by ensuring uploads meet provider multipart/single-part size constraints.
Implement a mechanism to handle small result files gracefully (e.g., fall back to single
PUT) and large files efficiently.
Ensure compatibility with both AWS S3 and QCloud COS backends.
Improve robustness of export pipeline to handle cancellations and downstream writer errors
without deadlocks.
🔴 Make the change available on 3.0-dev branch as part of data export/diff dump path.
Validate end-to-end diff dump to stage against empty table on COS to confirm no
EntityTooSmall errors and successful object creation.
Performance regression check comparing parallel vs serial uploads across various object
sizes.
Verify new policy flags and vector fields do not break existing configuration or APIs.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing Auditing: The new parallel multipart upload path performs critical write operations without adding
any audit logging indicating who initiated the action, the key, size, or outcome.

Referred Code
if pmw, ok := s.storage.(ParallelMultipartWriter); ok && pmw.SupportsParallelMultipart() &&
	!disableParallel &&
	(forceParallel || size == nil || *size >= minMultipartPartSize) {
	opt := &ParallelMultipartOption{
		PartSize:    defaultParallelMultipartPartSize,
		Concurrency: runtime.NumCPU(),
		Expire:      expire,
	}
	if err := pmw.WriteMultipartParallel(ctx, key, reader, size, opt); err != nil {
		return 0, err
	}
} else {
	if err := s.storage.Write(ctx, key, reader, size, expire); err != nil {
		return 0, err
	}

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 9, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Abstract the duplicated parallel upload logic

The WriteMultipartParallel function contains over 200 lines of nearly identical,
complex code in both aws_sdk_v2.go and qcloud_sdk.go. This duplicated logic
should be refactored into a single, generic helper to improve maintainability.

Examples:

pkg/fileservice/aws_sdk_v2.go [454-695]
func (a *AwsSDKv2) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)

 ... (clipped 232 lines)
pkg/fileservice/qcloud_sdk.go [290-533]
func (a *QCloudSDK) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)

 ... (clipped 234 lines)

Solution Walkthrough:

Before:

// In pkg/fileservice/aws_sdk_v2.go
func (a *AwsSDKv2) WriteMultipartParallel(...) error {
    // ... ~240 lines of orchestration logic ...
    // setup pools, channels, workers
    
    // AWS-specific create
    output, _ := a.client.CreateMultipartUpload(...)
    
    // worker loop
    for job := range jobCh {
        // AWS-specific upload part
        uploadOutput, _ := a.client.UploadPart(...)
        parts = append(parts, types.CompletedPart{...})
    }
    
    // AWS-specific complete
    _, err = a.client.CompleteMultipartUpload(...)
}

// In pkg/fileservice/qcloud_sdk.go
func (a *QCloudSDK) WriteMultipartParallel(...) error {
    // ... ~240 lines of IDENTICAL orchestration logic ...
    // setup pools, channels, workers

    // QCloud-specific create
    output, _ := a.client.Object.InitiateMultipartUpload(...)

    // worker loop
    for job := range jobCh {
        // QCloud-specific upload part
        resp, _ := a.client.Object.UploadPart(...)
        parts = append(parts, cos.Object{...})
    }
    
    // QCloud-specific complete
    _, err = a.client.Object.CompleteMultipartUpload(...)
}

After:

// In a new shared file, e.g., pkg/fileservice/parallel_uploader.go
type MultipartUploaderAdapter interface {
    CreateMultipartUpload(ctx, key, expire) (uploadID string, err error)
    UploadPart(ctx, key, uploadID, num, body) (etag string, err error)
    CompleteMultipartUpload(ctx, key, uploadID, parts) error
    AbortMultipartUpload(ctx, key, uploadID) error
}

func GenericWriteMultipartParallel(adapter MultipartUploaderAdapter, ...) error {
    // ... ~240 lines of shared orchestration logic ...
    uploadID, err := adapter.CreateMultipartUpload(...)
    // ...
    for job := range jobCh {
        etag, err := adapter.UploadPart(...)
        // ...
    }
    // ...
    err = adapter.CompleteMultipartUpload(...)
    return err
}

// In pkg/fileservice/aws_sdk_v2.go
func (a *AwsSDKv2) WriteMultipartParallel(...) error {
    adapter := newAwsAdapter(a.client, a.bucket)
    return GenericWriteMultipartParallel(adapter, ...)
}

// In pkg/fileservice/qcloud_sdk.go
func (a *QCloudSDK) WriteMultipartParallel(...) error {
    adapter := newQCloudAdapter(a.client)
    return GenericWriteMultipartParallel(adapter, ...)
}
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies over 200 lines of duplicated complex concurrent logic for parallel uploads in aws_sdk_v2.go and qcloud_sdk.go, and addressing this is critical for maintainability and correctness.

High
Possible issue
Use a new context for aborting uploads

For the deferred multipart upload abort, use a new context instead of the
original one to ensure cleanup succeeds even if the original context is
cancelled.

pkg/fileservice/aws_sdk_v2.go [528-537]

 	defer func() {
 		if err != nil {
-			_, abortErr := a.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
+			// Use a new context for aborting to avoid being affected by the original context's cancellation.
+			abortCtx, abortCancel := context.WithTimeout(context.Background(), time.Minute)
+			defer abortCancel()
+			_, abortErr := a.client.AbortMultipartUpload(abortCtx, &s3.AbortMultipartUploadInput{
 				Bucket:   ptrTo(a.bucket),
 				Key:      ptrTo(key),
 				UploadId: output.UploadId,
 			})
 			err = errors.Join(err, abortErr)
 		}
 	}()
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that a cancelled context would cause the abort operation to fail, leading to orphaned multipart uploads and unnecessary costs, and proposes a robust solution.

Medium
  • Update

@matrix-meow matrix-meow added size/XXL Denotes a PR that changes 2000+ lines and removed size/XL Denotes a PR that changes [1000, 1999] lines labels Dec 10, 2025
@gouhongshen gouhongshen changed the title to 3.0: parallel writing object to remote storage. to 3.0: data branch diff parallel writing object to remote storage. Dec 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants