Skip to content

Commit d9f8a13

Browse files
authored
feat(core): Add storage upload to move away from unified upload protocol (#11508)
1 parent f5007a5 commit d9f8a13

File tree

7 files changed

+381
-9
lines changed

7 files changed

+381
-9
lines changed

google-apis-core/lib/google/apis/core/base_service.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
require 'google/apis/core/api_command'
2020
require 'google/apis/core/batch'
2121
require 'google/apis/core/upload'
22+
require 'google/apis/core/storage_upload'
2223
require 'google/apis/core/download'
2324
require 'google/apis/core/storage_download'
2425
require 'google/apis/options'
@@ -318,6 +319,26 @@ def make_upload_command(method, path, options)
318319
command
319320
end
320321

322+
# Create a new storage upload command.
323+
# This is specifically for storage because we are moving to a new upload protocol.
324+
# Ref: https://cloud.google.com/storage/docs/performing-resumable-uploads
325+
#
326+
# @param [Symbol] method
327+
# HTTP method for uploading. The initial request to initiate a resumable session
328+
# is :post and the subsequent chunks uploaded to the session are :put
329+
# @param [String] path
330+
# Additional path to upload endpoint, appended to API base path
331+
# @param [Hash, Google::Apis::RequestOptions] options
332+
# Request-specific options
333+
# @return [Google::Apis::Core::StorageUploadCommand]
334+
def make_storage_upload_command(method, path, options)
335+
template = Addressable::Template.new(root_url + upload_path + path)
336+
command = StorageUploadCommand.new(method, template, client_version: client_version)
337+
command.options = request_options.merge(options)
338+
apply_command_defaults(command)
339+
command
340+
end
341+
321342
# Create a new download command.
322343
#
323344
# @param [symbol] method

google-apis-core/lib/google/apis/core/batch.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
require 'google/apis/core/multipart'
2929
require 'google/apis/core/http_command'
3030
require 'google/apis/core/upload'
31+
require 'google/apis/core/storage_upload'
3132
require 'google/apis/core/download'
3233
require 'google/apis/core/composite_io'
3334
require 'addressable/uri'
@@ -120,7 +121,7 @@ def prepare!
120121
end
121122

122123
def ensure_valid_command(command)
123-
if command.is_a?(Google::Apis::Core::BaseUploadCommand) || command.is_a?(Google::Apis::Core::DownloadCommand) || command.is_a?(Google::Apis::Core::StorageDownloadCommand)
124+
if command.is_a?(Google::Apis::Core::BaseUploadCommand) || command.is_a?(Google::Apis::Core::DownloadCommand) || command.is_a?(Google::Apis::Core::StorageDownloadCommand) || command.is_a?(Google::Apis::Core::StorageUploadCommand)
124125
fail Google::Apis::ClientError, 'Can not include media requests in batch'
125126
end
126127
fail Google::Apis::ClientError, 'Invalid command object' unless command.is_a?(HttpCommand)

google-apis-core/lib/google/apis/core/http_command.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,17 @@ def initialize(method, url, body: nil)
9898
# @raise [Google::Apis::ServerError] An error occurred on the server and the request can be retried
9999
# @raise [Google::Apis::ClientError] The request is invalid and should not be retried without modification
100100
# @raise [Google::Apis::AuthorizationError] Authorization is required
101-
def execute(client)
101+
def execute(client, &block)
102102
prepare!
103103
opencensus_begin_span
104+
do_retry :execute_once, client, &block
105+
ensure
106+
opencensus_end_span
107+
@http_res = nil
108+
release!
109+
end
110+
111+
def do_retry func, client
104112
begin
105113
Retriable.retriable tries: options.retries + 1,
106114
max_elapsed_time: options.max_elapsed_time,
@@ -115,7 +123,7 @@ def execute(client)
115123
Retriable.retriable tries: auth_tries,
116124
on: [Google::Apis::AuthorizationError, Signet::AuthorizationError, Signet::RemoteServerError, Signet::UnexpectedStatusError],
117125
on_retry: proc { |*| refresh_authorization } do
118-
execute_once(client).tap do |result|
126+
send(func, client).tap do |result|
119127
if block_given?
120128
yield result, nil
121129
end
@@ -129,10 +137,6 @@ def execute(client)
129137
raise e
130138
end
131139
end
132-
ensure
133-
opencensus_end_span
134-
@http_res = nil
135-
release!
136140
end
137141

138142
# Refresh the authorization authorization after a 401 error
@@ -216,7 +220,7 @@ def process_response(status, header, body)
216220
def check_status(status, header = nil, body = nil, message = nil)
217221
# TODO: 304 Not Modified depends on context...
218222
case status
219-
when 200...300
223+
when 200...300, 308
220224
nil
221225
when 301, 302, 303, 307
222226
message ||= sprintf('Redirect to %s', header['Location'])
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
# Copyright 2022 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
require 'google/apis/core/http_command'
16+
require 'google/apis/core/api_command'
17+
require 'google/apis/errors'
18+
require 'tempfile'
19+
require 'mini_mime'
20+
21+
module Google
22+
module Apis
23+
module Core
24+
# Base upload command. Not intended to be used directly
25+
# @private
26+
class StorageUploadCommand < ApiCommand
27+
CONTENT_LENGTH_HEADER = "Content-Length"
28+
CONTENT_TYPE_HEADER = "Content-Type"
29+
UPLOAD_CONTENT_TYPE_HEADER = "X-Upload-Content-Type"
30+
LOCATION_HEADER = "Location"
31+
CONTENT_RANGE_HEADER = "Content-Range"
32+
RESUMABLE = "resumable"
33+
OK_STATUS = 200
34+
CHUNK_SIZE = 8 * 1024 * 1024 # 8 MB
35+
36+
# File name or IO containing the content to upload
37+
# @return [String, File, #read]
38+
attr_accessor :upload_source
39+
40+
# Content type of the upload material
41+
# @return [String]
42+
attr_accessor :upload_content_type
43+
44+
# Content, as UploadIO
45+
# @return [Google::Apis::Core::UploadIO]
46+
attr_accessor :upload_io
47+
48+
# Ensure the content is readable and wrapped in an IO instance.
49+
#
50+
# @return [void]
51+
# @raise [Google::Apis::ClientError] if upload source is invalid
52+
def prepare!
53+
@upload_url = nil
54+
@offset = 0
55+
@upload_incomplete = true
56+
# Prevent the command from populating the body with form encoding, by
57+
# asserting that it already has a body. Form encoding is never used
58+
# by upload requests.
59+
self.body = '' unless self.body
60+
61+
super
62+
if streamable?(upload_source)
63+
self.upload_io = upload_source
64+
@close_io_on_finish = false
65+
elsif self.upload_source.is_a?(String)
66+
self.upload_io = File.new(upload_source, 'r')
67+
if self.upload_content_type.nil?
68+
type = MiniMime.lookup_by_filename(upload_source)
69+
self.upload_content_type = type&.content_type
70+
end
71+
@close_io_on_finish = true
72+
else
73+
fail Google::Apis::ClientError, 'Invalid upload source'
74+
end
75+
end
76+
77+
# Close IO stream when command done. Only closes the stream if it was opened by the command.
78+
def release!
79+
upload_io.close if @close_io_on_finish
80+
end
81+
82+
# Execute the command, retrying as necessary
83+
#
84+
# @param [HTTPClient] client
85+
# HTTP client
86+
# @yield [result, err] Result or error if block supplied
87+
# @return [Object]
88+
# @raise [Google::Apis::ServerError] An error occurred on the server and the request can be retried
89+
# @raise [Google::Apis::ClientError] The request is invalid and should not be retried without modification
90+
# @raise [Google::Apis::AuthorizationError] Authorization is required
91+
def execute(client)
92+
prepare!
93+
opencensus_begin_span
94+
95+
do_retry :initiate_resumable_upload, client
96+
while @upload_incomplete
97+
res = do_retry :send_upload_command, client
98+
end
99+
res
100+
ensure
101+
opencensus_end_span
102+
@http_res = nil
103+
release!
104+
end
105+
106+
def initiate_resumable_upload(client)
107+
logger.debug { sprintf('Intiating resumable upload command to %s', url) }
108+
109+
request_header = header.dup
110+
apply_request_options(request_header)
111+
112+
request_query = query.dup
113+
request_query['uploadType'] = RESUMABLE
114+
115+
request_header[CONTENT_LENGTH_HEADER] = upload_io.size.to_s
116+
request_header[CONTENT_TYPE_HEADER] = JSON_CONTENT_TYPE
117+
request_header[UPLOAD_CONTENT_TYPE_HEADER] = upload_content_type unless upload_content_type.nil?
118+
119+
response = client.post(url.to_s, query: request_query,
120+
body: body,
121+
header: request_header,
122+
follow_redirect: true)
123+
result = process_response(response.status_code, response.header, response.body)
124+
success(result)
125+
rescue => e
126+
error(e, rethrow: true)
127+
end
128+
129+
# Send the actual content
130+
#
131+
# @param [HTTPClient] client
132+
# HTTP client
133+
# @return [HTTP::Message]
134+
# @raise [Google::Apis::ServerError] Unable to send the request
135+
def send_upload_command(client)
136+
logger.debug { sprintf('Sending upload command to %s', @upload_url) }
137+
138+
remaining_content_size = upload_io.size - @offset
139+
current_chunk_size = remaining_content_size < CHUNK_SIZE ? remaining_content_size : CHUNK_SIZE
140+
141+
request_header = header.dup
142+
request_header[CONTENT_RANGE_HEADER] = sprintf('bytes %d-%d/%d', @offset, @offset+current_chunk_size-1, upload_io.size)
143+
request_header[CONTENT_LENGTH_HEADER] = current_chunk_size
144+
chunk_body = upload_io.read(current_chunk_size)
145+
146+
response = client.put(@upload_url, body: chunk_body, header: request_header, follow_redirect: true)
147+
148+
result = process_response(response.status_code, response.header, response.body)
149+
@upload_incomplete = false if response.status_code.eql? OK_STATUS
150+
@offset += current_chunk_size if @upload_incomplete
151+
success(result)
152+
rescue => e
153+
upload_io.pos = @offset
154+
error(e, rethrow: true)
155+
end
156+
157+
# Check the to see if the upload is complete or needs to be resumed.
158+
#
159+
# @param [Integer] status
160+
# HTTP status code of response
161+
# @param [HTTP::Message::Headers] header
162+
# Response headers
163+
# @param [String, #read] body
164+
# Response body
165+
# @return [Object]
166+
# Response object
167+
# @raise [Google::Apis::ServerError] An error occurred on the server and the request can be retried
168+
# @raise [Google::Apis::ClientError] The request is invalid and should not be retried without modification
169+
# @raise [Google::Apis::AuthorizationError] Authorization is required
170+
def process_response(status, header, body)
171+
@upload_url = header[LOCATION_HEADER].first unless header[LOCATION_HEADER].empty?
172+
super(status, header, body)
173+
end
174+
175+
def streamable?(upload_source)
176+
upload_source.is_a?(IO) || upload_source.is_a?(StringIO) || upload_source.is_a?(Tempfile)
177+
end
178+
end
179+
end
180+
end
181+
end

google-apis-core/spec/google/apis/core/service_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,21 @@
202202
include_examples 'with options'
203203
end
204204

205+
context 'when making storage upload commands' do
206+
let(:command) { service.send(:make_storage_upload_command, :post, 'zoo/animals', authorization: 'foo') }
207+
208+
it 'should return the correct command type' do
209+
expect(command).to be_an_instance_of(Google::Apis::Core::StorageUploadCommand)
210+
end
211+
212+
it 'should build a correct URL' do
213+
url = command.url.expand({}).to_s
214+
expect(url).to eql 'https://www.googleapis.com/upload/zoo/animals'
215+
end
216+
217+
include_examples 'with options'
218+
end
219+
205220
context 'with batch' do
206221
before(:example) do
207222
response = <<EOF.gsub(/\n/, "\r\n")

0 commit comments

Comments
 (0)