diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb index e21e4bc6bf9..b991f74e764 100644 --- a/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/customizations.rb @@ -24,8 +24,11 @@ module S3 autoload :ExpressCredentialsProvider, 'aws-sdk-s3/express_credentials_provider' # s3 access grants auth - autoload :AccessGrantsCredentials, 'aws-sdk-s3/access_grants_credentials' autoload :AccessGrantsCredentialsProvider, 'aws-sdk-s3/access_grants_credentials_provider' + + # testing transfer manager + autoload :DirectoryUploader, 'aws-sdk-s3/directory_uploader' + autoload :TransferManager, 'aws-sdk-s3/transfer_manager' end end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb new file mode 100644 index 00000000000..2b6e95937a2 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/directory_uploader.rb @@ -0,0 +1,102 @@ +# frozen_string_literal: true + +require 'find' +require 'set' +require 'thread' + +module Aws + module S3 + # @api private + class DirectoryUploader + def initialize(options = {}) + @client = options[:client] || Client.new + @thread_count = options[:thread_count] || 10 + end + + # @return [Client] + attr_reader :client + + def upload(source, options = {}) + raise ArgumentError, 'Invalid directory' unless Dir.exist?(source) + + upload_opts = options.dup + @source = source + @recursive = upload_opts.delete(:recursive) || false + @follow_symlinks = upload_opts.delete(:follow_symlinks) || false + @s3_prefix = upload_opts.delete(:s3_prefix) || nil + @s3_delimiter = upload_opts.delete(:s3_delimiter) || '/' + @filter_callback = upload_opts.delete(:filter_callback) || nil + + uploader = FileUploader.new(multipart_threshold: upload_opts.delete(:multipart_threshold), client: @client) + + queue = SizedQueue.new(5) # TODO: random number + @disable_queue = false + _producer = Thread.new do + if @recursive + stream_recursive_files(queue) + else + stream_direct_files(queue) + end + @thread_count.times { queue << :done } + end + + threads = [] + @thread_count.times do + thread = Thread.new do + while (file = queue.shift) != :done + path = File.join(@source, file) + # TODO: key to consider s3_prefix and custom delimiter + uploader.upload(path, upload_opts.merge(key: file)) + end + nil + rescue StandardError => e # TODO: handle failure policies + @disable_queue = true + queue.clear + raise e + end + threads << thread + end + threads.map(&:value).compact + end + + private + + def stream_recursive_files(queue) + visited = Set.new + # TODO: add filter callback + Find.find(@source) do |p| + break if @disable_queue + + if !@follow_symlinks && File.symlink?(p) + Find.prune + next + end + + absolute_path = File.realpath(p) + if visited.include?(absolute_path) + Find.prune + next + end + + visited << absolute_path + + # TODO: if non-default s3_delimiter is used, validate here and fail + queue << p.sub(%r{^#{Regexp.escape(@source)}/}, '') if File.file?(p) + end + end + + def stream_direct_files(queue) + # TODO: add filter callback4 + Dir.each_child(@source) do |entry| + break if @disable_queue + + path = File.join(@source, entry) + next if !@follow_symlinks && File.symlink?(path) + + # TODO: if non-default s3_delimiter is used, validate here and fail + queue << entry if File.file?(path) + end + end + end + end +end diff --git a/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb new file mode 100644 index 00000000000..7dc6669d9f3 --- /dev/null +++ b/gems/aws-sdk-s3/lib/aws-sdk-s3/transfer_manager.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +module Aws + module S3 + # A high-level S3 transfer utility that provides enhanced upload and download + # capabilities with automatic multipart handling, progress tracking, and + # handling of large files. The following features are supported: + # + # * upload a S3 object with multipart upload + # * download a S3 object with multipart download + # * track transfer progress by using progress listener + class TransferManager + def initialize(options = {}) + @client = options[:client] || Client.new + end + + attr_reader :client + + def upload_file(source, options = {}) + uploading_options = options.dup + uploader = FileUploader.new(multipart_threshold: uploading_options.delete(:multipart_threshold), client: @client) + # TODO: wrap with user-agent metric tracking + response = uploader.upload(source, uploading_options) + yield response if block_given? + true + end + + def upload_stream(options = {}, &block) + uploading_options = options.dup + uploader = MultipartStreamUploader.new( + client: @client, + thread_count: uploading_options.delete(:thread_count), + tempfile: uploading_options.delete(:tempfile), + part_size: uploading_options.delete(:part_size) + ) + # TODO: wrap with user-agent metric tracking + uploader.upload(uploading_options, &block) + true + end + + def download_file(destination, options = {}) + downloader = FileDownloader.new(client: @client) + # TODO: wrap with user-agent metric tracking + downloader.download(destination, options) + true + end + + def upload_directory(source, options = {}) + upload_directory_opts = options.dup + directory_uploader = DirectoryUploader.new( + client: @client, + thread_count: upload_directory_opts.delete(:thread_count) + ) + directory_uploader.upload(source, upload_directory_opts) + true # TODO: need to change depending on failure policy set + end + + def download_directory(source, options = {}); end + end + end +end