Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059))

- Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966))
- Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157)

### Fixed
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.store.subdirectory;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
Expand All @@ -17,6 +18,8 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -110,10 +113,23 @@ private void verifySubdirectoryFilesOnAllNodes(String indexName, int expectedCou

if (Files.exists(subdirectoryPath)) {
try (Directory directory = FSDirectory.open(subdirectoryPath)) {
Set<String> allFiles = new HashSet<>();

// Get segment files
SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory);
Collection<String> segmentFiles = segmentInfos.files(true);
if (!segmentFiles.isEmpty()) {
nodeFiles.put(nodeName, new HashSet<>(segmentFiles));
allFiles.addAll(segmentFiles);

// Get non-segment files
String[] filesInDir = directory.listAll();
for (String fileName : filesInDir) {
if (fileName.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX)) {
allFiles.add(fileName);
}
}

if (!allFiles.isEmpty()) {
nodeFiles.put(nodeName, allFiles);
}
} catch (IOException e) {
// corrupt index or no commit files, skip this node
Expand All @@ -127,12 +143,16 @@ private void verifySubdirectoryFilesOnAllNodes(String indexName, int expectedCou
nodeFiles.size()
);

// Verify all nodes have identical files
// Verify all nodes have identical files (including non-segment files)
if (nodeFiles.size() > 1) {
Set<String> referenceFiles = nodeFiles.values().iterator().next();
for (Map.Entry<String, Set<String>> entry : nodeFiles.entrySet()) {
assertEquals("Node " + entry.getKey() + " should have identical files to other nodes", referenceFiles, entry.getValue());
}

// Additional verification: ensure non-segment files are present
boolean hasNonSegmentFiles = referenceFiles.stream().anyMatch(f -> f.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX));
assertTrue("Non-segment files should be present in subdirectory", hasNonSegmentFiles);
}
}

Expand Down Expand Up @@ -181,11 +201,13 @@ public Engine newReadWriteEngine(EngineConfig config) {
static class TestEngine extends InternalEngine {

static final String SUBDIRECTORY_NAME = "test_subdirectory";
static final String NON_SEGMENT_FILE_PREFIX = "test_metadata_";

private final Path subdirectoryPath;
private final Directory subdirectoryDirectory;
private final IndexWriter subdirectoryWriter;
private final EngineConfig engineConfig;
private int nonSegmentFileCounter = 0;

TestEngine(EngineConfig config) throws IOException {
super(config);
Expand Down Expand Up @@ -226,11 +248,29 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// Then commit the subdirectory
try {
subdirectoryWriter.commit();
// Create a non-segment file to test non-segment file recovery
createNonSegmentFile();
} catch (IOException e) {
throw new EngineException(shardId, "Failed to commit subdirectory during flush", e);
}
}

/**
* Creates a non-segment file in the subdirectory to test non-segment file metadata handling
*/
private void createNonSegmentFile() throws IOException {
String fileName = NON_SEGMENT_FILE_PREFIX + nonSegmentFileCounter++;
try (IndexOutput output = subdirectoryDirectory.createOutput(fileName, IOContext.DEFAULT)) {
// Write a proper Lucene file with codec header and footer
CodecUtil.writeHeader(output, "test_metadata", 0);
// Write some test data
output.writeString("test_data_" + System.currentTimeMillis());
output.writeLong(nonSegmentFileCounter);
// Write codec footer with checksum
CodecUtil.writeFooter(output);
}
}

@Override
public void close() throws IOException {
subdirectoryWriter.close();
Expand Down Expand Up @@ -299,6 +339,15 @@ public Collection<String> getFileNames() throws IOException {
String relativePath = Path.of(TestEngine.SUBDIRECTORY_NAME, fileName).toString();
allFiles.add(relativePath);
}

// Add non-segment files (test_metadata_*)
String[] allFilesInDir = directory.listAll();
for (String fileName : allFilesInDir) {
if (fileName.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX)) {
String relativePath = Path.of(TestEngine.SUBDIRECTORY_NAME, fileName).toString();
allFiles.add(relativePath);
}
}
}
}
return allFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
Expand All @@ -19,6 +20,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Version;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.env.ShardLock;
Expand Down Expand Up @@ -122,31 +124,46 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
Map<String, String> commitUserDataBuilder = new HashMap<>(regularMetadata.userData);
totalNumDocs += regularMetadata.numDocs;

// Load subdirectory files metadata from segments_N files in subdirectories
totalNumDocs += this.loadSubdirectoryMetadataFromSegments(commit, builder);
// Load subdirectory files metadata (both segment files and non-segment files like custom metadata file)
totalNumDocs += this.loadSubdirectoryMetadata(commit, builder);

return new MetadataSnapshot(Collections.unmodifiableMap(builder), Collections.unmodifiableMap(commitUserDataBuilder), totalNumDocs);
}

/**
* Load subdirectory file metadata by reading segments_N files from any subdirectories.
* This leverages the same approach as Store.loadMetadata but for files in subdirectories.
* Load subdirectory file metadata by reading segments_N files from any subdirectories,
* and also compute metadata for non-segment files.
*
* @return the total number of documents in all subdirectory segments
*/
private long loadSubdirectoryMetadataFromSegments(IndexCommit commit, Map<String, StoreFileMetadata> builder) throws IOException {
// Find all segments_N files in subdirectories from the commit
Set<String> subdirectorySegmentFiles = new HashSet<>();
private long loadSubdirectoryMetadata(IndexCommit commit, Map<String, StoreFileMetadata> builder) throws IOException {
// Categorize subdirectory files into segment info files (segments_N) and non-segment-info files
Set<String> subdirectorySegmentInfoFiles = new HashSet<>();
Set<String> subdirectoryNonSegmentInfoFiles = new HashSet<>();

for (String fileName : commit.getFileNames()) {
if (Path.of(fileName).getParent() != null && fileName.contains(IndexFileNames.SEGMENTS)) {
subdirectorySegmentFiles.add(fileName);
Path filePath = Path.of(fileName);
// Only process subdirectory files (files with a parent path)
if (filePath.getParent() != null) {
if (fileName.contains(IndexFileNames.SEGMENTS)) {
subdirectorySegmentInfoFiles.add(fileName);
} else {
subdirectoryNonSegmentInfoFiles.add(fileName);
}
}
}

long totalSubdirectoryNumDocs = 0;
// Process each subdirectory segments_N file
for (String segmentsFilePath : subdirectorySegmentFiles) {
totalSubdirectoryNumDocs += this.loadMetadataFromSubdirectorySegmentsFile(segmentsFilePath, builder);
for (String segmentInfoFilePath : subdirectorySegmentInfoFiles) {
totalSubdirectoryNumDocs += this.loadMetadataFromSubdirectorySegmentsFile(segmentInfoFilePath, builder);
}

// Process non-segment files that weren't loaded by segmentInfo
for (String nonSegmentInfoFile : subdirectoryNonSegmentInfoFiles) {
if (!builder.containsKey(nonSegmentInfoFile)) {
computeFileMetadata(nonSegmentInfoFile, builder);
}
}

return totalSubdirectoryNumDocs;
Expand Down Expand Up @@ -213,6 +230,26 @@ private static void loadMetadataFromSegmentInfos(
}
}

/**
* Compute and store metadata for a single file.
*
* @param fileName the relative file path
* @param builder the map to add metadata to
* @throws IOException if reading file fails
*/
private void computeFileMetadata(String fileName, Map<String, StoreFileMetadata> builder) throws IOException {
Path filePath = shardPath().getDataPath().resolve(fileName);
try (Directory dir = FSDirectory.open(filePath.getParent())) {
String localFileName = filePath.getFileName().toString();
try (IndexInput in = dir.openInput(localFileName, IOContext.READONCE)) {
long length = in.length();
String checksum = Store.digestToString(CodecUtil.checksumEntireFile(in));
Version version = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for non-segment info file, we also need lucene version as placeholder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you check the constructor of StoreFileMetadata.java, the version is mandatory.

this.writtenBy = Objects.requireNonNull(writtenBy, "writtenBy must not be null");

builder.put(fileName, new StoreFileMetadata(fileName, length, checksum, version, null));
}
}
}

/**
* A Lucene Directory implementation that handles files in subdirectories.
*
Expand Down
Loading