Skip to content

Commit 8055bc8

Browse files
authored
S3 Executor support (#3302)
1 parent cb73da7 commit 8055bc8

15 files changed

+564
-287
lines changed

gems/aws-sdk-s3/CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
Unreleased Changes
22
------------------
33

4+
* Feature - Add lightweight thread pool executor for multipart `download_file`, `upload_file` and `upload_stream`.
5+
6+
* Feature - Add custom executor support for `Aws::S3::TransferManager`.
7+
48
1.199.1 (2025-09-25)
59
------------------
610

gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ module S3
77
autoload :Encryption, 'aws-sdk-s3/encryption'
88
autoload :EncryptionV2, 'aws-sdk-s3/encryption_v2'
99
autoload :FilePart, 'aws-sdk-s3/file_part'
10+
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
1011
autoload :FileUploader, 'aws-sdk-s3/file_uploader'
1112
autoload :FileDownloader, 'aws-sdk-s3/file_downloader'
1213
autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'

gems/aws-sdk-s3/lib/aws-sdk-s3/customizations/object.rb

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -358,8 +358,8 @@ def public_url(options = {})
358358
# {Client#complete_multipart_upload},
359359
# and {Client#upload_part} can be provided.
360360
#
361-
# @option options [Integer] :thread_count (10) The number of parallel
362-
# multipart uploads
361+
# @option options [Integer] :thread_count (10) The number of parallel multipart uploads.
362+
# An additional thread is used internally for task coordination.
363363
#
364364
# @option options [Boolean] :tempfile (false) Normally read data is stored
365365
# in memory when building the parts in order to complete the underlying
@@ -383,19 +383,18 @@ def public_url(options = {})
383383
# @see Client#complete_multipart_upload
384384
# @see Client#upload_part
385385
def upload_stream(options = {}, &block)
386-
uploading_options = options.dup
386+
upload_opts = options.merge(bucket: bucket_name, key: key)
387+
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
387388
uploader = MultipartStreamUploader.new(
388389
client: client,
389-
thread_count: uploading_options.delete(:thread_count),
390-
tempfile: uploading_options.delete(:tempfile),
391-
part_size: uploading_options.delete(:part_size)
390+
executor: executor,
391+
tempfile: upload_opts.delete(:tempfile),
392+
part_size: upload_opts.delete(:part_size)
392393
)
393394
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
394-
uploader.upload(
395-
uploading_options.merge(bucket: bucket_name, key: key),
396-
&block
397-
)
395+
uploader.upload(upload_opts, &block)
398396
end
397+
executor.shutdown
399398
true
400399
end
401400
deprecated(:upload_stream, use: 'Aws::S3::TransferManager#upload_stream', version: 'next major version')
@@ -458,12 +457,18 @@ def upload_stream(options = {}, &block)
458457
# @see Client#complete_multipart_upload
459458
# @see Client#upload_part
460459
def upload_file(source, options = {})
461-
uploading_options = options.dup
462-
uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: client)
460+
upload_opts = options.merge(bucket: bucket_name, key: key)
461+
executor = DefaultExecutor.new(max_threads: upload_opts.delete(:thread_count))
462+
uploader = FileUploader.new(
463+
client: client,
464+
executor: executor,
465+
multipart_threshold: upload_opts.delete(:multipart_threshold)
466+
)
463467
response = Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
464-
uploader.upload(source, uploading_options.merge(bucket: bucket_name, key: key))
468+
uploader.upload(source, upload_opts)
465469
end
466470
yield response if block_given?
471+
executor.shutdown
467472
true
468473
end
469474
deprecated(:upload_file, use: 'Aws::S3::TransferManager#upload_file', version: 'next major version')
@@ -512,10 +517,6 @@ def upload_file(source, options = {})
512517
#
513518
# @option options [Integer] :thread_count (10) Customize threads used in the multipart download.
514519
#
515-
# @option options [String] :version_id The object version id used to retrieve the object.
516-
#
517-
# @see https://docs.aws.amazon.com/AmazonS3/latest/dev/ObjectVersioning.html ObjectVersioning
518-
#
519520
# @option options [String] :checksum_mode ("ENABLED")
520521
# When `"ENABLED"` and the object has a stored checksum, it will be used to validate the download and will
521522
# raise an `Aws::Errors::ChecksumError` if checksum validation fails. You may provide a `on_checksum_validated`
@@ -539,10 +540,13 @@ def upload_file(source, options = {})
539540
# @see Client#get_object
540541
# @see Client#head_object
541542
def download_file(destination, options = {})
542-
downloader = FileDownloader.new(client: client)
543+
download_opts = options.merge(bucket: bucket_name, key: key)
544+
executor = DefaultExecutor.new(max_threads: download_opts.delete([:thread_count]))
545+
downloader = FileDownloader.new(client: client, executor: executor)
543546
Aws::Plugins::UserAgent.metric('RESOURCE_MODEL') do
544-
downloader.download(destination, options.merge(bucket: bucket_name, key: key))
547+
downloader.download(destination, download_opts)
545548
end
549+
executor.shutdown
546550
true
547551
end
548552
deprecated(:download_file, use: 'Aws::S3::TransferManager#download_file', version: 'next major version')
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# frozen_string_literal: true
2+
3+
module Aws
4+
module S3
5+
# @api private
6+
class DefaultExecutor
7+
DEFAULT_MAX_THREADS = 10
8+
RUNNING = :running
9+
SHUTTING_DOWN = :shutting_down
10+
SHUTDOWN = :shutdown
11+
12+
def initialize(options = {})
13+
@max_threads = options[:max_threads] || DEFAULT_MAX_THREADS
14+
@state = RUNNING
15+
@queue = Queue.new
16+
@pool = []
17+
@mutex = Mutex.new
18+
end
19+
20+
# Submits a task for execution.
21+
# @param [Object] args Variable number of arguments to pass to the block
22+
# @param [Proc] block The block to be executed
23+
# @return [Boolean] Returns true if the task was submitted successfully
24+
def post(*args, &block)
25+
@mutex.synchronize do
26+
raise 'Executor has been shutdown and is no longer accepting tasks' unless @state == RUNNING
27+
28+
@queue << [args, block]
29+
ensure_worker_available
30+
end
31+
true
32+
end
33+
34+
# Immediately terminates all worker threads and clears pending tasks.
35+
# This is a forceful shutdown that doesn't wait for running tasks to complete.
36+
#
37+
# @return [Boolean] true when termination is complete
38+
def kill
39+
@mutex.synchronize do
40+
@state = SHUTDOWN
41+
@pool.each(&:kill)
42+
@pool.clear
43+
@queue.clear
44+
end
45+
true
46+
end
47+
48+
# Gracefully shuts down the executor, optionally with a timeout.
49+
# Stops accepting new tasks and waits for running tasks to complete.
50+
#
51+
# @param timeout [Numeric, nil] Maximum time in seconds to wait for shutdown.
52+
# If nil, waits indefinitely. If timeout expires, remaining threads are killed.
53+
# @return [Boolean] true when shutdown is complete
54+
def shutdown(timeout = nil)
55+
@mutex.synchronize do
56+
return true if @state == SHUTDOWN
57+
58+
@state = SHUTTING_DOWN
59+
@pool.size.times { @queue << :shutdown }
60+
end
61+
62+
if timeout
63+
deadline = Time.now + timeout
64+
@pool.each do |thread|
65+
remaining = deadline - Time.now
66+
break if remaining <= 0
67+
68+
thread.join([remaining, 0].max)
69+
end
70+
@pool.select(&:alive?).each(&:kill)
71+
else
72+
@pool.each(&:join)
73+
end
74+
75+
@mutex.synchronize do
76+
@pool.clear
77+
@state = SHUTDOWN
78+
end
79+
true
80+
end
81+
82+
private
83+
84+
def ensure_worker_available
85+
return unless @state == RUNNING
86+
87+
@pool.select!(&:alive?)
88+
@pool << spawn_worker if @pool.size < @max_threads
89+
end
90+
91+
def spawn_worker
92+
Thread.new do
93+
while (job = @queue.shift)
94+
break if job == :shutdown
95+
96+
args, block = job
97+
block.call(*args)
98+
end
99+
end
100+
end
101+
end
102+
end
103+
end

0 commit comments

Comments
 (0)