@@ -49,6 +49,14 @@ class StorageUploadCommand < ApiCommand
49
49
# @return [Integer]
50
50
attr_accessor :upload_chunk_size
51
51
52
+ # Unique upload_id of a resumable upload
53
+ # @return [String]
54
+ attr_accessor :upload_id
55
+
56
+ # Boolean Value to specify is a resumable upload is to be deleted or not
57
+ # @return [Boolean]
58
+ attr_accessor :delete_upload
59
+
52
60
# Ensure the content is readable and wrapped in an IO instance.
53
61
#
54
62
# @return [void]
@@ -61,7 +69,6 @@ def prepare!
61
69
# asserting that it already has a body. Form encoding is never used
62
70
# by upload requests.
63
71
self . body = '' unless self . body
64
-
65
72
super
66
73
if streamable? ( upload_source )
67
74
self . upload_io = upload_source
@@ -73,14 +80,16 @@ def prepare!
73
80
self . upload_content_type = type &.content_type
74
81
end
75
82
@close_io_on_finish = true
83
+ elsif !upload_id . nil? && delete_upload
84
+ @close_io_on_finish = false
76
85
else
77
86
fail Google ::Apis ::ClientError , 'Invalid upload source'
78
87
end
79
88
end
80
89
81
90
# Close IO stream when command done. Only closes the stream if it was opened by the command.
82
91
def release!
83
- upload_io . close if @close_io_on_finish
92
+ upload_io . close if @close_io_on_finish && ! upload_io . nil?
84
93
end
85
94
86
95
# Execute the command, retrying as necessary
@@ -96,8 +105,16 @@ def execute(client)
96
105
prepare!
97
106
opencensus_begin_span
98
107
@upload_chunk_size = options . upload_chunk_size
108
+ if upload_id . nil?
109
+ res = do_retry :initiate_resumable_upload , client
110
+ elsif delete_upload && !upload_id . nil?
111
+ construct_resumable_upload_url upload_id
112
+ res = do_retry :cancel_resumable_upload , client
113
+ else
114
+ construct_resumable_upload_url upload_id
115
+ res = do_retry :reinitiate_resumable_upload , client
116
+ end
99
117
100
- do_retry :initiate_resumable_upload , client
101
118
while @upload_incomplete
102
119
res = do_retry :send_upload_command , client
103
120
end
@@ -131,6 +148,22 @@ def initiate_resumable_upload(client)
131
148
error ( e , rethrow : true )
132
149
end
133
150
151
+ # Reinitiating resumable upload
152
+ def reinitiate_resumable_upload ( client )
153
+ logger . debug { sprintf ( 'Restarting resumable upload command to %s' , url ) }
154
+ check_resumable_upload client
155
+ upload_io . pos = @offset
156
+ end
157
+
158
+ # Making resumable upload url from upload_id
159
+ def construct_resumable_upload_url ( upload_id )
160
+ query_params = query . dup
161
+ query_params [ 'uploadType' ] = RESUMABLE
162
+ query_params [ 'upload_id' ] = upload_id
163
+ resumable_upload_params = query_params . map { |key , value | "#{ key } =#{ value } " } . join ( '&' )
164
+ @upload_url = "#{ url } &#{ resumable_upload_params } "
165
+ end
166
+
134
167
# Send the actual content
135
168
#
136
169
# @param [HTTPClient] client
@@ -160,6 +193,9 @@ def send_upload_command(client)
160
193
@offset += current_chunk_size if @upload_incomplete
161
194
success ( result )
162
195
rescue => e
196
+ logger . warn {
197
+ "error occured please use uploadId-#{ response . headers [ 'X-GUploader-UploadID' ] } to resume your upload"
198
+ } unless response . nil?
163
199
upload_io . pos = @offset
164
200
error ( e , rethrow : true )
165
201
end
@@ -182,6 +218,59 @@ def process_response(status, header, body)
182
218
super ( status , header , body )
183
219
end
184
220
221
+ def check_resumable_upload ( client )
222
+ # Setting up request header
223
+ request_header = header . dup
224
+ request_header [ CONTENT_RANGE_HEADER ] = "bytes */#{ upload_io . size } "
225
+ request_header [ CONTENT_LENGTH_HEADER ] = '0'
226
+ # Initiating call
227
+ response = client . put ( @upload_url , header : request_header , follow_redirect : true )
228
+ handle_resumable_upload_http_response_codes ( response )
229
+ end
230
+
231
+ # Cancel resumable upload
232
+ def cancel_resumable_upload ( client )
233
+ # Setting up request header
234
+ request_header = header . dup
235
+ request_header [ CONTENT_LENGTH_HEADER ] = '0'
236
+ # Initiating call
237
+ response = client . delete ( @upload_url , header : request_header , follow_redirect : true )
238
+ handle_resumable_upload_http_response_codes ( response )
239
+
240
+ if !@upload_incomplete && ( 400 ..499 ) . include? ( response . code . to_i )
241
+ @close_io_on_finish = true
242
+ true # method returns true if upload is successfully cancelled
243
+ else
244
+ logger . debug { sprintf ( "Failed to cancel upload session. Response: #{ response . code } - #{ response . body } " ) }
245
+ end
246
+
247
+ end
248
+
249
+ def handle_resumable_upload_http_response_codes ( response )
250
+ code = response . code . to_i
251
+
252
+ case code
253
+ when 308
254
+ if response . headers [ 'Range' ]
255
+ range = response . headers [ 'Range' ]
256
+ @offset = range . split ( '-' ) . last . to_i + 1
257
+ logger . debug { sprintf ( "Upload is incomplete. Bytes uploaded so far: #{ range } " ) }
258
+ else
259
+ logger . debug { sprintf ( 'No bytes uploaded yet.' ) }
260
+ end
261
+ @upload_incomplete = true
262
+ when 400 ..499
263
+ # Upload is canceled
264
+ @upload_incomplete = false
265
+ when 200 , 201
266
+ # Upload is complete.
267
+ @upload_incomplete = false
268
+ else
269
+ logger . debug { sprintf ( "Unexpected response: #{ response . code } - #{ response . body } " ) }
270
+ @upload_incomplete = true
271
+ end
272
+ end
273
+
185
274
def streamable? ( upload_source )
186
275
upload_source . is_a? ( IO ) || upload_source . is_a? ( StringIO ) || upload_source . is_a? ( Tempfile )
187
276
end
0 commit comments