Skip to content

Commit 7ca3403

Browse files
author
Fred [C] Park
committed
feat: new WriteOption config for capturing backpressure data
1 parent dd37956 commit 7ca3403

File tree

11 files changed

+980
-21
lines changed

11 files changed

+980
-21
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,8 @@ If you have Docker running, but it is not available over localhost (e.g. you are
418418
- `INFLUXDB_PORT_API`
419419
- `INFLUXDB_2_IP`
420420
- `INFLUXDB_2_PORT_API`
421+
- `INFLUXDB_2_ONBOARDING_IP`
422+
- `INFLUXDB_2_ONBOARDING_PORT`
421423

422424
```bash
423425
$ export INFLUXDB_IP=192.168.99.100

client/README.md

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,10 @@ The writes are processed in batches which are configurable by `WriteOptions`:
611611
| **exponentialBase** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retryInterval * exponentialBase^(attempts-1)`` and ``retryInterval * exponentialBase^(attempts)``. Example for ``retryInterval=5_000, exponentialBase=2, maxRetryDelay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]``
612612
| **bufferLimit** | the maximum number of unwritten stored points | 10000 |
613613
| **backpressureStrategy** | the strategy to deal with buffer overflow | DROP_OLDEST |
614+
| **captureBackpressureData** | whether to capture affected data points in backpressure events | false |
615+
| **concatMapPrefetch** | the number of upstream items to prefetch for the concatMapMaybe operator | 2 |
616+
617+
There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
614618

615619
#### Backpressure
616620
The backpressure presents the problem of what to do with a growing backlog of unconsumed data points.
@@ -640,7 +644,41 @@ writeApi.listenEvents(BackpressureEvent.class, value -> {
640644
});
641645
```
642646

643-
There is also a synchronous blocking version of `WriteApi` - [WriteApiBlocking](#writing-data-using-synchronous-blocking-api).
647+
##### Backpressure Event Data Snapshots
648+
649+
When backpressure occurs, enable `captureBackpressureData` to capture a snapshot of the affected data points from the `BackpressureEvent`. The content of this snapshot depends on the configured backpressure strategy:
650+
651+
- **`DROP_OLDEST`**: The snapshot contains only the data points that will be dropped (the oldest points in the buffer). This allows you to log, persist, or handle the specific data that is being lost due to backpressure.
652+
653+
- **`DROP_LATEST`**: The snapshot contains only the newest data points that are being added to the buffer. This represents the most recent data that triggered the backpressure condition.
654+
655+
Logging dropped data points:
656+
```java
657+
WriteOptions writeOptions = WriteOptions.builder()
658+
.backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST)
659+
.bufferLimit(1000)
660+
.captureBackpressureData(true)
661+
.build();
662+
663+
WriteApi writeApi = influxDBClient.getWriteApi(writeOptions);
664+
665+
writeApi.listenEvents(BackpressureEvent.class, backpressureEvent -> {
666+
List<String> affectedPoints = backpressureEvent.getBufferedLineProtocol();
667+
668+
if (backpressureEvent.getReason() == BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES) {
669+
logger.warn("Backpressure occurred. Affected {} data points:", affectedPoints.size());
670+
671+
// For DROP_OLDEST: these are the points that were dropped from the buffer
672+
// For DROP_LATEST: these are the newest points that triggered the condition
673+
affectedPoints.forEach(point -> logger.debug("Affected point: {}", point));
674+
675+
// Do something with affected points ie. requeue for retry
676+
requeue(affectedPoints);
677+
}
678+
});
679+
```
680+
681+
Note: Disabling `captureBackpressureData` can improve performance when backpressure data capture is not needed.
644682

645683
#### Writing data
646684

client/src/main/java/com/influxdb/client/WriteOptions.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
* <li>retryInterval = 5000 ms</li>
4444
* <li>jitterInterval = 0</li>
4545
* <li>bufferLimit = 10_000</li>
46+
* <li>concatMapPrefetch = 2</li>
47+
* <li>captureBackpressureData = false</li>
4648
* </ul>
4749
* <p>
4850
* The default backpressure strategy is {@link BackpressureOverflowStrategy#DROP_OLDEST}.
@@ -62,6 +64,8 @@ public final class WriteOptions implements WriteApi.RetryOptions {
6264
public static final int DEFAULT_MAX_RETRY_TIME = 180_000;
6365
public static final int DEFAULT_EXPONENTIAL_BASE = 2;
6466
public static final int DEFAULT_BUFFER_LIMIT = 10000;
67+
public static final int DEFAULT_CONCAT_MAP_PREFETCH = 2;
68+
public static final boolean DEFAULT_CAPTURE_BACKPRESSURE_DATA = false;
6569

6670
/**
6771
* Default configuration with values that are consistent with Telegraf.
@@ -77,8 +81,10 @@ public final class WriteOptions implements WriteApi.RetryOptions {
7781
private final int maxRetryTime;
7882
private final int exponentialBase;
7983
private final int bufferLimit;
84+
private final int concatMapPrefetch;
8085
private final Scheduler writeScheduler;
8186
private final BackpressureOverflowStrategy backpressureStrategy;
87+
private final boolean captureBackpressureData;
8288

8389
/**
8490
* @return the number of data point to collect in batch
@@ -171,6 +177,17 @@ public int getBufferLimit() {
171177
return bufferLimit;
172178
}
173179

180+
/**
181+
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
182+
* MaybeSource terminates.
183+
*
184+
* @return the prefetch value for concatMapMaybe operator
185+
* @see WriteOptions.Builder#concatMapPrefetch(int)
186+
*/
187+
public int getConcatMapPrefetch() {
188+
return concatMapPrefetch;
189+
}
190+
174191
/**
175192
* @return The scheduler which is used for write data points.
176193
* @see WriteOptions.Builder#writeScheduler(Scheduler)
@@ -189,6 +206,14 @@ public BackpressureOverflowStrategy getBackpressureStrategy() {
189206
return backpressureStrategy;
190207
}
191208

209+
/**
210+
* @return whether to capture affected data points in backpressure events
211+
* @see WriteOptions.Builder#captureBackpressureData(boolean)
212+
*/
213+
public boolean getCaptureBackpressureData() {
214+
return captureBackpressureData;
215+
}
216+
192217
private WriteOptions(@Nonnull final Builder builder) {
193218

194219
Arguments.checkNotNull(builder, "WriteOptions.Builder");
@@ -202,8 +227,10 @@ private WriteOptions(@Nonnull final Builder builder) {
202227
maxRetryTime = builder.maxRetryTime;
203228
exponentialBase = builder.exponentialBase;
204229
bufferLimit = builder.bufferLimit;
230+
concatMapPrefetch = builder.concatMapPrefetch;
205231
writeScheduler = builder.writeScheduler;
206232
backpressureStrategy = builder.backpressureStrategy;
233+
captureBackpressureData = builder.captureBackpressureData;
207234
}
208235

209236
/**
@@ -231,8 +258,10 @@ public static class Builder {
231258
private int maxRetryTime = DEFAULT_MAX_RETRY_TIME;
232259
private int exponentialBase = DEFAULT_EXPONENTIAL_BASE;
233260
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
261+
private int concatMapPrefetch = DEFAULT_CONCAT_MAP_PREFETCH;
234262
private Scheduler writeScheduler = Schedulers.newThread();
235263
private BackpressureOverflowStrategy backpressureStrategy = BackpressureOverflowStrategy.DROP_OLDEST;
264+
private boolean captureBackpressureData = DEFAULT_CAPTURE_BACKPRESSURE_DATA;
236265

237266
/**
238267
* Set the number of data point to collect in batch.
@@ -339,7 +368,9 @@ public Builder maxRetryTime(final int maxRetryTime) {
339368
*/
340369
@Nonnull
341370
public Builder exponentialBase(final int exponentialBase) {
342-
Arguments.checkPositiveNumber(exponentialBase, "exponentialBase");
371+
if (exponentialBase < 2) {
372+
throw new IllegalArgumentException("Expecting a number >= 2 for exponentialBase");
373+
}
343374
this.exponentialBase = exponentialBase;
344375
return this;
345376
}
@@ -354,11 +385,27 @@ public Builder exponentialBase(final int exponentialBase) {
354385
*/
355386
@Nonnull
356387
public Builder bufferLimit(final int bufferLimit) {
357-
Arguments.checkNotNegativeNumber(bufferLimit, "bufferLimit");
388+
Arguments.checkPositiveNumber(bufferLimit, "bufferLimit");
358389
this.bufferLimit = bufferLimit;
359390
return this;
360391
}
361392

393+
/**
394+
* Set the prefetch value for the concatMapMaybe operator that processes write batches.
395+
*
396+
* The number of upstream items to prefetch so that fresh items are ready to be mapped when a previous
397+
* MaybeSource terminates.
398+
*
399+
* @param concatMapPrefetch the prefetch value for concatMapMaybe operator (must be positive)
400+
* @return {@code this}
401+
*/
402+
@Nonnull
403+
public Builder concatMapPrefetch(final int concatMapPrefetch) {
404+
Arguments.checkPositiveNumber(concatMapPrefetch, "concatMapPrefetch");
405+
this.concatMapPrefetch = concatMapPrefetch;
406+
return this;
407+
}
408+
362409
/**
363410
* Set the scheduler which is used for write data points. It is useful for disabling batch writes or
364411
* for tuning the performance. Default value is {@link Schedulers#newThread()}.
@@ -389,6 +436,25 @@ public Builder backpressureStrategy(@Nonnull final BackpressureOverflowStrategy
389436
return this;
390437
}
391438

439+
/**
440+
* Set whether to capture affected data points in backpressure events.
441+
*
442+
* When enabled, BackpressureEvent will include the specific line protocol points
443+
* that are affected by the backpressure condition:
444+
* - For DROP_OLDEST strategy: points that will be dropped
445+
* - For DROP_LATEST strategy: newest points being added
446+
*
447+
* Disabling this can improve performance when backpressure data capture is not needed.
448+
*
449+
* @param captureBackpressureData whether to capture affected data points. Default is false.
450+
* @return {@code this}
451+
*/
452+
@Nonnull
453+
public Builder captureBackpressureData(final boolean captureBackpressureData) {
454+
this.captureBackpressureData = captureBackpressureData;
455+
return this;
456+
}
457+
392458
/**
393459
* Build an instance of WriteOptions.
394460
*

client/src/main/java/com/influxdb/client/internal/AbstractWriteClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,15 @@ public AbstractWriteClient(@Nonnull final WriteOptions writeOptions,
161161
//
162162
.lift(new BackpressureBatchesBufferStrategy(
163163
writeOptions.getBufferLimit(),
164-
() -> publish(new BackpressureEvent(BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES)),
165-
writeOptions.getBackpressureStrategy()))
164+
bufferedPoints -> publish(new BackpressureEvent(
165+
BackpressureEvent.BackpressureReason.TOO_MUCH_BATCHES, bufferedPoints)),
166+
writeOptions.getBackpressureStrategy(),
167+
writeOptions.getCaptureBackpressureData()))
166168
//
167-
// Use concat to process batches one by one
169+
// Use concat to process batches with configurable prefetch
168170
//
169-
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions))
171+
.concatMapMaybe(new ToWritePointsMaybe(processorScheduler, writeOptions),
172+
writeOptions.getConcatMapPrefetch())
170173
.doFinally(() -> finished.set(true))
171174
.subscribe(responseNotification -> {
172175

client/src/main/java/com/influxdb/client/internal/flowable/BackpressureBatchesBufferStrategy.java

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package com.influxdb.client.internal.flowable;
22

3+
import java.util.Collections;
34
import java.util.ArrayDeque;
5+
import java.util.Arrays;
46
import java.util.Deque;
7+
import java.util.List;
8+
import java.util.stream.Collectors;
59
import java.util.concurrent.atomic.AtomicInteger;
610
import java.util.concurrent.atomic.AtomicLong;
11+
import java.util.function.Consumer;
712

813
import com.influxdb.client.internal.AbstractWriteClient;
914

@@ -13,7 +18,6 @@
1318
import io.reactivex.rxjava3.core.FlowableSubscriber;
1419
import io.reactivex.rxjava3.exceptions.Exceptions;
1520
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
16-
import io.reactivex.rxjava3.functions.Action;
1721
import io.reactivex.rxjava3.internal.operators.flowable.FlowableOnBackpressureBufferStrategy;
1822
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
1923
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
@@ -34,21 +38,32 @@ public final class BackpressureBatchesBufferStrategy implements
3438

3539
final long bufferSize;
3640

37-
final Action onOverflow;
41+
final Consumer<List<String>> onOverflow;
3842

3943
final BackpressureOverflowStrategy strategy;
4044

41-
public BackpressureBatchesBufferStrategy(long bufferSize, Action onOverflow,
42-
BackpressureOverflowStrategy strategy) {
45+
final boolean captureBackpressureData;
46+
47+
public BackpressureBatchesBufferStrategy(long bufferSize,
48+
Consumer<List<String>> onOverflow,
49+
BackpressureOverflowStrategy strategy) {
50+
this(bufferSize, onOverflow, strategy, false);
51+
}
52+
53+
public BackpressureBatchesBufferStrategy(long bufferSize,
54+
Consumer<List<String>> onOverflow,
55+
BackpressureOverflowStrategy strategy,
56+
boolean captureBackpressureData) {
4357
this.bufferSize = bufferSize;
4458
this.onOverflow = onOverflow;
4559
this.strategy = strategy;
60+
this.captureBackpressureData = captureBackpressureData;
4661
}
4762

4863
@Override
4964
public @NonNull Subscriber<? super AbstractWriteClient.BatchWriteItem> apply(
5065
@NonNull final Subscriber<? super AbstractWriteClient.BatchWriteItem> subscriber) throws Throwable {
51-
return new OnBackpressureBufferStrategySubscriber(subscriber, onOverflow, strategy, bufferSize);
66+
return new OnBackpressureBufferStrategySubscriber(subscriber, onOverflow, strategy, bufferSize, captureBackpressureData);
5267
}
5368

5469
static final class OnBackpressureBufferStrategySubscriber
@@ -59,8 +74,6 @@ static final class OnBackpressureBufferStrategySubscriber
5974

6075
final Subscriber<? super AbstractWriteClient.BatchWriteItem> downstream;
6176

62-
final Action onOverflow;
63-
6477
final BackpressureOverflowStrategy strategy;
6578

6679
final long bufferSize;
@@ -76,14 +89,20 @@ static final class OnBackpressureBufferStrategySubscriber
7689
volatile boolean done;
7790
Throwable error;
7891

92+
final Consumer<List<String>> onOverflow;
93+
94+
final boolean captureBackpressureData;
95+
7996
OnBackpressureBufferStrategySubscriber(Subscriber<? super AbstractWriteClient.BatchWriteItem> actual,
80-
Action onOverflow,
97+
Consumer<List<String>> onOverflow,
8198
BackpressureOverflowStrategy strategy,
82-
long bufferSize) {
99+
long bufferSize,
100+
boolean captureBackpressureData) {
83101
this.downstream = actual;
84102
this.onOverflow = onOverflow;
85103
this.strategy = strategy;
86104
this.bufferSize = bufferSize;
105+
this.captureBackpressureData = captureBackpressureData;
87106
this.requested = new AtomicLong();
88107
this.deque = new ArrayDeque<>();
89108
}
@@ -107,18 +126,25 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) {
107126
boolean callOnOverflow = false;
108127
boolean callError = false;
109128
Deque<AbstractWriteClient.BatchWriteItem> dq = deque;
129+
List<String> overflowSnapshot = null;
110130
synchronized (dq) {
111131
AtomicLong size = new AtomicLong(t.length());
112132
dq.forEach(batchWriteItem -> size.addAndGet(batchWriteItem.length()));
113133
if (size.get() > bufferSize) {
114134
switch (strategy) {
115135
case DROP_LATEST:
136+
if (captureBackpressureData) {
137+
overflowSnapshot = captureBatch(t);
138+
}
116139
dq.pollLast();
117140
dq.offer(t);
118141
callOnOverflow = true;
119142
break;
120143
case DROP_OLDEST:
121-
dq.poll();
144+
AbstractWriteClient.BatchWriteItem droppedBatch = dq.poll();
145+
if (captureBackpressureData && droppedBatch != null) {
146+
overflowSnapshot = captureBatch(droppedBatch);
147+
}
122148
dq.offer(t);
123149
callOnOverflow = true;
124150
break;
@@ -135,7 +161,13 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) {
135161
if (callOnOverflow) {
136162
if (onOverflow != null) {
137163
try {
138-
onOverflow.run();
164+
List<String> bufferedPoints;
165+
if (captureBackpressureData && overflowSnapshot != null) {
166+
bufferedPoints = overflowSnapshot;
167+
} else {
168+
bufferedPoints = Collections.emptyList();
169+
}
170+
onOverflow.accept(bufferedPoints);
139171
} catch (Throwable ex) {
140172
Exceptions.throwIfFatal(ex);
141173
upstream.cancel();
@@ -150,6 +182,24 @@ public void onNext(AbstractWriteClient.BatchWriteItem t) {
150182
}
151183
}
152184

185+
/**
186+
* Captures snapshot of a single batch item for overflow handling.
187+
*
188+
* @param item the batch item to capture
189+
* @return list of line protocol points from the item
190+
*/
191+
List<String> captureBatch(AbstractWriteClient.BatchWriteItem item) {
192+
String lp = item.toLineProtocol();
193+
if (lp == null || lp.isEmpty()) {
194+
return Collections.emptyList();
195+
}
196+
197+
return Arrays.stream(lp.split("\n"))
198+
.map(String::trim)
199+
.filter(s -> !s.isEmpty())
200+
.collect(Collectors.toList());
201+
}
202+
153203
@Override
154204
public void onError(Throwable t) {
155205
if (done) {

0 commit comments

Comments
 (0)