Skip to content

Commit 3396c64

Browse files
committed
Change
1 parent dc1f38b commit 3396c64

File tree

12 files changed

+284
-28
lines changed

12 files changed

+284
-28
lines changed
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.bulk;
11+
12+
import org.elasticsearch.action.index.IndexRequest;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.common.bytes.BytesArray;
15+
import org.elasticsearch.common.bytes.BytesReference;
16+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
17+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
18+
import org.elasticsearch.ingest.IngestClientIT;
19+
import org.elasticsearch.plugins.Plugin;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.xcontent.XContentType;
23+
24+
import java.io.IOException;
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.CyclicBarrier;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.concurrent.locks.LockSupport;
32+
33+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
34+
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
35+
36+
public class BulkSourceReleaseIT extends ESIntegTestCase {
37+
38+
@Override
39+
protected Collection<Class<? extends Plugin>> nodePlugins() {
40+
return List.of(IngestClientIT.ExtendedIngestTestPlugin.class);
41+
}
42+
43+
public void testBulkSourceReleaseWhenIngestReplacesSource() throws Exception {
44+
String index = "test1";
45+
createIndex(index);
46+
47+
String pipelineId = "pipeline_id";
48+
putPipeline(pipelineId);
49+
50+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class);
51+
52+
ReleasableBytesReference originalBytes = new ReleasableBytesReference(new BytesArray("{\"field\": \"value\"}"), () -> {});
53+
54+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
55+
IndexRequest indexRequest = new IndexRequest();
56+
indexRequest.index(index);
57+
indexRequest.sourceContext().source(originalBytes, XContentType.JSON);
58+
indexRequest.setPipeline(pipelineId);
59+
60+
CountDownLatch blockLatch = new CountDownLatch(1);
61+
blockWritePool(internalCluster().getInstance(ThreadPool.class), blockLatch);
62+
63+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
64+
65+
try {
66+
handler.lastItems(List.of(indexRequest), future);
67+
assertBusy(() -> assertFalse(originalBytes.hasReferences()));
68+
} finally {
69+
blockLatch.countDown();
70+
}
71+
72+
BulkResponse bulkResponse = safeGet(future);
73+
assertNoFailures(bulkResponse);
74+
}
75+
76+
public void testBytesReferencedByTwoSourcesNotReleasedIfOnlyOneIngestPipeline() throws Exception {
77+
String index = "test1";
78+
createIndex(index);
79+
80+
String pipelineId = "pipeline_id";
81+
putPipeline(pipelineId);
82+
83+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class);
84+
85+
ReleasableBytesReference originalBytes = new ReleasableBytesReference(
86+
new BytesArray("{\"field\": \"value1\"}{\"field\": \"value2\"}"),
87+
() -> {}
88+
);
89+
int splitPoint = originalBytes.indexOf((byte) '}', 0) + 1;
90+
91+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
92+
IndexRequest indexRequest = new IndexRequest();
93+
indexRequest.index(index);
94+
indexRequest.sourceContext().source(originalBytes.retainedSlice(0, splitPoint), XContentType.JSON);
95+
indexRequest.setPipeline(pipelineId);
96+
97+
IndexRequest indexRequestNoIngest = new IndexRequest();
98+
indexRequestNoIngest.index(index);
99+
indexRequestNoIngest.sourceContext()
100+
.source(originalBytes.retainedSlice(splitPoint, originalBytes.length() - splitPoint), XContentType.JSON);
101+
102+
originalBytes.decRef();
103+
assertTrue(originalBytes.hasReferences());
104+
105+
CountDownLatch blockLatch = new CountDownLatch(1);
106+
blockWritePool(internalCluster().getInstance(ThreadPool.class), blockLatch);
107+
108+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
109+
try {
110+
handler.lastItems(List.of(indexRequest, indexRequestNoIngest), future);
111+
112+
// Pause briefly to allow bytes to theoretically be released after ingest processing
113+
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
114+
115+
assertTrue(originalBytes.hasReferences());
116+
} finally {
117+
blockLatch.countDown();
118+
}
119+
120+
blockLatch.countDown();
121+
122+
BulkResponse bulkResponse = safeGet(future);
123+
assertNoFailures(bulkResponse);
124+
}
125+
126+
public void testSomeReferencesCanBeReleasedWhileOthersRetained() throws Exception {
127+
String index = "test1";
128+
createIndex(index);
129+
130+
String pipelineId = "pipeline_id";
131+
putPipeline(pipelineId);
132+
133+
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class);
134+
135+
ReleasableBytesReference releasedBytes = new ReleasableBytesReference(new BytesArray("{\"field\": \"value1\"}"), () -> {});
136+
ReleasableBytesReference retainedBytes = new ReleasableBytesReference(
137+
new BytesArray("{\"field\": \"value2\"}{\"field\": \"value3\"}"),
138+
() -> {}
139+
);
140+
int splitPoint = retainedBytes.indexOf((byte) '}', 0) + 1;
141+
142+
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
143+
IndexRequest indexRequest1 = new IndexRequest();
144+
indexRequest1.index(index);
145+
indexRequest1.sourceContext().source(releasedBytes, XContentType.JSON);
146+
indexRequest1.setPipeline(pipelineId);
147+
148+
IndexRequest indexRequest2 = new IndexRequest();
149+
indexRequest2.index(index);
150+
indexRequest2.sourceContext().source(retainedBytes.retainedSlice(0, splitPoint), XContentType.JSON);
151+
indexRequest2.setPipeline(pipelineId);
152+
153+
IndexRequest indexRequestNoIngest = new IndexRequest();
154+
indexRequestNoIngest.index(index);
155+
indexRequestNoIngest.sourceContext()
156+
.source(retainedBytes.retainedSlice(splitPoint, retainedBytes.length() - splitPoint), XContentType.JSON);
157+
158+
retainedBytes.decRef();
159+
assertTrue(retainedBytes.hasReferences());
160+
161+
CountDownLatch blockLatch = new CountDownLatch(1);
162+
blockWritePool(internalCluster().getInstance(ThreadPool.class), blockLatch);
163+
164+
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
165+
try {
166+
handler.lastItems(List.of(indexRequest2, indexRequest1, indexRequestNoIngest), future);
167+
168+
assertBusy(() -> assertFalse(releasedBytes.hasReferences()));
169+
170+
assertTrue(retainedBytes.hasReferences());
171+
} finally {
172+
blockLatch.countDown();
173+
}
174+
175+
blockLatch.countDown();
176+
177+
BulkResponse bulkResponse = safeGet(future);
178+
assertNoFailures(bulkResponse);
179+
}
180+
181+
private static void putPipeline(String pipelineId) throws IOException {
182+
BytesReference pipelineSource = BytesReference.bytes(
183+
jsonBuilder().startObject()
184+
.field("description", "my_pipeline")
185+
.startArray("processors")
186+
.startObject()
187+
.startObject("test")
188+
.endObject()
189+
.endObject()
190+
.endArray()
191+
.endObject()
192+
);
193+
putJsonPipeline(pipelineId, pipelineSource);
194+
}
195+
196+
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) {
197+
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax();
198+
final var startBarrier = new CyclicBarrier(threadCount + 1);
199+
final var blockingTask = new AbstractRunnable() {
200+
@Override
201+
public void onFailure(Exception e) {
202+
fail(e);
203+
}
204+
205+
@Override
206+
protected void doRun() {
207+
safeAwait(startBarrier);
208+
safeAwait(finishLatch);
209+
}
210+
211+
@Override
212+
public boolean isForceExecution() {
213+
return true;
214+
}
215+
};
216+
for (int i = 0; i < threadCount; i++) {
217+
threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
218+
}
219+
safeAwait(startBarrier);
220+
}
221+
222+
private static void fillWriteQueue(ThreadPool threadPool) {
223+
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles());
224+
final var queueFilled = new AtomicBoolean(false);
225+
final var queueFillingTask = new AbstractRunnable() {
226+
@Override
227+
public void onFailure(Exception e) {
228+
fail(e);
229+
}
230+
231+
@Override
232+
protected void doRun() {
233+
assertTrue("thread pool not blocked", queueFilled.get());
234+
}
235+
236+
@Override
237+
public boolean isForceExecution() {
238+
return true;
239+
}
240+
};
241+
for (int i = 0; i < queueSize; i++) {
242+
threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask);
243+
}
244+
queueFilled.set(true);
245+
}
246+
}

server/src/internalClusterTest/java/org/elasticsearch/rest/action/document/RestBulkActionIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.hamcrest.Matchers.not;
2626

2727
public class RestBulkActionIT extends ESIntegTestCase {
28+
2829
@Override
2930
protected boolean addMockHttpTransport() {
3031
return false;

server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ static boolean executeBulkItemRequest(
410410
XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request);
411411
final SourceToParse sourceToParse = new SourceToParse(
412412
request.id(),
413-
request.sourceContext().bytes(),
413+
request.source(),
414414
request.getContentType(),
415415
request.routing(),
416416
request.getDynamicTemplates(),
@@ -609,7 +609,7 @@ private static BulkItemResponse processUpdateResponse(
609609
);
610610

611611
if (updateRequest.fetchSource() != null && updateRequest.fetchSource().fetchSource()) {
612-
final BytesReference indexSourceAsBytes = updateIndexRequest.sourceContext().bytes();
612+
final BytesReference indexSourceAsBytes = updateIndexRequest.source();
613613
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(
614614
indexSourceAsBytes,
615615
true,
@@ -741,7 +741,7 @@ private static Engine.Result performOpOnReplica(
741741
final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
742742
final SourceToParse sourceToParse = new SourceToParse(
743743
indexRequest.id(),
744-
indexRequest.sourceContext().bytes(),
744+
indexRequest.source(),
745745
indexRequest.getContentType(),
746746
indexRequest.routing()
747747
);

server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ protected void doInternalExecute(
159159
request.id(),
160160
request.index(),
161161
request.version(),
162-
request.sourceContext().bytes(),
162+
request.source(),
163163
request.getContentType(),
164164
request.getExecutedPipelines(),
165165
validationResult.ignoredFields,
@@ -201,7 +201,7 @@ private ValidationResult validateMappings(
201201
) {
202202
final SourceToParse sourceToParse = new SourceToParse(
203203
request.id(),
204-
request.sourceContext().bytes(),
204+
request.source(),
205205
request.getContentType(),
206206
request.routing(),
207207
request.getDynamicTemplates(),

server/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
226226
case CREATED -> {
227227
IndexRequest upsertRequest = result.action();
228228
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
229-
final BytesReference upsertSourceBytes = upsertRequest.sourceContext().bytes();
229+
final BytesReference upsertSourceBytes = upsertRequest.source();
230230
client.bulk(
231231
toSingleItemBulkRequest(upsertRequest),
232232
unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {
@@ -269,7 +269,7 @@ protected void shardOperation(final UpdateRequest request, final ActionListener<
269269
case UPDATED -> {
270270
IndexRequest indexRequest = result.action();
271271
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
272-
final BytesReference indexSourceBytes = indexRequest.sourceContext().bytes();
272+
final BytesReference indexSourceBytes = indexRequest.source();
273273
client.bulk(
274274
toSingleItemBulkRequest(indexRequest),
275275
unwrappingSingleItemBulkResponse(ActionListener.<DocWriteResponse>wrap(response -> {

server/src/main/java/org/elasticsearch/action/update/UpdateRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -759,7 +759,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
759759
XContentParser parser = XContentHelper.createParser(
760760
NamedXContentRegistry.EMPTY,
761761
LoggingDeprecationHandler.INSTANCE,
762-
doc.sourceContext().bytes(),
762+
doc.source(),
763763
xContentType
764764
)
765765
) {
@@ -782,7 +782,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
782782
XContentParser parser = XContentHelper.createParser(
783783
NamedXContentRegistry.EMPTY,
784784
LoggingDeprecationHandler.INSTANCE,
785-
upsertRequest.sourceContext().bytes(),
785+
upsertRequest.source(),
786786
xContentType
787787
)
788788
) {

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1738,7 +1738,7 @@ public Index getWriteIndex(IndexRequest request, ProjectMetadata project) {
17381738
if (rawTimestamp != null) {
17391739
timestamp = getTimeStampFromRaw(rawTimestamp);
17401740
} else {
1741-
timestamp = getTimestampFromParser(request.sourceContext().bytes(), request.getContentType());
1741+
timestamp = getTimestampFromParser(request.source(), request.getContentType());
17421742
}
17431743
timestamp = getCanonicalTimestampBound(timestamp);
17441744
Index result = selectTimeSeriesWriteIndex(timestamp, project);

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void testIncrementalParsing() throws IOException {
8787
deleteRequests::add
8888
);
8989

90-
ReleasableBytesReference request = ReleasableBytesReference.wrap(new BytesArray("""
90+
ReleasableBytesReference request = new ReleasableBytesReference(new BytesArray("""
9191
{ "index":{ "_id": "bar", "pipeline": "foo" } }
9292
{ "field": "value"}
9393
{ "index":{ "require_alias": false } }
@@ -98,7 +98,7 @@ public void testIncrementalParsing() throws IOException {
9898
{ "index": { } }
9999
{ "field": "value"}
100100
{ "delete":{ "_id": "bop" } }
101-
"""));
101+
"""), () -> {});
102102

103103
int consumed = 0;
104104
for (int i = 0; i < request.length() - 1; ++i) {
@@ -107,9 +107,21 @@ public void testIncrementalParsing() throws IOException {
107107
consumed += incrementalParser.parse(request.slice(consumed, request.length() - consumed), true);
108108
assertThat(consumed, equalTo(request.length()));
109109

110+
request.decRef();
111+
112+
// 3 Index request retaining
113+
assertTrue(request.hasReferences());
114+
110115
assertThat(indexRequests.size(), equalTo(3));
111116
assertThat(updateRequests.size(), equalTo(1));
112117
assertThat(deleteRequests.size(), equalTo(2));
118+
119+
for (DocWriteRequest<?> req : indexRequests) {
120+
req.close();
121+
}
122+
123+
// Deletes and updates do not retain (upsert source is copied out opposed to sliced)
124+
assertFalse(request.hasReferences());
113125
}
114126

115127
public void testIndexRequest() throws IOException {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public MonitoringBulkRequest add(
100100
if (MonitoringIndex.from(indexRequest.index()) != MonitoringIndex.TIMESTAMPED) {
101101
return;
102102
}
103-
final BytesReference source = indexRequest.sourceContext().bytes();
103+
final BytesReference source = indexRequest.source();
104104
if (source.length() == 0) {
105105
throw new IllegalArgumentException(
106106
"source is missing for monitoring document [" + indexRequest.index() + "][" + type + "][" + indexRequest.id() + "]"

0 commit comments

Comments
 (0)