Skip to content

Commit 5659e5b

Browse files
committed
Skip force-merge if it would be a no-op
We currently add any incoming force-merge request to the force-merge thread pool queue regardless of whether the shard already matches the request (e.g. max number of segments). This PR adds a check in the transport layer that asks the index `Engine` whether a force-merge would be a no-op, and shortcuts the action if so. Before doing the check, we first flush the shard to ensure accuracy. The `Engine` checks whether a force-merge would be a no-op by reading the `SegmentInfos` and calling Lucene's `MergePolicy#findForceMerges` method to determine what set of merge operations would be necessary. Resolves #86013
1 parent dd6484c commit 5659e5b

File tree

7 files changed

+158
-1
lines changed

7 files changed

+158
-1
lines changed

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search.vectors/42_knn_search_bbq_flat.yml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,11 @@ setup:
400400
0.224, 0.203, 0.439, 0.064, 0.246, 0.396, 0.297, 0.242,
401401
0.224, 0.203, 0.439, 0.064, 0.246, 0.396, 0.297, 0.242,
402402
0.028, 0.321, 0.022, 0.009, 0.001 , 0.031, -0.533, 0.45]
403+
# Flush in order to provoke a merge later
404+
- do:
405+
indices.flush:
406+
index: bbq_flat_nested
407+
403408
- do:
404409
index:
405410
index: bbq_flat_nested
@@ -433,6 +438,10 @@ setup:
433438
-0.602, -0.142, 0.11 , 0.192, 0.259, -0.241, 0.181, -0.166,
434439
0.082, 0.107, -0.05 , 0.155, 0.011, 0.161, -0.486, 0.569,
435440
-0.489, 0.901, 0.208, 0.011, 0.209, -0.153, -0.27, -0.013 ]
441+
# Flush in order to provoke a merge later
442+
- do:
443+
indices.flush:
444+
index: bbq_flat_nested
436445

437446
- do:
438447
index:
@@ -449,7 +458,7 @@ setup:
449458
-0.658, -0.03 , 0.276, 0.041, 0.187, -0.331, 0.165, 0.017,
450459
0.171, -0.203, -0.198, 0.115, -0.007, 0.337, -0.444, 0.615,
451460
-0.657, 1.285, 0.2 , -0.062, 0.038, 0.089, -0.068, -0.058 ]
452-
461+
# Flush in order to provoke a merge later
453462
- do:
454463
indices.flush:
455464
index: bbq_flat_nested

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeIT.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.action.admin.indices.forcemerge;
1111

12+
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
13+
import org.elasticsearch.action.support.WriteRequest;
1214
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1315
import org.elasticsearch.cluster.ClusterState;
1416
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -20,6 +22,7 @@
2022
import org.elasticsearch.test.ESIntegTestCase;
2123

2224
import java.io.IOException;
25+
import java.util.List;
2326

2427
import static org.hamcrest.Matchers.is;
2528
import static org.hamcrest.Matchers.notNullValue;
@@ -31,6 +34,15 @@ public void testForceMergeUUIDConsistent() throws IOException {
3134
internalCluster().ensureAtLeastNumDataNodes(2);
3235
final String index = "test-index";
3336
createIndex(index, 1, 1);
37+
// Index some documents to ensure more than 1 segment.
38+
int docs = between(10, 100);
39+
for (int i = 0; i < docs; i++) {
40+
prepareIndex(index).setId("" + i).setSource("test", "init").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
41+
if (i % 10 == 0) {
42+
// occasionally flush to create more segments
43+
flush(index);
44+
}
45+
}
3446
ensureGreen(index);
3547
final ClusterState state = clusterService().state();
3648
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
@@ -68,6 +80,91 @@ public void testForceMergeUUIDConsistent() throws IOException {
6880
final String replicaForceMergeUUID = getForceMergeUUID(replica);
6981
assertThat(replicaForceMergeUUID, notNullValue());
7082
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
83+
84+
// Assert that we have only 1 segment now
85+
final List<ShardSegments> shardSegments = getShardSegments(index);
86+
shardSegments.forEach(ss -> assertThat(ss.getNumberOfSearch(), is(1)));
87+
}
88+
89+
public void testForceMergeNoOpEmptyIndex() throws IOException {
90+
internalCluster().ensureAtLeastNumDataNodes(2);
91+
final String index = "test-index-no-op-empty";
92+
createIndex(index, 1, 1);
93+
ensureGreen(index);
94+
final ClusterState state = clusterService().state();
95+
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
96+
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.shard(0);
97+
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
98+
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
99+
final Index idx = shardRouting.primaryShard().index();
100+
final IndicesService primaryIndicesService = internalCluster().getInstance(
101+
IndicesService.class,
102+
state.nodes().get(primaryNodeId).getName()
103+
);
104+
final IndicesService replicaIndicesService = internalCluster().getInstance(
105+
IndicesService.class,
106+
state.nodes().get(replicaNodeId).getName()
107+
);
108+
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
109+
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);
110+
111+
assertThat(getForceMergeUUID(primary), nullValue());
112+
assertThat(getForceMergeUUID(replica), nullValue());
113+
114+
final BroadcastResponse forceMergeResponse = indicesAdmin().prepareForceMerge(index).setMaxNumSegments(1).get();
115+
116+
assertThat(forceMergeResponse.getFailedShards(), is(0));
117+
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
118+
119+
// Force flush to force a new commit that would contain the force flush UUID before the no-op existed
120+
final BroadcastResponse flushResponse = indicesAdmin().prepareFlush(index).setForce(true).get();
121+
assertThat(flushResponse.getFailedShards(), is(0));
122+
assertThat(flushResponse.getSuccessfulShards(), is(2));
123+
124+
// Assert that no force-merge occurred.
125+
assertThat(getForceMergeUUID(primary), nullValue());
126+
assertThat(getForceMergeUUID(replica), nullValue());
127+
}
128+
129+
public void testForceMergeNoOpOneSegment() throws IOException {
130+
internalCluster().ensureAtLeastNumDataNodes(2);
131+
final String index = "test-index-no-op-one-segment";
132+
createIndex(index, 1, 1);
133+
prepareIndex(index).setId("0").setSource("test", "init").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
134+
ensureGreen(index);
135+
final ClusterState state = clusterService().state();
136+
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
137+
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.shard(0);
138+
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
139+
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
140+
final Index idx = shardRouting.primaryShard().index();
141+
final IndicesService primaryIndicesService = internalCluster().getInstance(
142+
IndicesService.class,
143+
state.nodes().get(primaryNodeId).getName()
144+
);
145+
final IndicesService replicaIndicesService = internalCluster().getInstance(
146+
IndicesService.class,
147+
state.nodes().get(replicaNodeId).getName()
148+
);
149+
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
150+
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);
151+
152+
assertThat(getForceMergeUUID(primary), nullValue());
153+
assertThat(getForceMergeUUID(replica), nullValue());
154+
155+
final BroadcastResponse forceMergeResponse = indicesAdmin().prepareForceMerge(index).setMaxNumSegments(1).get();
156+
157+
assertThat(forceMergeResponse.getFailedShards(), is(0));
158+
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
159+
160+
// Force flush to force a new commit that would contain the force flush UUID before the no-op existed
161+
final BroadcastResponse flushResponse = indicesAdmin().prepareFlush(index).setForce(true).get();
162+
assertThat(flushResponse.getFailedShards(), is(0));
163+
assertThat(flushResponse.getSuccessfulShards(), is(2));
164+
165+
// Assert that no force-merge occurred.
166+
assertThat(getForceMergeUUID(primary), nullValue());
167+
assertThat(getForceMergeUUID(replica), nullValue());
71168
}
72169

73170
private static String getForceMergeUUID(IndexShard indexShard) throws IOException {

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/TransportForceMergeAction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.action.admin.indices.forcemerge;
1111

12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
1214
import org.elasticsearch.action.ActionListener;
1315
import org.elasticsearch.action.ActionRunnable;
1416
import org.elasticsearch.action.support.ActionFilters;
@@ -42,6 +44,8 @@ public class TransportForceMergeAction extends TransportBroadcastByNodeAction<
4244
BroadcastResponse,
4345
TransportBroadcastByNodeAction.EmptyResult> {
4446

47+
private static final Logger logger = LogManager.getLogger(TransportForceMergeAction.class);
48+
4549
private final IndicesService indicesService;
4650
private final ThreadPool threadPool;
4751
private final ProjectResolver projectResolver;
@@ -102,6 +106,15 @@ protected void shardOperation(
102106
.getShard(shardRouting.shardId().id());
103107
indexShard.ensureMutable(l.map(unused -> indexShard), false);
104108
}).<EmptyResult>andThen((l, indexShard) -> {
109+
boolean forceMergeIsNoOp = indexShard.withEngineException(engine -> {
110+
engine.flush();
111+
return engine.forceMergeIsNoOp(request.maxNumSegments());
112+
});
113+
if (forceMergeIsNoOp) {
114+
logger.info("---> skipping force merge for shard {} since it is a no-op", indexShard.shardId());
115+
l.onResponse(EmptyResult.INSTANCE);
116+
return;
117+
}
105118
threadPool.executor(ThreadPool.Names.FORCE_MERGE).execute(ActionRunnable.supply(l, () -> {
106119
indexShard.forceMerge(request);
107120
return EmptyResult.INSTANCE;

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,8 @@ public final void flush() throws EngineException {
15411541
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, String forceMergeUUID)
15421542
throws EngineException, IOException;
15431543

1544+
public abstract boolean forceMergeIsNoOp(int maxNumSegments) throws IOException;
1545+
15441546
/**
15451547
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
15461548
* lucene index to make sure we have a "fresh" copy of the files to snapshot.

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@
112112
import java.io.IOException;
113113
import java.util.Arrays;
114114
import java.util.Collections;
115+
import java.util.HashMap;
115116
import java.util.List;
116117
import java.util.Locale;
117118
import java.util.Map;
@@ -2543,6 +2544,26 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
25432544
}
25442545
}
25452546

2547+
@Override
2548+
public boolean forceMergeIsNoOp(int maxNumSegments) throws IOException {
2549+
// TODO: is there a way for us to determine no-op with no max num segments?
2550+
if (maxNumSegments <= 0) {
2551+
return false;
2552+
}
2553+
try (var reader = DirectoryReader.open(indexWriter)) {
2554+
final var segmentCommitInfos = SegmentInfos.readCommit(reader.directory(), reader.getIndexCommit().getSegmentsFileName());
2555+
final var segmentsToMerge = new HashMap<SegmentCommitInfo, Boolean>();
2556+
for (int i = 0; i < segmentCommitInfos.size(); i++) {
2557+
segmentsToMerge.put(segmentCommitInfos.info(i), Boolean.TRUE);
2558+
}
2559+
2560+
final var mergeSpecification = indexWriter.getConfig()
2561+
.getMergePolicy()
2562+
.findForcedMerges(segmentCommitInfos, maxNumSegments, segmentsToMerge, indexWriter);
2563+
return mergeSpecification == null || mergeSpecification.merges.isEmpty();
2564+
}
2565+
}
2566+
25462567
private IndexCommitRef acquireIndexCommitRef(final Supplier<IndexCommit> indexCommitSupplier) {
25472568
store.incRef();
25482569
boolean success = false;

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,19 @@ public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDel
478478
}
479479
}
480480

481+
@Override
482+
public boolean forceMergeIsNoOp(int maxNumSegments) throws IOException {
483+
throw new UnsupportedOperationException(
484+
"force merge is not supported on a read-only engine, "
485+
+ "target max number of segments["
486+
+ maxNumSegments
487+
+ "], "
488+
+ "current number of segments["
489+
+ lastCommittedSegmentInfos.size()
490+
+ "]."
491+
);
492+
}
493+
481494
@Override
482495
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
483496
store.incRef();

x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1456,6 +1456,8 @@ private void runFallBehindTest(
14561456
for (int i = 0; i < numDocs; i++) {
14571457
final String source = Strings.format("{\"f\":%d}", i * 2);
14581458
leaderClient().prepareIndex("index1").setId(Integer.toString(i)).setSource(source, XContentType.JSON).get();
1459+
// flush during indexing to create more segments
1460+
leaderClient().admin().indices().flush(new FlushRequest("index1").force(true)).actionGet();
14591461
}
14601462
leaderClient().prepareDelete("index1", "1").get();
14611463
leaderClient().admin().indices().refresh(new RefreshRequest("index1")).actionGet();

0 commit comments

Comments
 (0)