Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.parquet.parquetdataformat.fields.core.data.number.LongParquetField;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter;
import org.opensearch.index.mapper.FieldNamesFieldMapper;
Expand All @@ -24,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* Utility class for creating Apache Arrow schemas from OpenSearch mapper services.
Expand All @@ -32,6 +34,9 @@
*/
public final class ArrowSchemaBuilder {

private static final FieldType LONG_FIELD_TYPE = new LongParquetField().getFieldType();
private static final ConcurrentHashMap<Long, Schema> SCHEMA_CACHE = new ConcurrentHashMap<>();

// Private constructor to prevent instantiation of utility class
private ArrowSchemaBuilder() {
throw new UnsupportedOperationException("Utility class should not be instantiated");
Expand All @@ -49,13 +54,14 @@ private ArrowSchemaBuilder() {
public static Schema getSchema(final MapperService mapperService, boolean isPrimary) {
Objects.requireNonNull(mapperService, "MapperService cannot be null");

final List<Field> fields = extractFieldsFromMappers(mapperService, isPrimary);

if (fields.isEmpty()) {
throw new IllegalStateException("No valid fields found in mapper service");
}

return new Schema(fields);
long mappingVersion = mapperService.getIndexSettings().getIndexMetadata().getMappingVersion();
return SCHEMA_CACHE.computeIfAbsent(mappingVersion, v -> {
final List<Field> fields = extractFieldsFromMappers(mapperService, isPrimary);
if (fields.isEmpty()) {
throw new IllegalStateException("No valid fields found in mapper service");
}
return new Schema(fields);
});
}

/**
Expand All @@ -79,9 +85,8 @@ private static List<Field> extractFieldsFromMappers(final MapperService mapperSe
}
}

LongParquetField longField = new LongParquetField();
fields.add(new Field(CompositeDataFormatWriter.ROW_ID, longField.getFieldType(), null));
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, longField.getFieldType(), null));
fields.add(new Field(CompositeDataFormatWriter.ROW_ID, LONG_FIELD_TYPE, null));
fields.add(new Field(SeqNoFieldMapper.PRIMARY_TERM_NAME, LONG_FIELD_TYPE, null));

return fields;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ public class ManagedVSR implements AutoCloseable {
private final BufferAllocator allocator;
private VSRState state;
private final Map<String, Field> fields = new HashMap<>();
private final Map<String, FieldVector> vectorCache = new HashMap<>();


public ManagedVSR(String id, Schema schema, BufferAllocator allocator) {
this.id = id;
this.vsr = VectorSchemaRoot.create(schema, allocator);
this.allocator = allocator;
this.state = VSRState.ACTIVE;
for (Field field : vsr.getSchema().getFields()) {
fields.put(field.getName(), field);
for (FieldVector vector : vsr.getFieldVectors()) {
vectorCache.put(vector.getName(), vector);
}
}

Expand Down Expand Up @@ -76,7 +77,7 @@ public FieldVector getVector(String fieldName) {
if (state != VSRState.ACTIVE) {
throw new IllegalStateException("Cannot access vector in VSR state: " + state + ". VSR must be ACTIVE to access vectors.");
}
return vsr.getVector(fields.get(fieldName));
return vectorCache.get(fieldName);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,37 +47,38 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

public class LuceneCommitEngine implements Committer {
public class LuceneCommitEngine {

private final Logger logger;
private IndexWriter indexWriter;
private final CombinedDeletionPolicy combinedDeletionPolicy;
private final Store store;
private volatile SegmentInfos lastCommittedSegmentInfos;

public LuceneCommitEngine(Store store, TranslogDeletionPolicy translogDeletionPolicy, LongSupplier globalCheckpointSupplier, boolean primaryMode)
public LuceneCommitEngine(
Store store,
CombinedDeletionPolicy combinedDeletionPolicy,
IndexWriter indexWriter)
throws IOException {
this.logger = Loggers.getLogger(LuceneCommitEngine.class, store.shardId());
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy, null, globalCheckpointSupplier);
IndexWriterConfig indexWriterConfig = new IndexWriterConfig();
indexWriterConfig.setIndexDeletionPolicy(combinedDeletionPolicy);
indexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE);
this.combinedDeletionPolicy = combinedDeletionPolicy;
this.store = store;
this.indexWriter = indexWriter;
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
if (primaryMode) {
this.indexWriter = new IndexWriter(store.directory(), indexWriterConfig);
}
}

@Override
public synchronized void addLuceneIndexes(List<Segment> segments) throws IOException {

for(Segment segment : segments) {
WriterFileSet wfs = segment.getDFGroupedSearchableFiles().get(LuceneDataFormat.LUCENE.name());
if(wfs == null || wfs.refresh()) continue;

try {
indexWriter.addIndexes(new HardlinkCopyDirectoryWrapper(new NIOFSDirectory(Path.of(wfs.getDirectory()))));
Path writerDir = Path.of(indexWriter.getDirectory().toString()).toAbsolutePath().normalize();
Path segmentDir = Path.of(wfs.getDirectory()).toAbsolutePath().normalize();
if (!writerDir.equals(segmentDir)) {
indexWriter.addIndexes(NIOFSDirectory.open(segmentDir));
}
wfs.setRefreshed();
} catch (IOException e) {
throw new RuntimeException("Not able to copy it to the main writer in commiter: {}", e);
Expand Down Expand Up @@ -110,7 +111,6 @@ public synchronized void addLuceneIndexes(List<Segment> segments) throws IOExcep
}
}

@Override
public synchronized CommitPoint commit(Iterable<Map.Entry<String, String>> commitData, CatalogSnapshot catalogSnapshot) {
indexWriter.setLiveCommitData(commitData);
try {
Expand Down Expand Up @@ -140,19 +140,16 @@ private void refreshLastCommittedSegmentInfos() {
}
}

@Override
public Map<String, String> getLastCommittedData() {
return MapBuilder.<String, String>newMapBuilder().putAll(lastCommittedSegmentInfos.getUserData()).immutableMap();
}

@Override
public CommitStats getCommitStats() {
String segmentId = Base64.getEncoder().encodeToString(lastCommittedSegmentInfos.getId());
// TODO: Implement numDocs
return new CommitStats(lastCommittedSegmentInfos.getUserData(), lastCommittedSegmentInfos.getLastGeneration(), segmentId, 0);
}

@Override
public SafeCommitInfo getSafeCommitInfo() {
return this.combinedDeletionPolicy.getSafeCommitInfo();
}
Expand Down Expand Up @@ -186,9 +183,4 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
throw new EngineException(store.shardId(), "Failed to acquire safe index commit", e);
}
}

@Override
public void close() throws IOException {
IOUtils.close(indexWriter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.index.engine.exec.commit.LuceneCommitEngine;
import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter;
import org.opensearch.index.engine.exec.composite.CompositeIndexingExecutionEngine;
import org.opensearch.index.engine.exec.lucene.engine.LuceneExecutionEngine;
import org.opensearch.index.engine.exec.merge.CompositeMergeHandler;
import org.opensearch.index.engine.exec.merge.MergeHandler;
import org.opensearch.index.engine.exec.merge.MergeResult;
Expand Down Expand Up @@ -309,10 +310,8 @@ public void onFailure(String reason, Exception ex) {
this.translogManager = translogManagerRef;

// initialize committer and composite indexing execution engine
committerRef = new LuceneCommitEngine(store, translogDeletionPolicy, translogManager::getLastSyncedGlobalCheckpoint, !config().isReadOnlyReplica());
this.compositeEngineCommitter = committerRef;
final AtomicLong lastCommittedWriterGeneration = new AtomicLong(-1);
Map<String, String> lastCommittedData = this.compositeEngineCommitter.getLastCommittedData();
Map<String, String> lastCommittedData = store.readLastCommittedSegmentsInfo().getUserData();
if (lastCommittedData.containsKey(LAST_COMPOSITE_WRITER_GEN_KEY)) {
lastCommittedWriterGeneration.set(Long.parseLong(lastCommittedData.get(LAST_COMPOSITE_WRITER_GEN_KEY)));
}
Expand All @@ -326,6 +325,18 @@ public void onFailure(String reason, Exception ex) {
lastCommittedWriterGeneration.incrementAndGet(),
indexSettings
);
// Find LuceneExecutionEngine from delegates and initialize its commit engine
LuceneExecutionEngine luceneExecutionEngine = this.engine.getDelegates().stream()
.filter(d -> d instanceof LuceneExecutionEngine)
.map(d -> (LuceneExecutionEngine) d)
.findFirst()
.orElseThrow(() -> new EngineCreationFailureException(shardId, "LuceneExecutionEngine not found in delegates", null));
luceneExecutionEngine.initializeCommitEngine(
store, translogDeletionPolicy, translogManager::getLastSyncedGlobalCheckpoint, !config().isReadOnlyReplica()
);
committerRef = luceneExecutionEngine;
this.compositeEngineCommitter = committerRef;

this.catalogSnapshotManager = new CatalogSnapshotManager(this, committerRef, shardPath, deleteUnreferencedFiles);
// How to bring the Dataformat here? Currently, this means only Text and LuceneFormat can be used

Expand Down Expand Up @@ -1412,12 +1423,10 @@ protected void closeNoLock(String reason, CountDownLatch closedLatch) {
@Override
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
ensureOpen();
if (compositeEngineCommitter instanceof LuceneCommitEngine) {
LuceneCommitEngine luceneCommitEngine = (LuceneCommitEngine) compositeEngineCommitter;
// Delegate to the LuceneCommitEngine's acquireSafeIndexCommit method
return luceneCommitEngine.acquireSafeIndexCommit();
if (compositeEngineCommitter instanceof LuceneExecutionEngine) {
return ((LuceneExecutionEngine) compositeEngineCommitter).acquireSafeIndexCommit();
} else {
throw new EngineException(shardId, "CompositeEngine committer is not a LuceneCommitEngine");
throw new EngineException(shardId, "CompositeEngine committer is not a LuceneExecutionEngine");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,29 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.MergeTrigger;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.PrintStreamInfoStream;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.CombinedDeletionPolicy;
import org.opensearch.index.engine.CommitStats;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.SafeCommitInfo;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.DocumentInput;
import org.opensearch.index.engine.exec.EngineRole;
Expand All @@ -33,13 +43,19 @@
import org.opensearch.index.engine.exec.RefreshInput;
import org.opensearch.index.engine.exec.RefreshResult;
import org.opensearch.index.engine.exec.Writer;
import org.opensearch.index.engine.exec.commit.CommitPoint;
import org.opensearch.index.engine.exec.commit.Committer;
import org.opensearch.index.engine.exec.commit.LuceneCommitEngine;
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
import org.opensearch.index.engine.exec.coord.Segment;
import org.opensearch.index.engine.exec.lucene.LuceneDataFormat;
import org.opensearch.index.engine.exec.lucene.fields.LuceneFieldRegistry;
import org.opensearch.index.engine.exec.lucene.writer.LuceneWriter;
import org.opensearch.index.engine.exec.lucene.writer.LuceneWriterCodec;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -48,17 +64,19 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

import static org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter.ROW_ID;

public class LuceneExecutionEngine implements IndexingExecutionEngine<LuceneDataFormat> {
public class LuceneExecutionEngine implements IndexingExecutionEngine<LuceneDataFormat>, Committer {

private final MapperService mapperService;
private final ShardPath shardPath;
private final DataFormat dataFormat;
private final EngineConfig engineConfig;
private static final Logger logger = LogManager.getLogger(LuceneExecutionEngine.class);
private final boolean isPrimaryEngine;
private LuceneCommitEngine luceneCommitEngine;

public LuceneExecutionEngine(EngineConfig engineConfig, MapperService mapperService, boolean isPrimaryEngine, ShardPath shardPath, IndexSettings indexSettings, FieldAssignments fieldAssignments) {
this.engineConfig = engineConfig;
Expand All @@ -70,6 +88,35 @@ public LuceneExecutionEngine(EngineConfig engineConfig, MapperService mapperServ
// in POC it's only a secondary engine so we don't need to have all fields in this.
}

/**
* Initializes the commit-time IndexWriter and LuceneCommitEngine.
* Must be called before any Committer methods are used.
*/
public void initializeCommitEngine(
Store store,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
boolean primaryMode
) throws IOException {
Logger commitLogger = Loggers.getLogger(LuceneCommitEngine.class, store.shardId());
CombinedDeletionPolicy combinedDeletionPolicy = new CombinedDeletionPolicy(
commitLogger, translogDeletionPolicy, null, globalCheckpointSupplier
);

//Use CustomIndexWriter here
IndexWriter indexWriter = null;
if (primaryMode) {
IndexWriterConfig iwc = new IndexWriterConfig();
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setMergeScheduler(new SerialMergeScheduler());
iwc.setIndexSort(new Sort(new SortField(ROW_ID, SortField.Type.LONG)));

indexWriter = new IndexWriter(store.directory(), iwc);
}
this.luceneCommitEngine = new LuceneCommitEngine(store, combinedDeletionPolicy, indexWriter);
}

@Override
public List<String> supportedFieldTypes(boolean isPrimaryEngine) {
// Delegate to the static LuceneFieldRegistry — each registered field type is supported
Expand Down Expand Up @@ -171,8 +218,39 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IO

}

// --- Committer delegation ---

@Override
public void addLuceneIndexes(List<Segment> segments) throws IOException {
luceneCommitEngine.addLuceneIndexes(segments);
}

@Override
public void close() throws IOException {
public CommitPoint commit(Iterable<Map.Entry<String, String>> commitData, CatalogSnapshot catalogSnapshot) {
return luceneCommitEngine.commit(commitData, catalogSnapshot);
}

@Override
public Map<String, String> getLastCommittedData() {
return luceneCommitEngine.getLastCommittedData();
}

@Override
public CommitStats getCommitStats() {
return luceneCommitEngine.getCommitStats();
}

@Override
public SafeCommitInfo getSafeCommitInfo() {
return luceneCommitEngine.getSafeCommitInfo();
}

public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
return luceneCommitEngine.acquireSafeIndexCommit();
}

@Override
public void close() throws IOException {
IOUtils.close(luceneCommitEngine.acquireSafeIndexCommit());
}
}
Loading
Loading