Skip to content

Commit 25741cc

Browse files
authored
patch: applied the skipExistingSegements patch for text index (#429)
* inprogress: patch for skipping text index building * patch: applied the skipExistingSegements patch for text index * nit: fixed suggested nits * fix: spotless suggestions * fixed checkstyle issues
1 parent 855c3fe commit 25741cc

File tree

4 files changed

+130
-10
lines changed

4 files changed

+130
-10
lines changed

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pinot.segment.spi.index.IndexReaderFactory;
3131
import org.apache.pinot.segment.spi.index.IndexService;
3232
import org.apache.pinot.segment.spi.index.IndexType;
33+
import org.apache.pinot.segment.spi.index.StandardIndexes;
3334
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
3435
import org.apache.pinot.segment.spi.store.SegmentDirectory;
3536
import org.slf4j.Logger;
@@ -52,17 +53,36 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum
5253
}
5354

5455
_readersByIndex = new HashMap<>();
56+
Map<String, Map<String, String>> columnProperties = indexLoadingConfig.getColumnProperties();
5557
for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) {
56-
if (segmentReader.hasIndexFor(columnName, indexType)) {
57-
IndexReaderFactory<?> readerProvider = indexType.getReaderFactory();
58-
try {
59-
IndexReader reader = readerProvider.createIndexReader(segmentReader, fieldIndexConfigs, metadata);
60-
if (reader != null) {
61-
_readersByIndex.put(indexType, reader);
62-
}
63-
} catch (IndexReaderConstraintException ex) {
64-
LOGGER.warn("Constraint violation when indexing {} with {} index", columnName, indexType, ex);
58+
boolean hasIndexFor = segmentReader.hasIndexFor(columnName, indexType);
59+
if (!indexType.getId().equals(StandardIndexes.TEXT_ID)) {
60+
// process all index types other than Text Index as-it-is
61+
prepareIndexReader(segmentReader, indexType, fieldIndexConfigs, metadata);
62+
} else if (IndexLoadingConfig.processExistingSegments(columnName, columnProperties) || hasIndexFor) {
63+
// In case of Text Index, process segments only if property allows it OR text index exists on disk
64+
prepareIndexReader(segmentReader, indexType, fieldIndexConfigs, metadata);
65+
} else {
66+
LOGGER.info("skipping index reader for segmentDir: {} for column: {} with skipExistingSegments.",
67+
segmentReader.toSegmentDirectory().getIndexDir().toString(), columnName);
68+
}
69+
}
70+
}
71+
72+
private void prepareIndexReader(SegmentDirectory.Reader segmentReader,
73+
IndexType<?, ?, ?> indexType,
74+
FieldIndexConfigs fieldIndexConfigs,
75+
ColumnMetadata metadata) {
76+
String columnName = metadata.getColumnName();
77+
if (segmentReader.hasIndexFor(columnName, indexType)) {
78+
IndexReaderFactory<?> readerProvider = indexType.getReaderFactory();
79+
try {
80+
IndexReader reader = readerProvider.createIndexReader(segmentReader, fieldIndexConfigs, metadata);
81+
if (reader != null) {
82+
_readersByIndex.put(indexType, reader);
6583
}
84+
} catch (IndexReaderConstraintException | IOException ex) {
85+
LOGGER.warn("Constraint violation when indexing {} with {} index", columnName, indexType, ex);
6686
}
6787
}
6888
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,4 +398,39 @@ public void addKnownColumns(Set<String> columns) {
398398
}
399399
_dirty = true;
400400
}
401+
402+
public Map<String, Map<String, String>> getColumnProperties() {
403+
Map<String, Map<String, String>> columnProperties = new HashMap<>();
404+
List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
405+
if (fieldConfigs != null) {
406+
for (FieldConfig fieldConfig : fieldConfigs) {
407+
columnProperties.put(fieldConfig.getName(), fieldConfig.getProperties());
408+
}
409+
}
410+
return Map.of();
411+
}
412+
413+
/**
414+
* Helper methods to skip processing segments if the property SKIP_EXISTING_SEGMENTS is
415+
* set to true in fieldConfigList.
416+
*
417+
* e.g
418+
* "fieldConfigList":[
419+
* {
420+
* "name":"text_col_1",
421+
* "encodingType":"RAW",
422+
* "indexTypes": ["TEXT"],
423+
* "properties":{"fstType":"lucene", "skipExistingSegments":"true"}
424+
* }
425+
* ]
426+
* */
427+
public static boolean processExistingSegments(String columnName, Map<String, Map<String, String>> columnProperties) {
428+
final String skipExistingSegments = "skipExistingSegments";
429+
if (!columnProperties.containsKey(columnName)
430+
|| columnProperties.get(columnName) == null
431+
|| !columnProperties.get(columnName).containsKey(skipExistingSegments)) {
432+
return true;
433+
}
434+
return !Boolean.parseBoolean(columnProperties.get(columnName).get(skipExistingSegments));
435+
}
401436
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/invertedindex/TextIndexHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@
3838

3939
import java.io.File;
4040
import java.io.IOException;
41+
import java.util.HashMap;
4142
import java.util.HashSet;
43+
import java.util.List;
4244
import java.util.Map;
4345
import java.util.Set;
4446
import javax.annotation.Nullable;
4547
import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType;
4648
import org.apache.pinot.segment.local.segment.index.forward.ForwardIndexType;
4749
import org.apache.pinot.segment.local.segment.index.loader.BaseIndexHandler;
50+
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
4851
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
4952
import org.apache.pinot.segment.spi.ColumnMetadata;
5053
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
@@ -58,6 +61,7 @@
5861
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
5962
import org.apache.pinot.segment.spi.store.SegmentDirectory;
6063
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
64+
import org.apache.pinot.spi.config.table.FieldConfig;
6165
import org.apache.pinot.spi.config.table.TableConfig;
6266
import org.apache.pinot.spi.data.FieldSpec.DataType;
6367
import org.slf4j.Logger;
@@ -86,12 +90,27 @@
8690
public class TextIndexHandler extends BaseIndexHandler {
8791
private static final Logger LOGGER = LoggerFactory.getLogger(TextIndexHandler.class);
8892

93+
private static final String SKIP_EXISTING_SEGMENTS = "skipExistingSegments";
94+
8995
private final Set<String> _columnsToAddIdx;
96+
private Map<String, Map<String, String>> _columnProperties = new HashMap<>();
9097

9198
public TextIndexHandler(SegmentDirectory segmentDirectory, Map<String, FieldIndexConfigs> fieldIndexConfigs,
9299
@Nullable TableConfig tableConfig) {
93100
super(segmentDirectory, fieldIndexConfigs, tableConfig);
94101
_columnsToAddIdx = FieldIndexConfigsUtil.columnsWithIndexEnabled(StandardIndexes.text(), _fieldIndexConfigs);
102+
prepareColumnProperties(tableConfig);
103+
}
104+
105+
private void prepareColumnProperties(@Nullable TableConfig tableConfig) {
106+
if (tableConfig != null) {
107+
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
108+
if (fieldConfigList != null) {
109+
for (FieldConfig fieldConfig : fieldConfigList) {
110+
_columnProperties.put(fieldConfig.getName(), fieldConfig.getProperties());
111+
}
112+
}
113+
}
95114
}
96115

97116
@Override
@@ -134,7 +153,10 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter)
134153
for (String column : columnsToAddIdx) {
135154
ColumnMetadata columnMetadata = _segmentDirectory.getSegmentMetadata().getColumnMetadataFor(column);
136155
if (shouldCreateTextIndex(columnMetadata)) {
156+
LOGGER.info("Creating text index from segment: {}, column: {}", segmentName, column);
137157
createTextIndexForColumn(segmentWriter, columnMetadata);
158+
} else {
159+
LOGGER.info("Skipping creation of text index from segment: {}, column: {}", segmentName, column);
138160
}
139161
}
140162
}
@@ -143,7 +165,8 @@ private boolean shouldCreateTextIndex(ColumnMetadata columnMetadata) {
143165
if (columnMetadata != null) {
144166
// Fail fast upon unsupported operations.
145167
checkUnsupportedOperationsForTextIndex(columnMetadata);
146-
return true;
168+
// skip creating text index if SKIP_EXISTING_SEGMENTS is set to true.
169+
return IndexLoadingConfig.processExistingSegments(columnMetadata.getColumnName(), _columnProperties);
147170
}
148171
return false;
149172
}

pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,48 @@ public void testEnableTextIndexOnNewColumnDictEncoded(SegmentVersion segmentVers
680680
false, DataType.STRING, 100000);
681681
}
682682

683+
/**
684+
* Test to check if text index creation skipped if SKIP_EXISTING_SEGMENTS set to true
685+
* @throws Exception
686+
*/
687+
@Test(dataProvider = "bothV1AndV3")
688+
public void testSkipTextIndexCreationOnExistingSegmentForRawColumn(SegmentVersion segmentVersion)
689+
throws Exception {
690+
buildSegment(segmentVersion);
691+
692+
FieldConfig fieldConfig = new FieldConfig(EXISTING_STRING_COL_RAW,
693+
FieldConfig.EncodingType.RAW,
694+
FieldConfig.IndexType.TEXT,
695+
null,
696+
null,
697+
null,
698+
Map.of("skipExistingSegments", "true"));
699+
_fieldConfigMap.put(EXISTING_STRING_COL_RAW, fieldConfig);
700+
runPreProcessor(_schema);
701+
validateIndexDoesNotExist(EXISTING_STRING_COL_RAW, StandardIndexes.text());
702+
}
703+
704+
/**
705+
* Test to check if text index creation skipped if SKIP_EXISTING_SEGMENTS set to false
706+
* @throws Exception
707+
*/
708+
@Test(dataProvider = "bothV1AndV3")
709+
public void testDoNotSkipTextIndexCreationOnExistingSegmentForRawColumn(SegmentVersion segmentVersion)
710+
throws Exception {
711+
buildSegment(segmentVersion);
712+
713+
FieldConfig fieldConfig = new FieldConfig(EXISTING_STRING_COL_RAW,
714+
FieldConfig.EncodingType.RAW,
715+
FieldConfig.IndexType.TEXT,
716+
null,
717+
null,
718+
null,
719+
Map.of("skipExistingSegments", "false"));
720+
_fieldConfigMap.put(EXISTING_STRING_COL_RAW, fieldConfig);
721+
runPreProcessor(_schema);
722+
validateIndexExists(EXISTING_STRING_COL_RAW, StandardIndexes.text());
723+
}
724+
683725
/**
684726
* Test to check text index creation during segment load after text index
685727
* creation is enabled on an existing raw column.

0 commit comments

Comments
 (0)