Skip to content

Commit ac911ed

Browse files
authored
Fix CriteriaBasedCodec to work with delegate codec (opensearch-project#20442)
Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
1 parent 8134f12 commit ac911ed

File tree

10 files changed

+486
-125
lines changed

10 files changed

+486
-125
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
- Update EncryptedBlobContainer to adhere limits while listing blobs in specific sort order if wrapped blob container supports ([#20514](https://github.com/opensearch-project/OpenSearch/pull/20514))
1919
- [segment replication] Fix segment replication infinite retry due to stale metadata checkpoint ([#20551](https://github.com/opensearch-project/OpenSearch/pull/20551))
2020
- Changing opensearch.cgroups.hierarchy.override causes java.lang.SecurityException exception ([#20565](https://github.com/opensearch-project/OpenSearch/pull/20565))
21+
- Fix CriteriaBasedCodec to work with delegate codec. ([20442](https://github.com/opensearch-project/OpenSearch/pull/20442))
2122

2223
### Dependencies
2324
- Bump `ch.qos.logback:logback-core` and `ch.qos.logback:logback-classic` from 1.5.24 to 1.5.27 ([#20525](https://github.com/opensearch-project/OpenSearch/pull/20525))

server/src/main/java/org/opensearch/index/CriteriaBasedMergePolicy.java

Lines changed: 84 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.index.MergeTrigger;
1414
import org.apache.lucene.index.SegmentCommitInfo;
1515
import org.apache.lucene.index.SegmentInfos;
16+
import org.opensearch.common.CheckedFunction;
1617
import org.opensearch.index.codec.CriteriaBasedCodec;
1718

1819
import java.io.IOException;
@@ -36,47 +37,106 @@ public CriteriaBasedMergePolicy(MergePolicy in) {
3637
this.in = in;
3738
}
3839

39-
/**
40-
* Merges the segments belonging to same group.
41-
*
42-
* @param mergeTrigger the event that triggered the merge
43-
* @param infos the total set of segments in the index
44-
* @param mergeContext the IndexWriter to find the merges on
45-
* @return
46-
* @throws IOException
47-
*/
48-
@Override
49-
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
40+
private MergeSpecification findMergesInternal(
41+
SegmentInfos segmentInfos,
42+
MergeContext mergeContext,
43+
CheckedFunction<SegmentInfos, MergeSpecification, IOException> mergeFinderFunction
44+
) throws IOException {
45+
5046
final Set<SegmentCommitInfo> merging = mergeContext.getMergingSegments();
51-
MergeSpecification spec = null;
5247
final Map<String, List<SegmentCommitInfo>> commitInfos = new HashMap<>();
53-
for (SegmentCommitInfo si : infos) {
48+
49+
for (SegmentCommitInfo si : segmentInfos) {
5450
if (merging.contains(si)) {
5551
continue;
5652
}
57-
5853
final String dwptGroupNumber = si.info.getAttribute(CriteriaBasedCodec.BUCKET_NAME);
5954
commitInfos.computeIfAbsent(dwptGroupNumber, k -> new ArrayList<>()).add(si);
6055
}
6156

62-
for (String dwptGroupNumber : commitInfos.keySet()) {
63-
if (commitInfos.get(dwptGroupNumber).size() > 1) {
64-
final SegmentInfos newSIS = new SegmentInfos(infos.getIndexCreatedVersionMajor());
65-
for (SegmentCommitInfo info : commitInfos.get(dwptGroupNumber)) {
66-
newSIS.add(info);
67-
}
57+
MergeSpecification spec = null;
58+
for (Map.Entry<String, List<SegmentCommitInfo>> entry : commitInfos.entrySet()) {
59+
List<SegmentCommitInfo> segments = entry.getValue();
60+
if (segments.size() > 1) {
61+
final SegmentInfos newSIS = new SegmentInfos(segmentInfos.getIndexCreatedVersionMajor());
62+
segments.forEach(newSIS::add);
6863

69-
final MergeSpecification tieredMergePolicySpec = in.findMerges(mergeTrigger, newSIS, mergeContext);
70-
if (tieredMergePolicySpec != null) {
64+
final MergeSpecification delegateSpec = mergeFinderFunction.apply(newSIS);
65+
if (delegateSpec != null) {
7166
if (spec == null) {
7267
spec = new MergeSpecification();
7368
}
74-
75-
spec.merges.addAll(tieredMergePolicySpec.merges);
69+
spec.merges.addAll(delegateSpec.merges);
7670
}
7771
}
7872
}
7973

8074
return spec;
8175
}
76+
77+
/**
78+
* Merges the segments belonging to same group
79+
*
80+
* @param mergeTrigger the event that triggered the merge
81+
* @param infos the total set of segments in the index
82+
* @param mergeContext the IndexWriter to find the merges on
83+
* @return
84+
* @throws IOException
85+
*/
86+
@Override
87+
public MergeSpecification findMerges(MergeTrigger mergeTrigger, SegmentInfos infos, MergeContext mergeContext) throws IOException {
88+
return findMergesInternal(infos, mergeContext, newSIS -> in.findMerges(mergeTrigger, newSIS, mergeContext));
89+
}
90+
91+
/**
92+
* Force merges segments belonging to same group.
93+
*
94+
* @param segmentInfos the total set of segments in the index
95+
* @param maxSegmentCount requested maximum number of segments in the index
96+
* @param segmentsToMerge contains the specific SegmentInfo instances that must be merged away.
97+
* @param mergeContext the MergeContext to find the merges on
98+
* @return
99+
* @throws IOException
100+
*/
101+
@Override
102+
public MergeSpecification findForcedMerges(
103+
SegmentInfos segmentInfos,
104+
int maxSegmentCount,
105+
Map<SegmentCommitInfo, Boolean> segmentsToMerge,
106+
MergeContext mergeContext
107+
) throws IOException {
108+
return findMergesInternal(
109+
segmentInfos,
110+
mergeContext,
111+
newSIS -> in.findForcedMerges(newSIS, maxSegmentCount, segmentsToMerge, mergeContext)
112+
);
113+
}
114+
115+
/**
116+
* Merges segment belonging to same group to expunge deletes.
117+
*
118+
* @param segmentInfos the total set of segments in the index
119+
* @param mergeContext the MergeContext to find the merges on
120+
* @return
121+
* @throws IOException
122+
*/
123+
@Override
124+
public MergeSpecification findForcedDeletesMerges(SegmentInfos segmentInfos, MergeContext mergeContext) throws IOException {
125+
return findMergesInternal(segmentInfos, mergeContext, newSIS -> in.findForcedDeletesMerges(newSIS, mergeContext));
126+
}
127+
128+
/**
129+
* Identifies merges that we want to execute (synchronously) on commit
130+
*
131+
* @param mergeTrigger the event that triggered the merge (COMMIT or GET_READER).
132+
* @param segmentInfos the total set of segments in the index (while preparing the commit)
133+
* @param mergeContext the MergeContext to find the merges on.
134+
* @return
135+
* @throws IOException
136+
*/
137+
@Override
138+
public MergeSpecification findFullFlushMerges(MergeTrigger mergeTrigger, SegmentInfos segmentInfos, MergeContext mergeContext)
139+
throws IOException {
140+
return findMergesInternal(segmentInfos, mergeContext, newSIS -> in.findFullFlushMerges(mergeTrigger, newSIS, mergeContext));
141+
}
82142
}

server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,9 @@
99
package org.opensearch.index.codec;
1010

1111
import org.apache.lucene.codecs.Codec;
12-
import org.apache.lucene.codecs.DocValuesFormat;
1312
import org.apache.lucene.codecs.FilterCodec;
14-
import org.apache.lucene.codecs.SegmentInfoFormat;
15-
import org.apache.lucene.codecs.lucene103.Lucene103Codec;
16-
import org.apache.lucene.index.SegmentInfo;
17-
import org.apache.lucene.store.Directory;
18-
import org.apache.lucene.store.IOContext;
19-
20-
import java.io.IOException;
13+
import org.apache.lucene.codecs.PostingsFormat;
14+
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
2115

2216
/**
2317
* Filter codec used to attach bucket attributes to segments of child writer.
@@ -27,46 +21,29 @@ public class CriteriaBasedCodec extends FilterCodec {
2721

2822
private final String bucket;
2923
public static final String BUCKET_NAME = "bucket";
30-
private static final String PLACEHOLDER_BUCKET_FOR_PARENT_WRITER = "-2";
31-
32-
public CriteriaBasedCodec() {
33-
super("CriteriaBasedCodec", new Lucene103Codec());
34-
bucket = null;
35-
}
24+
public static final String ATTRIBUTE_BINDING_TARGET_FIELD = "_id";
3625

3726
public CriteriaBasedCodec(Codec delegate, String bucket) {
38-
super("CriteriaBasedCodec", delegate);
27+
super(delegate.getName(), delegate);
3928
this.bucket = bucket;
4029
}
4130

4231
@Override
43-
public SegmentInfoFormat segmentInfoFormat() {
44-
return new SegmentInfoFormat() {
45-
@Override
46-
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
47-
return delegate.segmentInfoFormat().read(directory, segmentName, segmentID, context);
48-
}
49-
50-
@Override
51-
public void write(Directory directory, SegmentInfo info, IOContext ioContext) throws IOException {
52-
if (bucket != null) {
53-
// We will set BUCKET_NAME attribute only for child writer where bucket will set.
54-
info.putAttribute(BUCKET_NAME, bucket);
55-
} else if (info.getAttribute(BUCKET_NAME) == null) {
56-
// For segment belonging to parent writer, attributes will be set. In case write went to parent
57-
// writer (like for no ops writes or for temporary tombstone entry which is added for deletes/updates
58-
// to sync version across child and parent writers), segments corresponding to those writer does not
59-
// have
60-
info.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER);
32+
public PostingsFormat postingsFormat() {
33+
PostingsFormat format = super.postingsFormat();
34+
if (format instanceof PerFieldPostingsFormat) {
35+
return new PerFieldPostingsFormat() {
36+
37+
@Override
38+
public PostingsFormat getPostingsFormatForField(String field) {
39+
if (field.equals(ATTRIBUTE_BINDING_TARGET_FIELD)) {
40+
return new CriteriaBasedPostingsFormat(((PerFieldPostingsFormat) format).getPostingsFormatForField(field), bucket);
41+
} else {
42+
return ((PerFieldPostingsFormat) format).getPostingsFormatForField(field);
43+
}
6144
}
62-
63-
delegate.segmentInfoFormat().write(directory, info, ioContext);
64-
}
65-
};
66-
}
67-
68-
@Override
69-
public DocValuesFormat docValuesFormat() {
70-
return new CriteriaBasedDocValueFormat(bucket);
45+
};
46+
}
47+
return format;
7148
}
7249
}

server/src/main/java/org/opensearch/index/codec/CriteriaBasedDocValueFormat.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

0 commit comments

Comments
 (0)