Skip to content

Commit f27288e

Browse files
committed
Fix CriteriaBasedCodec to work with delegate codec
Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
1 parent 52b699d commit f27288e

File tree

7 files changed

+352
-51
lines changed

7 files changed

+352
-51
lines changed

server/src/main/java/org/opensearch/index/codec/CriteriaBasedCodec.java

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,9 @@
99
package org.opensearch.index.codec;
1010

1111
import org.apache.lucene.codecs.Codec;
12-
import org.apache.lucene.codecs.DocValuesFormat;
1312
import org.apache.lucene.codecs.FilterCodec;
14-
import org.apache.lucene.codecs.SegmentInfoFormat;
15-
import org.apache.lucene.codecs.lucene103.Lucene103Codec;
16-
import org.apache.lucene.index.SegmentInfo;
17-
import org.apache.lucene.store.Directory;
18-
import org.apache.lucene.store.IOContext;
19-
20-
import java.io.IOException;
13+
import org.apache.lucene.codecs.PostingsFormat;
14+
import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat;
2115

2216
/**
2317
* Filter codec used to attach bucket attributes to segments of child writer.
@@ -27,46 +21,29 @@ public class CriteriaBasedCodec extends FilterCodec {
2721

2822
private final String bucket;
2923
public static final String BUCKET_NAME = "bucket";
30-
private static final String PLACEHOLDER_BUCKET_FOR_PARENT_WRITER = "-2";
31-
32-
public CriteriaBasedCodec() {
33-
super("CriteriaBasedCodec", new Lucene103Codec());
34-
bucket = null;
35-
}
24+
public static final String ATTRIBUTE_BINDING_TARGET_FIELD = "_id";
3625

3726
public CriteriaBasedCodec(Codec delegate, String bucket) {
38-
super("CriteriaBasedCodec", delegate);
27+
super(delegate.getName(), delegate);
3928
this.bucket = bucket;
4029
}
4130

4231
@Override
43-
public SegmentInfoFormat segmentInfoFormat() {
44-
return new SegmentInfoFormat() {
45-
@Override
46-
public SegmentInfo read(Directory directory, String segmentName, byte[] segmentID, IOContext context) throws IOException {
47-
return delegate.segmentInfoFormat().read(directory, segmentName, segmentID, context);
48-
}
49-
50-
@Override
51-
public void write(Directory directory, SegmentInfo info, IOContext ioContext) throws IOException {
52-
if (bucket != null) {
53-
// We will set BUCKET_NAME attribute only for child writer where bucket will set.
54-
info.putAttribute(BUCKET_NAME, bucket);
55-
} else if (info.getAttribute(BUCKET_NAME) == null) {
56-
// For segment belonging to parent writer, attributes will be set. In case write went to parent
57-
// writer (like for no ops writes or for temporary tombstone entry which is added for deletes/updates
58-
// to sync version across child and parent writers), segments corresponding to those writer does not
59-
// have
60-
info.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER);
32+
public PostingsFormat postingsFormat() {
33+
PostingsFormat format = super.postingsFormat();
34+
if (format instanceof PerFieldPostingsFormat) {
35+
return new PerFieldPostingsFormat() {
36+
37+
@Override
38+
public PostingsFormat getPostingsFormatForField(String field) {
39+
if (field.equals(ATTRIBUTE_BINDING_TARGET_FIELD)) {
40+
return new CriteriaBasedPostingsFormat(((PerFieldPostingsFormat) format).getPostingsFormatForField(field), bucket);
41+
} else {
42+
return ((PerFieldPostingsFormat) format).getPostingsFormatForField(field);
43+
}
6144
}
62-
63-
delegate.segmentInfoFormat().write(directory, info, ioContext);
64-
}
65-
};
66-
}
67-
68-
@Override
69-
public DocValuesFormat docValuesFormat() {
70-
return new CriteriaBasedDocValueFormat(bucket);
45+
};
46+
}
47+
return format;
7148
}
7249
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec;
10+
11+
import org.apache.lucene.codecs.FieldsConsumer;
12+
import org.apache.lucene.codecs.FieldsProducer;
13+
import org.apache.lucene.codecs.NormsProducer;
14+
import org.apache.lucene.codecs.PostingsFormat;
15+
import org.apache.lucene.index.FieldInfo;
16+
import org.apache.lucene.index.Fields;
17+
import org.apache.lucene.index.MergeState;
18+
import org.apache.lucene.index.SegmentReadState;
19+
import org.apache.lucene.index.SegmentWriteState;
20+
21+
import java.io.IOException;
22+
import java.util.Set;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.StreamSupport;
25+
26+
import static org.opensearch.index.codec.CriteriaBasedCodec.ATTRIBUTE_BINDING_TARGET_FIELD;
27+
28+
/**
29+
* Postings format to attach segment info attribute corresponding to grouping criteria associated with segments.
30+
*
31+
*/
32+
public class CriteriaBasedPostingsFormat extends PostingsFormat {
33+
34+
public static final String CRITERIA_BASED_CODEC_NAME = "CriteriaBasedCodec99";
35+
private final PostingsFormat delegatePostingsFormat;
36+
private final String bucket;
37+
/** Extension of CAS index to store delegate information. */
38+
public static final String CAS_FILE_EXTENSION = "cas";
39+
public static final int VERSION_START = 0;
40+
public static final int VERSION_CURRENT = VERSION_START;
41+
public static final String BUCKET_NAME = "bucket";
42+
private static final String PLACEHOLDER_BUCKET_FOR_PARENT_WRITER = "-2";
43+
private static final String DELEGATE_CODEC_KEY = "delegate_codec_key";
44+
45+
/**
46+
* Creates a new postings format.
47+
*
48+
* <p>The provided name will be written into the index segment in some configurations (such as
49+
* when using ): in such configurations, for the segment to be read
50+
* this class should be registered with Java's SPI mechanism (registered in META-INF/ of your jar
51+
* file, etc).
52+
*
53+
*/
54+
protected CriteriaBasedPostingsFormat(PostingsFormat delegatePostingsFormat, String bucket) {
55+
super(CRITERIA_BASED_CODEC_NAME);
56+
this.delegatePostingsFormat = delegatePostingsFormat;
57+
this.bucket = bucket;
58+
}
59+
60+
// Needed for SPI
61+
public CriteriaBasedPostingsFormat() {
62+
this(null, null);
63+
}
64+
65+
@Override
66+
public FieldsConsumer fieldsConsumer(SegmentWriteState state) throws IOException {
67+
if (delegatePostingsFormat == null) {
68+
throw new UnsupportedOperationException(
69+
"Error - " + getClass().getName() + " has been constructed without a choice of PostingsFormat"
70+
);
71+
}
72+
73+
FieldsConsumer fieldsConsumer = delegatePostingsFormat.fieldsConsumer(state);
74+
return new CriteriaBasedFieldsConsumer(fieldsConsumer, state);
75+
}
76+
77+
@Override
78+
public FieldsProducer fieldsProducer(SegmentReadState state) throws IOException {
79+
assert state.segmentInfo.getAttribute(DELEGATE_CODEC_KEY) != null;
80+
return PostingsFormat.forName(state.segmentInfo.getAttribute(DELEGATE_CODEC_KEY)).fieldsProducer(state);
81+
}
82+
83+
class CriteriaBasedFieldsConsumer extends FieldsConsumer {
84+
private final FieldsConsumer delegateFieldsConsumer;
85+
private SegmentWriteState state;
86+
private boolean closed;
87+
88+
public CriteriaBasedFieldsConsumer(FieldsConsumer delegateFieldsConsumer, SegmentWriteState state) {
89+
this.delegateFieldsConsumer = delegateFieldsConsumer;
90+
this.state = state;
91+
}
92+
93+
@Override
94+
public void write(Fields fields, NormsProducer norms) throws IOException {
95+
delegateFieldsConsumer.write(fields, norms);
96+
FieldInfo idFieldInfo = state.fieldInfos.fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD);
97+
if (bucket != null) {
98+
state.segmentInfo.putAttribute(BUCKET_NAME, bucket);
99+
if (idFieldInfo != null) {
100+
idFieldInfo.putAttribute(BUCKET_NAME, bucket);
101+
}
102+
} else if (state.segmentInfo.getAttribute(BUCKET_NAME) == null) {
103+
// For segment belonging to parent writer, attributes will be set. In case write went to parent
104+
// writer (like for no ops writes or for temporary tombstone entry which is added for deletes/updates
105+
// to sync version across child and parent writers), segments corresponding to those writer does not
106+
// have
107+
state.segmentInfo.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER);
108+
if (idFieldInfo != null) {
109+
idFieldInfo.putAttribute(BUCKET_NAME, PLACEHOLDER_BUCKET_FOR_PARENT_WRITER);
110+
}
111+
} else if (idFieldInfo != null) {
112+
idFieldInfo.putAttribute(BUCKET_NAME, state.segmentInfo.getAttribute(BUCKET_NAME));
113+
}
114+
115+
state.segmentInfo.putAttribute(DELEGATE_CODEC_KEY, delegatePostingsFormat.getName());
116+
}
117+
118+
@Override
119+
public void merge(MergeState mergeState, NormsProducer norms) throws IOException {
120+
delegateFieldsConsumer.merge(mergeState, norms);
121+
Set<String> mergeFieldNames = StreamSupport.stream(mergeState.mergeFieldInfos.spliterator(), false)
122+
.map(FieldInfo::getName)
123+
.collect(Collectors.toSet());
124+
if (mergeFieldNames.contains(ATTRIBUTE_BINDING_TARGET_FIELD) && mergeState.fieldInfos.length > 0) {
125+
String attribute = mergeState.fieldInfos[0].fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD).getAttribute(BUCKET_NAME);
126+
assert attribute != null : "Attribute should not be null during merging segment";
127+
mergeState.segmentInfo.putAttribute(BUCKET_NAME, attribute);
128+
129+
mergeState.mergeFieldInfos.fieldInfo(ATTRIBUTE_BINDING_TARGET_FIELD).putAttribute(BUCKET_NAME, attribute);
130+
}
131+
132+
mergeState.segmentInfo.putAttribute(DELEGATE_CODEC_KEY, delegatePostingsFormat.getName());
133+
}
134+
135+
@Override
136+
public void close() throws IOException {
137+
if (closed) {
138+
return;
139+
}
140+
141+
closed = true;
142+
delegateFieldsConsumer.close();
143+
}
144+
}
145+
}
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
org.opensearch.index.codec.composite.composite912.Composite912Codec
22
org.opensearch.index.codec.composite.composite103.Composite103Codec
33
org.opensearch.index.codec.composite.backward_codecs.composite101.Composite101Codec
4-
org.opensearch.index.codec.CriteriaBasedCodec
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
org.apache.lucene.search.suggest.document.Completion50PostingsFormat
22
org.opensearch.index.codec.fuzzy.FuzzyFilterPostingsFormat
3+
org.opensearch.index.codec.CriteriaBasedPostingsFormat
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.codec;
10+
11+
import org.apache.lucene.analysis.standard.StandardAnalyzer;
12+
import org.apache.lucene.codecs.Codec;
13+
import org.apache.lucene.document.Document;
14+
import org.apache.lucene.document.Field;
15+
import org.apache.lucene.document.StringField;
16+
import org.apache.lucene.document.TextField;
17+
import org.apache.lucene.index.DirectoryReader;
18+
import org.apache.lucene.index.IndexReader;
19+
import org.apache.lucene.index.IndexWriter;
20+
import org.apache.lucene.index.IndexWriterConfig;
21+
import org.apache.lucene.index.MultiTerms;
22+
import org.apache.lucene.index.SegmentCommitInfo;
23+
import org.apache.lucene.index.SegmentInfos;
24+
import org.apache.lucene.index.Terms;
25+
import org.apache.lucene.index.TermsEnum;
26+
import org.apache.lucene.store.ByteBuffersDirectory;
27+
import org.apache.lucene.store.Directory;
28+
import org.apache.lucene.tests.index.BasePostingsFormatTestCase;
29+
import org.apache.lucene.tests.util.TestUtil;
30+
import org.apache.lucene.util.BytesRef;
31+
32+
import java.io.IOException;
33+
34+
public class CriteriaBasedPostingsFormatTests extends BasePostingsFormatTestCase {
35+
36+
private static final String TEST_BUCKET = "test_bucket";
37+
private Codec criteriaBasedPostingFormat = TestUtil.alwaysPostingsFormat(
38+
new CriteriaBasedPostingsFormat(TestUtil.getDefaultPostingsFormat(), TEST_BUCKET)
39+
);
40+
41+
public void testBasicFunctionality() throws IOException {
42+
Directory dir = new ByteBuffersDirectory();
43+
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
44+
iwc.setCodec(criteriaBasedPostingFormat);
45+
46+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
47+
for (int i = 0; i < 100; i++) {
48+
Document doc = new Document();
49+
doc.add(new StringField("_id", "doc" + i, Field.Store.YES));
50+
doc.add(new TextField("field", "value" + i, Field.Store.YES));
51+
writer.addDocument(doc);
52+
}
53+
}
54+
55+
try (IndexReader reader = DirectoryReader.open(dir)) {
56+
assertEquals(100, reader.numDocs());
57+
58+
Terms terms = MultiTerms.getTerms(reader, "field");
59+
assertNotNull(terms);
60+
TermsEnum termsEnum = terms.iterator();
61+
62+
int count = 0;
63+
BytesRef term;
64+
while ((term = termsEnum.next()) != null) {
65+
assertTrue(term.utf8ToString().startsWith("value"));
66+
count++;
67+
}
68+
assertEquals(100, count);
69+
}
70+
}
71+
72+
public void testBucketAttributeIsSetOnSegment() throws IOException {
73+
Directory dir = new ByteBuffersDirectory();
74+
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
75+
iwc.setCodec(criteriaBasedPostingFormat);
76+
77+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
78+
Document doc = new Document();
79+
doc.add(new StringField("_id", "doc1", Field.Store.YES));
80+
doc.add(new TextField("content", "test content", Field.Store.YES));
81+
writer.addDocument(doc);
82+
}
83+
84+
SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(dir);
85+
assertFalse("Should have at least one segment", segmentInfos.asList().isEmpty());
86+
87+
for (SegmentCommitInfo segmentCommitInfo : segmentInfos) {
88+
String bucketValue = segmentCommitInfo.info.getAttribute(CriteriaBasedPostingsFormat.BUCKET_NAME);
89+
assertEquals("Bucket attribute should be set", TEST_BUCKET, bucketValue);
90+
}
91+
}
92+
93+
public void testNullBucketSetsPlaceholder() throws IOException {
94+
Directory dir = new ByteBuffersDirectory();
95+
Codec nullBucketCodec = TestUtil.alwaysPostingsFormat(new CriteriaBasedPostingsFormat(TestUtil.getDefaultPostingsFormat(), null));
96+
97+
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
98+
iwc.setCodec(nullBucketCodec);
99+
100+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
101+
Document doc = new Document();
102+
doc.add(new StringField("_id", "doc1", Field.Store.YES));
103+
doc.add(new TextField("content", "test content", Field.Store.YES));
104+
writer.addDocument(doc);
105+
}
106+
107+
SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(dir);
108+
for (SegmentCommitInfo segmentCommitInfo : segmentInfos) {
109+
String bucketValue = segmentCommitInfo.info.getAttribute(CriteriaBasedPostingsFormat.BUCKET_NAME);
110+
assertEquals("Placeholder bucket should be set for null bucket", "-2", bucketValue);
111+
}
112+
}
113+
114+
public void testEmptyIndex() throws IOException {
115+
Directory dir = new ByteBuffersDirectory();
116+
IndexWriterConfig iwc = new IndexWriterConfig(new StandardAnalyzer());
117+
iwc.setCodec(criteriaBasedPostingFormat);
118+
119+
try (IndexWriter writer = new IndexWriter(dir, iwc)) {
120+
// Don't add any documents
121+
}
122+
123+
try (IndexReader reader = DirectoryReader.open(dir)) {
124+
assertEquals(0, reader.numDocs());
125+
Terms terms = MultiTerms.getTerms(reader, "_id");
126+
assertNull("Terms should be null for empty index", terms);
127+
}
128+
}
129+
130+
@Override
131+
protected Codec getCodec() {
132+
return criteriaBasedPostingFormat;
133+
}
134+
}

0 commit comments

Comments
 (0)