Skip to content

Commit 3a113e2

Browse files
committed
Use UnicastProcessor in Extended Flow to avoid overflow of outbound writes.
We now use UnicastProcessor to buffer outbound writes when consuming a cursor. Previously, DirectProcessor would fail if netty operator doesn't have demand caused by channel writability switches. [resolves #395] Signed-off-by: Mark Paluch <[email protected]>
1 parent d2c0ff6 commit 3a113e2

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

src/main/java/io/r2dbc/postgresql/ExtendedFlowDelegate.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,12 @@
4343
import io.r2dbc.postgresql.message.frontend.Parse;
4444
import io.r2dbc.postgresql.message.frontend.Sync;
4545
import io.r2dbc.postgresql.util.Operators;
46-
import reactor.core.publisher.DirectProcessor;
4746
import reactor.core.publisher.Flux;
4847
import reactor.core.publisher.FluxSink;
4948
import reactor.core.publisher.Mono;
5049
import reactor.core.publisher.SynchronousSink;
50+
import reactor.core.publisher.UnicastProcessor;
51+
import reactor.util.concurrent.Queues;
5152

5253
import java.util.ArrayList;
5354
import java.util.List;
@@ -163,7 +164,7 @@ private static Flux<BackendMessage> fetchAll(List<FrontendMessage.DirectEncoder>
163164
*/
164165
private static Flux<BackendMessage> fetchCursoredWithSync(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
165166

166-
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
167+
UnicastProcessor<FrontendMessage> requestsProcessor = UnicastProcessor.create(Queues.<FrontendMessage>small().get());
167168
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
168169
AtomicBoolean isCanceled = new AtomicBoolean(false);
169170
AtomicBoolean done = new AtomicBoolean(false);
@@ -226,7 +227,7 @@ private static Flux<BackendMessage> fetchCursoredWithSync(List<FrontendMessage.D
226227
*/
227228
private static Flux<BackendMessage> fetchCursoredWithFlush(List<FrontendMessage.DirectEncoder> messagesToSend, Client client, String portal, int fetchSize) {
228229

229-
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
230+
UnicastProcessor<FrontendMessage> requestsProcessor = UnicastProcessor.create(Queues.<FrontendMessage>small().get());
230231
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
231232
AtomicBoolean isCanceled = new AtomicBoolean(false);
232233

src/main/java/io/r2dbc/postgresql/client/ExtendedQueryMessageFlow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.util.ReferenceCounted;
2222
import io.r2dbc.postgresql.message.Format;
2323
import io.r2dbc.postgresql.message.backend.BackendMessage;
24-
import io.r2dbc.postgresql.message.backend.CloseComplete;
2524
import io.r2dbc.postgresql.message.backend.CommandComplete;
2625
import io.r2dbc.postgresql.message.backend.ErrorResponse;
2726
import io.r2dbc.postgresql.message.backend.ParseComplete;
@@ -39,10 +38,11 @@
3938
import io.r2dbc.postgresql.message.frontend.Sync;
4039
import io.r2dbc.postgresql.util.Assert;
4140
import io.r2dbc.postgresql.util.Operators;
42-
import reactor.core.publisher.DirectProcessor;
4341
import reactor.core.publisher.Flux;
4442
import reactor.core.publisher.FluxSink;
4543
import reactor.core.publisher.SynchronousSink;
44+
import reactor.core.publisher.UnicastProcessor;
45+
import reactor.util.concurrent.Queues;
4646

4747
import java.util.Collection;
4848
import java.util.Collections;
@@ -129,7 +129,7 @@ private static Flux<BackendMessage> fetchAll(Flux<FrontendMessage> bindFlow, Cli
129129
*/
130130
private static Flux<BackendMessage> fetchCursored(Flux<FrontendMessage> bindFlow, Client client, String portal, int fetchSize) {
131131

132-
DirectProcessor<FrontendMessage> requestsProcessor = DirectProcessor.create();
132+
UnicastProcessor<FrontendMessage> requestsProcessor = UnicastProcessor.create(Queues.<FrontendMessage>small().get());
133133
FluxSink<FrontendMessage> requestsSink = requestsProcessor.sink();
134134
AtomicBoolean isCanceled = new AtomicBoolean(false);
135135

0 commit comments

Comments
 (0)