Skip to content

Commit 4e1a0ff

Browse files
authored
Download to file optimizations. (Azure#29965)
* adapt channels. * http response. * stream response. * this works. * fix a bug. * fancy channels. * transferBodyTo with default impl. * dedicated netty and okhttp impl. * throws. * more. * refactor handler. * undo storage for second attempt. * redo storage. * almost there. * byte counting channel with tests. * test the byte counting async channel. * wip * fix this * wip. * test ioutils. * test ioutils. * wip. * use verifier * pr feedback. * hide ctor. * pr feedback. * flaky tests.
1 parent 5e51bf3 commit 4e1a0ff

File tree

24 files changed

+1677
-244
lines changed

24 files changed

+1677
-244
lines changed

sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/implementation/OkHttpAsyncResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import com.azure.core.http.HttpRequest;
77
import com.azure.core.util.BinaryData;
8-
import com.azure.core.util.IOUtils;
8+
import com.azure.core.util.io.IOUtils;
99
import okhttp3.Response;
1010
import okhttp3.ResponseBody;
1111
import reactor.core.publisher.Flux;

sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/HttpClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import com.azure.core.util.BinaryData;
1414
import com.azure.core.util.Context;
1515
import com.azure.core.util.Contexts;
16-
import com.azure.core.util.IOUtils;
16+
import com.azure.core.util.io.IOUtils;
1717
import com.azure.core.util.ProgressReporter;
1818
import com.azure.core.util.logging.ClientLogger;
1919
import com.azure.core.util.serializer.ObjectSerializer;

sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
import com.azure.core.util.BinaryData;
4646
import com.azure.core.util.Context;
4747
import com.azure.core.util.FluxUtil;
48-
import com.azure.core.util.IOUtils;
48+
import com.azure.core.util.io.IOUtils;
4949
import org.junit.jupiter.api.Assertions;
5050
import org.junit.jupiter.api.Named;
5151
import org.junit.jupiter.api.Test;

sdk/core/azure-core/src/main/java/com/azure/core/implementation/AsynchronousFileChannelAdapter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public Future<Integer> read(ByteBuffer dst) {
6161
@Override
6262
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
6363
beginOperation(Operation.WRITE);
64+
// We're implementing channel interface here, i.e. we don't have to consume whole buffer in one shot.
65+
// Caller is responsible for that.
6466
fileChannel.write(src, POSITION_ATOMIC_UPDATER.get(this), attachment,
6567
new DelegatingCompletionHandler<>(handler, Operation.WRITE));
6668
}
@@ -69,6 +71,8 @@ public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ?
6971
public Future<Integer> write(ByteBuffer src) {
7072
beginOperation(Operation.WRITE);
7173
CompletableFuture<Integer> future = new CompletableFuture<>();
74+
// We're implementing channel interface here, i.e. we don't have to consume whole buffer in one shot.
75+
// Caller is responsible for that.
7276
fileChannel.write(src, POSITION_ATOMIC_UPDATER.get(this), src,
7377
new DelegatingCompletionHandler<>(future, Operation.WRITE));
7478
return future;
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.implementation;
5+
6+
import com.azure.core.util.ProgressReporter;
7+
8+
import java.io.IOException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.channels.AsynchronousByteChannel;
11+
import java.nio.channels.CompletionHandler;
12+
import java.util.Objects;
13+
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.Future;
15+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
16+
17+
/**
18+
* Count bytes written and read to the target channel.
19+
*/
20+
public class ByteCountingAsynchronousByteChannel implements AsynchronousByteChannel {
21+
22+
private final AsynchronousByteChannel channel;
23+
private final ProgressReporter readProgressReporter;
24+
private final ProgressReporter writeProgressReporter;
25+
26+
private static final AtomicLongFieldUpdater<ByteCountingAsynchronousByteChannel> BYTES_WRITTEN_ATOMIC_UPDATER =
27+
AtomicLongFieldUpdater.newUpdater(ByteCountingAsynchronousByteChannel.class, "bytesWritten");
28+
private volatile long bytesWritten;
29+
private static final AtomicLongFieldUpdater<ByteCountingAsynchronousByteChannel> BYTES_READ_ATOMIC_UPDATER =
30+
AtomicLongFieldUpdater.newUpdater(ByteCountingAsynchronousByteChannel.class, "bytesRead");
31+
private volatile long bytesRead;
32+
33+
public ByteCountingAsynchronousByteChannel(
34+
AsynchronousByteChannel channel,
35+
ProgressReporter readProgressReporter,
36+
ProgressReporter writeProgressReporter) {
37+
this.channel = Objects.requireNonNull(channel, "'channel' must not be null");
38+
this.readProgressReporter = readProgressReporter;
39+
this.writeProgressReporter = writeProgressReporter;
40+
}
41+
42+
@Override
43+
public <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer, ? super A> handler) {
44+
this.channel.read(dst, attachment,
45+
new DelegatingCompletionHandler<A>(handler, BYTES_READ_ATOMIC_UPDATER, readProgressReporter));
46+
}
47+
48+
@Override
49+
public Future<Integer> read(ByteBuffer dst) {
50+
CompletableFuture<Integer> future = new CompletableFuture<>();
51+
channel.read(dst, dst,
52+
new DelegatingCompletionHandler<>(future, BYTES_READ_ATOMIC_UPDATER, readProgressReporter));
53+
return future;
54+
}
55+
56+
@Override
57+
public <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer, ? super A> handler) {
58+
// We're implementing channel interface here, i.e. we don't have to consume whole buffer in one shot.
59+
// Caller is responsible for that.
60+
this.channel.write(src, attachment,
61+
new DelegatingCompletionHandler<A>(handler, BYTES_WRITTEN_ATOMIC_UPDATER, writeProgressReporter));
62+
}
63+
64+
@Override
65+
public Future<Integer> write(ByteBuffer src) {
66+
CompletableFuture<Integer> future = new CompletableFuture<>();
67+
// We're implementing channel interface here, i.e. we don't have to consume whole buffer in one shot.
68+
// Caller is responsible for that.
69+
channel.write(src, src,
70+
new DelegatingCompletionHandler<>(future, BYTES_WRITTEN_ATOMIC_UPDATER, writeProgressReporter));
71+
return future;
72+
}
73+
74+
@Override
75+
public boolean isOpen() {
76+
return channel.isOpen();
77+
}
78+
79+
@Override
80+
public void close() throws IOException {
81+
channel.close();
82+
}
83+
84+
public long getBytesWritten() {
85+
return BYTES_WRITTEN_ATOMIC_UPDATER.get(this);
86+
}
87+
88+
public long getBytesRead() {
89+
return BYTES_READ_ATOMIC_UPDATER.get(this);
90+
}
91+
92+
private final class DelegatingCompletionHandler<T> implements CompletionHandler<Integer, T> {
93+
private final CompletionHandler<Integer, ? super T> handler;
94+
private final CompletableFuture<Integer> future;
95+
private final AtomicLongFieldUpdater<ByteCountingAsynchronousByteChannel> atomicLongFieldUpdater;
96+
private final ProgressReporter progressReporter;
97+
98+
private DelegatingCompletionHandler(
99+
CompletionHandler<Integer, ? super T> handler,
100+
AtomicLongFieldUpdater<ByteCountingAsynchronousByteChannel> atomicLongFieldUpdater,
101+
ProgressReporter progressReporter) {
102+
this.handler = handler;
103+
this.future = null;
104+
this.atomicLongFieldUpdater = atomicLongFieldUpdater;
105+
this.progressReporter = progressReporter;
106+
}
107+
108+
private DelegatingCompletionHandler(
109+
CompletableFuture<Integer> future,
110+
AtomicLongFieldUpdater<ByteCountingAsynchronousByteChannel> atomicLongFieldUpdater,
111+
ProgressReporter progressReporter) {
112+
this.handler = null;
113+
this.future = future;
114+
this.atomicLongFieldUpdater = atomicLongFieldUpdater;
115+
this.progressReporter = progressReporter;
116+
}
117+
118+
@Override
119+
public void completed(Integer result, T attachment) {
120+
if (result > 0) {
121+
atomicLongFieldUpdater.addAndGet(ByteCountingAsynchronousByteChannel.this, result);
122+
if (progressReporter != null) {
123+
progressReporter.reportProgress(result);
124+
}
125+
}
126+
if (handler != null) {
127+
handler.completed(result, attachment);
128+
} else if (future != null) {
129+
future.complete(result);
130+
}
131+
}
132+
133+
@Override
134+
public void failed(Throwable exc, T attachment) {
135+
if (handler != null) {
136+
handler.failed(exc, attachment);
137+
} else if (future != null) {
138+
future.completeExceptionally(exc);
139+
}
140+
}
141+
}
142+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.implementation;
5+
6+
import com.azure.core.util.ProgressReporter;
7+
8+
import java.io.IOException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.channels.WritableByteChannel;
11+
import java.util.Objects;
12+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
13+
14+
/**
15+
* Count bytes written to the target channel.
16+
*/
17+
public class ByteCountingWritableByteChannel implements WritableByteChannel {
18+
19+
private final WritableByteChannel channel;
20+
private final ProgressReporter progressReporter;
21+
22+
private static final AtomicLongFieldUpdater<ByteCountingWritableByteChannel> BYTES_WRITTEN_ATOMIC_UPDATER =
23+
AtomicLongFieldUpdater.newUpdater(ByteCountingWritableByteChannel.class, "bytesWritten");
24+
private volatile long bytesWritten;
25+
26+
public ByteCountingWritableByteChannel(WritableByteChannel channel, ProgressReporter progressReporter) {
27+
this.channel = Objects.requireNonNull(channel, "'channel' must not be null");
28+
this.progressReporter = progressReporter;
29+
}
30+
31+
@Override
32+
public int write(ByteBuffer src) throws IOException {
33+
// We're implementing channel interface here, i.e. we don't have to consume whole buffer in one shot.
34+
// Caller is responsible for that.
35+
int written = channel.write(src);
36+
BYTES_WRITTEN_ATOMIC_UPDATER.addAndGet(this, written);
37+
if (progressReporter != null) {
38+
progressReporter.reportProgress(written);
39+
}
40+
return written;
41+
}
42+
43+
@Override
44+
public boolean isOpen() {
45+
return channel.isOpen();
46+
}
47+
48+
@Override
49+
public void close() throws IOException {
50+
channel.close();
51+
}
52+
53+
public long getBytesWritten() {
54+
return BYTES_WRITTEN_ATOMIC_UPDATER.get(this);
55+
}
56+
}

sdk/core/azure-core/src/main/java/com/azure/core/util/FluxUtil.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.core.implementation.OutputStreamWriteSubscriber;
1212
import com.azure.core.implementation.RetriableDownloadFlux;
1313
import com.azure.core.implementation.TypeUtil;
14+
import com.azure.core.util.io.IOUtils;
1415
import com.azure.core.util.logging.ClientLogger;
1516
import com.azure.core.util.logging.LoggingEventBuilder;
1617
import org.reactivestreams.Subscriber;
@@ -65,6 +66,47 @@ public static boolean isFluxByteBuffer(Type entityType) {
6566
return false;
6667
}
6768

69+
/**
70+
* Adds progress reporting to the provided {@link Flux} of {@link ByteBuffer}.
71+
*
72+
* <p>
73+
* Each {@link ByteBuffer} that's emitted from the {@link Flux} will report {@link ByteBuffer#remaining()}.
74+
* </p>
75+
* <p>
76+
* When {@link Flux} is resubscribed the progress is reset. If the flux is not replayable, resubscribing
77+
* can result in empty or partial data then progress reporting might not be accurate.
78+
* </p>
79+
* <p>
80+
* If {@link ProgressReporter} is not provided, i.e. is {@code null},
81+
* then this method returns unmodified {@link Flux}.
82+
* </p>
83+
*
84+
* @param flux A {@link Flux} to report progress on.
85+
* @param progressReporter Optional {@link ProgressReporter}.
86+
* @return A {@link Flux} that reports progress, or original {@link Flux} if {@link ProgressReporter} is not
87+
* provided.
88+
*/
89+
public static Flux<ByteBuffer> addProgressReporting(Flux<ByteBuffer> flux, ProgressReporter progressReporter) {
90+
if (progressReporter == null) {
91+
return flux;
92+
}
93+
94+
return Mono.just(progressReporter).flatMapMany(reporter -> {
95+
/*
96+
Each time there is a new subscription, we will rewind the progress. This is desirable specifically
97+
for retries, which resubscribe on each try. The first time this flowable is subscribed to, the
98+
reset will be a noop as there will have been no progress made. Subsequent rewinds will work as
99+
expected.
100+
*/
101+
reporter.reset();
102+
103+
/*
104+
Every time we emit some data, report it to the Tracker, which will pass it on to the end user.
105+
*/
106+
return flux.doOnNext(buffer -> reporter.reportProgress(buffer.remaining()));
107+
});
108+
}
109+
68110
/**
69111
* Collects ByteBuffers emitted by a Flux into a byte array.
70112
*

sdk/core/azure-core/src/main/java/com/azure/core/util/IOUtils.java renamed to sdk/core/azure-core/src/main/java/com/azure/core/util/io/IOUtils.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4-
package com.azure.core.util;
4+
package com.azure.core.util.io;
55

6+
import com.azure.core.http.rest.StreamResponse;
67
import com.azure.core.implementation.AsynchronousFileChannelAdapter;
8+
import com.azure.core.implementation.ByteCountingAsynchronousByteChannel;
9+
import com.azure.core.implementation.logging.LoggingKeys;
10+
import com.azure.core.util.ProgressReporter;
711
import com.azure.core.util.logging.ClientLogger;
812
import reactor.core.publisher.Mono;
913
import reactor.core.publisher.MonoSink;
@@ -16,6 +20,7 @@
1620
import java.nio.channels.ReadableByteChannel;
1721
import java.nio.channels.WritableByteChannel;
1822
import java.util.Objects;
23+
import java.util.function.BiFunction;
1924

2025
/**
2126
* Utilities related to IO operations that involve channels, streams, byte transfers.
@@ -119,4 +124,55 @@ public void failed(Throwable e, ByteBuffer attachment) {
119124
sink.success();
120125
}
121126
}
127+
128+
/**
129+
* Transfers the {@link StreamResponse} content to {@link AsynchronousByteChannel}.
130+
* Resumes the transfer in case of errors.
131+
*
132+
* @param targetChannel The destination {@link AsynchronousByteChannel}.
133+
* @param sourceResponse The initial {@link StreamResponse}.
134+
* @param onErrorResume A {@link BiFunction} of {@link Throwable} and {@link Long} which is used to resume
135+
* downloading when an error occurs. The function accepts a {@link Throwable} and offset at the destination
136+
* from beginning of writing at which the error occurred.
137+
* @param progressReporter The {@link ProgressReporter}.
138+
* @param maxRetries The maximum number of times a download can be resumed when an error occurs.
139+
* @return A {@link Mono} which completion indicates successful transfer.
140+
*/
141+
public static Mono<Void> transferStreamResponseToAsynchronousByteChannel(
142+
AsynchronousByteChannel targetChannel,
143+
StreamResponse sourceResponse,
144+
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume,
145+
ProgressReporter progressReporter, int maxRetries) {
146+
147+
return transferStreamResponseToAsynchronousByteChannelHelper(
148+
new ByteCountingAsynchronousByteChannel(targetChannel, null, progressReporter),
149+
sourceResponse, onErrorResume, maxRetries, 0);
150+
}
151+
152+
private static Mono<Void> transferStreamResponseToAsynchronousByteChannelHelper(
153+
ByteCountingAsynchronousByteChannel targetChannel,
154+
StreamResponse response,
155+
BiFunction<Throwable, Long, Mono<StreamResponse>> onErrorResume,
156+
int maxRetries, int retryCount) {
157+
158+
return response.writeValueToAsync(targetChannel)
159+
.doFinally(ignored -> response.close())
160+
.onErrorResume(Exception.class, exception -> {
161+
response.close();
162+
163+
int updatedRetryCount = retryCount + 1;
164+
165+
if (updatedRetryCount > maxRetries) {
166+
LOGGER.atError()
167+
.addKeyValue(LoggingKeys.TRY_COUNT_KEY, retryCount)
168+
.log(() -> "Retry attempts have been exhausted.", exception);
169+
return Mono.error(exception);
170+
}
171+
172+
return onErrorResume.apply(exception, targetChannel.getBytesWritten())
173+
.flatMap(newResponse -> transferStreamResponseToAsynchronousByteChannelHelper(
174+
targetChannel, newResponse,
175+
onErrorResume, maxRetries, updatedRetryCount));
176+
});
177+
}
122178
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Package containing APIs for IO operations.
6+
*/
7+
package com.azure.core.util.io;

sdk/core/azure-core/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
exports com.azure.core.models;
2626
exports com.azure.core.util;
2727
exports com.azure.core.util.builder;
28+
exports com.azure.core.util.io;
2829
exports com.azure.core.util.logging;
2930
exports com.azure.core.util.paging;
3031
exports com.azure.core.util.polling;

0 commit comments

Comments
 (0)