diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e2be80e8201c..cf87715f882a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump opensearch-protobufs dependency to 0.24.0 and update transport-grpc module compatibility ([#20059](https://github.com/opensearch-project/OpenSearch/pull/20059)) - Refactor the ShardStats, WarmerStats and IndexingPressureStats class to use the Builder pattern instead of constructors ([#19966](https://github.com/opensearch-project/OpenSearch/pull/19966)) +- Handle custom metadata files in subdirectory-store ([#20157](https://github.com/opensearch-project/OpenSearch/pull/20157) ### Fixed - Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012)) diff --git a/modules/store-subdirectory/src/internalClusterTest/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareRecoveryTests.java b/modules/store-subdirectory/src/internalClusterTest/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareRecoveryTests.java index ccd6ac05ae087..ed68b4f52101a 100644 --- a/modules/store-subdirectory/src/internalClusterTest/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareRecoveryTests.java +++ b/modules/store-subdirectory/src/internalClusterTest/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareRecoveryTests.java @@ -8,6 +8,7 @@ package org.opensearch.plugin.store.subdirectory; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.StringField; @@ -17,6 +18,8 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -110,10 +113,23 @@ private void verifySubdirectoryFilesOnAllNodes(String indexName, int expectedCou if (Files.exists(subdirectoryPath)) { try (Directory directory = FSDirectory.open(subdirectoryPath)) { + Set allFiles = new HashSet<>(); + + // Get segment files SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory); Collection segmentFiles = segmentInfos.files(true); - if (!segmentFiles.isEmpty()) { - nodeFiles.put(nodeName, new HashSet<>(segmentFiles)); + allFiles.addAll(segmentFiles); + + // Get non-segment files + String[] filesInDir = directory.listAll(); + for (String fileName : filesInDir) { + if (fileName.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX)) { + allFiles.add(fileName); + } + } + + if (!allFiles.isEmpty()) { + nodeFiles.put(nodeName, allFiles); } } catch (IOException e) { // corrupt index or no commit files, skip this node @@ -127,12 +143,16 @@ private void verifySubdirectoryFilesOnAllNodes(String indexName, int expectedCou nodeFiles.size() ); - // Verify all nodes have identical files + // Verify all nodes have identical files (including non-segment files) if (nodeFiles.size() > 1) { Set referenceFiles = nodeFiles.values().iterator().next(); for (Map.Entry> entry : nodeFiles.entrySet()) { assertEquals("Node " + entry.getKey() + " should have identical files to other nodes", referenceFiles, entry.getValue()); } + + // Additional verification: ensure non-segment files are present + boolean hasNonSegmentFiles = referenceFiles.stream().anyMatch(f -> f.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX)); + assertTrue("Non-segment files should be present in subdirectory", hasNonSegmentFiles); } } @@ -181,11 +201,13 @@ public Engine newReadWriteEngine(EngineConfig config) { static class TestEngine extends InternalEngine { static final String SUBDIRECTORY_NAME = "test_subdirectory"; + static final String NON_SEGMENT_FILE_PREFIX = "test_metadata_"; private final Path subdirectoryPath; private final Directory subdirectoryDirectory; private final IndexWriter subdirectoryWriter; private final EngineConfig engineConfig; + private int nonSegmentFileCounter = 0; TestEngine(EngineConfig config) throws IOException { super(config); @@ -226,11 +248,29 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // Then commit the subdirectory try { subdirectoryWriter.commit(); + // Create a non-segment file to test non-segment file recovery + createNonSegmentFile(); } catch (IOException e) { throw new EngineException(shardId, "Failed to commit subdirectory during flush", e); } } + /** + * Creates a non-segment file in the subdirectory to test non-segment file metadata handling + */ + private void createNonSegmentFile() throws IOException { + String fileName = NON_SEGMENT_FILE_PREFIX + nonSegmentFileCounter++; + try (IndexOutput output = subdirectoryDirectory.createOutput(fileName, IOContext.DEFAULT)) { + // Write a proper Lucene file with codec header and footer + CodecUtil.writeHeader(output, "test_metadata", 0); + // Write some test data + output.writeString("test_data_" + System.currentTimeMillis()); + output.writeLong(nonSegmentFileCounter); + // Write codec footer with checksum + CodecUtil.writeFooter(output); + } + } + @Override public void close() throws IOException { subdirectoryWriter.close(); @@ -299,6 +339,15 @@ public Collection getFileNames() throws IOException { String relativePath = Path.of(TestEngine.SUBDIRECTORY_NAME, fileName).toString(); allFiles.add(relativePath); } + + // Add non-segment files (test_metadata_*) + String[] allFilesInDir = directory.listAll(); + for (String fileName : allFilesInDir) { + if (fileName.startsWith(TestEngine.NON_SEGMENT_FILE_PREFIX)) { + String relativePath = Path.of(TestEngine.SUBDIRECTORY_NAME, fileName).toString(); + allFiles.add(relativePath); + } + } } } return allFiles; diff --git a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java index af8b2c917b953..84fd9a5a97f34 100644 --- a/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java +++ b/modules/store-subdirectory/src/main/java/org/opensearch/plugin/store/subdirectory/SubdirectoryAwareStore.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; @@ -19,6 +20,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.util.Version; import org.opensearch.common.lucene.Lucene; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.ShardLock; @@ -122,31 +124,46 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException { Map commitUserDataBuilder = new HashMap<>(regularMetadata.userData); totalNumDocs += regularMetadata.numDocs; - // Load subdirectory files metadata from segments_N files in subdirectories - totalNumDocs += this.loadSubdirectoryMetadataFromSegments(commit, builder); + // Load subdirectory files metadata (both segment files and non-segment files like custom metadata file) + totalNumDocs += this.loadSubdirectoryMetadata(commit, builder); return new MetadataSnapshot(Collections.unmodifiableMap(builder), Collections.unmodifiableMap(commitUserDataBuilder), totalNumDocs); } /** - * Load subdirectory file metadata by reading segments_N files from any subdirectories. - * This leverages the same approach as Store.loadMetadata but for files in subdirectories. + * Load subdirectory file metadata by reading segments_N files from any subdirectories, + * and also compute metadata for non-segment files. * * @return the total number of documents in all subdirectory segments */ - private long loadSubdirectoryMetadataFromSegments(IndexCommit commit, Map builder) throws IOException { - // Find all segments_N files in subdirectories from the commit - Set subdirectorySegmentFiles = new HashSet<>(); + private long loadSubdirectoryMetadata(IndexCommit commit, Map builder) throws IOException { + // Categorize subdirectory files into segment info files (segments_N) and non-segment-info files + Set subdirectorySegmentInfoFiles = new HashSet<>(); + Set subdirectoryNonSegmentInfoFiles = new HashSet<>(); + for (String fileName : commit.getFileNames()) { - if (Path.of(fileName).getParent() != null && fileName.contains(IndexFileNames.SEGMENTS)) { - subdirectorySegmentFiles.add(fileName); + Path filePath = Path.of(fileName); + // Only process subdirectory files (files with a parent path) + if (filePath.getParent() != null) { + if (fileName.contains(IndexFileNames.SEGMENTS)) { + subdirectorySegmentInfoFiles.add(fileName); + } else { + subdirectoryNonSegmentInfoFiles.add(fileName); + } } } long totalSubdirectoryNumDocs = 0; // Process each subdirectory segments_N file - for (String segmentsFilePath : subdirectorySegmentFiles) { - totalSubdirectoryNumDocs += this.loadMetadataFromSubdirectorySegmentsFile(segmentsFilePath, builder); + for (String segmentInfoFilePath : subdirectorySegmentInfoFiles) { + totalSubdirectoryNumDocs += this.loadMetadataFromSubdirectorySegmentsFile(segmentInfoFilePath, builder); + } + + // Process non-segment files that weren't loaded by segmentInfo + for (String nonSegmentInfoFile : subdirectoryNonSegmentInfoFiles) { + if (!builder.containsKey(nonSegmentInfoFile)) { + computeFileMetadata(nonSegmentInfoFile, builder); + } } return totalSubdirectoryNumDocs; @@ -213,6 +230,26 @@ private static void loadMetadataFromSegmentInfos( } } + /** + * Compute and store metadata for a single file. + * + * @param fileName the relative file path + * @param builder the map to add metadata to + * @throws IOException if reading file fails + */ + private void computeFileMetadata(String fileName, Map builder) throws IOException { + Path filePath = shardPath().getDataPath().resolve(fileName); + try (Directory dir = FSDirectory.open(filePath.getParent())) { + String localFileName = filePath.getFileName().toString(); + try (IndexInput in = dir.openInput(localFileName, IOContext.READONCE)) { + long length = in.length(); + String checksum = Store.digestToString(CodecUtil.checksumEntireFile(in)); + Version version = org.opensearch.Version.CURRENT.minimumIndexCompatibilityVersion().luceneVersion; + builder.put(fileName, new StoreFileMetadata(fileName, length, checksum, version, null)); + } + } + } + /** * A Lucene Directory implementation that handles files in subdirectories. * diff --git a/modules/store-subdirectory/src/test/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePluginTests.java b/modules/store-subdirectory/src/test/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePluginTests.java index 68ff237beb023..8a8e62883b7a9 100644 --- a/modules/store-subdirectory/src/test/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePluginTests.java +++ b/modules/store-subdirectory/src/test/java/org/opensearch/plugin/store/subdirectory/SubdirectoryStorePluginTests.java @@ -8,8 +8,15 @@ package org.opensearch.plugin.store.subdirectory; +import org.apache.lucene.codecs.CodecUtil; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; @@ -22,6 +29,7 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreStats; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.test.DummyShardLock; @@ -30,9 +38,11 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -136,4 +146,316 @@ public int numNonExtraFiles(Store store) throws IOException { } return numNonExtra; } + + /** + * Helper class to hold test store setup data + */ + private static class TestStoreSetup { + final SubdirectoryAwareStore store; + final Path path; + + TestStoreSetup(SubdirectoryAwareStore store, Path path) { + this.store = store; + this.path = path; + } + } + + /** + * Creates a test store setup with all necessary paths and configurations + */ + private TestStoreSetup createTestStoreSetup() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)) + .build(); + Path path = createTempDir().resolve("indices").resolve(shardId.getIndex().getUUID()).resolve(String.valueOf(shardId.id())); + Path indexPath = path.resolve("index"); + Files.createDirectories(indexPath); + + SubdirectoryAwareStore store = new SubdirectoryAwareStore( + shardId, + IndexSettingsModule.newIndexSettings("index", settings), + SubdirectoryStorePluginTests.newFSDirectory(indexPath), + new DummyShardLock(shardId), + Store.OnClose.EMPTY, + new ShardPath(false, path, path, shardId), + new FsDirectoryFactory() + ); + + return new TestStoreSetup(store, path); + } + + public void testLoadMetadataWithNonSegmentFiles() throws IOException { + TestStoreSetup setup = createTestStoreSetup(); + SubdirectoryAwareStore store = setup.store; + + try { + Path subdirPath = setup.path.resolve("subdir"); + Files.createDirectories(subdirPath); + + // Write non-segment files with codec headers and footers + try (Directory subdir = FSDirectory.open(subdirPath)) { + writeTestFileWithCodec(subdir, "metadata_file1.dat", "test_data_1"); + writeTestFileWithCodec(subdir, "metadata_file2.dat", "test_data_2"); + } + + // Create a minimal segments file in the main directory + try (IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig())) { + writer.commit(); + } + + // Get the committed SegmentInfos from the directory + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(store.directory()); + + // Create mock IndexCommit that includes subdirectory files + TestIndexCommit testCommit = new TestIndexCommit(segmentInfos, store.directory(), subdirPath, "subdir"); + + // Load metadata + Store.MetadataSnapshot metadata = store.getMetadata(testCommit); + + // Verify non-segment files are included in metadata + assertTrue("Metadata should contain subdir/metadata_file1.dat", metadata.contains("subdir/metadata_file1.dat")); + assertTrue("Metadata should contain subdir/metadata_file2.dat", metadata.contains("subdir/metadata_file2.dat")); + + // Verify file metadata properties + StoreFileMetadata file1Meta = metadata.get("subdir/metadata_file1.dat"); + assertNotNull("File metadata should not be null", file1Meta); + assertTrue("File length should be greater than 0", file1Meta.length() > 0); + assertNotNull("Checksum should not be null", file1Meta.checksum()); + assertNotNull("Version should not be null", file1Meta.writtenBy()); + + StoreFileMetadata file2Meta = metadata.get("subdir/metadata_file2.dat"); + assertNotNull("File metadata should not be null", file2Meta); + assertTrue("File length should be greater than 0", file2Meta.length() > 0); + assertNotNull("Checksum should not be null", file2Meta.checksum()); + + // Verify different files have different checksums + assertFalse("Different files should have different checksums", file1Meta.checksum().equals(file2Meta.checksum())); + } finally { + deleteContent(store.directory()); + IOUtils.close(store); + } + } + + public void testLoadMetadataWithMixedFiles() throws IOException { + TestStoreSetup setup = createTestStoreSetup(); + SubdirectoryAwareStore store = setup.store; + + try { + Path subdirPath = setup.path.resolve("subdir"); + Files.createDirectories(subdirPath); + + // Write segment files + try (Directory subdir = FSDirectory.open(subdirPath)) { + try (IndexWriter writer = new IndexWriter(subdir, new IndexWriterConfig())) { + writer.commit(); + } + // Write non-segment files + writeTestFileWithCodec(subdir, "custom_metadata.dat", "custom_data"); + } + + // Create a minimal segments file in main directory + try (IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig())) { + writer.commit(); + } + + // Get the committed SegmentInfos from the directory + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(store.directory()); + + // Create mock IndexCommit that includes subdirectory files + TestIndexCommit testCommit = new TestIndexCommit(segmentInfos, store.directory(), subdirPath, "subdir"); + + // Load metadata + Store.MetadataSnapshot metadata = store.getMetadata(testCommit); + + // Verify segment files are included + boolean hasSegmentFile = false; + for (String fileName : metadata.asMap().keySet()) { + if (fileName.startsWith("subdir/segments_")) { + hasSegmentFile = true; + break; + } + } + assertTrue("Should have segment file in subdirectory", hasSegmentFile); + + // Verify non-segment file is included + assertTrue("Metadata should contain custom_metadata.dat", metadata.contains("subdir/custom_metadata.dat")); + + // Verify no duplicate entries + Collection fileNames = metadata.asMap().keySet(); + assertEquals("Should have no duplicate entries", fileNames.size(), fileNames.stream().distinct().count()); + } finally { + deleteContent(store.directory()); + IOUtils.close(store); + } + } + + public void testNonSegmentFileChecksumValidation() throws IOException { + TestStoreSetup setup = createTestStoreSetup(); + SubdirectoryAwareStore store = setup.store; + + try { + Path subdirPath = setup.path.resolve("subdir"); + Files.createDirectories(subdirPath); + + // Write test file with codec headers and footers + String testContent = "test_content_for_checksum"; + try (Directory subdir = FSDirectory.open(subdirPath)) { + writeTestFileWithCodec(subdir, "checksum_test.dat", testContent); + + // Compute expected checksum directly + try (IndexInput in = subdir.openInput("checksum_test.dat", IOContext.READONCE)) { + String expectedChecksum = Store.digestToString(CodecUtil.checksumEntireFile(in)); + + // Create main segments file + try (IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig())) { + writer.commit(); + } + + // Get the committed SegmentInfos from the directory + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(store.directory()); + + // Create mock IndexCommit + TestIndexCommit testCommit = new TestIndexCommit(segmentInfos, store.directory(), subdirPath, "subdir"); + + // Load metadata + Store.MetadataSnapshot metadata = store.getMetadata(testCommit); + + // Verify checksum matches + StoreFileMetadata fileMeta = metadata.get("subdir/checksum_test.dat"); + assertNotNull("File metadata should exist", fileMeta); + assertEquals("Checksum should match CodecUtil.checksumEntireFile result", expectedChecksum, fileMeta.checksum()); + } + + // Write another file with different content + writeTestFileWithCodec(subdir, "checksum_test2.dat", "different_content"); + + // Get the committed SegmentInfos from the directory + SegmentInfos segmentInfos2 = SegmentInfos.readLatestCommit(store.directory()); + + // Verify different content produces different checksum + TestIndexCommit testCommit2 = new TestIndexCommit(segmentInfos2, store.directory(), subdirPath, "subdir"); + Store.MetadataSnapshot metadata2 = store.getMetadata(testCommit2); + + String checksum1 = metadata2.get("subdir/checksum_test.dat").checksum(); + String checksum2 = metadata2.get("subdir/checksum_test2.dat").checksum(); + assertFalse("Different content should produce different checksums", checksum1.equals(checksum2)); + } + } finally { + deleteContent(store.directory()); + IOUtils.close(store); + } + } + + /** + * Helper method to write a test file with proper Lucene codec header and footer + */ + private void writeTestFileWithCodec(Directory dir, String fileName, String content) throws IOException { + try (IndexOutput output = dir.createOutput(fileName, IOContext.DEFAULT)) { + CodecUtil.writeHeader(output, "test_codec", 0); + output.writeString(content); + output.writeLong(System.currentTimeMillis()); + CodecUtil.writeFooter(output); + } + } + + /** + * Mock IndexCommit that wraps SegmentInfos and includes files from subdirectories + */ + static class TestIndexCommit extends IndexCommit { + private final SegmentInfos segmentInfos; + private final Directory directory; + private final List subdirectoryPaths; + private final List subdirectoryNames; + + TestIndexCommit(SegmentInfos segmentInfos, Directory directory, Path subdirectoryPath, String subdirectoryName) { + this.segmentInfos = segmentInfos; + this.directory = directory; + this.subdirectoryPaths = List.of(subdirectoryPath); + this.subdirectoryNames = List.of(subdirectoryName); + } + + @Override + public String getSegmentsFileName() { + return segmentInfos.getSegmentsFileName(); + } + + @Override + public Collection getFileNames() throws IOException { + Collection allFiles = new ArrayList<>(segmentInfos.files(true)); + + for (int i = 0; i < subdirectoryPaths.size(); i++) { + Path subdirPath = subdirectoryPaths.get(i); + String subdirName = subdirectoryNames.get(i); + + if (Files.exists(subdirPath)) { + try (Directory directory = FSDirectory.open(subdirPath)) { + String[] files = directory.listAll(); + + // Add segment files if they exist + try { + SegmentInfos segmentInfos = SegmentInfos.readLatestCommit(directory); + for (String fileName : segmentInfos.files(true)) { + allFiles.add(subdirName + "/" + fileName); + } + } catch (Exception e) { + // No segment files, that's OK + } + + // Add non-segment files (exclude test infrastructure and Lucene internal files) + for (String fileName : files) { + if (isCustomMetadataFile(fileName)) { + allFiles.add(subdirName + "/" + fileName); + } + } + } + } + } + + return allFiles; + } + + @Override + public Directory getDirectory() { + return directory; + } + + @Override + public void delete() { + throw new UnsupportedOperationException("TestIndexCommit does not support deletion"); + } + + @Override + public int getSegmentCount() { + return segmentInfos.size(); + } + + @Override + public long getGeneration() { + return segmentInfos.getGeneration(); + } + + @Override + public Map getUserData() throws IOException { + return segmentInfos.getUserData(); + } + + @Override + public boolean isDeleted() { + return false; + } + + /** + * Returns true if the file is a custom metadata file (not a Lucene internal file or test infrastructure file) + */ + private static boolean isCustomMetadataFile(String fileName) { + return !fileName.startsWith("segments_") + && !fileName.endsWith(".si") + && !fileName.startsWith("extra") + && !fileName.endsWith(".cfe") + && !fileName.endsWith(".cfs") + && !fileName.endsWith(".lock"); + } + } }