Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3dfc0e1
Add
Tim-Brooks Aug 28, 2025
c94f0c0
Change
Tim-Brooks Sep 2, 2025
0e5ac6f
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 2, 2025
8984571
remove prod source accessors
Tim-Brooks Sep 2, 2025
ffb233b
Change
Tim-Brooks Sep 3, 2025
418638f
Fix
Tim-Brooks Sep 3, 2025
cb21030
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 3, 2025
812e7e0
Change
Tim-Brooks Sep 3, 2025
98772ba
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 3, 2025
5b7b7d5
Change
Tim-Brooks Sep 3, 2025
d0e3b71
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 3, 2025
55853cd
Change
Tim-Brooks Sep 4, 2025
1a3e103
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 4, 2025
7ad6692
Fixes
Tim-Brooks Sep 4, 2025
eab166c
Fixes
Tim-Brooks Sep 4, 2025
7d16bba
Change
Tim-Brooks Sep 4, 2025
dc1f38b
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 4, 2025
3396c64
Change
Tim-Brooks Sep 4, 2025
ba71bab
Fix
Tim-Brooks Sep 5, 2025
b9523f5
Fix
Tim-Brooks Sep 5, 2025
418ae7b
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 5, 2025
247debd
Fix
Tim-Brooks Sep 5, 2025
5dba505
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 5, 2025
3a23c60
Fix
Tim-Brooks Sep 5, 2025
1bbbd06
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 10, 2025
1f8c2eb
Rename
Tim-Brooks Sep 11, 2025
4526fe8
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 11, 2025
20fbc46
Change
Tim-Brooks Sep 11, 2025
2280259
Merge remote-tracking branch 'upstream' into add_source_context
Tim-Brooks Sep 17, 2025
82f4885
[CI] Update transport version definitions
Sep 17, 2025
51e3c26
Merge remote-tracking branch 'upstream/main' into add_source_context
Tim-Brooks Sep 17, 2025
b7e4cc5
Merge remote-tracking branch 'origin/add_source_context' into add_sou…
Tim-Brooks Sep 17, 2025
fefec4a
[CI] Update transport version definitions
Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/
package org.elasticsearch.plugin.noop.action.bulk;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -25,6 +27,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -78,10 +81,12 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
request.getRestApiVersion()
);

// The actual bulk request items are mutable during the bulk process so we must create a copy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I found the place where it is not mutating, maybe I musunderstood something? Can you elaborate or point me to where it is now mutating?

Feels like an anti-pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failRequestsWhenPrerequisiteActionFailed. We mutate the request list to remove failed requests.

There is also pretty mutating type of stuff going on in BulkRequestModifier. Although I don't think it rewrites the original requests list.

Feels like an anti-pattern?

I agree. Although I think refactoring the bulk request into an immutable version, introducing some type of builder pattern, etc will be a several hundred if not thousand line PR by itself. The ingest pipeline code, failure store, bulk action code, etc all have a lot of baggage built on an mutable bulk request.

List<DocWriteRequest<?>> toClose = new ArrayList<>(bulkRequest.requests());
// short circuit the call to the transport layer
return channel -> {
BulkRestBuilderListener listener = new BulkRestBuilderListener(channel, request);
listener.onResponse(bulkRequest);
ActionListener.releaseAfter(listener, () -> Releasables.close(toClose)).onResponse(bulkRequest);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
import org.elasticsearch.action.datastreams.GetDataStreamAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexSource;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -89,16 +89,17 @@ public void testShortCircuitFailure() throws Exception {

String coordinatingOnlyNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

AbstractRefCounted refCounted = AbstractRefCounted.of(() -> {});
final ArrayList<IndexSource> contextsToRelease = new ArrayList<>();
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, coordinatingOnlyNode);
try (IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest()) {

AtomicBoolean nextRequested = new AtomicBoolean(true);
int successfullyStored = 0;
while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true));
IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME);
contextsToRelease.add(indexRequest.indexSource());
handler.addItems(List.of(indexRequest), () -> nextRequested.set(true));
successfullyStored++;
}
assertBusy(() -> assertTrue(nextRequested.get()));
Expand All @@ -116,29 +117,34 @@ public void testShortCircuitFailure() throws Exception {
while (primaryPressure.stats().getPrimaryRejections() == primaryRejections) {
while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
List<DocWriteRequest<?>> requests = new ArrayList<>();
for (int i = 0; i < 20; ++i) {
requests.add(indexRequest(DATA_STREAM_NAME));
IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME);
contextsToRelease.add(indexRequest.indexSource());
requests.add(indexRequest);
}
handler.addItems(requests, refCounted::decRef, () -> nextRequested.set(true));
handler.addItems(requests, () -> nextRequested.set(true));
}
assertBusy(() -> assertTrue(nextRequested.get()));
}
}

while (nextRequested.get()) {
nextRequested.set(false);
refCounted.incRef();
handler.addItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, () -> nextRequested.set(true));
IndexRequest indexRequest = indexRequest(DATA_STREAM_NAME);
contextsToRelease.add(indexRequest.indexSource());
handler.addItems(List.of(indexRequest), () -> nextRequested.set(true));
}

assertBusy(() -> assertTrue(nextRequested.get()));

PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
handler.lastItems(List.of(indexRequest(DATA_STREAM_NAME)), refCounted::decRef, future);
IndexRequest lastRequest = indexRequest(DATA_STREAM_NAME);
contextsToRelease.add(lastRequest.indexSource());
handler.lastItems(List.of(lastRequest), future);

BulkResponse bulkResponse = safeGet(future);
assertThat(contextsToRelease.stream().filter(c -> c.isClosed() == false).count(), equalTo(0L));

for (int i = 0; i < bulkResponse.getItems().length; ++i) {
// the first requests were successful
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOExcept
}

public void testBulkMissingBody() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("request body is required"));
sendMissingBody();
}

public void testBulkInvalidIndexNameString() throws IOException {
Expand All @@ -79,16 +75,7 @@ public void testBulkInvalidIndexNameString() throws IOException {
}

public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
sendImproperlyTerminated();
}

public void testBulkRequest() throws IOException {
Expand Down Expand Up @@ -156,6 +143,9 @@ public void testBulkWithIncrementalDisabled() throws IOException {

try {
sendLargeBulk();
sendMalFormedActionLine();
sendImproperlyTerminated();
sendMissingBody();
} finally {
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null));
Expand All @@ -177,6 +167,31 @@ public void testMalformedActionLineBulk() throws IOException {
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

sendMalFormedActionLine();
}

private static void sendMissingBody() {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("request body is required"));
}

private static void sendImproperlyTerminated() {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
}

private static void sendMalFormedActionLine() throws IOException {
Request bulkRequest = new Request("POST", "/index_name/_bulk");

final StringBuilder bulk = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ public List<Route> routes() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
var content = request.requiredContent();
IndexRequest indexRequest = new IndexRequest(".net-new-system-index-primary");
indexRequest.source(content, request.getXContentType());
indexRequest.indexSource().source(content.retain(), request.getXContentType());
indexRequest.id(request.param("id"));
indexRequest.setRefreshPolicy(request.param("refresh"));
return channel -> client.index(indexRequest, ActionListener.withRef(new RestToXContentListener<>(channel), content));
return channel -> client.index(indexRequest, ActionListener.releaseAfter(new RestToXContentListener<>(channel), indexRequest));
}

@Override
Expand Down
Loading