Skip to content

Conversation

phananh1010
Copy link
Owner

@phananh1010 phananh1010 commented Aug 28, 2025

User description

Patch from BASE=acdc6a427bf07c47da3fe4f145cd844eab8be5ee to HEAD=8332d368ace691794562b5e0977a8ecfddd874a5. If partial, see *.rej in working tree.


PR Type

Bug fix, Tests


Description

  • Force failure store rollover on write to true when indices list is empty

  • Add rolling upgrade test for failure store on existing data streams

  • Update test expectations for failure rollover behavior

  • Fix data stream serialization logic for failure store rollover flag


Diagram Walkthrough

flowchart LR
  A["Empty Failure Indices"] --> B["Force Rollover On Write = true"]
  B --> C["Data Stream Constructor"]
  B --> D["Serialization Logic"]
  B --> E["Test Updates"]
  F["Rolling Upgrade Test"] --> G["Failure Store Migration"]
Loading

File Walkthrough

Relevant files
Tests
FailureStoreUpgradeIT.java
Add failure store rolling upgrade integration test             

qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FailureStoreUpgradeIT.java

  • Add comprehensive rolling upgrade test for failure store functionality
  • Test data stream migration from pre-failure store versions
  • Verify failure store behavior during cluster upgrades
  • Include test scenarios for enabling failure store on existing streams
+229/-0 
IndexAbstractionTests.java
Update IndexAbstractionTests for rollover behavior             

server/src/test/java/org/elasticsearch/cluster/metadata/IndexAbstractionTests.java

  • Update test to set rollover on write based on replication and failure
    indices state
  • Ensure consistent behavior between replicated flag and failure store
    configuration
+7/-2     
ProjectMetadataTests.java
Update ProjectMetadataTests rollover expectations               

server/src/test/java/org/elasticsearch/cluster/metadata/ProjectMetadataTests.java

  • Change expected failure_rollover_on_write value from false to true in
    test assertions
  • Update test expectations to match new default behavior
+2/-2     
DataStreamTestHelper.java
Update DataStreamTestHelper rollover logic                             

test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java

  • Set rollover on write based on replication status and failure store
    emptiness
  • Update random instance generation to use consistent rollover logic
  • Ensure test helper matches production behavior
+6/-2     
Bug fix
DataStream.java
Fix failure store rollover logic in DataStream                     

server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java

  • Force rollover on write to true when failure indices are empty and not
    replicated
  • Update constructor logic for failure store rollover behavior
  • Modify serialization to handle rollover flag correctly during
    read/write
  • Add backward compatibility for older transport versions
+19/-3   
Documentation
133347.yaml
Add changelog entry for rollover fix                                         

docs/changelog/133347.yaml

  • Add changelog entry documenting the bug fix
  • Reference issue 133176 for context
+6/-0     
Miscellaneous
pr-133347.patch
Complete patch file for upstream changes                                 

pr-133347.patch

  • Complete patch file containing all changes from the upstream PR
  • Includes diff format of all modifications made
+368/-0 

Copy link

qodo-merge-pro bot commented Aug 28, 2025

PR Reviewer Guide 🔍

(Review updated until commit 6ea18be)

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Backcompat Logic

Validate the conditional forcing of failure store rollover_on_write during deserialization and construction does not inadvertently enable rollover for replicated data streams or those with pre-existing failure indices across mixed-version clusters.

    ? readIndices(in)
    : List.<Index>of();
var failureIndicesBuilder = DataStreamIndices.failureIndicesBuilder(failureIndices);
backingIndicesBuilder.setRolloverOnWrite(in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0) ? in.readBoolean() : false);
if (in.getTransportVersion().onOrAfter(DataStream.ADDED_AUTO_SHARDING_EVENT_VERSION)) {
    backingIndicesBuilder.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
    // Read the rollover on write flag from the stream, but force it on if the failure indices are empty and we're not replicating
    boolean failureStoreRolloverOnWrite = in.readBoolean() || (replicated == false && failureIndices.isEmpty());
    failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite)
        .setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
} else {
    // If we are reading from an older version that does not have these fields, just default
    // to a reasonable value for rollover on write for the failure store
    boolean failureStoreRolloverOnWrite = replicated == false && failureIndices.isEmpty();
    failureIndicesBuilder.setRolloverOnWrite(failureStoreRolloverOnWrite);
}
Parser Args Assumptions

Confirm the boolean logic combining parser args for rollover_on_write with replicated/indices checks matches the parser’s argument ordering and nullability guarantees to avoid misinterpreting args[5], args[13], args[14].

    BACKING_INDEX_PREFIX,
    (List<Index>) args[1],
    args[10] != null && (boolean) args[10],
    (DataStreamAutoShardingEvent) args[11]
),
new DataStreamIndices(
    FAILURE_STORE_PREFIX,
    args[13] != null ? (List<Index>) args[13] : List.of(),
    // If replicated (args[5]) is null or exists and is false, and the failure index list (args[13]) is null or
    // exists and is empty, then force the rollover on write field to true. If none of those conditions are met,
    // then use the rollover on write value (args[14]) present in the parser.
    ((args[5] == null || ((boolean) args[5] == false)) && (args[13] == null || ((List<Index>) args[13]).isEmpty()))
        || (args[14] != null && (boolean) args[14]),
    (DataStreamAutoShardingEvent) args[15]
)
Test Determinism

The test branches on randomBoolean() to initialize the failure store; ensure this randomness does not cause flakiness (e.g., timing, race with rollover) and that both branches are stable under slow CI.

    int expectedFailureDocuments = 0;
    if (randomBoolean()) {
        // Index a failure via a mapping exception
        indexDoc(dataStreamName, INVALID_DOC, true);
        expectedFailureDocuments = 1;
    } else {
        // Manually rollover failure store to force initialization
        var failureStoreRolloverRequest = new Request("POST", "/" + failureStoreName + "/_rollover");
        assertOK(client().performRequest(failureStoreRolloverRequest));
    }

    // Ensure correct initialized failure store state
    dataStreams = getDataStream(dataStreamName);
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams"), hasSize(1));
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo(dataStreamName));
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store"), notNullValue());
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.enabled"), equalTo(true));
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices"), is(not(empty())));
    assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.rollover_on_write"), equalTo(false));

    String failureIndexName = ObjectPath.evaluate(dataStreams, "data_streams.0.failure_store.indices.0.index_name");
    assertThat(failureIndexName, dataStreamIndexEqualTo(dataStreamName, 2, true));

    assertDocCount(client(), dataStreamName, 6);
    assertDocCount(client(), failureStoreName, expectedFailureDocuments);
}

@phananh1010
Copy link
Owner Author

/review

Copy link

Persistent review updated to latest commit 6ea18be

Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Use NDJSON content type for bulk

Set the correct NDJSON content type for bulk requests to prevent 415/400 errors
on stricter servers. Explicitly add the application/x-ndjson header when sending
bulk payloads.

qa/rolling-upgrade/src/javaRestTest/java/org/elasticsearch/upgrades/FailureStoreUpgradeIT.java [203-217]

 private static void executeBulk(String dataStreamName) throws IOException {
     var bulkRequest = new Request("POST", "/" + dataStreamName + "/_bulk");
     bulkRequest.setJsonEntity(BULK.replace("$now", formatInstant(Instant.now())));
     bulkRequest.addParameter("refresh", "true");
+    // Ensure the correct content type for NDJSON bulk requests
+    bulkRequest.setOptions(org.elasticsearch.client.RequestOptions.DEFAULT.toBuilder()
+        .addHeader("Content-Type", "application/x-ndjson")
+        .build());
     Response response = null;
     try {
         response = client().performRequest(bulkRequest);
     } catch (ResponseException re) {
         response = re.getResponse();
     }
     assertNotNull(response);
     var responseBody = entityAsMap(response);
     assertOK(response);
     assertThat("errors in response:\n " + responseBody, responseBody.get("errors"), equalTo(false));
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly points out that bulk requests should use the application/x-ndjson content type, improving the robustness of the new test code.

Low
  • More

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant