Skip to content

Commit ce88111

Browse files
Merge branch 'main' into generalExpressionJoin_v2
2 parents 1af4bb8 + 933354b commit ce88111

File tree

28 files changed

+870
-42
lines changed

28 files changed

+870
-42
lines changed

build-tools-internal/gradle/wrapper/gradle-wrapper.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionSha256Sum=16f2b95838c1ddcf7242b1c39e7bbbb43c842f1f1a1a0dc4959b6d4d68abcac3
4-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-all.zip
3+
distributionSha256Sum=f86344275d1b194688dd330abf9f6f2344cd02872ffee035f2d1ea2fd60cf7f3
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-all.zip
55
networkTimeout=10000
66
validateDistributionUrl=true
77
zipStoreBase=GRADLE_USER_HOME
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
9.2.0
1+
9.2.1

docs/changelog/138624.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 138624
2+
summary: Handle individual doc parsing failure in bulk request with pipeline
3+
area: Ingest Node
4+
type: bug
5+
issues:
6+
- 138445

gradle/wrapper/gradle-wrapper.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionSha256Sum=16f2b95838c1ddcf7242b1c39e7bbbb43c842f1f1a1a0dc4959b6d4d68abcac3
4-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-all.zip
3+
distributionSha256Sum=f86344275d1b194688dd330abf9f6f2344cd02872ffee035f2d1ea2fd60cf7f3
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-all.zip
55
networkTimeout=10000
66
validateDistributionUrl=true
77
zipStoreBase=GRADLE_USER_HOME

libs/ssl-config/src/main/java/org/elasticsearch/common/ssl/DerParser.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,12 @@ private int getLength() throws IOException {
138138
int n = derInputStream.read(bytes);
139139
if (n < num) throw new IOException("Invalid DER: length too short");
140140

141-
return new BigInteger(1, bytes).intValue();
141+
int len = new BigInteger(1, bytes).intValue();
142+
if (len < 0) {
143+
throw new IOException("Invalid DER: length larger than max-int");
144+
}
145+
146+
return len;
142147
}
143148

144149
/**

modules/reindex/src/main/java/org/elasticsearch/reindex/ReindexPlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.settings.Setting;
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.settings.SettingsFilter;
22+
import org.elasticsearch.common.util.FeatureFlag;
2223
import org.elasticsearch.features.NodeFeature;
2324
import org.elasticsearch.index.reindex.BulkByScrollTask;
2425
import org.elasticsearch.index.reindex.DeleteByQueryAction;
@@ -44,6 +45,11 @@ public class ReindexPlugin extends Plugin implements ActionPlugin {
4445

4546
public static final ActionType<ListTasksResponse> RETHROTTLE_ACTION = new ActionType<>("cluster:admin/reindex/rethrottle");
4647

48+
/**
49+
* Whether the feature flag to guard the work to make reindex more resilient while it is under development.
50+
*/
51+
static boolean REINDEX_RESILIENCE_ENABLED = new FeatureFlag("reindex_resilience").isEnabled();
52+
4753
@Override
4854
public List<ActionHandler> getActions() {
4955
return Arrays.asList(

muted-tests.yml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,6 @@ tests:
6060
- class: org.elasticsearch.xpack.ccr.FollowIndexSecurityIT
6161
method: testCleanShardFollowTaskAfterDeleteFollower
6262
issue: https://github.com/elastic/elasticsearch/issues/120339
63-
- class: org.elasticsearch.xpack.sql.expression.function.scalar.datetime.DateTimeToCharProcessorTests
64-
issue: https://github.com/elastic/elasticsearch/issues/120575
6563
- class: org.elasticsearch.xpack.security.authc.service.ServiceAccountIT
6664
method: testAuthenticateShouldNotFallThroughInCaseOfFailure
6765
issue: https://github.com/elastic/elasticsearch/issues/120902
@@ -438,15 +436,15 @@ tests:
438436
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
439437
method: test {csv-spec:inlinestats.MvMinMvExpand}
440438
issue: https://github.com/elastic/elasticsearch/issues/137679
441-
- class: org.elasticsearch.xpack.esql.optimizer.rules.physical.local.SubstituteRoundToTests
442-
method: testSubqueryWithCountStarAndDateTrunc {default}
443-
issue: https://github.com/elastic/elasticsearch/issues/138601
444439
- class: org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT
445440
method: testWaitForSnapshot
446441
issue: https://github.com/elastic/elasticsearch/issues/138669
447442
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
448443
method: test {p0=scroll/20_keep_alive/Max keep alive}
449444
issue: https://github.com/elastic/elasticsearch/issues/138680
445+
- class: org.elasticsearch.xpack.unsignedlong.UnsignedLongSyntheticSourceNativeArrayIntegrationTests
446+
method: testSynthesizeArrayRandom
447+
issue: https://github.com/elastic/elasticsearch/issues/138684
450448

451449
# Examples:
452450
#

plugins/examples/gradle/wrapper/gradle-wrapper.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionSha256Sum=16f2b95838c1ddcf7242b1c39e7bbbb43c842f1f1a1a0dc4959b6d4d68abcac3
4-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-all.zip
3+
distributionSha256Sum=f86344275d1b194688dd330abf9f6f2344cd02872ffee035f2d1ea2fd60cf7f3
4+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-all.zip
55
networkTimeout=10000
66
validateDistributionUrl=true
77
zipStoreBase=GRADLE_USER_HOME

server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestClientIT.java

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,92 @@ public void testPipelineProcessorOnFailure() throws Exception {
378378
assertThat(inserted.get("readme"), equalTo("pipeline with id [3] is a bad pipeline"));
379379
}
380380

381+
public void testBulkRequestWithInvalidJsonAndPipeline() throws Exception {
382+
// Test that when a document with invalid JSON is in a bulk request with a pipeline,
383+
// the invalid document fails gracefully without causing the entire bulk request to fail.
384+
// This tests the fix for https://github.com/elastic/elasticsearch/issues/138445
385+
386+
createIndex("test_index");
387+
388+
putJsonPipeline(
389+
"test-pipeline",
390+
(builder, params) -> builder.field("description", "test pipeline")
391+
.startArray("processors")
392+
.startObject()
393+
.startObject("test")
394+
.endObject()
395+
.endObject()
396+
.endArray()
397+
);
398+
399+
// Create a bulk request with valid and invalid documents
400+
BulkRequest bulkRequest = new BulkRequest();
401+
402+
// Valid document
403+
IndexRequest validRequest = new IndexRequest("test_index").id("valid_doc");
404+
validRequest.source("{\"valid\":\"test\"}", XContentType.JSON);
405+
validRequest.setPipeline("test-pipeline");
406+
bulkRequest.add(validRequest);
407+
408+
// Invalid document with missing closing brace
409+
IndexRequest invalidRequest = new IndexRequest("test_index").id("invalid_doc");
410+
invalidRequest.source("{\"invalid\":\"json\"", XContentType.JSON);
411+
invalidRequest.setPipeline("test-pipeline");
412+
bulkRequest.add(invalidRequest);
413+
414+
// Invalid document with duplicate fields
415+
IndexRequest invalidRequest2 = new IndexRequest("test_index").id("invalid_doc2");
416+
invalidRequest2.source("{\"invalid\":\"json\", \"invalid\":\"json\"}", XContentType.JSON);
417+
invalidRequest2.setPipeline("test-pipeline");
418+
bulkRequest.add(invalidRequest2);
419+
420+
// Another valid document
421+
IndexRequest validRequest2 = new IndexRequest("test_index").id("valid_doc2");
422+
validRequest2.source("{\"valid\":\"test2\"}", XContentType.JSON);
423+
validRequest2.setPipeline("test-pipeline");
424+
bulkRequest.add(validRequest2);
425+
426+
BulkResponse response = client().bulk(bulkRequest).actionGet();
427+
428+
// The bulk request should succeed
429+
assertThat(response.hasFailures(), is(true));
430+
assertThat(response.getItems().length, equalTo(4));
431+
432+
// First document should succeed
433+
BulkItemResponse item0 = response.getItems()[0];
434+
assertThat(item0.isFailed(), is(false));
435+
assertThat(item0.getResponse().getId(), equalTo("valid_doc"));
436+
assertThat(item0.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
437+
438+
// Second document should fail
439+
BulkItemResponse item1 = response.getItems()[1];
440+
assertThat(item1.isFailed(), is(true));
441+
assertThat(item1.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
442+
assertThat(item1.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
443+
444+
// Third document should fail
445+
BulkItemResponse item2 = response.getItems()[2];
446+
assertThat(item2.isFailed(), is(true));
447+
assertThat(item2.getFailure().getStatus(), equalTo(org.elasticsearch.rest.RestStatus.BAD_REQUEST));
448+
assertThat(item2.getFailure().getCause(), instanceOf(IllegalArgumentException.class));
449+
450+
// Fourth document should succeed
451+
BulkItemResponse item3 = response.getItems()[3];
452+
assertThat(item3.isFailed(), is(false));
453+
assertThat(item3.getResponse().getId(), equalTo("valid_doc2"));
454+
assertThat(item3.getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
455+
456+
// Verify that the valid documents were indexed
457+
assertThat(client().prepareGet("test_index", "valid_doc").get().isExists(), is(true));
458+
assertThat(client().prepareGet("test_index", "valid_doc2").get().isExists(), is(true));
459+
// Verify that the invalid documents were not indexed
460+
assertThat(client().prepareGet("test_index", "invalid_doc").get().isExists(), is(false));
461+
assertThat(client().prepareGet("test_index", "invalid_doc2").get().isExists(), is(false));
462+
463+
// cleanup
464+
deletePipeline("test-pipeline");
465+
}
466+
381467
public static class ExtendedIngestTestPlugin extends IngestTestPlugin {
382468

383469
@Override

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
5959
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
6060
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
61+
import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider;
6162
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
6263
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
6364
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
@@ -497,6 +498,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
497498
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
498499
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
499500
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
501+
addAllocationDecider(deciders, new IndexBalanceAllocationDecider(settings, clusterSettings));
500502

501503
clusterPlugins.stream()
502504
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())

0 commit comments

Comments
 (0)