Skip to content

Commit 2c95397

Browse files
authored
Add BinaryData.isReplayable (Azure#29463)
* Add `BinaryData.isReplayable` * pr feedback.
1 parent c58e01a commit 2c95397

File tree

10 files changed

+159
-0
lines changed

10 files changed

+159
-0
lines changed

sdk/core/azure-core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features Added
66

7+
- Added `BinaryData.isReplayable()` to indicate if multiple consumptions of the content are safe.
78
- Added support for sending synchronous requests using `sendSync` in `HttpPipeline`:
89
- Added `HttpPipelinePolicy.processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next)` to allow processing policies synchronously.
910
- Added `HttpPipelineSyncPolicy` to represent synchronous `HttpPipelinePolicy`.

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/BinaryDataContent.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,11 @@ public abstract class BinaryDataContent {
7474
* @return The {@link BinaryDataContent} as a {@code Flux<ByteBuffer>}.
7575
*/
7676
public abstract Flux<ByteBuffer> toFluxByteBuffer();
77+
78+
/**
79+
* Returns a flag indicating whether the content can be repeatedly consumed using all accessors including
80+
* {@link #toStream()} and {@link #toFluxByteBuffer()}
81+
* @return a flag indicating whether the content can be repeatedly consumed.
82+
*/
83+
public abstract boolean isReplayable();
7784
}

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/ByteArrayContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,9 @@ public ByteBuffer toByteBuffer() {
6464
public Flux<ByteBuffer> toFluxByteBuffer() {
6565
return Mono.fromSupplier(() -> ByteBuffer.wrap(toBytes()).asReadOnlyBuffer()).flux();
6666
}
67+
68+
@Override
69+
public boolean isReplayable() {
70+
return true;
71+
}
6772
}

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FileContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,11 @@ public int getChunkSize() {
195195
return chunkSize;
196196
}
197197

198+
@Override
199+
public boolean isReplayable() {
200+
return true;
201+
}
202+
198203
private byte[] getBytes() {
199204
if (length > MAX_ARRAY_SIZE) {
200205
throw LOGGER.logExceptionAsError(new IllegalStateException(

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/FluxByteBufferContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
8787
return content;
8888
}
8989

90+
@Override
91+
public boolean isReplayable() {
92+
return false;
93+
}
94+
9095
private byte[] getBytes() {
9196
return FluxUtil.collectBytesInByteBufferStream(content)
9297
// this doesn't seem to be working (newBoundedElastic() didn't work either)

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/InputStreamContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,11 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
8080
return FluxUtil.toFluxByteBuffer(this.content, STREAM_READ_SIZE);
8181
}
8282

83+
@Override
84+
public boolean isReplayable() {
85+
return false;
86+
}
87+
8388
private byte[] getBytes() {
8489
try {
8590
ByteArrayOutputStream dataOutputBuffer = new ByteArrayOutputStream();

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/SerializableContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
7676
return Mono.fromSupplier(() -> ByteBuffer.wrap(toBytes())).flux();
7777
}
7878

79+
@Override
80+
public boolean isReplayable() {
81+
return true;
82+
}
83+
7984
private byte[] getBytes() {
8085
return serializer.serializeToBytes(content);
8186
}

sdk/core/azure-core/src/main/java/com/azure/core/implementation/util/StringContent.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
7171
return Mono.fromSupplier(() -> ByteBuffer.wrap(toBytes())).flux();
7272
}
7373

74+
@Override
75+
public boolean isReplayable() {
76+
return true;
77+
}
78+
7479
private byte[] getBytes() {
7580
return this.content.getBytes(StandardCharsets.UTF_8);
7681
}

sdk/core/azure-core/src/main/java/com/azure/core/util/BinaryData.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,4 +1459,13 @@ public Flux<ByteBuffer> toFluxByteBuffer() {
14591459
public Long getLength() {
14601460
return content.getLength();
14611461
}
1462+
1463+
/**
1464+
* Returns a flag indicating whether the content can be repeatedly consumed using all accessors including
1465+
* {@link #toStream()} and {@link #toFluxByteBuffer()}
1466+
* @return a flag indicating whether the content can be repeatedly consumed using all accessors.
1467+
*/
1468+
public boolean isReplayable() {
1469+
return content.isReplayable();
1470+
}
14621471
}

sdk/core/azure-core/src/test/java/com/azure/core/util/BinaryDataTest.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
import com.fasterxml.jackson.databind.ObjectMapper;
1111
import com.fasterxml.jackson.databind.type.TypeFactory;
1212
import org.junit.jupiter.api.Assertions;
13+
import org.junit.jupiter.api.Named;
1314
import org.junit.jupiter.api.Test;
1415
import org.junit.jupiter.params.ParameterizedTest;
16+
import org.junit.jupiter.params.provider.Arguments;
17+
import org.junit.jupiter.params.provider.MethodSource;
1518
import org.junit.jupiter.params.provider.ValueSource;
1619
import reactor.core.publisher.Flux;
1720
import reactor.core.publisher.Mono;
@@ -44,13 +47,15 @@
4447
import java.util.Random;
4548
import java.util.UUID;
4649
import java.util.concurrent.atomic.AtomicInteger;
50+
import java.util.function.Supplier;
4751
import java.util.stream.Stream;
4852

4953
import static com.azure.core.implementation.util.BinaryDataContent.STREAM_READ_SIZE;
5054
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
5155
import static org.junit.jupiter.api.Assertions.assertEquals;
5256
import static org.junit.jupiter.api.Assertions.assertFalse;
5357
import static org.junit.jupiter.api.Assertions.assertThrows;
58+
import static org.junit.jupiter.api.Assertions.assertTrue;
5459
import static org.mockito.ArgumentMatchers.any;
5560
import static org.mockito.ArgumentMatchers.anyLong;
5661
import static org.mockito.Mockito.doThrow;
@@ -622,6 +627,113 @@ public void testFromFileSegment(int size) throws Exception {
622627

623628
}
624629

630+
@ParameterizedTest
631+
@MethodSource("createNonRetryableBinaryData")
632+
public void testNonReplayableContentTypes(Supplier<BinaryData> binaryDataSupplier) throws IOException {
633+
634+
assertFalse(binaryDataSupplier.get().isReplayable());
635+
636+
BinaryData data = binaryDataSupplier.get();
637+
byte[] firstFluxConsumption = FluxUtil.collectBytesInByteBufferStream(data.toFluxByteBuffer()).block();
638+
byte[] secondFluxConsumption = FluxUtil.collectBytesInByteBufferStream(data.toFluxByteBuffer()).block();
639+
640+
data = binaryDataSupplier.get();
641+
byte[] firstStreamConsumption = readInputStream(data.toStream());
642+
byte[] secondStreamConsumption = readInputStream(data.toStream());
643+
644+
// Either flux or stream consumption is not replayable.
645+
assertFalse(
646+
Arrays.equals(firstFluxConsumption, secondFluxConsumption)
647+
&& Arrays.equals(firstStreamConsumption, secondStreamConsumption)
648+
);
649+
}
650+
651+
public static Stream<Arguments> createNonRetryableBinaryData() {
652+
byte[] bytes = new byte[1024];
653+
RANDOM.nextBytes(bytes);
654+
return Stream.of(
655+
Arguments.of(
656+
Named.named("stream",
657+
(Supplier<BinaryData>) () -> BinaryData.fromStream(new ByteArrayInputStream(bytes)))),
658+
Arguments.of(
659+
Named.named("unbuffered flux",
660+
(Supplier<BinaryData>) () -> BinaryData.fromFlux(Flux.just(ByteBuffer.wrap(bytes)), null, false).block()))
661+
);
662+
}
663+
664+
@ParameterizedTest
665+
@MethodSource("createRetryableBinaryData")
666+
public void testReplayableContentTypes(Supplier<BinaryData> binaryDataSupplier) throws IOException {
667+
668+
assertTrue(binaryDataSupplier.get().isReplayable());
669+
670+
// Check toFluxByteBuffer consumption
671+
BinaryData data = binaryDataSupplier.get();
672+
byte[] firstConsumption = FluxUtil.collectBytesInByteBufferStream(data.toFluxByteBuffer()).block();
673+
byte[] secondConsumption = FluxUtil.collectBytesInByteBufferStream(data.toFluxByteBuffer()).block();
674+
assertArrayEquals(firstConsumption, secondConsumption);
675+
676+
// Check toStream consumption
677+
data = binaryDataSupplier.get();
678+
firstConsumption = readInputStream(data.toStream());
679+
secondConsumption = readInputStream(data.toStream());
680+
assertArrayEquals(firstConsumption, secondConsumption);
681+
682+
// Check toByteBuffer consumption
683+
data = binaryDataSupplier.get();
684+
firstConsumption = readByteBuffer(data.toByteBuffer());
685+
secondConsumption = readByteBuffer(data.toByteBuffer());
686+
assertArrayEquals(firstConsumption, secondConsumption);
687+
688+
// Check toBytes consumption
689+
data = binaryDataSupplier.get();
690+
firstConsumption = data.toBytes();
691+
secondConsumption = data.toBytes();
692+
assertArrayEquals(firstConsumption, secondConsumption);
693+
}
694+
695+
public static Stream<Arguments> createRetryableBinaryData() throws IOException {
696+
byte[] bytes = new byte[1024];
697+
RANDOM.nextBytes(bytes);
698+
Path tempFile = Files.createTempFile("retryableData", null);
699+
tempFile.toFile().deleteOnExit();
700+
Files.write(tempFile, bytes);
701+
return Stream.of(
702+
Arguments.of(
703+
Named.named("bytes",
704+
(Supplier<BinaryData>) () -> BinaryData.fromBytes(bytes))),
705+
Arguments.of(
706+
Named.named("string",
707+
(Supplier<BinaryData>) () -> BinaryData.fromString("test string"))),
708+
Arguments.of(
709+
Named.named("object",
710+
(Supplier<BinaryData>) () -> BinaryData.fromObject("\"test string\""))),
711+
Arguments.of(
712+
Named.named("file",
713+
(Supplier<BinaryData>) () -> BinaryData.fromFile(tempFile))),
714+
Arguments.of(
715+
Named.named("buffered flux",
716+
(Supplier<BinaryData>) () -> BinaryData.fromFlux(Flux.just(ByteBuffer.wrap(bytes))).block()))
717+
);
718+
}
719+
720+
private static byte[] readInputStream(InputStream inputStream) throws IOException {
721+
byte[] buffer = new byte[1024];
722+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
723+
int read;
724+
while ((read = inputStream.read(buffer)) >= 0) {
725+
bos.write(buffer, 0, read);
726+
}
727+
return bos.toByteArray();
728+
}
729+
730+
private static byte[] readByteBuffer(ByteBuffer buffer) {
731+
// simplified implementation good enough for testing.
732+
byte[] result = new byte[buffer.remaining()];
733+
buffer.get(result);
734+
return result;
735+
}
736+
625737
public static class MyJsonSerializer implements JsonSerializer {
626738
private final ClientLogger logger = new ClientLogger(MyJsonSerializer.class);
627739
private final ObjectMapper mapper;

0 commit comments

Comments
 (0)