1515 */
1616package org .springframework .data .r2dbc .core ;
1717
18+ import static io .netty .buffer .ByteBufUtil .*;
1819import static org .assertj .core .api .Assertions .*;
1920import static org .springframework .data .relational .core .query .Criteria .*;
2021
22+ import io .netty .buffer .ByteBufUtil ;
23+ import io .netty .buffer .Unpooled ;
2124import io .r2dbc .postgresql .PostgresqlConnectionConfiguration ;
2225import io .r2dbc .postgresql .PostgresqlConnectionFactory ;
2326import io .r2dbc .postgresql .codec .Box ;
3033import io .r2dbc .postgresql .codec .Point ;
3134import io .r2dbc .postgresql .codec .Polygon ;
3235import io .r2dbc .postgresql .extension .CodecRegistrar ;
36+ import io .r2dbc .spi .Blob ;
3337import io .r2dbc .spi .ConnectionFactory ;
3438import lombok .AllArgsConstructor ;
3539import lombok .Data ;
40+ import reactor .core .publisher .Flux ;
41+ import reactor .core .publisher .Mono ;
3642import reactor .test .StepVerifier ;
3743
44+ import java .nio .ByteBuffer ;
45+ import java .nio .charset .StandardCharsets ;
3846import java .time .Duration ;
3947import java .util .Collections ;
4048import java .util .List ;
49+ import java .util .concurrent .CompletableFuture ;
4150
4251import javax .sql .DataSource ;
4352
4453import org .junit .jupiter .api .BeforeEach ;
4554import org .junit .jupiter .api .Test ;
4655import org .junit .jupiter .api .extension .RegisterExtension ;
47-
4856import org .springframework .dao .DataAccessException ;
4957import org .springframework .data .annotation .Id ;
5058import org .springframework .data .r2dbc .convert .EnumWriteSupport ;
@@ -81,6 +89,13 @@ void before() {
8189 + "primitive_array INT[]," //
8290 + "multidimensional_array INT[]," //
8391 + "collection_array INT[][])" );
92+
93+ template .execute ("DROP TABLE IF EXISTS with_blobs" );
94+ template .execute ("CREATE TABLE with_blobs (" //
95+ + "id serial PRIMARY KEY," //
96+ + "byte_array bytea," //
97+ + "byte_buffer bytea," //
98+ + "byte_blob bytea)" );
8499 }
85100
86101 @ Test // gh-411
@@ -198,9 +213,9 @@ void shouldReadAndWriteInterval() {
198213
199214 template .execute ("DROP TABLE IF EXISTS with_interval" );
200215 template .execute ("CREATE TABLE with_interval (" //
201- + "id serial PRIMARY KEY," //
202- + "interval INTERVAL" //
203- + ")" );
216+ + "id serial PRIMARY KEY," //
217+ + "interval INTERVAL" //
218+ + ")" );
204219
205220 R2dbcEntityTemplate template = new R2dbcEntityTemplate (client ,
206221 new DefaultReactiveDataAccessStrategy (PostgresDialect .INSTANCE ));
@@ -213,6 +228,62 @@ void shouldReadAndWriteInterval() {
213228 }).verifyComplete ();
214229 }
215230
231+ @ Test // gh-1408
232+ void shouldReadAndWriteBlobs () {
233+
234+ R2dbcEntityTemplate template = new R2dbcEntityTemplate (client ,
235+ new DefaultReactiveDataAccessStrategy (PostgresDialect .INSTANCE ));
236+
237+ WithBlobs withBlobs = new WithBlobs ();
238+ byte [] content = "123ä" .getBytes (StandardCharsets .UTF_8 );
239+
240+ withBlobs .byteArray = content ;
241+ withBlobs .byteBuffer = ByteBuffer .wrap (content );
242+ withBlobs .byteBlob = Blob .from (Mono .just (ByteBuffer .wrap (content )));
243+
244+ template .insert (withBlobs ) //
245+ .as (StepVerifier ::create ) //
246+ .expectNextCount (1 ) //
247+ .verifyComplete ();
248+
249+ template .selectOne (Query .empty (), WithBlobs .class ) //
250+ .flatMap (it -> {
251+ return Flux .from (it .byteBlob .stream ()).last ().map (blob -> {
252+ it .byteBlob = Blob .from (Mono .just (blob ));
253+ return it ;
254+ });
255+ }).as (StepVerifier ::create ) //
256+ .consumeNextWith (actual -> {
257+
258+ CompletableFuture <byte []> cf = Mono .from (actual .byteBlob .stream ()).map (Unpooled ::wrappedBuffer )
259+ .map (ByteBufUtil ::getBytes ).toFuture ();
260+ assertThat (actual .getByteArray ()).isEqualTo (content );
261+ assertThat (getBytes (Unpooled .wrappedBuffer (actual .getByteBuffer ()))).isEqualTo (content );
262+ assertThat (cf .join ()).isEqualTo (content );
263+ }).verifyComplete ();
264+
265+ template .selectOne (Query .empty (), WithBlobs .class )
266+ .doOnNext (it -> it .byteArray = "foo" .getBytes (StandardCharsets .UTF_8 )).flatMap (template ::update ) //
267+ .as (StepVerifier ::create ) //
268+ .expectNextCount (1 ).verifyComplete ();
269+
270+ template .selectOne (Query .empty (), WithBlobs .class ) //
271+ .flatMap (it -> {
272+ return Flux .from (it .byteBlob .stream ()).last ().map (blob -> {
273+ it .byteBlob = Blob .from (Mono .just (blob ));
274+ return it ;
275+ });
276+ }).as (StepVerifier ::create ) //
277+ .consumeNextWith (actual -> {
278+
279+ CompletableFuture <byte []> cf = Mono .from (actual .byteBlob .stream ()).map (Unpooled ::wrappedBuffer )
280+ .map (ByteBufUtil ::getBytes ).toFuture ();
281+ assertThat (actual .getByteArray ()).isEqualTo ("foo" .getBytes (StandardCharsets .UTF_8 ));
282+ assertThat (getBytes (Unpooled .wrappedBuffer (actual .getByteBuffer ()))).isEqualTo (content );
283+ assertThat (cf .join ()).isEqualTo (content );
284+ }).verifyComplete ();
285+ }
286+
216287 @ Data
217288 @ AllArgsConstructor
218289 static class EntityWithEnum {
@@ -260,4 +331,16 @@ static class EntityWithInterval {
260331
261332 }
262333
334+ @ Data
335+ @ Table ("with_blobs" )
336+ static class WithBlobs {
337+
338+ @ Id Integer id ;
339+
340+ byte [] byteArray ;
341+ ByteBuffer byteBuffer ;
342+ Blob byteBlob ;
343+
344+ }
345+
263346}
0 commit comments