diff --git a/spring-reactive-modules/spring-webflux/src/main/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverter.java b/spring-reactive-modules/spring-webflux/src/main/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverter.java new file mode 100644 index 000000000000..b458faa81c1a --- /dev/null +++ b/spring-reactive-modules/spring-webflux/src/main/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverter.java @@ -0,0 +1,26 @@ +package com.baeldung.spring.convert.databuffertomono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class DataBufferConverter { + public Mono toByteArray(Flux data) { + return DataBufferUtils + // Here, we'll join all DataBuffers in the Flux into a single Mono. + .join(data) + .flatMap(dataBuffer -> { + try { + // Next, extract the byte[] from the aggregated DataBuffer manually. + byte[] bytes = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(bytes); + return Mono.just(bytes); + } finally { + // Ensure the final aggregated DataBuffer is released. + DataBufferUtils.release(dataBuffer); + } + }); + } +} diff --git a/spring-reactive-modules/spring-webflux/src/test/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverterTest.java b/spring-reactive-modules/spring-webflux/src/test/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverterTest.java new file mode 100644 index 000000000000..267136488b7d --- /dev/null +++ b/spring-reactive-modules/spring-webflux/src/test/java/com/baeldung/spring/convert/databuffertomono/DataBufferConverterTest.java @@ -0,0 +1,35 @@ +package com.baeldung.spring.convert.databuffertomono; + +import org.junit.jupiter.api.Test; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import reactor.core.publisher.Flux; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +public class DataBufferConverterTest { + + private final DataBufferConverter converter = new DataBufferConverter(); + private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + private final String TEST_CONTENT = "This is a long test string."; + + @Test + void givenFluxOfDataBuffers_whenConvertedToByteArray_thenContentMatches() { + // Setup: First, we'll manually create two DataBuffer chunks for the input Flux + byte[] part1 = "This is a ".getBytes(); + byte[] part2 = "long test string.".getBytes(); + + DataBuffer buffer1 = factory.allocateBuffer(part1.length); + buffer1.write(part1); + + DataBuffer buffer2 = factory.allocateBuffer(part2.length); + buffer2.write(part2); + + Flux sourceFlux = Flux.just(buffer1, buffer2); + + // Act & Assert: Here we perform conversion and block for direct assertion + byte[] resultBytes = converter.toByteArray(sourceFlux).block(); + + byte[] expectedBytes = TEST_CONTENT.getBytes(); + assertArrayEquals(expectedBytes, resultBytes, "The reconstructed byte array should match original"); + } +}