Skip to content

Commit 7064034

Browse files
authored
chore: refactor BlobAppendableUpload, so it follows the lifecycle of BlobWriteSession (googleapis#3036)
This allows access to the resulting BlobInfo, even if not finalized. The returned BlobInfo will have generation and a client side updated `size` matching the number of persisted bytes the client knows has been ack'd by GCS. Add closeAction option to BlobAppendableUploadConfig allowing configuration of finalization on close behavior.
1 parent 386d783 commit 7064034

13 files changed

+555
-271
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUpload.java

Lines changed: 102 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.BetaApi;
2121
import com.google.api.core.InternalExtensionOnly;
22+
import com.google.cloud.storage.BlobAppendableUploadConfig.CloseAction;
2223
import com.google.cloud.storage.Storage.BlobWriteOption;
2324
import java.io.IOException;
2425
import java.nio.ByteBuffer;
2526
import java.nio.channels.WritableByteChannel;
27+
import java.util.concurrent.TimeUnit;
2628

2729
/**
2830
* Interface representing those methods which can be used to write to and interact with an
@@ -32,37 +34,120 @@
3234
*/
3335
@BetaApi
3436
@InternalExtensionOnly
35-
public interface BlobAppendableUpload extends AutoCloseable, WritableByteChannel {
37+
public interface BlobAppendableUpload extends BlobWriteSession {
3638

3739
/**
38-
* Write some bytes to the appendable session. Whether a flush happens will depend on how many
39-
* bytes have been written prior, how many bytes are being written now and what {@link
40-
* BlobAppendableUploadConfig} was provided when creating the {@link BlobAppendableUpload}.
40+
* Open the {@link AppendableUploadWriteableByteChannel AppendableUploadWriteableByteChannel} for
41+
* this session.
4142
*
42-
* <p>This method can block the invoking thread in order to ensure written bytes are acknowledged
43-
* by Google Cloud Storage.
43+
* <p>A session may only be {@code open}ed once. If multiple calls to open are made, an illegal
44+
* state exception will be thrown
4445
*
45-
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
46+
* <p>The returned {@code AppendableUploadWriteableByteChannel} can throw IOExceptions from any of
47+
* its usual methods. Any {@link IOException} thrown can have a cause of a {@link
48+
* StorageException}. However, not all {@code IOExceptions} will have {@code StorageException}s.
49+
*
50+
* @throws IOException When creating the {@link AppendableUploadWriteableByteChannel} if an
51+
* unrecoverable underlying IOException occurs it can be rethrown
52+
* @throws IllegalStateException if open is called more than once
53+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
4654
*/
4755
@Override
48-
int write(ByteBuffer src) throws IOException;
56+
AppendableUploadWriteableByteChannel open() throws IOException;
4957

5058
/**
51-
* Close this instance to further {@link #write(ByteBuffer)}ing. This will close any underlying
52-
* stream and release any releasable resources once out of scope.
59+
* Return an {@link ApiFuture}{@code <BlobInfo>} which will represent the state of the object in
60+
* Google Cloud Storage.
61+
*
62+
* <p>This future will not resolve until:
63+
*
64+
* <ol>
65+
* <li>The object is successfully finalized in Google Cloud Storage by calling {@link
66+
* AppendableUploadWriteableByteChannel#finalizeAndClose()
67+
* AppendableUploadWriteableByteChannel#finalizeAndClose()}
68+
* <li>This session is detached from the upload without finalizing by calling {@link
69+
* AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
70+
* AppendableUploadWriteableByteChannel#closeWithoutFinalizing()}
71+
* <li>The session is closed by calling {@link AppendableUploadWriteableByteChannel#close()
72+
* AppendableUploadWriteableByteChannel#close()}
73+
* <li>A terminal failure occurs, the terminal failure will become the exception result
74+
* </ol>
75+
*
76+
* <p><i>NOTICE:</i> Some fields may not be populated unless finalization has completed.
77+
*
78+
* <p>If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link
79+
* ApiFuture#get(long, TimeUnit)} will result in an {@link
80+
* java.util.concurrent.ExecutionException} with the cause.
5381
*
54-
* <p>{@link #finalizeUpload()} can be called after this method, but it will not carry any bytes
55-
* with it.
82+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
5683
*/
5784
@Override
58-
void close() throws IOException;
85+
ApiFuture<BlobInfo> getResult();
5986

6087
/**
61-
* Finalize the appendable upload, close any underlying stream and release any releasable
62-
* resources once out of scope.
88+
* The {@link WritableByteChannel} returned from {@link BlobAppendableUpload#open()}.
6389
*
64-
* <p>Once this method is called, and returns no more writes to the object will be allowed by GCS.
90+
* <p>This interface allows writing bytes to an Appendable Upload, and provides methods to close
91+
* this channel -- optionally finalizing the upload.
92+
*
93+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
6594
*/
6695
@BetaApi
67-
ApiFuture<BlobInfo> finalizeUpload() throws IOException;
96+
@InternalExtensionOnly
97+
interface AppendableUploadWriteableByteChannel extends WritableByteChannel {
98+
99+
/**
100+
* Finalize the upload and close this instance to further {@link #write(ByteBuffer)}ing. This
101+
* will close any underlying stream and release any releasable resources once out of scope.
102+
*
103+
* <p>Once this method is called, and returns no more writes to the object will be allowed by
104+
* GCS.
105+
*
106+
* <p>This method and {@link #close()} are mutually exclusive. If one of the other methods are
107+
* called before this method, this method will be a no-op.
108+
*
109+
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
110+
* @see BlobAppendableUploadConfig.CloseAction#FINALIZE_WHEN_CLOSING
111+
* @see BlobAppendableUploadConfig#getCloseAction()
112+
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
113+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
114+
*/
115+
@BetaApi
116+
void finalizeAndClose() throws IOException;
117+
118+
/**
119+
* Close this instance to further {@link #write(ByteBuffer)}ing without finalizing the upload.
120+
* This will close any underlying stream and release any releasable resources once out of scope.
121+
*
122+
* <p>This method, {@link AppendableUploadWriteableByteChannel#finalizeAndClose()} and {@link
123+
* AppendableUploadWriteableByteChannel#close()} are mutually exclusive. If one of the other
124+
* methods are called before this method, this method will be a no-op.
125+
*
126+
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
127+
* @see BlobAppendableUploadConfig.CloseAction#CLOSE_WITHOUT_FINALIZING
128+
* @see BlobAppendableUploadConfig#getCloseAction()
129+
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
130+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
131+
*/
132+
@BetaApi
133+
void closeWithoutFinalizing() throws IOException;
134+
135+
/**
136+
* Close this instance to further {@link #write(ByteBuffer)}ing.
137+
*
138+
* <p>Whether the upload is finalized during this depends on the {@link
139+
* BlobAppendableUploadConfig#getCloseAction()} provided to create the {@link
140+
* BlobAppendableUpload}. If {@link BlobAppendableUploadConfig#getCloseAction()}{@code ==
141+
* }{@link CloseAction#FINALIZE_WHEN_CLOSING}, {@link #finalizeAndClose()} will be called. If
142+
* {@link BlobAppendableUploadConfig#getCloseAction()}{@code == }{@link
143+
* CloseAction#CLOSE_WITHOUT_FINALIZING}, {@link #closeWithoutFinalizing()} will be called.
144+
*
145+
* @see Storage#blobAppendableUpload(BlobInfo, BlobAppendableUploadConfig, BlobWriteOption...)
146+
* @see BlobAppendableUploadConfig#getCloseAction()
147+
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
148+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
149+
*/
150+
@BetaApi
151+
void close() throws IOException;
152+
}
68153
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobAppendableUploadConfig.java

Lines changed: 138 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,21 @@
1919
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
2020
import static java.util.Objects.requireNonNull;
2121

22+
import com.google.api.core.ApiFutures;
2223
import com.google.api.core.BetaApi;
2324
import com.google.api.core.InternalApi;
25+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
26+
import com.google.api.gax.rpc.AbortedException;
27+
import com.google.api.gax.rpc.ApiException;
28+
import com.google.cloud.storage.BlobAppendableUpload.AppendableUploadWriteableByteChannel;
29+
import com.google.cloud.storage.BlobAppendableUploadImpl.AppendableObjectBufferedWritableByteChannel;
2430
import com.google.cloud.storage.Storage.BlobWriteOption;
2531
import com.google.cloud.storage.TransportCompatibility.Transport;
32+
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
33+
import com.google.cloud.storage.UnifiedOpts.Opts;
34+
import com.google.storage.v2.BidiWriteObjectRequest;
35+
import com.google.storage.v2.BidiWriteObjectResponse;
36+
import com.google.storage.v2.Object;
2637
import javax.annotation.concurrent.Immutable;
2738

2839
/**
@@ -39,14 +50,20 @@
3950
public final class BlobAppendableUploadConfig {
4051

4152
private static final BlobAppendableUploadConfig INSTANCE =
42-
new BlobAppendableUploadConfig(FlushPolicy.minFlushSize(_256KiB), Hasher.enabled());
53+
new BlobAppendableUploadConfig(
54+
FlushPolicy.minFlushSize(_256KiB),
55+
Hasher.enabled(),
56+
CloseAction.CLOSE_WITHOUT_FINALIZING);
4357

4458
private final FlushPolicy flushPolicy;
4559
private final Hasher hasher;
60+
private final CloseAction closeAction;
4661

47-
private BlobAppendableUploadConfig(FlushPolicy flushPolicy, Hasher hasher) {
62+
private BlobAppendableUploadConfig(
63+
FlushPolicy flushPolicy, Hasher hasher, CloseAction closeAction) {
4864
this.flushPolicy = flushPolicy;
4965
this.hasher = hasher;
66+
this.closeAction = closeAction;
5067
}
5168

5269
/**
@@ -77,7 +94,37 @@ public BlobAppendableUploadConfig withFlushPolicy(FlushPolicy flushPolicy) {
7794
if (this.flushPolicy.equals(flushPolicy)) {
7895
return this;
7996
}
80-
return new BlobAppendableUploadConfig(flushPolicy, hasher);
97+
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
98+
}
99+
100+
/**
101+
* The {@link CloseAction} which will dictate the behavior of {@link
102+
* AppendableUploadWriteableByteChannel#close()}.
103+
*
104+
* <p><i>Default:</i> {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
105+
*
106+
* @see #withCloseAction(CloseAction)
107+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
108+
*/
109+
@BetaApi
110+
public CloseAction getCloseAction() {
111+
return closeAction;
112+
}
113+
114+
/**
115+
* Return an instance with the {@code CloseAction} set to be the specified value. <i>Default:</i>
116+
* {@link CloseAction#CLOSE_WITHOUT_FINALIZING}
117+
*
118+
* @see #getCloseAction()
119+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
120+
*/
121+
@BetaApi
122+
public BlobAppendableUploadConfig withCloseAction(CloseAction closeAction) {
123+
requireNonNull(closeAction, "closeAction must be non null");
124+
if (this.closeAction == closeAction) {
125+
return this;
126+
}
127+
return new BlobAppendableUploadConfig(flushPolicy, hasher, closeAction);
81128
}
82129

83130
/**
@@ -108,7 +155,8 @@ BlobAppendableUploadConfig withCrc32cValidationEnabled(boolean enabled) {
108155
} else if (!enabled && Hasher.noop().equals(hasher)) {
109156
return this;
110157
}
111-
return new BlobAppendableUploadConfig(flushPolicy, enabled ? Hasher.enabled() : Hasher.noop());
158+
return new BlobAppendableUploadConfig(
159+
flushPolicy, enabled ? Hasher.enabled() : Hasher.noop(), closeAction);
112160
}
113161

114162
/** Never to be made public until {@link Hasher} is public */
@@ -125,6 +173,7 @@ Hasher getHasher() {
125173
* <pre>{@code
126174
* BlobAppendableUploadConfig.of()
127175
* .withFlushPolicy(FlushPolicy.minFlushSize(256 * 1024))
176+
* .withCloseAction(CloseAction.CLOSE_WITHOUT_FINALIZING)
128177
* }</pre>
129178
*
130179
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
@@ -134,4 +183,89 @@ Hasher getHasher() {
134183
public static BlobAppendableUploadConfig of() {
135184
return INSTANCE;
136185
}
186+
187+
/**
188+
* Enum providing the possible actions which can be taken during the {@link
189+
* AppendableUploadWriteableByteChannel#close()} call.
190+
*
191+
* @see AppendableUploadWriteableByteChannel#close()
192+
* @see BlobAppendableUploadConfig#withCloseAction(CloseAction)
193+
* @see BlobAppendableUploadConfig#getCloseAction()
194+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
195+
*/
196+
@BetaApi
197+
public enum CloseAction {
198+
/**
199+
* Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
200+
* appendable upload should be finalized.
201+
*
202+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
203+
* @see AppendableUploadWriteableByteChannel#finalizeAndClose()
204+
*/
205+
@BetaApi
206+
FINALIZE_WHEN_CLOSING,
207+
/**
208+
* Designate that when {@link AppendableUploadWriteableByteChannel#close()} is called, the
209+
* appendable upload should NOT be finalized, allowing for takeover by another session or
210+
* client.
211+
*
212+
* @since 2.51.0 This new api is in preview and is subject to breaking changes.
213+
* @see AppendableUploadWriteableByteChannel#closeWithoutFinalizing()
214+
*/
215+
@BetaApi
216+
CLOSE_WITHOUT_FINALIZING
217+
}
218+
219+
BlobAppendableUpload create(GrpcStorageImpl storage, BlobInfo info, Opts<ObjectTargetOpt> opts) {
220+
boolean takeOver = info.getGeneration() != null;
221+
BidiWriteObjectRequest req =
222+
takeOver
223+
? storage.getBidiWriteObjectRequestForTakeover(info, opts)
224+
: storage.getBidiWriteObjectRequest(info, opts);
225+
226+
BidiAppendableWrite baw = new BidiAppendableWrite(req, takeOver);
227+
228+
WritableByteChannelSession<AppendableObjectBufferedWritableByteChannel, BidiWriteObjectResponse>
229+
build =
230+
ResumableMedia.gapic()
231+
.write()
232+
.bidiByteChannel(storage.storageClient.bidiWriteObjectCallable())
233+
.setHasher(this.getHasher())
234+
.setByteStringStrategy(ByteStringStrategy.copy())
235+
.appendable()
236+
.withRetryConfig(
237+
storage.retrier.withAlg(
238+
new BasicResultRetryAlgorithm<Object>() {
239+
@Override
240+
public boolean shouldRetry(
241+
Throwable previousThrowable, Object previousResponse) {
242+
// TODO: remove this later once the redirects are not handled by the
243+
// retry loop
244+
ApiException apiEx = null;
245+
if (previousThrowable instanceof StorageException) {
246+
StorageException se = (StorageException) previousThrowable;
247+
Throwable cause = se.getCause();
248+
if (cause instanceof ApiException) {
249+
apiEx = (ApiException) cause;
250+
}
251+
}
252+
if (apiEx instanceof AbortedException) {
253+
return true;
254+
}
255+
return storage
256+
.retryAlgorithmManager
257+
.idempotent()
258+
.shouldRetry(previousThrowable, null);
259+
}
260+
}))
261+
.buffered(this.getFlushPolicy())
262+
.setStartAsync(ApiFutures.immediateFuture(baw))
263+
.setGetCallable(storage.storageClient.getObjectCallable())
264+
.setFinalizeOnClose(this.closeAction == CloseAction.FINALIZE_WHEN_CLOSING)
265+
.build();
266+
267+
return new BlobAppendableUploadImpl(
268+
new DefaultBlobWriteSessionConfig.DecoratedWritableByteChannelSession<>(
269+
build, BidiBlobWriteSessionConfig.Factory.WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER));
270+
}
137271
}

0 commit comments

Comments
 (0)