Skip to content

Commit 5800440

Browse files
committed
Add virtual thread backed thread pool semantics limiting concurrency and queueing
1 parent 829fb56 commit 5800440

File tree

36 files changed

+554
-605
lines changed

36 files changed

+554
-605
lines changed

libs/core/src/main/java/org/elasticsearch/core/Streams.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
*/
2323
public class Streams {
2424

25-
private static final ThreadLocal<byte[]> LOCAL_BUFFER = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
25+
private static final ObjectPool<byte[]> LOCAL_BUFFER = ObjectPool.withInitial(() -> new byte[8 * 1024]);
2626

2727
private Streams() {
2828

@@ -63,7 +63,9 @@ public static long copy(final InputStream in, final OutputStream out, byte[] buf
6363
* @see #copy(InputStream, OutputStream, byte[], boolean)
6464
*/
6565
public static long copy(final InputStream in, final OutputStream out, boolean close) throws IOException {
66-
return copy(in, out, LOCAL_BUFFER.get(), close);
66+
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
67+
return copy(in, out, pooledBuffer.get(), close);
68+
}
6769
}
6870

6971
/**
@@ -77,7 +79,9 @@ public static long copy(final InputStream in, final OutputStream out, byte[] buf
7779
* @see #copy(InputStream, OutputStream, byte[], boolean)
7880
*/
7981
public static long copy(final InputStream in, final OutputStream out) throws IOException {
80-
return copy(in, out, LOCAL_BUFFER.get(), true);
82+
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
83+
return copy(in, out, pooledBuffer.get(), true);
84+
}
8185
}
8286

8387
/**
@@ -107,17 +111,19 @@ private static int readToHeapBuffer(InputStream input, ByteBuffer buffer, int co
107111

108112
private static int readToDirectBuffer(InputStream input, ByteBuffer b, int count) throws IOException {
109113
int totalRead = 0;
110-
final byte[] buffer = LOCAL_BUFFER.get();
111-
while (totalRead < count) {
112-
final int len = Math.min(count - totalRead, buffer.length);
113-
final int read = input.read(buffer, 0, len);
114-
if (read == -1) {
115-
break;
114+
try (var pooledBuffer = LOCAL_BUFFER.acquire()) {
115+
final byte[] buffer = pooledBuffer.get();
116+
while (totalRead < count) {
117+
final int len = Math.min(count - totalRead, buffer.length);
118+
final int read = input.read(buffer, 0, len);
119+
if (read == -1) {
120+
break;
121+
}
122+
b.put(buffer, 0, read);
123+
totalRead += read;
116124
}
117-
b.put(buffer, 0, read);
118-
totalRead += read;
125+
return totalRead;
119126
}
120-
return totalRead;
121127
}
122128

123129
public static int readFully(InputStream reader, byte[] dest) throws IOException {

libs/lz4/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
module org.elasticsearch.lz4 {
1111
requires org.lz4.java;
12+
requires org.elasticsearch.base;
1213

1314
exports org.elasticsearch.lz4;
1415
}

libs/lz4/src/main/java/org/elasticsearch/lz4/ESLZ4Compressor.java

Lines changed: 150 additions & 144 deletions
Large diffs are not rendered by default.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/CommunityIdProcessor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import org.elasticsearch.cluster.metadata.ProjectId;
1313
import org.elasticsearch.common.network.InetAddresses;
14+
import org.elasticsearch.core.ObjectPool;
1415
import org.elasticsearch.ingest.AbstractProcessor;
1516
import org.elasticsearch.ingest.ConfigurationUtils;
1617
import org.elasticsearch.ingest.IngestDocument;
@@ -36,7 +37,7 @@ public final class CommunityIdProcessor extends AbstractProcessor {
3637

3738
public static final String TYPE = "community_id";
3839

39-
private static final ThreadLocal<MessageDigest> MESSAGE_DIGEST = ThreadLocal.withInitial(() -> {
40+
private static final ObjectPool<MessageDigest> MESSAGE_DIGEST = ObjectPool.withInitial(() -> {
4041
try {
4142
return MessageDigest.getInstance("SHA-1");
4243
} catch (NoSuchAlgorithmException e) {
@@ -397,10 +398,13 @@ byte[] toBytes() {
397398
}
398399

399400
String toCommunityId(byte[] seed) {
400-
MessageDigest md = MESSAGE_DIGEST.get();
401-
md.reset();
402-
md.update(seed);
403-
byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
401+
byte[] encodedBytes;
402+
try (var pooledMessageDigest = MESSAGE_DIGEST.acquire()) {
403+
MessageDigest md = pooledMessageDigest.get();
404+
md.reset();
405+
md.update(seed);
406+
encodedBytes = Base64.getEncoder().encode(md.digest(toBytes()));
407+
}
404408
return "1:" + new String(encodedBytes, StandardCharsets.UTF_8);
405409
}
406410
}

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/FingerprintProcessor.java

Lines changed: 59 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.Strings;
1414
import org.elasticsearch.common.hash.Murmur3Hasher;
1515
import org.elasticsearch.common.util.ByteUtils;
16+
import org.elasticsearch.core.ObjectPool;
1617
import org.elasticsearch.ingest.AbstractProcessor;
1718
import org.elasticsearch.ingest.ConfigurationUtils;
1819
import org.elasticsearch.ingest.IngestDocument;
@@ -49,7 +50,7 @@ public final class FingerprintProcessor extends AbstractProcessor {
4950

5051
private final List<String> fields;
5152
private final String targetField;
52-
private final ThreadLocal<Hasher> threadLocalHasher;
53+
private final ObjectPool<Hasher> hasherPool;
5354
private final byte[] salt;
5455
private final boolean ignoreMissing;
5556

@@ -59,77 +60,79 @@ public final class FingerprintProcessor extends AbstractProcessor {
5960
List<String> fields,
6061
String targetField,
6162
byte[] salt,
62-
ThreadLocal<Hasher> threadLocalHasher,
63+
ObjectPool<Hasher> hasherPool,
6364
boolean ignoreMissing
6465
) {
6566
super(tag, description);
6667
this.fields = new ArrayList<>(fields);
6768
this.fields.sort(Comparator.naturalOrder());
6869
this.targetField = targetField;
69-
this.threadLocalHasher = threadLocalHasher;
70+
this.hasherPool = hasherPool;
7071
this.salt = salt;
7172
this.ignoreMissing = ignoreMissing;
7273
}
7374

7475
@Override
7576
@SuppressWarnings("unchecked")
7677
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
77-
Hasher hasher = threadLocalHasher.get();
78-
hasher.reset();
79-
hasher.update(salt);
80-
81-
var values = new Stack<>();
82-
for (int k = fields.size() - 1; k >= 0; k--) {
83-
String field = fields.get(k);
84-
Object value = ingestDocument.getFieldValue(field, Object.class, true);
85-
if (value == null) {
86-
if (ignoreMissing) {
87-
continue;
88-
} else {
89-
throw new IllegalArgumentException("missing field [" + field + "] when calculating fingerprint");
78+
try (var pooledHasher = hasherPool.acquire()) {
79+
Hasher hasher = pooledHasher.get();
80+
hasher.reset();
81+
hasher.update(salt);
82+
83+
var values = new Stack<>();
84+
for (int k = fields.size() - 1; k >= 0; k--) {
85+
String field = fields.get(k);
86+
Object value = ingestDocument.getFieldValue(field, Object.class, true);
87+
if (value == null) {
88+
if (ignoreMissing) {
89+
continue;
90+
} else {
91+
throw new IllegalArgumentException("missing field [" + field + "] when calculating fingerprint");
92+
}
9093
}
94+
values.push(value);
9195
}
92-
values.push(value);
93-
}
9496

95-
if (values.size() > 0) {
96-
// iteratively traverse document fields
97-
while (values.isEmpty() == false) {
98-
var value = values.pop();
99-
if (value instanceof List<?> list) {
100-
for (int k = list.size() - 1; k >= 0; k--) {
101-
values.push(list.get(k));
102-
}
103-
} else if (value instanceof Set) {
104-
@SuppressWarnings("rawtypes")
105-
var set = (Set<Comparable>) value;
106-
// process set entries in consistent order
107-
var setList = new ArrayList<>(set);
108-
setList.sort(Comparator.naturalOrder());
109-
for (int k = setList.size() - 1; k >= 0; k--) {
110-
values.push(setList.get(k));
97+
if (values.size() > 0) {
98+
// iteratively traverse document fields
99+
while (values.isEmpty() == false) {
100+
var value = values.pop();
101+
if (value instanceof List<?> list) {
102+
for (int k = list.size() - 1; k >= 0; k--) {
103+
values.push(list.get(k));
104+
}
105+
} else if (value instanceof Set) {
106+
@SuppressWarnings("rawtypes")
107+
var set = (Set<Comparable>) value;
108+
// process set entries in consistent order
109+
var setList = new ArrayList<>(set);
110+
setList.sort(Comparator.naturalOrder());
111+
for (int k = setList.size() - 1; k >= 0; k--) {
112+
values.push(setList.get(k));
113+
}
114+
} else if (value instanceof Map) {
115+
var map = (Map<String, Object>) value;
116+
// process map entries in consistent order
117+
@SuppressWarnings("rawtypes")
118+
var entryList = new ArrayList<>(map.entrySet());
119+
entryList.sort(Map.Entry.comparingByKey(Comparator.naturalOrder()));
120+
for (int k = entryList.size() - 1; k >= 0; k--) {
121+
values.push(entryList.get(k));
122+
}
123+
} else if (value instanceof Map.Entry<?, ?> entry) {
124+
hasher.update(DELIMITER);
125+
hasher.update(toBytes(entry.getKey()));
126+
values.push(entry.getValue());
127+
} else {
128+
// feed them through digest.update
129+
hasher.update(DELIMITER);
130+
hasher.update(toBytes(value));
111131
}
112-
} else if (value instanceof Map) {
113-
var map = (Map<String, Object>) value;
114-
// process map entries in consistent order
115-
@SuppressWarnings("rawtypes")
116-
var entryList = new ArrayList<>(map.entrySet());
117-
entryList.sort(Map.Entry.comparingByKey(Comparator.naturalOrder()));
118-
for (int k = entryList.size() - 1; k >= 0; k--) {
119-
values.push(entryList.get(k));
120-
}
121-
} else if (value instanceof Map.Entry<?, ?> entry) {
122-
hasher.update(DELIMITER);
123-
hasher.update(toBytes(entry.getKey()));
124-
values.push(entry.getValue());
125-
} else {
126-
// feed them through digest.update
127-
hasher.update(DELIMITER);
128-
hasher.update(toBytes(value));
129132
}
130-
}
131133

132-
ingestDocument.setFieldValue(targetField, Base64.getEncoder().encodeToString(hasher.digest()));
134+
ingestDocument.setFieldValue(targetField, Base64.getEncoder().encodeToString(hasher.digest()));
135+
}
133136
}
134137

135138
return ingestDocument;
@@ -198,8 +201,8 @@ public String getTargetField() {
198201
return targetField;
199202
}
200203

201-
public ThreadLocal<Hasher> getThreadLocalHasher() {
202-
return threadLocalHasher;
204+
public ObjectPool<Hasher> getHasherPool() {
205+
return hasherPool;
203206
}
204207

205208
public byte[] getSalt() {
@@ -253,7 +256,7 @@ public FingerprintProcessor create(
253256
)
254257
);
255258
}
256-
ThreadLocal<Hasher> threadLocalHasher = ThreadLocal.withInitial(() -> {
259+
ObjectPool<Hasher> threadLocalHasher = ObjectPool.withInitial(() -> {
257260
try {
258261
return MessageDigestHasher.getInstance(method);
259262
} catch (NoSuchAlgorithmException e) {

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FingerprintProcessorFactoryTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ public void testCreate() throws Exception {
5353
assertThat(fingerprintProcessor.getFields(), equalTo(sortedFieldList));
5454
assertThat(fingerprintProcessor.getTargetField(), equalTo(targetField));
5555
assertThat(fingerprintProcessor.getSalt(), equalTo(salt.getBytes(StandardCharsets.UTF_8)));
56-
assertThat(fingerprintProcessor.getThreadLocalHasher().get().getAlgorithm(), equalTo(method));
56+
try (var pooledHasher = fingerprintProcessor.getHasherPool().acquire()) {
57+
assertThat(pooledHasher.get().getAlgorithm(), equalTo(method));
58+
}
5759
assertThat(fingerprintProcessor.isIgnoreMissing(), equalTo(ignoreMissing));
5860
}
5961

@@ -71,7 +73,9 @@ public void testMethod() throws Exception {
7173
FingerprintProcessor fingerprintProcessor = factory.create(null, processorTag, null, config, null);
7274
assertThat(fingerprintProcessor.getTag(), equalTo(processorTag));
7375
assertThat(fingerprintProcessor.getFields(), equalTo(sortedFieldList));
74-
assertThat(fingerprintProcessor.getThreadLocalHasher().get().getAlgorithm(), equalTo(method));
76+
try (var pooledHasher = fingerprintProcessor.getHasherPool().acquire()) {
77+
assertThat(pooledHasher.get().getAlgorithm(), equalTo(method));
78+
}
7579

7680
// invalid method
7781
String invalidMethod = randomValueOtherThanMany(
@@ -120,7 +124,9 @@ public void testDefaults() throws Exception {
120124
assertThat(fingerprintProcessor.getFields(), equalTo(sortedFieldList));
121125
assertThat(fingerprintProcessor.getTargetField(), equalTo(FingerprintProcessor.Factory.DEFAULT_TARGET));
122126
assertThat(fingerprintProcessor.getSalt(), equalTo(new byte[0]));
123-
assertThat(fingerprintProcessor.getThreadLocalHasher().get().getAlgorithm(), equalTo(FingerprintProcessor.Factory.DEFAULT_METHOD));
127+
try (var pooledHasher = fingerprintProcessor.getHasherPool().acquire()) {
128+
assertThat(pooledHasher.get().getAlgorithm(), equalTo(FingerprintProcessor.Factory.DEFAULT_METHOD));
129+
}
124130
assertThat(fingerprintProcessor.isIgnoreMissing(), equalTo(false));
125131
}
126132
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/FingerprintProcessorTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.ingest.common;
1111

12+
import org.elasticsearch.core.ObjectPool;
1213
import org.elasticsearch.ingest.TestIngestDocument;
1314
import org.elasticsearch.test.ESTestCase;
1415

@@ -378,14 +379,14 @@ public void testObjectTraversalWithNestedStructures() throws Exception {
378379
}
379380

380381
private void doTestObjectTraversal(Map<String, Object> inputMap, List<String> fields, List<Object> expectedValues) throws Exception {
381-
ThreadLocal<FingerprintProcessor.Hasher> threadLocalHasher = ThreadLocal.withInitial(TestHasher::new);
382+
ObjectPool<FingerprintProcessor.Hasher> hasherPool = ObjectPool.withInitial(TestHasher::new);
382383
FingerprintProcessor fp = new FingerprintProcessor(
383384
FingerprintProcessor.TYPE,
384385
"",
385386
fields,
386387
"fingerprint",
387388
new byte[0],
388-
threadLocalHasher,
389+
hasherPool,
389390
false
390391
);
391392

@@ -397,7 +398,8 @@ private void doTestObjectTraversal(Map<String, Object> inputMap, List<String> fi
397398

398399
var input = TestIngestDocument.withDefaultVersion(inputMap);
399400
var output = fp.execute(input);
400-
var hasher = (TestHasher) threadLocalHasher.get();
401+
402+
var hasher = (TestHasher) hasherPool.acquire().get();
401403
assertThat(hasher.getBytesSeen(), equalTo(expectedBytes));
402404
assertTrue(output.hasField("fingerprint"));
403405
assertThat(output.getFieldValue("fingerprint", String.class), equalTo(Base64.getEncoder().encodeToString(expectedBytes)));

modules/kibana/src/internalClusterTest/java/org/elasticsearch/kibana/KibanaThreadPoolIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
2020
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
2121
import org.elasticsearch.common.settings.Settings;
22+
import org.elasticsearch.common.util.concurrent.EsExecutorService;
2223
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
23-
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
2424
import org.elasticsearch.index.IndexingPressure;
2525
import org.elasticsearch.index.query.QueryBuilders;
2626
import org.elasticsearch.plugins.Plugin;
@@ -248,7 +248,7 @@ private static void fillThreadPoolQueues(String threadPoolName, ThreadPool threa
248248
}
249249

250250
private static void logThreadPoolQueue(String threadPoolName, ThreadPool threadPool) {
251-
if (threadPool.executor(threadPoolName) instanceof EsThreadPoolExecutor tpe) {
251+
if (threadPool.executor(threadPoolName) instanceof EsExecutorService tpe) {
252252
logger.debug("Thread pool details " + threadPoolName + " " + tpe);
253253
logger.debug(Arrays.toString(tpe.getTasks().toArray()));
254254
}

modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/BulkByScrollUsesAllScrollDocumentsAfterConflictsIntegTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.elasticsearch.common.bytes.BytesReference;
2121
import org.elasticsearch.common.settings.Settings;
2222
import org.elasticsearch.common.util.CollectionUtils;
23-
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
23+
import org.elasticsearch.common.util.concurrent.EsExecutorService;
2424
import org.elasticsearch.common.xcontent.XContentHelper;
2525
import org.elasticsearch.index.VersionType;
2626
import org.elasticsearch.index.query.QueryBuilders;
@@ -191,7 +191,7 @@ public void testDeleteByQuery() throws Exception {
191191

192192
final int writeThreads = threadPool.info(ThreadPool.Names.WRITE).getMax();
193193
assertThat(writeThreads, equalTo(1));
194-
final EsThreadPoolExecutor writeThreadPool = (EsThreadPoolExecutor) threadPool.executor(ThreadPool.Names.WRITE);
194+
final EsExecutorService writeThreadPool = (EsExecutorService) threadPool.executor(ThreadPool.Names.WRITE);
195195
final CyclicBarrier barrier = new CyclicBarrier(writeThreads + 1);
196196
final CountDownLatch latch = new CountDownLatch(1);
197197

@@ -232,7 +232,7 @@ public void testDeleteByQuery() throws Exception {
232232
final ActionFuture<BulkResponse> bulkFuture = internalCluster().coordOnlyNodeClient().bulk(conflictingUpdatesBulkRequest);
233233

234234
// Ensure that the concurrent writes are enqueued before the update by query request is sent
235-
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(1)));
235+
assertBusy(() -> assertThat(writeThreadPool.getCurrentQueueSize(), equalTo(1)));
236236

237237
requestBuilder.source(sourceIndex).maxDocs(maxDocs).abortOnVersionConflict(false);
238238

@@ -247,7 +247,7 @@ public void testDeleteByQuery() throws Exception {
247247
source.setQuery(QueryBuilders.matchAllQuery());
248248
final ActionFuture<BulkByScrollResponse> updateByQueryResponse = requestBuilder.execute();
249249

250-
assertBusy(() -> assertThat(writeThreadPool.getQueue().size(), equalTo(2)));
250+
assertBusy(() -> assertThat(writeThreadPool.getCurrentQueueSize(), equalTo(2)));
251251

252252
// Allow tasks from the write thread to make progress
253253
latch.countDown();

0 commit comments

Comments
 (0)