3838import reactor .util .annotation .Nullable ;
3939
4040import java .util .concurrent .atomic .AtomicBoolean ;
41+ import java .util .concurrent .atomic .AtomicReference ;
4142
4243import static io .r2dbc .postgresql .PostgresqlResult .toResult ;
4344
@@ -55,19 +56,31 @@ final class PostgresqlCopyIn {
5556 Mono <Long > copy (String sql , Publisher <? extends Publisher <ByteBuf >> stdin ) {
5657
5758 ExceptionFactory exceptionFactory = ExceptionFactory .withSql (sql );
58-
59+ AtomicReference < CopyData > toReleaseOnError = new AtomicReference <>();
5960 return Flux .from (stdin )
6061 .<FrontendMessage >concatMap (data -> {
6162
6263 CompositeByteBuf composite = this .context .getClient ().getByteBufAllocator ().compositeBuffer ();
6364
6465 return Flux .from (data )
65- .reduce (composite , (l , r ) -> l .addComponent (true , r ))
66+ .reduce (composite , (l , r ) -> {
67+ return l .addComponent (true , r );
68+ })
6669 .map (CopyData ::new )
70+ .doOnNext (toReleaseOnError ::set )
6771 .doOnDiscard (ReferenceCounted .class , ReferenceCountUtil ::release );
6872
6973 }).concatWithValues (CopyDone .INSTANCE ).startWith (new Query (sql ))
70- .as (messages -> copyIn (exceptionFactory , messages ));
74+ .as (messages -> copyIn (exceptionFactory , messages ))
75+ .doFinally (signalType -> {
76+
77+ CopyData copyData = toReleaseOnError .get ();
78+ if (copyData != null ) {
79+ if (copyData .refCnt () > 0 ) {
80+ copyData .release ();
81+ }
82+ }
83+ });
7184 }
7285
7386 private Mono <Long > copyIn (ExceptionFactory exceptionFactory , Flux <FrontendMessage > copyDataMessages ) {
0 commit comments