Skip to content

Commit 05ed43d

Browse files
authored
DAT-226: Use Netty's FastThreadLocal (#196)
This commit also gets rid of Stormpot BlazePool and replaces it with the groups() operator, which showed noticeable thorughput improvement.
1 parent 1b23f16 commit 05ed43d

File tree

68 files changed

+246
-257
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+246
-257
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [improvement] DAT-108: Upgrade DSE driver to 1.6.3.
99
- [improvement] DAT-227: Warn that continuous paging is not available when unloading with CL > ONE.
1010
- [new feature] DAT-224: Add support for numeric overflow and rounding.
11+
- [improvement] DAT-226: Use Netty's FastThreadLocal.
1112

1213

1314
### 1.0.0-rc1

connectors/csv/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@
4444
</dependency>
4545

4646
<dependency>
47-
<groupId>com.github.chrisvest</groupId>
48-
<artifactId>stormpot</artifactId>
47+
<groupId>io.netty</groupId>
48+
<artifactId>netty-common</artifactId>
4949
</dependency>
5050

5151
<dependency>

connectors/csv/src/main/java/com/datastax/dsbulk/connectors/csv/CSVConnector.java

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.univocity.parsers.csv.CsvParserSettings;
3030
import com.univocity.parsers.csv.CsvWriter;
3131
import com.univocity.parsers.csv.CsvWriterSettings;
32+
import io.netty.util.concurrent.DefaultThreadFactory;
3233
import java.io.IOException;
3334
import java.io.Reader;
3435
import java.io.UncheckedIOException;
@@ -44,7 +45,9 @@
4445
import java.nio.file.PathMatcher;
4546
import java.nio.file.Paths;
4647
import java.util.Arrays;
47-
import java.util.concurrent.TimeUnit;
48+
import java.util.List;
49+
import java.util.concurrent.CopyOnWriteArrayList;
50+
import java.util.concurrent.ThreadFactory;
4851
import java.util.concurrent.atomic.AtomicInteger;
4952
import java.util.function.Function;
5053
import java.util.function.Supplier;
@@ -58,13 +61,6 @@
5861
import reactor.core.publisher.Signal;
5962
import reactor.core.scheduler.Scheduler;
6063
import reactor.core.scheduler.Schedulers;
61-
import reactor.util.concurrent.Queues;
62-
import stormpot.Allocator;
63-
import stormpot.BlazePool;
64-
import stormpot.Config;
65-
import stormpot.Poolable;
66-
import stormpot.Slot;
67-
import stormpot.Timeout;
6864

6965
/**
7066
* A connector for CSV files.
@@ -115,7 +111,7 @@ public class CSVConnector implements Connector {
115111
private CsvWriterSettings writerSettings;
116112
private AtomicInteger counter;
117113
private Scheduler scheduler;
118-
private BlazePool<PoolableCSVWriter> pool;
114+
private List<CSVWriter> writers;
119115

120116
@Override
121117
public void configure(LoaderConfig settings, boolean read) {
@@ -176,12 +172,12 @@ public void init() throws URISyntaxException, IOException {
176172
}
177173

178174
@Override
179-
public void close() throws InterruptedException {
175+
public void close() {
180176
if (scheduler != null) {
181177
scheduler.dispose();
182178
}
183-
if (pool != null) {
184-
pool.shutdown().await(new Timeout(1, TimeUnit.MINUTES));
179+
if (writers != null) {
180+
writers.forEach(CSVWriter::close);
185181
}
186182
}
187183

@@ -219,38 +215,21 @@ public Supplier<? extends Publisher<Publisher<Record>>> readByResource() {
219215
assert !read;
220216
if (root != null && maxConcurrentFiles > 1) {
221217
return upstream -> {
222-
scheduler = Schedulers.newParallel("csv-connector", maxConcurrentFiles);
223-
Config<PoolableCSVWriter> config =
224-
new Config<PoolableCSVWriter>()
225-
.setSize(maxConcurrentFiles)
226-
.setAllocator(new CSVWriterAllocator());
227-
pool = new BlazePool<>(config);
228-
Timeout timeout = new Timeout(Long.MAX_VALUE, TimeUnit.SECONDS);
218+
ThreadFactory threadFactory = new DefaultThreadFactory("csv-connector");
219+
scheduler = Schedulers.newParallel(maxConcurrentFiles, threadFactory);
220+
writers = new CopyOnWriteArrayList<>();
221+
for (int i = 0; i < maxConcurrentFiles; i++) {
222+
writers.add(new CSVWriter());
223+
}
229224
return Flux.from(upstream)
230-
.window(Queues.SMALL_BUFFER_SIZE)
231225
.parallel(maxConcurrentFiles)
232226
.runOn(scheduler)
233-
.flatMap(
234-
records -> {
235-
try {
236-
PoolableCSVWriter writer = pool.claim(timeout);
237-
if (writer != null) {
238-
try {
239-
return records.transform(writeRecords(writer));
240-
} finally {
241-
writer.release();
242-
}
243-
}
244-
} catch (InterruptedException e) {
245-
Thread.currentThread().interrupt();
246-
}
247-
return Flux.empty();
248-
})
249-
.sequential();
227+
.groups()
228+
.flatMap(records -> records.transform(writeRecords(writers.get(records.key()))));
250229
};
251230
} else {
252231
return upstream -> {
253-
PoolableCSVWriter writer = new PoolableCSVWriter(null);
232+
CSVWriter writer = new CSVWriter();
254233
return Flux.from(upstream).transform(writeRecords(writer)).doOnTerminate(writer::close);
255234
};
256235
}
@@ -396,7 +375,7 @@ private Flux<URL> scanRootDirectory() {
396375
});
397376
}
398377

399-
private Function<Flux<Record>, Flux<Record>> writeRecords(PoolableCSVWriter writer) {
378+
private Function<Flux<Record>, Flux<Record>> writeRecords(CSVWriter writer) {
400379
return upstream ->
401380
upstream
402381
.materialize()
@@ -414,34 +393,11 @@ private Function<Flux<Record>, Flux<Record>> writeRecords(PoolableCSVWriter writ
414393
.dematerialize();
415394
}
416395

417-
private class CSVWriterAllocator implements Allocator<PoolableCSVWriter> {
418-
@Override
419-
public PoolableCSVWriter allocate(Slot slot) {
420-
return new PoolableCSVWriter(slot);
421-
}
422-
423-
@Override
424-
public void deallocate(PoolableCSVWriter writer) {
425-
writer.close();
426-
}
427-
}
428-
429-
private class PoolableCSVWriter implements Poolable {
430-
431-
private final Slot slot;
396+
private class CSVWriter {
432397

433398
private URL url;
434399
private CsvWriter writer;
435400

436-
private PoolableCSVWriter(Slot slot) {
437-
this.slot = slot;
438-
}
439-
440-
@Override
441-
public void release() {
442-
slot.release(this);
443-
}
444-
445401
private void write(Record record) {
446402
try {
447403
if (writer == null) {

connectors/json/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
</dependency>
5050

5151
<dependency>
52-
<groupId>com.github.chrisvest</groupId>
53-
<artifactId>stormpot</artifactId>
52+
<groupId>io.netty</groupId>
53+
<artifactId>netty-common</artifactId>
5454
</dependency>
5555

5656
<dependency>

connectors/json/src/main/java/com/datastax/dsbulk/connectors/json/JsonConnector.java

Lines changed: 23 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.fasterxml.jackson.core.JsonParseException;
2727
import com.fasterxml.jackson.core.JsonParser;
2828
import com.fasterxml.jackson.core.JsonToken;
29+
import com.fasterxml.jackson.core.TreeNode;
2930
import com.fasterxml.jackson.core.io.SerializedString;
3031
import com.fasterxml.jackson.core.type.TypeReference;
3132
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
@@ -38,6 +39,7 @@
3839
import com.fasterxml.jackson.databind.SerializationFeature;
3940
import com.google.common.base.Suppliers;
4041
import com.typesafe.config.ConfigException;
42+
import io.netty.util.concurrent.DefaultThreadFactory;
4143
import java.io.BufferedReader;
4244
import java.io.IOException;
4345
import java.io.UncheckedIOException;
@@ -53,8 +55,10 @@
5355
import java.nio.file.PathMatcher;
5456
import java.nio.file.Paths;
5557
import java.util.HashMap;
58+
import java.util.List;
5659
import java.util.Map;
57-
import java.util.concurrent.TimeUnit;
60+
import java.util.concurrent.CopyOnWriteArrayList;
61+
import java.util.concurrent.ThreadFactory;
5862
import java.util.concurrent.atomic.AtomicInteger;
5963
import java.util.function.Function;
6064
import java.util.function.Supplier;
@@ -67,13 +71,6 @@
6771
import reactor.core.publisher.Signal;
6872
import reactor.core.scheduler.Scheduler;
6973
import reactor.core.scheduler.Schedulers;
70-
import reactor.util.concurrent.Queues;
71-
import stormpot.Allocator;
72-
import stormpot.BlazePool;
73-
import stormpot.Config;
74-
import stormpot.Poolable;
75-
import stormpot.Slot;
76-
import stormpot.Timeout;
7774

7875
/**
7976
* A connector for Json files.
@@ -133,10 +130,9 @@ enum DocumentMode {
133130
private Map<MapperFeature, Boolean> mapperFeatures;
134131
private Map<SerializationFeature, Boolean> serializationFeatures;
135132
private Map<DeserializationFeature, Boolean> deserializationFeatures;
136-
137133
private boolean prettyPrint;
138134
private Scheduler scheduler;
139-
private BlazePool<PoolableJsonWriter> pool;
135+
private List<JsonWriter> writers;
140136

141137
@Override
142138
public RecordMetadata getRecordMetadata() {
@@ -211,12 +207,12 @@ public void init() throws URISyntaxException, IOException {
211207
}
212208

213209
@Override
214-
public void close() throws InterruptedException {
210+
public void close() {
215211
if (scheduler != null) {
216212
scheduler.dispose();
217213
}
218-
if (pool != null) {
219-
pool.shutdown().await(new Timeout(1, TimeUnit.MINUTES));
214+
if (writers != null) {
215+
writers.forEach(JsonWriter::close);
220216
}
221217
}
222218

@@ -254,38 +250,21 @@ public Supplier<? extends Publisher<Publisher<Record>>> readByResource() {
254250
assert !read;
255251
if (root != null && maxConcurrentFiles > 1) {
256252
return upstream -> {
257-
scheduler = Schedulers.newParallel("json-connector", maxConcurrentFiles);
258-
Config<PoolableJsonWriter> config =
259-
new Config<PoolableJsonWriter>()
260-
.setSize(maxConcurrentFiles)
261-
.setAllocator(new JsonWriterAllocator());
262-
pool = new BlazePool<>(config);
263-
Timeout timeout = new Timeout(Long.MAX_VALUE, TimeUnit.SECONDS);
253+
ThreadFactory threadFactory = new DefaultThreadFactory("json-connector");
254+
scheduler = Schedulers.newParallel(maxConcurrentFiles, threadFactory);
255+
writers = new CopyOnWriteArrayList<>();
256+
for (int i = 0; i < maxConcurrentFiles; i++) {
257+
writers.add(new JsonWriter());
258+
}
264259
return Flux.from(upstream)
265-
.window(Queues.SMALL_BUFFER_SIZE)
266260
.parallel(maxConcurrentFiles)
267261
.runOn(scheduler)
268-
.flatMap(
269-
records -> {
270-
try {
271-
PoolableJsonWriter writer = pool.claim(timeout);
272-
if (writer != null) {
273-
try {
274-
return records.transform(writeRecords(writer));
275-
} finally {
276-
writer.release();
277-
}
278-
}
279-
} catch (InterruptedException e) {
280-
Thread.currentThread().interrupt();
281-
}
282-
return Flux.empty();
283-
})
284-
.sequential();
262+
.groups()
263+
.flatMap(records -> records.transform(writeRecords(writers.get(records.key()))));
285264
};
286265
} else {
287266
return upstream -> {
288-
PoolableJsonWriter writer = new PoolableJsonWriter(null);
267+
JsonWriter writer = new JsonWriter();
289268
return Flux.from(upstream).transform(writeRecords(writer)).doOnTerminate(writer::close);
290269
};
291270
}
@@ -415,7 +394,7 @@ private Flux<URL> scanRootDirectory() {
415394
});
416395
}
417396

418-
private Function<Flux<Record>, Flux<Record>> writeRecords(PoolableJsonWriter writer) {
397+
private Function<Flux<Record>, Flux<Record>> writeRecords(JsonWriter writer) {
419398
return upstream ->
420399
upstream
421400
.materialize()
@@ -433,35 +412,12 @@ private Function<Flux<Record>, Flux<Record>> writeRecords(PoolableJsonWriter wri
433412
.dematerialize();
434413
}
435414

436-
private class JsonWriterAllocator implements Allocator<PoolableJsonWriter> {
437-
@Override
438-
public PoolableJsonWriter allocate(Slot slot) {
439-
return new PoolableJsonWriter(slot);
440-
}
441-
442-
@Override
443-
public void deallocate(PoolableJsonWriter writer) {
444-
writer.close();
445-
}
446-
}
447-
448-
private class PoolableJsonWriter implements Poolable {
449-
450-
private final Slot slot;
415+
private class JsonWriter {
451416

452417
private URL url;
453418
private JsonGenerator writer;
454419
private long currentLine;
455420

456-
private PoolableJsonWriter(Slot slot) {
457-
this.slot = slot;
458-
}
459-
460-
@Override
461-
public void release() {
462-
slot.release(this);
463-
}
464-
465421
private void write(Record record) {
466422
try {
467423
if (writer == null) {
@@ -477,7 +433,7 @@ private void write(Record record) {
477433
writer.writeStartObject();
478434
for (String field : record.fields()) {
479435
writer.writeFieldName(field);
480-
writer.writeObject(record.getFieldValue(field));
436+
writer.writeTree((TreeNode) record.getFieldValue(field));
481437
}
482438
writer.writeEndObject();
483439
currentLine++;
@@ -495,7 +451,8 @@ private void open() throws IOException {
495451
url = getOrCreateDestinationURL();
496452
writer = createJsonWriter(url);
497453
if (mode == DocumentMode.SINGLE_DOCUMENT) {
498-
// do not use writer.writeStartArray(): we need to fool the parser into thinking it's on multi doc mode,
454+
// do not use writer.writeStartArray(): we need to fool the parser into thinking it's on
455+
// multi doc mode,
499456
// to get a better-looking result
500457
writer.writeRaw('[');
501458
writer.writeRaw(System.lineSeparator());

connectors/json/src/test/java/com/datastax/dsbulk/connectors/json/JsonConnectorTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.slf4j.LoggerFactory;
4949
import reactor.core.publisher.Flux;
5050

51-
/** */
5251
@SuppressWarnings("Duplicates")
5352
class JsonConnectorTest {
5453

@@ -198,7 +197,15 @@ void should_write_to_stdout_with_special_encoding() throws Exception {
198197
connector.configure(settings, false);
199198
connector.init();
200199
assertThat(connector.isWriteToStandardOutput()).isTrue();
201-
Flux.<Record>just(new DefaultRecord(null, null, -1, null, "fóô", "bàr", "qïx"))
200+
Flux.<Record>just(
201+
new DefaultRecord(
202+
null,
203+
null,
204+
-1,
205+
null,
206+
factory.textNode("fóô"),
207+
factory.textNode("bàr"),
208+
factory.textNode("qïx")))
202209
.transform(connector.write())
203210
.blockLast();
204211
assertThat(new String(baos.toByteArray(), "ISO-8859-1"))

0 commit comments

Comments
 (0)