Skip to content

Add interface for the Multi format merge flow#20908

Draft
darjisagar7 wants to merge 1 commit intoopensearch-project:mainfrom
darjisagar7:merge_asbtraction
Draft

Add interface for the Multi format merge flow#20908
darjisagar7 wants to merge 1 commit intoopensearch-project:mainfrom
darjisagar7:merge_asbtraction

Conversation

@darjisagar7
Copy link

Description

[Describe what this change achieves]

#19490

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sagar Darji <darsaga@amazon.com>
@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Add OneMerge model and WriterFileSet.getNumRows()

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/dataformat/merge/OneMerge.java
  • server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/merge/MergeTests.java

Sub-PR theme: Add MergeHandler and MergeScheduler abstractions

Relevant files:

  • server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java
  • server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/merge/MergeTests.java

Sub-PR theme: Refactor MockDataFormat into shared DataFormatTestUtils

Relevant files:

  • server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatTestUtils.java
  • server/src/test/java/org/opensearch/index/engine/dataformat/DataFormatPluginTests.java

⚡ Recommended focus areas for review

Null Logger

The logger field is declared as private static Logger logger and initialized to null. In refreshConfig(), calling logger.info(...) will throw a NullPointerException unless the logger is set externally (as done via reflection in tests). The logger should be initialized inline, e.g., LogManager.getLogger(MergeScheduler.class).

private static Logger logger;
private volatile int maxConcurrentMerges;
private volatile int maxMergeCount;
private final MergeSchedulerConfig mergeSchedulerConfig;

/** true if we should rate-limit writes for each merge */
private boolean doAutoIOThrottle = false;

/** Initial value for IO write rate limit when doAutoIOThrottle is true */
private static final double START_MB_PER_SEC = 20.0;

/** Current IO writes throttle rate */
protected double targetMBPerSec = START_MB_PER_SEC;

/**
 * Creates a new merge scheduler.
 *
 * @param mergeHandler   the handler that selects and executes merges
 * @param shardId        the shard this scheduler is associated with
 * @param indexSettings  the index settings providing merge scheduler configuration
 */
public MergeScheduler(
    MergeHandler mergeHandler,
    ShardId shardId,
    IndexSettings indexSettings
) {
    this.mergeSchedulerConfig = indexSettings.getMergeSchedulerConfig();
    refreshConfig();
}

/**
 * Refreshes the max concurrent merge thread count and max merge count from
 * the current {@link MergeSchedulerConfig}. No-op if the values have not changed.
 */
public synchronized void refreshConfig() {
    int newMaxThreadCount = mergeSchedulerConfig.getMaxThreadCount();
    int newMaxMergeCount = mergeSchedulerConfig.getMaxMergeCount();

    if (newMaxThreadCount == this.maxConcurrentMerges && newMaxMergeCount == this.maxMergeCount) {
        return;
    }

    logger.info(() -> new ParameterizedMessage("Updating from merge scheduler config: maxThreadCount {} -> {}, " +
        "maxMergeCount {} -> {}", this.maxConcurrentMerges, newMaxThreadCount, this.maxMergeCount, newMaxMergeCount));
Empty Implementations

Several methods (updatePendingMerges, registerMerge, onMergeFinished, onMergeFailure, doMerge) have empty or stub bodies. doMerge returns null. These are not abstract, so subclasses won't be forced to implement them, and callers may silently get no-op behavior or null results. Consider making them abstract or throwing UnsupportedOperationException.

public synchronized void updatePendingMerges() {

}

/**
 * Registers a merge to be executed.
 *
 * @param merge the merge to register
 */
public synchronized void registerMerge(OneMerge merge) {

}

/**
 * Returns whether there are any pending merges in the queue.
 *
 * @return {@code true} if there are pending merges
 */
public boolean hasPendingMerges() {
    return !mergingSegments.isEmpty();
}

/**
 * Retrieves and removes the next pending merge from the queue.
 *
 * @return the next merge to execute, or {@code null} if the queue is empty
 */
public synchronized OneMerge getNextMerge() {
    if(mergingSegments.isEmpty()) {
        return null;
    }
    return mergingSegments.removeFirst();
}

/**
 * Callback invoked when a merge completes successfully.
 *
 * @param oneMerge the merge that finished
 */
public synchronized void onMergeFinished(OneMerge oneMerge) {
}

/**
 * Callback invoked when a merge fails.
 *
 * @param oneMerge the merge that failed
 */
public synchronized void onMergeFailure(OneMerge oneMerge) {
}

/**
 * Executes the given merge operation.
 *
 * @param oneMerge the merge to execute
 * @return the result of the merge
 */
public MergeResult doMerge(OneMerge oneMerge) {
    return null;
}
Thread Safety

hasPendingMerges() reads mergingSegments without synchronization, while getNextMerge() is synchronized. This creates a potential TOCTOU race: a caller checking hasPendingMerges() and then calling getNextMerge() may observe inconsistent state. Consider synchronizing hasPendingMerges() as well.

public boolean hasPendingMerges() {
    return !mergingSegments.isEmpty();
}

/**
 * Retrieves and removes the next pending merge from the queue.
 *
 * @return the next merge to execute, or {@code null} if the queue is empty
 */
public synchronized OneMerge getNextMerge() {
    if(mergingSegments.isEmpty()) {
        return null;
    }
    return mergingSegments.removeFirst();
}
Reflection in Tests

The test uses reflection to set the private static logger field in MergeScheduler. This is fragile and indicates a design issue in the production code. The logger should be properly initialized in the class itself rather than requiring test-time injection via reflection.

Field loggerField = MergeScheduler.class.getDeclaredField("logger");
loggerField.setAccessible(true);
loggerField.set(null, LogManager.getLogger(MergeScheduler.class));
Unused Parameter

The constructor accepts MergeHandler mergeHandler and ShardId shardId parameters but neither is stored as a field. The mergeHandler is particularly important since triggerMerges() and forceMerge() are supposed to delegate to it, but currently those methods are empty stubs.

public MergeScheduler(
    MergeHandler mergeHandler,
    ShardId shardId,
    IndexSettings indexSettings
) {
    this.mergeSchedulerConfig = indexSettings.getMergeSchedulerConfig();
    refreshConfig();
}

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Initialize static logger field directly

The logger field is declared as a non-final static field and initialized to null,
which will cause a NullPointerException when refreshConfig() is called before the
logger is explicitly set (e.g., outside of tests). It should be initialized directly
using LogManager.getLogger(MergeScheduler.class) to ensure it is always available.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeScheduler.java [31]

-private static Logger logger;
+private static final Logger logger = LogManager.getLogger(MergeScheduler.class);
Suggestion importance[1-10]: 8

__

Why: The logger field is null by default and will cause a NullPointerException when refreshConfig() is called in production code. The test workaround using reflection confirms this is a real bug that needs fixing by initializing it with LogManager.getLogger(MergeScheduler.class).

Medium
Synchronize access to shared collection

The hasPendingMerges() method accesses mergingSegments without synchronization,
while all other methods that access this field are synchronized. This creates a data
race where the result may be stale or inconsistent in a concurrent environment.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/MergeHandler.java [69-71]

-public boolean hasPendingMerges() {
+public synchronized boolean hasPendingMerges() {
     return !mergingSegments.isEmpty();
 }
Suggestion importance[1-10]: 7

__

Why: hasPendingMerges() accesses mergingSegments without synchronization while all other methods accessing this field are synchronized, creating a potential data race in concurrent usage.

Medium
General
Defensively copy list to ensure immutability

Collections.unmodifiableList wraps the original list without copying it, so if the
caller mutates the passed-in list after construction, the internal segmentsToMerge
will reflect those changes. Use List.copyOf (or new ArrayList<>(segmentsToMerge)
wrapped with unmodifiableList) to ensure true immutability.

server/src/main/java/org/opensearch/index/engine/dataformat/merge/OneMerge.java [37-41]

 public OneMerge(List<Segment> segmentsToMerge) {
-    this.segmentsToMerge = Collections.unmodifiableList(segmentsToMerge);
+    this.segmentsToMerge = List.copyOf(segmentsToMerge);
     this.totalSize = calculateTotalSizeInBytes();
     this.totalNumDocs = calculateTotalNumDocs();
 }
Suggestion importance[1-10]: 5

__

Why: Using Collections.unmodifiableList without copying means external mutations to the original list would affect segmentsToMerge. Using List.copyOf provides true defensive immutability, which is a valid correctness improvement.

Low

@github-actions
Copy link
Contributor

❌ Gradle check result for 1d5b2c6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant