|
33 | 33 |
|
34 | 34 | import java.nio.ByteBuffer; |
35 | 35 | import java.util.Optional; |
36 | | -import java.util.concurrent.atomic.AtomicLong; |
37 | 36 |
|
38 | 37 | import static com.couchbase.client.core.Reactor.emitFailureHandler; |
39 | 38 |
|
@@ -79,9 +78,17 @@ public abstract class BaseChunkResponseParser<H extends ChunkHeader, ROW extends |
79 | 78 | private Sinks.Many<ROW> rowSink; |
80 | 79 |
|
81 | 80 | /** |
82 | | - * Holds the currently user-requested rows for backpressure handling. |
| 81 | + * For backpressure, a running total of the number of rows requested by the subscriber, |
| 82 | + * minus the number of rows emitted. A negative value indicates a surplus of available rows. |
| 83 | + * <p> |
| 84 | + * A value of {@code Long.MAX_VALUE} indicates backpressure is disabled. |
83 | 85 | */ |
84 | | - private final AtomicLong requested = new AtomicLong(0); |
| 86 | + private long demand = 0; |
| 87 | + |
| 88 | + /** |
| 89 | + * Guards access to `demand` and channel autoRead setting. |
| 90 | + */ |
| 91 | + private final Object autoReadLock = new Object(); |
85 | 92 |
|
86 | 93 | /** |
87 | 94 | * Subclass implements this to return the "meat" of the decoding, the chunk parser. |
@@ -174,23 +181,58 @@ public void initialize(final ChannelConfig channelConfig) { |
174 | 181 | parser = parserBuilder().build(); |
175 | 182 | this.channelConfig = channelConfig; |
176 | 183 | this.trailer = Sinks.one(); |
177 | | - this.requested.set(0); |
| 184 | + synchronized (autoReadLock) { |
| 185 | + this.demand = 0; |
| 186 | + } |
178 | 187 |
|
179 | 188 | this.rowSink = Sinks.many().unicast().onBackpressureBuffer(); |
180 | 189 | this.rows = rowSink |
181 | 190 | .asFlux() |
182 | 191 | .doOnRequest(v -> { |
183 | | - requested.addAndGet(v); |
184 | | - if (!channelConfig.isAutoRead()) { |
185 | | - channelConfig.setAutoRead(true); |
| 192 | + synchronized (autoReadLock) { |
| 193 | + if (demand == Long.MAX_VALUE) { |
| 194 | + return; |
| 195 | + } |
| 196 | + |
| 197 | + try { |
| 198 | + demand = Math.addExact(demand, v); |
| 199 | + |
| 200 | + } catch (ArithmeticException e) { |
| 201 | + // They asked for it, so open the floodgates. |
| 202 | + demand = Long.MAX_VALUE; |
| 203 | + channelConfig.setAutoRead(true); |
| 204 | + return; |
| 205 | + } |
| 206 | + |
| 207 | + if (demand > 0 && !channelConfig.isAutoRead()) { |
| 208 | + channelConfig.setAutoRead(true); |
| 209 | + } |
186 | 210 | } |
187 | 211 | }) |
188 | | - .doOnTerminate(() -> channelConfig.setAutoRead(true)) |
189 | | - .doOnCancel(() -> channelConfig.setAutoRead(true)) |
| 212 | + .doOnTerminate(this::readRemainingRowsWithoutBackpressure) |
| 213 | + .doOnCancel(this::readRemainingRowsWithoutBackpressure) |
190 | 214 | .publish() |
191 | 215 | .refCount(); |
192 | 216 | } |
193 | 217 |
|
| 218 | + /** |
| 219 | + * Allow the remaining result rows to be consumed, even in the absence of demand. |
| 220 | + * Must not be called before the row subscription is terminated or cancelled. |
| 221 | + * <p> |
| 222 | + * From a memory usage perspective, this is safe because future attempts to emit |
| 223 | + * rows into the sink fail instead of buffering the items. |
| 224 | + * <p> |
| 225 | + * Do this because the current API contract requires publishing metadata |
| 226 | + * even if the row sink terminates or is cancelled. Otherwise, we would |
| 227 | + * probably be better off simply closing the HTTP connection. |
| 228 | + */ |
| 229 | + private void readRemainingRowsWithoutBackpressure() { |
| 230 | + synchronized (autoReadLock) { |
| 231 | + demand = Long.MAX_VALUE; |
| 232 | + channelConfig.setAutoRead(true); |
| 233 | + } |
| 234 | + } |
| 235 | + |
194 | 236 | @Override |
195 | 237 | public Flux<ROW> rows() { |
196 | 238 | return rows; |
@@ -241,9 +283,17 @@ public Optional<CouchbaseException> decodingFailure() { |
241 | 283 | */ |
242 | 284 | protected void emitRow(final ROW row) { |
243 | 285 | rowSink.emitNext(row, emitFailureHandler()); |
244 | | - requested.decrementAndGet(); |
245 | | - if (requested.get() <= 0 && channelConfig.isAutoRead() && rowSink.currentSubscriberCount() > 0) { |
246 | | - channelConfig.setAutoRead(false); |
| 286 | + |
| 287 | + synchronized (autoReadLock) { |
| 288 | + if (demand == Long.MAX_VALUE) { |
| 289 | + return; |
| 290 | + } |
| 291 | + |
| 292 | + demand--; |
| 293 | + |
| 294 | + if (demand <= 0 && channelConfig.isAutoRead()) { |
| 295 | + channelConfig.setAutoRead(false); |
| 296 | + } |
247 | 297 | } |
248 | 298 | } |
249 | 299 |
|
|
0 commit comments