Skip to content

Commit 59be6ae

Browse files
authored
Introducing indexing & deletion strategy planner interfaces (opensearch-project#20585)
Signed-off-by: Shashank Gowri <shnkgo@amazon.com>
1 parent d56fa55 commit 59be6ae

File tree

11 files changed

+1547
-478
lines changed

11 files changed

+1547
-478
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3535
- Added TopN selection logic for streaming terms aggregations ([#20481](https://github.com/opensearch-project/OpenSearch/pull/20481))
3636
- Added support for Intra Segment Search ([#19704](https://github.com/opensearch-project/OpenSearch/pull/19704))
3737
- Introduce AdditionalCodecs and EnginePlugin::getAdditionalCodecs hook to allow additional Codec registration ([#20411](https://github.com/opensearch-project/OpenSearch/pull/20411))
38+
- Introduced strategy planner interfaces for indexing and deletion ([#20585](https://github.com/opensearch-project/OpenSearch/pull/20585))
3839

3940
### Changed
4041
- Move Randomness from server to libs/common ([#20570](https://github.com/opensearch-project/OpenSearch/pull/20570))
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.common.lucene.uid.Versions;
12+
import org.opensearch.index.seqno.SequenceNumbers;
13+
14+
import java.util.Objects;
15+
16+
/**
17+
* The deletion strategy
18+
*
19+
* @opensearch.internal
20+
*/
21+
public final class DeletionStrategy extends OperationStrategy {
22+
23+
public final boolean currentlyDeleted;
24+
25+
private DeletionStrategy(
26+
boolean deleteFromEngine,
27+
boolean addStaleOpToEngine,
28+
boolean currentlyDeleted,
29+
long versionOfDeletion,
30+
int reservedDocs,
31+
Engine.DeleteResult earlyResultOnPreflightError
32+
) {
33+
super(deleteFromEngine, addStaleOpToEngine, versionOfDeletion, earlyResultOnPreflightError, reservedDocs);
34+
assert (deleteFromEngine && earlyResultOnPreflightError != null) == false
35+
: "can only delete from engine or have a preflight result but not both."
36+
+ "deleteFromEngine: "
37+
+ deleteFromEngine
38+
+ " earlyResultOnPreFlightError:"
39+
+ earlyResultOnPreflightError;
40+
assert reservedDocs == 0 || deleteFromEngine || addStaleOpToEngine : reservedDocs;
41+
this.currentlyDeleted = currentlyDeleted;
42+
}
43+
44+
static DeletionStrategy skipDueToVersionConflict(VersionConflictEngineException e, long currentVersion, boolean currentlyDeleted) {
45+
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
46+
e,
47+
currentVersion,
48+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
49+
SequenceNumbers.UNASSIGNED_SEQ_NO,
50+
currentlyDeleted == false
51+
);
52+
return new DeletionStrategy(false, false, currentlyDeleted, Versions.NOT_FOUND, 0, deleteResult);
53+
}
54+
55+
static DeletionStrategy processNormally(boolean currentlyDeleted, long versionOfDeletion, int reservedDocs) {
56+
return new DeletionStrategy(true, false, currentlyDeleted, versionOfDeletion, reservedDocs, null);
57+
58+
}
59+
60+
static DeletionStrategy processButSkipEngine(boolean currentlyDeleted, long versionOfDeletion) {
61+
return new DeletionStrategy(false, false, currentlyDeleted, versionOfDeletion, 0, null);
62+
}
63+
64+
static DeletionStrategy processAsStaleOp(long versionOfDeletion) {
65+
return new DeletionStrategy(false, true, false, versionOfDeletion, 0, null);
66+
}
67+
68+
static DeletionStrategy failAsTooManyDocs(Exception e) {
69+
final Engine.DeleteResult deleteResult = new Engine.DeleteResult(
70+
e,
71+
Versions.NOT_FOUND,
72+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
73+
SequenceNumbers.UNASSIGNED_SEQ_NO,
74+
false
75+
);
76+
return new DeletionStrategy(false, false, false, Versions.NOT_FOUND, 0, deleteResult);
77+
}
78+
79+
@Override
80+
public boolean equals(Object o) {
81+
if (this == o) return true;
82+
if (o == null || getClass() != o.getClass()) return false;
83+
if (!super.equals(o)) return false;
84+
DeletionStrategy that = (DeletionStrategy) o;
85+
return currentlyDeleted == that.currentlyDeleted;
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(super.hashCode(), currentlyDeleted);
91+
}
92+
93+
@Override
94+
public String toString() {
95+
return "DeletionStrategy{"
96+
+ "currentlyDeleted="
97+
+ currentlyDeleted
98+
+ ", executeOpOnEngine="
99+
+ executeOpOnEngine
100+
+ ", addStaleOpToEngine="
101+
+ addStaleOpToEngine
102+
+ ", version="
103+
+ version
104+
+ ", earlyResultOnPreFlightError="
105+
+ earlyResultOnPreFlightError
106+
+ ", reservedDocs="
107+
+ reservedDocs
108+
+ '}';
109+
}
110+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.common.CheckedBiFunction;
12+
import org.opensearch.common.CheckedFunction;
13+
import org.opensearch.common.lucene.uid.Versions;
14+
import org.opensearch.core.index.shard.ShardId;
15+
import org.opensearch.index.IndexSettings;
16+
import org.opensearch.index.seqno.SequenceNumbers;
17+
18+
import java.io.IOException;
19+
import java.util.function.BiFunction;
20+
import java.util.function.Predicate;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* Plans execution strategies for deletion operations.
25+
* The planner produces {@link DeletionStrategy} instances that guide the engine's
26+
* execution of delete operations on both primary and replica shards.
27+
*
28+
* @opensearch.internal
29+
*/
30+
public class DeletionStrategyPlanner implements OperationStrategyPlanner<Engine.Delete, DeletionStrategy> {
31+
32+
private final IndexSettings indexSettings;
33+
private final ShardId shardId;
34+
private final Predicate<Engine.Operation> hasBeenProcessedBefore;
35+
private final CheckedFunction<Engine.Operation, OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction;
36+
private final CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier;
37+
private final BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs;
38+
private final Supplier<Boolean> incrementVersionLookup;
39+
40+
public DeletionStrategyPlanner(
41+
IndexSettings indexSettings,
42+
ShardId shardId,
43+
Predicate<Engine.Operation> hasBeenProcessedBefore,
44+
CheckedFunction<Engine.Operation, OpVsEngineDocStatus, IOException> opVsEngineDocStatusFunction,
45+
CheckedBiFunction<Engine.Operation, Boolean, VersionValue, IOException> docVersionSupplier,
46+
BiFunction<Engine.Operation, Integer, Exception> tryAcquireInFlightDocs,
47+
Supplier<Boolean> incrementVersionLookup
48+
) {
49+
this.indexSettings = indexSettings;
50+
this.shardId = shardId;
51+
this.hasBeenProcessedBefore = hasBeenProcessedBefore;
52+
this.opVsEngineDocStatusFunction = opVsEngineDocStatusFunction;
53+
this.docVersionSupplier = docVersionSupplier;
54+
this.tryAcquireInFlightDocs = tryAcquireInFlightDocs;
55+
this.incrementVersionLookup = incrementVersionLookup;
56+
}
57+
58+
@Override
59+
public DeletionStrategy planOperationAsPrimary(Engine.Delete delete) throws IOException {
60+
assert delete.origin() == Engine.Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
61+
// resolve operation from external to internal
62+
final VersionValue versionValue = docVersionSupplier.apply(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO);
63+
assert incrementVersionLookup.get();
64+
final long currentVersion;
65+
final boolean currentlyDeleted;
66+
if (versionValue == null) {
67+
currentVersion = Versions.NOT_FOUND;
68+
currentlyDeleted = true;
69+
} else {
70+
currentVersion = versionValue.version;
71+
currentlyDeleted = versionValue.isDelete();
72+
}
73+
final DeletionStrategy plan;
74+
if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && currentlyDeleted) {
75+
final VersionConflictEngineException e = new VersionConflictEngineException(
76+
shardId,
77+
delete.id(),
78+
delete.getIfSeqNo(),
79+
delete.getIfPrimaryTerm(),
80+
SequenceNumbers.UNASSIGNED_SEQ_NO,
81+
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
82+
);
83+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
84+
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
85+
&& (versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm())) {
86+
final VersionConflictEngineException e = new VersionConflictEngineException(
87+
shardId,
88+
delete.id(),
89+
delete.getIfSeqNo(),
90+
delete.getIfPrimaryTerm(),
91+
versionValue.seqNo,
92+
versionValue.term
93+
);
94+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
95+
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
96+
final VersionConflictEngineException e = new VersionConflictEngineException(
97+
shardId,
98+
delete,
99+
currentVersion,
100+
currentlyDeleted
101+
);
102+
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
103+
} else {
104+
final Exception reserveError = tryAcquireInFlightDocs.apply(delete, 1);
105+
if (reserveError != null) {
106+
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
107+
} else {
108+
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
109+
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
110+
}
111+
}
112+
return plan;
113+
}
114+
115+
@Override
116+
public DeletionStrategy planOperationAsNonPrimary(Engine.Delete delete) throws IOException {
117+
assert assertNonPrimaryOrigin(delete);
118+
final DeletionStrategy plan;
119+
if (hasBeenProcessedBefore.test(delete)) {
120+
// the operation seq# was processed thus this operation was already put into lucene
121+
// this can happen during recovery where older operations are sent from the translog that are already
122+
// part of the lucene commit (either from a peer recovery or a local translog)
123+
// or due to concurrent indexing & recovery. For the former it is important to skip lucene as the operation in
124+
// question may have been deleted in an out of order op that is not replayed.
125+
// See testRecoverFromStoreWithOutOfOrderDelete for an example of local recovery
126+
// See testRecoveryWithOutOfOrderDelete for an example of peer recovery
127+
plan = DeletionStrategy.processButSkipEngine(false, delete.version());
128+
} else {
129+
boolean segRepEnabled = indexSettings.isSegRepEnabledOrRemoteNode();
130+
final OpVsEngineDocStatus opVsLucene = opVsEngineDocStatusFunction.apply(delete);
131+
if (opVsLucene == OpVsEngineDocStatus.OP_STALE_OR_EQUAL) {
132+
if (segRepEnabled) {
133+
// For segrep based indices, we can't completely rely on localCheckpointTracker
134+
// as the preserved checkpoint may not have all the operations present in lucene
135+
// we don't need to index it again as stale op as it would create multiple documents for same seq no
136+
plan = DeletionStrategy.processButSkipEngine(false, delete.version());
137+
} else {
138+
plan = DeletionStrategy.processAsStaleOp(delete.version());
139+
}
140+
} else {
141+
plan = DeletionStrategy.processNormally(opVsLucene == OpVsEngineDocStatus.DOC_NOT_FOUND, delete.version(), 0);
142+
}
143+
}
144+
return plan;
145+
}
146+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.common.lucene.uid.Versions;
12+
13+
import java.util.Objects;
14+
15+
/**
16+
* The indexing strategy
17+
*
18+
* @opensearch.internal
19+
*/
20+
public final class IndexingStrategy extends OperationStrategy {
21+
22+
public final boolean currentNotFoundOrDeleted;
23+
public final boolean useUpdateDocument;
24+
25+
private IndexingStrategy(
26+
boolean currentNotFoundOrDeleted,
27+
boolean useUpdateDocument,
28+
boolean indexIntoEngine,
29+
boolean addStaleOpToEngine,
30+
long versionForIndexing,
31+
int reservedDocs,
32+
Engine.IndexResult earlyResultOnPreFlightError
33+
) {
34+
super(indexIntoEngine, addStaleOpToEngine, versionForIndexing, earlyResultOnPreFlightError, reservedDocs);
35+
assert useUpdateDocument == false || indexIntoEngine : "use update is set to true, but we're not indexing into engine";
36+
assert (indexIntoEngine && earlyResultOnPreFlightError != null) == false
37+
: "can only index into engine or have a preflight result but not both."
38+
+ "indexIntoEngine: "
39+
+ indexIntoEngine
40+
+ " earlyResultOnPreFlightError:"
41+
+ earlyResultOnPreFlightError;
42+
assert reservedDocs == 0 || indexIntoEngine || addStaleOpToEngine : reservedDocs;
43+
this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
44+
this.useUpdateDocument = useUpdateDocument;
45+
}
46+
47+
static IndexingStrategy optimizedAppendOnly(long versionForIndexing, int reservedDocs) {
48+
return new IndexingStrategy(true, false, true, false, versionForIndexing, reservedDocs, null);
49+
}
50+
51+
static IndexingStrategy skipDueToVersionConflict(
52+
VersionConflictEngineException e,
53+
boolean currentNotFoundOrDeleted,
54+
long currentVersion
55+
) {
56+
final Engine.IndexResult result = new Engine.IndexResult(e, currentVersion);
57+
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, Versions.NOT_FOUND, 0, result);
58+
}
59+
60+
static IndexingStrategy processNormally(boolean currentNotFoundOrDeleted, long versionForIndexing, int reservedDocs) {
61+
return new IndexingStrategy(
62+
currentNotFoundOrDeleted,
63+
currentNotFoundOrDeleted == false,
64+
true,
65+
false,
66+
versionForIndexing,
67+
reservedDocs,
68+
null
69+
);
70+
}
71+
72+
static IndexingStrategy processButSkipEngine(boolean currentNotFoundOrDeleted, long versionForIndexing) {
73+
return new IndexingStrategy(currentNotFoundOrDeleted, false, false, false, versionForIndexing, 0, null);
74+
}
75+
76+
static IndexingStrategy processAsStaleOp(long versionForIndexing) {
77+
return new IndexingStrategy(false, false, false, true, versionForIndexing, 0, null);
78+
}
79+
80+
static IndexingStrategy failAsTooManyDocs(Exception e) {
81+
final Engine.IndexResult result = new Engine.IndexResult(e, Versions.NOT_FOUND);
82+
return new IndexingStrategy(false, false, false, false, Versions.NOT_FOUND, 0, result);
83+
}
84+
85+
static IndexingStrategy failAsIndexAppendOnly(Engine.IndexResult result, long versionForIndexing, int reservedDocs) {
86+
return new IndexingStrategy(false, false, false, true, versionForIndexing, reservedDocs, result);
87+
}
88+
89+
@Override
90+
public boolean equals(Object o) {
91+
if (this == o) return true;
92+
if (o == null || getClass() != o.getClass()) return false;
93+
if (!super.equals(o)) return false;
94+
IndexingStrategy that = (IndexingStrategy) o;
95+
return currentNotFoundOrDeleted == that.currentNotFoundOrDeleted && useUpdateDocument == that.useUpdateDocument;
96+
}
97+
98+
@Override
99+
public int hashCode() {
100+
return Objects.hash(super.hashCode(), currentNotFoundOrDeleted, useUpdateDocument);
101+
}
102+
103+
@Override
104+
public String toString() {
105+
return "IndexingStrategy{"
106+
+ "currentNotFoundOrDeleted="
107+
+ currentNotFoundOrDeleted
108+
+ ", useUpdateDocument="
109+
+ useUpdateDocument
110+
+ ", executeOpOnEngine="
111+
+ executeOpOnEngine
112+
+ ", addStaleOpToEngine="
113+
+ addStaleOpToEngine
114+
+ ", version="
115+
+ version
116+
+ ", earlyResultOnPreFlightError="
117+
+ earlyResultOnPreFlightError
118+
+ ", reservedDocs="
119+
+ reservedDocs
120+
+ '}';
121+
}
122+
}

0 commit comments

Comments
 (0)