Skip to content

Commit f590c59

Browse files
authored
Added an option to enable dropping of new points written after action queue exhaustion. (#689)
1. Added an option to drop the points being written after BatchProcessor#queue is exhausted. 2. Added an option to plug in a handler which will be invoked if above happens.
1 parent 027b62e commit f590c59

File tree

7 files changed

+228
-11
lines changed

7 files changed

+228
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## 2.20 [unreleased]
44

5+
### Features
6+
- Add an option in `BatchOption` to prevent `InfluxDB#write` from blocking when actions queue is exhausted. [Issue #668](https://github.com/influxdata/influxdb-java/issues/688)
7+
8+
59
## 2.19 [2020-05-18]
610

711
## 2.18 [2020-04-17]

MANUAL.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,18 @@ It's possible to use both approaches at the same time: set a default database us
101101
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
102102
public static final int DEFAULT_BUFFER_LIMIT = 10000;
103103
public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;
104+
public static final boolean DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION = false;
104105
```
106+
#### Configuring behaviour of batch writes when the action queue exhausts
107+
With batching enabled, the client provides two options on how to deal with **action queue** (where the points are accumulated as a batch) exhaustion.
108+
1. When `dropActionsOnQueueExhaustion` is `false` (default value), `InfluxDB#write` will be blocked till the space is created in the action queue.
109+
2. When `dropActionsOnQueueExhaustion` is `true`, new writes using `InfluxDB#write` will dropped and `droppedActionHandler` will be called.
110+
Example usage:
111+
```Java
112+
influxDB.enableBatch(BatchOptions.DEFAULTS.dropActionsOnQueueExhaustion(true)
113+
.droppedActionHandler((point) -> log.error("Point dropped due to action queue exhaustion.")));
114+
```
115+
105116

106117
#### Configuring the jitter interval for batch writes
107118

src/main/java/org/influxdb/BatchOptions.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import java.util.concurrent.ThreadFactory;
77
import java.util.concurrent.TimeUnit;
88
import java.util.function.BiConsumer;
9+
import java.util.function.Consumer;
910

1011
/**
1112
* BatchOptions are used to configure batching of individual data point writes
@@ -19,6 +20,8 @@ public final class BatchOptions implements Cloneable {
1920
public static final int DEFAULT_JITTER_INTERVAL_DURATION = 0;
2021
public static final int DEFAULT_BUFFER_LIMIT = 10000;
2122
public static final TimeUnit DEFAULT_PRECISION = TimeUnit.NANOSECONDS;
23+
public static final boolean DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION = false;
24+
2225

2326
/**
2427
* Default batch options. This class is immutable, each configuration
@@ -32,10 +35,14 @@ public final class BatchOptions implements Cloneable {
3235
private int jitterDuration = DEFAULT_JITTER_INTERVAL_DURATION;
3336
private int bufferLimit = DEFAULT_BUFFER_LIMIT;
3437
private TimeUnit precision = DEFAULT_PRECISION;
38+
private boolean dropActionsOnQueueExhaustion = DEFAULT_DROP_ACTIONS_ON_QUEUE_EXHAUSTION;
39+
private Consumer<Point> droppedActionHandler = (point) -> {
40+
};
3541

3642
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
3743
BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (points, throwable) -> {
3844
};
45+
3946
private InfluxDB.ConsistencyLevel consistency = InfluxDB.ConsistencyLevel.ONE;
4047

4148
private BatchOptions() {
@@ -133,6 +140,33 @@ public BatchOptions precision(final TimeUnit precision) {
133140
return clone;
134141
}
135142

143+
/**
144+
* Set to define the behaviour when the action queue exhausts. If unspecified, will default to false which means
145+
* that the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
146+
* true means that the newer actions being written to the queue will be dropped and
147+
* {@link BatchOptions#droppedActionHandler} will be called.
148+
* @param dropActionsOnQueueExhaustion sets the behavior
149+
* @return the BatchOptions instance to be able to use it in a fluent manner.
150+
*/
151+
public BatchOptions dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
152+
BatchOptions clone = getClone();
153+
clone.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
154+
return clone;
155+
}
156+
157+
/**
158+
* Handler to handle dropped actions due to queue actions. This is only valid when
159+
* {@link BatchOptions#dropActionsOnQueueExhaustion} is set to true.
160+
* @param droppedActionHandler to handle action drops on action queue exhaustion.
161+
* @return the BatchOptions instance to be able to use it in a fluent manner.
162+
*/
163+
public BatchOptions droppedActionHandler(final Consumer<Point> droppedActionHandler) {
164+
BatchOptions clone = getClone();
165+
clone.droppedActionHandler = droppedActionHandler;
166+
return clone;
167+
}
168+
169+
136170
/**
137171
* @return actions the number of actions to collect
138172
*/
@@ -190,6 +224,21 @@ public TimeUnit getPrecision() {
190224
return precision;
191225
}
192226

227+
228+
/**
229+
* @return a boolean determining whether to drop actions on action queue exhaustion.
230+
*/
231+
public boolean isDropActionsOnQueueExhaustion() {
232+
return dropActionsOnQueueExhaustion;
233+
}
234+
235+
/**
236+
* @return a consumer function to handle actions drops on action queue exhaustion.
237+
*/
238+
public Consumer<Point> getDroppedActionHandler() {
239+
return droppedActionHandler;
240+
}
241+
193242
private BatchOptions getClone() {
194243
try {
195244
return (BatchOptions) this.clone();

src/main/java/org/influxdb/impl/BatchProcessor.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.ThreadFactory;
2020
import java.util.concurrent.TimeUnit;
2121
import java.util.function.BiConsumer;
22+
import java.util.function.Consumer;
2223
import java.util.logging.Level;
2324
import java.util.logging.Logger;
2425

@@ -43,6 +44,8 @@ public final class BatchProcessor {
4344
private final int jitterInterval;
4445
private final TimeUnit precision;
4546
private final BatchWriter batchWriter;
47+
private boolean dropActionsOnQueueExhaustion;
48+
Consumer<Point> droppedActionHandler;
4649

4750
/**
4851
* The Builder to create a BatchProcessor instance.
@@ -62,6 +65,8 @@ public static final class Builder {
6265
private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> { };
6366
private ConsistencyLevel consistencyLevel;
6467

68+
private boolean dropActionsOnQueueExhaustion;
69+
private Consumer<Point> droppedActionsHandler;
6570
/**
6671
* @param threadFactory
6772
* is optional.
@@ -152,6 +157,37 @@ public Builder exceptionHandler(final BiConsumer<Iterable<Point>, Throwable> han
152157
return this;
153158
}
154159

160+
/**
161+
* To define the behaviour when the action queue exhausts. If unspecified, will default to false which means that
162+
* the {@link InfluxDB#write(Point)} will be blocked till the space in the queue is created.
163+
* true means that the newer actions being written to the queue will dropped and
164+
* {@link BatchProcessor#droppedActionHandler} will be called.
165+
*
166+
* @param dropActionsOnQueueExhaustion
167+
* the dropActionsOnQueueExhaustion
168+
*
169+
* @return this Builder to use it fluent
170+
*/
171+
public Builder dropActionsOnQueueExhaustion(final boolean dropActionsOnQueueExhaustion) {
172+
this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
173+
return this;
174+
}
175+
176+
/**
177+
* A callback to be used when an actions are dropped on action queue exhaustion.
178+
*
179+
* @param handler
180+
* the handler
181+
*
182+
* @return this Builder to use it fluent
183+
*/
184+
public Builder droppedActionHandler(final Consumer<Point> handler) {
185+
this.droppedActionsHandler = handler;
186+
return this;
187+
}
188+
189+
190+
155191
/**
156192
* Consistency level for batch write.
157193
*
@@ -200,7 +236,7 @@ public BatchProcessor build() {
200236
}
201237
return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit,
202238
this.flushInterval, this.jitterInterval, exceptionHandler, this.consistencyLevel,
203-
this.precision);
239+
this.precision, this.dropActionsOnQueueExhaustion, this.droppedActionsHandler);
204240
}
205241
}
206242

@@ -262,7 +298,8 @@ public static Builder builder(final InfluxDB influxDB) {
262298
BatchProcessor(final InfluxDB influxDB, final BatchWriter batchWriter, final ThreadFactory threadFactory,
263299
final int actions, final TimeUnit flushIntervalUnit, final int flushInterval, final int jitterInterval,
264300
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
265-
final ConsistencyLevel consistencyLevel, final TimeUnit precision) {
301+
final ConsistencyLevel consistencyLevel, final TimeUnit precision,
302+
final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
266303
super();
267304
this.influxDB = influxDB;
268305
this.batchWriter = batchWriter;
@@ -274,6 +311,8 @@ public static Builder builder(final InfluxDB influxDB) {
274311
this.exceptionHandler = exceptionHandler;
275312
this.consistencyLevel = consistencyLevel;
276313
this.precision = precision;
314+
this.dropActionsOnQueueExhaustion = dropActionsOnQueueExhaustion;
315+
this.droppedActionHandler = droppedActionHandler;
277316
if (actions > 1 && actions < Integer.MAX_VALUE) {
278317
this.queue = new LinkedBlockingQueue<>(actions);
279318
} else {
@@ -359,7 +398,14 @@ void write() {
359398
*/
360399
void put(final AbstractBatchEntry batchEntry) {
361400
try {
362-
this.queue.put(batchEntry);
401+
if (this.dropActionsOnQueueExhaustion) {
402+
if (!this.queue.offer(batchEntry)) {
403+
this.droppedActionHandler.accept(batchEntry.getPoint());
404+
return;
405+
}
406+
} else {
407+
this.queue.put(batchEntry);
408+
}
363409
} catch (InterruptedException e) {
364410
throw new RuntimeException(e);
365411
}
@@ -403,4 +449,11 @@ BatchWriter getBatchWriter() {
403449
return batchWriter;
404450
}
405451

452+
public boolean isDropActionsOnQueueExhaustion() {
453+
return dropActionsOnQueueExhaustion;
454+
}
455+
456+
public Consumer<Point> getDroppedActionHandler() {
457+
return droppedActionHandler;
458+
}
406459
}

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ public InfluxDB enableBatch(final BatchOptions batchOptions) {
310310
.bufferLimit(batchOptions.getBufferLimit())
311311
.consistencyLevel(batchOptions.getConsistency())
312312
.precision(batchOptions.getPrecision())
313+
.dropActionsOnQueueExhaustion(batchOptions.isDropActionsOnQueueExhaustion())
314+
.droppedActionHandler(batchOptions.getDroppedActionHandler())
313315
.build();
314316
this.batchEnabled.set(true);
315317
return this;
@@ -343,13 +345,14 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, final Ti
343345
public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit,
344346
final ThreadFactory threadFactory,
345347
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
346-
enableBatch(actions, flushDuration, 0, flushDurationTimeUnit, threadFactory, exceptionHandler);
348+
enableBatch(actions, flushDuration, 0, flushDurationTimeUnit, threadFactory, exceptionHandler, false, null);
347349
return this;
348350
}
349351

350352
private InfluxDB enableBatch(final int actions, final int flushDuration, final int jitterDuration,
351353
final TimeUnit durationTimeUnit, final ThreadFactory threadFactory,
352-
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler) {
354+
final BiConsumer<Iterable<Point>, Throwable> exceptionHandler,
355+
final boolean dropActionsOnQueueExhaustion, final Consumer<Point> droppedActionHandler) {
353356
if (this.batchEnabled.get()) {
354357
throw new IllegalStateException("BatchProcessing is already enabled.");
355358
}
@@ -360,6 +363,8 @@ private InfluxDB enableBatch(final int actions, final int flushDuration, final i
360363
.interval(flushDuration, jitterDuration, durationTimeUnit)
361364
.threadFactory(threadFactory)
362365
.consistencyLevel(consistency)
366+
.dropActionsOnQueueExhaustion(dropActionsOnQueueExhaustion)
367+
.droppedActionHandler(droppedActionHandler)
363368
.build();
364369
this.batchEnabled.set(true);
365370
return this;

0 commit comments

Comments
 (0)