Skip to content

Conversation

@gouhongshen
Copy link
Contributor

@gouhongshen gouhongshen commented Dec 9, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##23236

What this PR does / why we need it:

parallel writing object to remote storage.


PR Type

Enhancement


Description

  • Implement parallel multipart upload support for S3 and QCloud storage backends

  • Add ParallelMultipartWriter interface with configurable part size and concurrency

  • Integrate parallel uploads into S3FS write path with policy-based control flags

  • Enhance export pipeline with context-aware cancellation handling


Diagram Walkthrough

flowchart LR
  IOVector["IOVector with<br/>DisableParallel/ForceParallel"] -->|write| S3FS["S3FS.write()"]
  S3FS -->|check support| PMW["ParallelMultipartWriter"]
  PMW -->|parallel upload| AWS["AwsSDKv2<br/>WriteMultipartParallel"]
  PMW -->|parallel upload| QCloud["QCloudSDK<br/>WriteMultipartParallel"]
  AWS -->|buffer pool| Workers["Worker Pool<br/>runtime.NumCPU()"]
  QCloud -->|buffer pool| Workers
  Workers -->|upload parts| Storage["Remote Storage"]
  Export["Export Pipeline"] -->|context aware| Cancel["Cancellation Handling"]
Loading

File Walkthrough

Relevant files
Enhancement
11 files
object_storage.go
Define parallel multipart upload interface and utilities 
+75/-0   
aws_sdk_v2.go
Implement parallel multipart upload for AWS S3                     
+250/-0 
qcloud_sdk.go
Implement parallel multipart upload for QCloud COS             
+252/-0 
object_storage_http_trace.go
Add parallel multipart tracing support                                     
+19/-0   
object_storage_metrics.go
Add parallel multipart metrics support                                     
+17/-0   
object_storage_semaphore.go
Add parallel multipart semaphore support                                 
+18/-0   
file_service.go
Add parallel upload control flags to IOVector                       
+5/-0     
policy.go
Add parallel write policy flags                                                   
+2/-0     
s3_fs.go
Integrate parallel multipart uploads into write path         
+18/-2   
data_branch.go
Add context-aware cancellation in CSV write pipeline         
+6/-0     
export.go
Improve export pipeline with context cancellation handling
+22/-3   

@gouhongshen gouhongshen marked this pull request as ready for review December 9, 2025 14:21
@qodo-code-review qodo-code-review bot changed the title parallel writing object to remote storage. parallel writing object to remote storage. Dec 9, 2025
@matrix-meow matrix-meow added the size/L Denotes a PR that changes [500,999] lines label Dec 9, 2025
@qodo-code-review
Copy link

qodo-code-review bot commented Dec 9, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Resource exhaustion

Description: The global parallel upload worker pool is initialized with runtime.NumCPU() and is
unbounded for tasks per upload, which can lead to denial-of-service via excessive
concurrent uploads exhausting shared pool capacity across the process.
object_storage.go [41-54]

Referred Code
	parallelUploadPoolOnce sync.Once
	parallelUploadPool     *ants.Pool
)

func getParallelUploadPool() *ants.Pool {
	parallelUploadPoolOnce.Do(func() {
		pool, err := ants.NewPool(runtime.NumCPU())
		if err != nil {
			panic(err)
		}
		parallelUploadPool = pool
	})
	return parallelUploadPool
}
Memory exhaustion

Description: Multipart uploads buffer entire parts in memory via sync.Pool with default 64MB part size
and concurrency tied to CPU cores, potentially allowing large memory usage spikes under
load.
aws_sdk_v2.go [454-695]

Referred Code
func (a *AwsSDKv2) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)
	if sizeHint != nil && *sizeHint < minMultipartPartSize {
		return a.Write(ctx, key, r, sizeHint, options.Expire)
	}
	if sizeHint != nil {
		expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize
		if expectedParts > maxMultipartParts {
			return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts)
		}
	}

	ctx, cancel := context.WithCancel(ctx)


 ... (clipped 221 lines)
Memory exhaustion

Description: QCloud parallel multipart upload also allocates 64MB buffers per part with CPU-based
concurrency, risking high memory consumption under heavy parallel exports.
qcloud_sdk.go [290-533]

Referred Code
func (a *QCloudSDK) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)
	if sizeHint != nil && *sizeHint < minMultipartPartSize {
		return a.Write(ctx, key, r, sizeHint, options.Expire)
	}
	if sizeHint != nil {
		expectedParts := (*sizeHint + options.PartSize - 1) / options.PartSize
		if expectedParts > maxMultipartParts {
			return moerr.NewInternalErrorNoCtxf("too many parts for multipart upload: %d", expectedParts)
		}
	}

	ctx, cancel := context.WithCancel(ctx)


 ... (clipped 223 lines)
Ticket Compliance
🟡
🎫 #23236
🟢 Support multipart uploads correctly so small files use single PUT and larger files are
uploaded in compliant part sizes.
Improve reliability of export pipeline by handling cancellation and downstream writer
errors to avoid deadlocks.
Apply the fix on the 3.0-dev branch for S3/QCloud storage backends used by stage:// URLs.
Prevent 400 EntityTooSmall errors when diff dump/export writes very small objects to
S3/QCloud stages.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Missing Auditing: New parallel multipart upload path performs critical write operations without adding audit
logging for action, user context, or outcome.

Referred Code
if pmw, ok := s.storage.(ParallelMultipartWriter); ok && pmw.SupportsParallelMultipart() &&
	!disableParallel &&
	(forceParallel || size == nil || *size >= minMultipartPartSize) {
	opt := &ParallelMultipartOption{
		PartSize:    defaultParallelMultipartPartSize,
		Concurrency: runtime.NumCPU(),
		Expire:      expire,
	}
	if err := pmw.WriteMultipartParallel(ctx, key, reader, size, opt); err != nil {
		return 0, err
	}
} else {
	if err := s.storage.Write(ctx, key, reader, size, expire); err != nil {
		return 0, err
	}
}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Concurrency Limits: Global worker pool and default concurrency are introduced without explicit upper-bound
validation per-tenant/request which may risk resource exhaustion under multi-tenant load.

Referred Code
func getParallelUploadPool() *ants.Pool {
	parallelUploadPoolOnce.Do(func() {
		pool, err := ants.NewPool(runtime.NumCPU())
		if err != nil {
			panic(err)
		}
		parallelUploadPool = pool
	})
	return parallelUploadPool
}

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 9, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Abstract the duplicated parallel upload logic

The parallel multipart upload logic is duplicated in aws_sdk_v2.go and
qcloud_sdk.go. This should be refactored into a generic helper function that
accepts an interface for SDK-specific operations to reduce code duplication and
improve maintainability.

Examples:

pkg/fileservice/aws_sdk_v2.go [454-695]
func (a *AwsSDKv2) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)

 ... (clipped 232 lines)
pkg/fileservice/qcloud_sdk.go [290-533]
func (a *QCloudSDK) WriteMultipartParallel(
	ctx context.Context,
	key string,
	r io.Reader,
	sizeHint *int64,
	opt *ParallelMultipartOption,
) (err error) {
	defer wrapSizeMismatchErr(&err)

	options := normalizeParallelOption(opt)

 ... (clipped 234 lines)

Solution Walkthrough:

Before:

// In aws_sdk_v2.go
func (a *AwsSDKv2) WriteMultipartParallel(...) error {
    // ... setup, buffer pool, readChunk ...
    output, _ := a.client.CreateMultipartUpload(...)
    defer a.client.AbortMultipartUpload(...)
    // ... worker pool, job channel, waitgroup ...
    startWorker := func() {
        // ... loop over job channel ...
        uploadOutput, _ := a.client.UploadPart(...)
        // ... handle result, append to parts slice ...
    }
    // ... read loop, send jobs, wait for workers ...
    _, err = a.client.CompleteMultipartUpload(...)
    return err
}

// In qcloud_sdk.go
func (a *QCloudSDK) WriteMultipartParallel(...) error {
    // ... IDENTICAL setup, buffer pool, readChunk ...
    output, _ := a.client.Object.InitiateMultipartUpload(...)
    defer a.client.Object.AbortMultipartUpload(...)
    // ... IDENTICAL worker pool, job channel, waitgroup ...
    startWorker := func() {
        // ... loop over job channel ...
        _, _ := a.client.Object.UploadPart(...)
        // ... handle result, append to parts slice ...
    }
    // ... IDENTICAL read loop, send jobs, wait for workers ...
    _, err = a.client.Object.CompleteMultipartUpload(...)
    return err
}

After:

// In a new helper file
type MultipartUploader interface {
    Initiate(...) (uploadID any, err error)
    UploadPart(...) (partInfo any, err error)
    Complete(...) error
    Abort(...) error
}

func GenericWriteMultipartParallel(uploader MultipartUploader, ...) error {
    // ... setup, buffer pool, readChunk ...
    uploadID, _ := uploader.Initiate(...)
    defer uploader.Abort(...)
    // ... worker pool, job channel, waitgroup ...
    startWorker := func() {
        // ... loop over job channel ...
        partInfo, _ := uploader.UploadPart(...)
        // ... handle result, append to parts slice ...
    }
    // ... read loop, send jobs, wait for workers ...
    err = uploader.Complete(...)
    return err
}

// In aws_sdk_v2.go and qcloud_sdk.go
func (a *AwsSDKv2) WriteMultipartParallel(...) error {
    // adapter that implements MultipartUploader for aws
    awsUploader := newAwsUploaderAdapter(a.client, ...)
    return GenericWriteMultipartParallel(awsUploader, ...)
}
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies significant code duplication in the core logic of the PR for AwsSDKv2 and QCloudSDK, and proposes a valid abstraction that would greatly improve code quality and maintainability.

High
Possible issue
Use background context for abort

Use a new background context for the AbortMultipartUpload call to ensure it can
execute even if the primary operation's context is cancelled.

pkg/fileservice/aws_sdk_v2.go [528-537]

 	defer func() {
 		if err != nil {
+			// use a new context to ensure abort is called
+			ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
+			defer cancel()
 			_, abortErr := a.client.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
 				Bucket:   ptrTo(a.bucket),
 				Key:      ptrTo(key),
 				UploadId: output.UploadId,
 			})
 			err = errors.Join(err, abortErr)
 		}
 	}()
  • Apply / Chat
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical bug where a cancelled context prevents the cleanup of failed multipart uploads, which could lead to orphaned data and increased storage costs.

High
Ensure context cancellation on timeout

Call cancelCtx() when the inputCtx is done to ensure all spawned goroutines are
properly terminated.

pkg/frontend/data_branch.go [2049-2058]

 		case <-inputCtx.Done():
 			err = errors.Join(err, inputCtx.Err())
 			stop = true
+			cancelCtx()
 		case e := <-writerErr:
 			if e != nil {
 				err = errors.Join(err, e)
 			}
 			stop = true
 			cancelCtx()
 		case e, ok := <-errChan:
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that failing to call cancelCtx() when inputCtx is done can lead to dangling goroutines, fixing a potential resource leak and improving shutdown behavior.

Medium
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/enhancement Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants