Skip to content

Commit 4e15267

Browse files
committed
JVMCBC-1705 Some uses of Flux.interval() are not protected by onBackpressureDrop
Motivation ---------- Users in serverless execution environments have seen SDK Flux.interval() operators failing due to lack of downstream requests. This could potentially terminate transaction cleanup. Modifications ------------- Replace most calls to Flux.interval() with new method Reactor.safeInterval() that always drops on backpressure. Add Reactor.unsafeInterval() as an alias for Flux.interval(), and use it in LostCleanupDistributed where overflow from Flux.interval() is expected for correct behavior. Upgrade to latest Checkstyle. We can do this because a while back we started requiring Java 17 at build time. Add Checkstyle rule to prohibit dangerous usage of Flux.interval(). Change-Id: I66a799a6957d2b010bf5d2205bf8de082b0cf9f8 Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/236256 Tested-by: Build Bot <[email protected]> Reviewed-by: Graham Pople <[email protected]>
1 parent 3b36c86 commit 4e15267

File tree

9 files changed

+67
-14
lines changed

9 files changed

+67
-14
lines changed

config/checkstyle/checkstyle-basic.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,18 @@
1111
<property name="fileExtensions" value="java,scala"/>
1212
</module>
1313

14+
<module name="RegexpMultiline">
15+
<!-- Match static imports and qualified references. -->
16+
<property name="format" value="Flux\s*\.\s*interval"/>
17+
<property name="message" value="Please use Reactor.safeInterval() instead of Flux.interval()"/>
18+
</module>
19+
20+
<module name="SuppressWithPlainTextCommentFilter">
21+
<property name="offCommentFormat" value="CHECKSTYLE:OFF ([\w\|]+)"/>
22+
<property name="onCommentFormat" value="CHECKSTYLE:ON ([\w\|]+)"/>
23+
<property name="checkFormat" value="$1"/>
24+
</module>
25+
1426
<module name="TreeWalker">
1527
<module name="IllegalImport">
1628
<!-- Prevent unintentional dependency on unbundled Jackson -->

core-io/src/main/java/com/couchbase/client/core/Reactor.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import reactor.core.publisher.Operators;
3131
import reactor.core.publisher.SignalType;
3232
import reactor.core.publisher.Sinks;
33+
import reactor.core.scheduler.Scheduler;
34+
import reactor.core.scheduler.Schedulers;
3335
import reactor.util.context.Context;
3436

3537
import java.time.Duration;
@@ -291,4 +293,39 @@ public static Sinks.EmitResult ignoreIfDone(Sinks.EmitResult result) {
291293
? OK
292294
: result;
293295
}
296+
297+
/**
298+
* Alias for {@link Flux#interval(Duration, Scheduler)} that doesn't trigger a Checkstyle error.
299+
* <p>
300+
* This operation is "unsafe" because if there is backpressure it emits the dreaded OverflowException: "interval doesn't support small downstream requests that replenish slower than the ticks".
301+
*/
302+
@Stability.Internal
303+
public static Flux<Long> unsafeInterval(Duration d, Scheduler scheduler) {
304+
// CHECKSTYLE:OFF RegexpMultiline - Allow Flux.interval
305+
return Flux.interval(d, scheduler);
306+
// CHECKSTYLE:ON RegexpMultiline
307+
}
308+
309+
/**
310+
* Like {@link Flux#interval(Duration, Scheduler)}, but drops signals (instead of exploding) when there is insufficient demand.
311+
* <p>
312+
* Avoids the dreaded OverflowException: "interval doesn't support small downstream requests that replenish slower than the ticks".
313+
*/
314+
@Stability.Internal
315+
public static Flux<Long> safeInterval(Duration period, Scheduler scheduler) {
316+
return safeInterval(period, period, scheduler);
317+
}
318+
319+
/**
320+
* Like {@link Flux#interval(Duration, Duration, Scheduler)}, but drops signals (instead of exploding) when there is insufficient demand.
321+
* <p>
322+
* Avoids the dreaded OverflowException: "interval doesn't support small downstream requests that replenish slower than the ticks".
323+
*/
324+
@Stability.Internal
325+
public static Flux<Long> safeInterval(Duration delay, Duration period, Scheduler scheduler) {
326+
// CHECKSTYLE:OFF RegexpMultiline - Allow Flux.interval
327+
return Flux.interval(delay, period, scheduler)
328+
.onBackpressureDrop();
329+
// CHECKSTYLE:ON RegexpMultiline
330+
}
294331
}

core-io/src/main/java/com/couchbase/client/core/cnc/DefaultEventBus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.couchbase.client.core.util.CbCollections;
2121
import com.couchbase.client.core.util.NanoTimestamp;
2222
import com.couchbase.client.core.util.NativeImageHelper;
23-
import reactor.core.publisher.Flux;
2423
import reactor.core.publisher.Mono;
2524
import reactor.core.scheduler.Scheduler;
2625

@@ -37,6 +36,7 @@
3736
import java.util.concurrent.atomic.AtomicLong;
3837
import java.util.function.Consumer;
3938

39+
import static com.couchbase.client.core.Reactor.safeInterval;
4040
import static com.couchbase.client.core.util.CbThrowables.getStackTraceAsString;
4141

4242
/**
@@ -281,7 +281,7 @@ public Mono<Void> stop(final Duration timeout) {
281281
overflowInfo.clear();
282282
return Mono.empty();
283283
})
284-
.then(Flux.interval(Duration.ofMillis(10), scheduler).takeUntil(i -> !runningThread.isAlive()).then())
284+
.then(safeInterval(Duration.ofMillis(10), scheduler).takeUntil(i -> !runningThread.isAlive()).then())
285285
.timeout(timeout, scheduler);
286286
}
287287

core-io/src/main/java/com/couchbase/client/core/config/DefaultConfigurationProvider.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@
100100

101101
import static com.couchbase.client.core.Reactor.emitFailureHandler;
102102
import static com.couchbase.client.core.Reactor.ignoreIfDone;
103+
import static com.couchbase.client.core.Reactor.safeInterval;
103104
import static com.couchbase.client.core.logging.RedactableArgument.redactMeta;
104105
import static com.couchbase.client.core.logging.RedactableArgument.redactSystem;
105106
import static com.couchbase.client.core.util.CbStrings.nullToEmpty;
@@ -846,8 +847,7 @@ public Flux<TopologyPollingTrigger> topologyPollingTriggers(Duration timerInterv
846847
Scheduler scheduler = core.context().environment().scheduler();
847848

848849
return Flux.merge(
849-
Flux.interval(timerInterval, core.context().environment().scheduler())
850-
.onBackpressureDrop() // otherwise the `interval` operator signals an error
850+
safeInterval(timerInterval, core.context().environment().scheduler())
851851
.map(it -> TopologyPollingTrigger.TIMER),
852852

853853
topologyPollingTriggers.asFlux().publishOn(scheduler)

core-io/src/main/java/com/couchbase/client/core/transaction/cleanup/CoreTransactionsCleanup.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.DelayQueue;
4141
import java.util.concurrent.TimeUnit;
4242

43+
import static com.couchbase.client.core.Reactor.safeInterval;
4344
import static com.couchbase.client.core.cnc.events.transaction.TransactionEvent.DEFAULT_CATEGORY;
4445

4546
/**
@@ -127,7 +128,7 @@ private void runRegularAttemptsCleanupThread() {
127128
LOGGER_REGULAR.debug("Starting background cleanup thread to find transactions from this client");
128129

129130
// Periodically check and drain the cleanupQueue
130-
Flux.interval(Duration.ofMillis(100), core.context().environment().transactionsSchedulers().schedulerCleanup())
131+
safeInterval(Duration.ofMillis(100), core.context().environment().transactionsSchedulers().schedulerCleanup())
131132

132133
.flatMap(v -> {
133134
if (stop) {

core-io/src/main/java/com/couchbase/client/core/transaction/cleanup/LostCleanupDistributed.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@
6464
import java.util.function.Supplier;
6565
import java.util.stream.Collectors;
6666

67+
import static com.couchbase.client.core.Reactor.safeInterval;
68+
import static com.couchbase.client.core.Reactor.unsafeInterval;
6769
import static com.couchbase.client.core.io.CollectionIdentifier.DEFAULT_COLLECTION;
6870
import static com.couchbase.client.core.io.CollectionIdentifier.DEFAULT_SCOPE;
6971
import static com.couchbase.client.core.logging.RedactableArgument.redactMeta;
@@ -358,7 +360,7 @@ Mono<Void> createThreadForCollectionIfNeeded(CollectionIdentifier coll) {
358360
}
359361

360362
private void periodicallyCheckCleanupSet() {
361-
cleanupThreadLauncher = Flux.interval(Duration.ZERO, Duration.ofSeconds(1), core.context().environment().transactionsSchedulers().schedulerCleanup())
363+
cleanupThreadLauncher = safeInterval(Duration.ZERO, Duration.ofSeconds(1), core.context().environment().transactionsSchedulers().schedulerCleanup())
362364
.concatMap(v -> Flux.fromIterable(cleanupSet))
363365
.publishOn(core.context().environment().transactionsSchedulers().schedulerCleanup())
364366
.concatMap(this::createThreadForCollectionIfNeeded)
@@ -439,7 +441,7 @@ private Mono<Void> perCollectionThread(CollectionIdentifier collection) {
439441
// handling the first ATR takes 0.3 seconds, the next should execute 0.7 seconds later.
440442
// Similar if the first ATR takes > 1 second, the next should execute instantly.
441443
return Flux.zip(Flux.fromIterable(atrsHandledByThisClient),
442-
Flux.interval(Duration.ofNanos(checkAtrEveryNNanos)))
444+
unsafeInterval(Duration.ofNanos(checkAtrEveryNNanos), core.context().environment().scheduler()))
443445
.publishOn(core.context().environment().transactionsSchedulers().schedulerCleanup())
444446

445447
// Where the ATR cleanup magic happens

core-io/src/test/java/com/couchbase/client/core/config/refresher/GlobalRefresherTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
import org.junit.jupiter.api.AfterEach;
3535
import org.junit.jupiter.api.BeforeEach;
3636
import org.junit.jupiter.api.Test;
37-
import reactor.core.publisher.Flux;
37+
import reactor.core.scheduler.Schedulers;
3838

3939
import java.time.Duration;
4040
import java.util.concurrent.atomic.AtomicInteger;
4141

42+
import static com.couchbase.client.core.Reactor.safeInterval;
4243
import static com.couchbase.client.core.util.CbCollections.mapOf;
4344
import static com.couchbase.client.core.util.MockUtil.mockCore;
4445
import static com.couchbase.client.test.Util.waitUntilCondition;
@@ -78,7 +79,7 @@ static ConfigurationProvider mockConfigurationProvider(ClusterConfig clusterConf
7879
ConfigurationProvider provider = mock(ConfigurationProvider.class);
7980
when(provider.config()).thenReturn(clusterConfig);
8081
when(provider.topologyPollingTriggers(any())).
81-
thenReturn(Flux.interval(FAST_CONFIG_POLL_INTERVAL).map(it -> TopologyPollingTrigger.TIMER));
82+
thenReturn(safeInterval(FAST_CONFIG_POLL_INTERVAL, Schedulers.parallel()).map(it -> TopologyPollingTrigger.TIMER));
8283
return provider;
8384
}
8485

core-io/src/test/java/com/couchbase/client/core/config/refresher/KeyValueBucketRefresherTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@
3434
import org.junit.jupiter.api.AfterEach;
3535
import org.junit.jupiter.api.BeforeEach;
3636
import org.junit.jupiter.api.Test;
37-
import reactor.core.publisher.Flux;
37+
import reactor.core.scheduler.Schedulers;
3838

3939
import java.time.Duration;
4040
import java.util.concurrent.atomic.AtomicInteger;
4141

42+
import static com.couchbase.client.core.Reactor.safeInterval;
4243
import static com.couchbase.client.core.util.CbCollections.mapOf;
4344
import static com.couchbase.client.core.util.MockUtil.mockCore;
4445
import static org.mockito.ArgumentMatchers.any;
@@ -83,7 +84,7 @@ void triggersEventIfAllNodesFailedToRefresh() {
8384
ClusterConfig clusterConfig = new ClusterConfig();
8485
when(provider.config()).thenReturn(clusterConfig);
8586
when(provider.topologyPollingTriggers(any()))
86-
.thenReturn(Flux.interval(Duration.ofMillis(10)).map(it -> TopologyPollingTrigger.TIMER));
87+
.thenReturn(safeInterval(Duration.ofMillis(10), Schedulers.parallel()).map(it -> TopologyPollingTrigger.TIMER));
8788

8889
ClusterTopologyWithBucket config = new ClusterTopologyBuilder()
8990
.addNode("foo", node -> node.ports(mapOf(ServiceType.KV, 11210, ServiceType.MANAGER, 8091)))

pom.xml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -349,13 +349,12 @@
349349
<plugin>
350350
<groupId>org.apache.maven.plugins</groupId>
351351
<artifactId>maven-checkstyle-plugin</artifactId>
352-
<version>3.1.2</version>
352+
<version>3.6.0</version>
353353
<dependencies>
354354
<dependency>
355355
<groupId>com.puppycrawl.tools</groupId>
356356
<artifactId>checkstyle</artifactId>
357-
<!-- Last version to support Java 8 is 9.x -->
358-
<version>9.3</version>
357+
<version>12.1.1</version>
359358
</dependency>
360359
</dependencies>
361360
<configuration>

0 commit comments

Comments
 (0)