@@ -38,10 +38,53 @@ public class MockGcsBlobStore {
3838
3939 record BlobVersion (String path , long generation , BytesReference contents ) {}
4040
41- record ResumableUpload (String uploadId , String path , Long ifGenerationMatch , BytesReference contents , boolean completed ) {
41+ record ResumableUpload (
42+ String uploadId ,
43+ String path ,
44+ Long ifGenerationMatch ,
45+ BytesReference contents ,
46+ Integer finalLength ,
47+ boolean completed
48+ ) {
49+
50+ ResumableUpload (String uploadId , String path , Long ifGenerationMatch ) {
51+ this (uploadId , path , ifGenerationMatch , BytesArray .EMPTY , null , false );
52+ }
53+
54+ public ResumableUpload update (BytesReference contents ) {
55+ if (completed ) {
56+ throw new IllegalStateException ("Blob already completed" );
57+ }
58+ return new ResumableUpload (uploadId , path , ifGenerationMatch , contents , null , false );
59+ }
60+
61+ /**
62+ * When we complete, we nullify our reference to the contents to allow it to be collected if it gets overwritten
63+ */
64+ public ResumableUpload complete () {
65+ if (completed ) {
66+ throw new IllegalStateException ("Blob already completed" );
67+ }
68+ return new ResumableUpload (uploadId , path , ifGenerationMatch , null , contents .length (), true );
69+ }
4270
43- public ResumableUpload update (BytesReference contents , boolean completed ) {
44- return new ResumableUpload (uploadId , path , ifGenerationMatch , contents , completed );
71+ public HttpHeaderParser .Range getRange () {
72+ int length = length ();
73+ if (length > 0 ) {
74+ return new HttpHeaderParser .Range (0 , length - 1 );
75+ } else {
76+ return null ;
77+ }
78+ }
79+
80+ public int length () {
81+ if (finalLength != null ) {
82+ return finalLength ;
83+ }
84+ if (contents != null ) {
85+ return contents .length ();
86+ }
87+ return 0 ;
4588 }
4689 }
4790
@@ -93,7 +136,7 @@ BlobVersion updateBlob(String path, Long ifGenerationMatch, BytesReference conte
93136
94137 ResumableUpload createResumableUpload (String path , Long ifGenerationMatch ) {
95138 final String uploadId = UUIDs .randomBase64UUID ();
96- final ResumableUpload value = new ResumableUpload (uploadId , path , ifGenerationMatch , BytesArray . EMPTY , false );
139+ final ResumableUpload value = new ResumableUpload (uploadId , path , ifGenerationMatch );
97140 resumableUploads .put (uploadId , value );
98141 return value ;
99142 }
@@ -114,16 +157,14 @@ UpdateResponse updateResumableUpload(String uploadId, HttpHeaderParser.ContentRa
114157 throw failAndThrow ("Attempted to update a non-existent resumable: " + uid );
115158 }
116159
117- if (contentRange .hasRange () == false ) {
118- // Content-Range: */... is a status check https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
160+ ResumableUpload valueToReturn = existing ;
161+
162+ // Handle the request, a range indicates a chunk of data was submitted
163+ if (contentRange .hasRange ()) {
119164 if (existing .completed ) {
120- updateResponse .set (new UpdateResponse (RestStatus .OK .getStatus (), calculateRangeHeader (blobs .get (existing .path ))));
121- } else {
122- final HttpHeaderParser .Range range = calculateRangeHeader (existing );
123- updateResponse .set (new UpdateResponse (RESUME_INCOMPLETE , range ));
165+ throw failAndThrow ("Attempted to write more to a completed resumable upload" );
124166 }
125- return existing ;
126- } else {
167+
127168 if (contentRange .start () > contentRange .end ()) {
128169 throw failAndThrow ("Invalid content range " + contentRange );
129170 }
@@ -143,29 +184,25 @@ UpdateResponse updateResumableUpload(String uploadId, HttpHeaderParser.ContentRa
143184 existing .contents ,
144185 requestBody .slice (offset , requestBody .length ())
145186 );
146- // We just received the last chunk, update the blob and remove the resumable upload from the map
147- if (contentRange .hasSize () && updatedContent .length () == contentRange .size ()) {
148- updateBlob (existing .path (), existing .ifGenerationMatch , updatedContent );
149- updateResponse .set (new UpdateResponse (RestStatus .OK .getStatus (), null ));
150- return existing .update (BytesArray .EMPTY , true );
151- }
152- final ResumableUpload updated = existing .update (updatedContent , false );
153- updateResponse .set (new UpdateResponse (RESUME_INCOMPLETE , calculateRangeHeader (updated )));
154- return updated ;
187+ valueToReturn = existing .update (updatedContent );
188+ }
189+
190+ // Next we determine the response
191+ if (valueToReturn .completed ) {
192+ updateResponse .set (new UpdateResponse (RestStatus .OK .getStatus (), valueToReturn .getRange ()));
193+ } else if (contentRange .hasSize () && contentRange .size () == valueToReturn .contents .length ()) {
194+ updateBlob (valueToReturn .path (), valueToReturn .ifGenerationMatch (), valueToReturn .contents );
195+ valueToReturn = valueToReturn .complete ();
196+ updateResponse .set (new UpdateResponse (RestStatus .OK .getStatus (), valueToReturn .getRange ()));
197+ } else {
198+ updateResponse .set (new UpdateResponse (RESUME_INCOMPLETE , valueToReturn .getRange ()));
155199 }
200+ return valueToReturn ;
156201 });
157202 assert updateResponse .get () != null : "Should always produce an update response" ;
158203 return updateResponse .get ();
159204 }
160205
161- private static HttpHeaderParser .Range calculateRangeHeader (ResumableUpload resumableUpload ) {
162- return resumableUpload .contents .length () > 0 ? new HttpHeaderParser .Range (0 , resumableUpload .contents .length () - 1 ) : null ;
163- }
164-
165- private static HttpHeaderParser .Range calculateRangeHeader (BlobVersion blob ) {
166- return blob .contents .length () > 0 ? new HttpHeaderParser .Range (0 , blob .contents .length () - 1 ) : null ;
167- }
168-
169206 record UpdateResponse (int statusCode , HttpHeaderParser .Range rangeHeader ) {}
170207
171208 void deleteBlob (String path ) {
0 commit comments