Skip to content

Commit 546257c

Browse files
local recovery via catalogsnapshot (#19841)
Signed-off-by: bharath-techie <[email protected]>
1 parent f14a910 commit 546257c

File tree

4 files changed

+147
-9
lines changed

4 files changed

+147
-9
lines changed

server/src/main/java/org/opensearch/index/engine/exec/WriterFileSet.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@
88

99
package org.opensearch.index.engine.exec;
1010

11+
import org.opensearch.core.common.io.stream.StreamInput;
12+
import org.opensearch.core.common.io.stream.StreamOutput;
13+
import org.opensearch.core.common.io.stream.Writeable;
14+
15+
import java.io.IOException;
1116
import java.io.Serializable;
1217
import java.nio.file.Path;
1318
import java.util.HashSet;
1419
import java.util.Set;
1520

16-
public class WriterFileSet implements Serializable {
21+
public class WriterFileSet implements Serializable, Writeable {
1722

1823
private final String directory;
1924
private final long writerGeneration;
@@ -25,6 +30,30 @@ public WriterFileSet(Path directory, long writerGeneration) {
2530
this.directory = directory.toString();
2631
}
2732

33+
public WriterFileSet(StreamInput in) throws IOException {
34+
this.directory = in.readString();
35+
this.writerGeneration = in.readLong();
36+
37+
int fileCount = in.readVInt();
38+
this.files = new HashSet<>(fileCount);
39+
for (int i = 0; i < fileCount; i++) {
40+
this.files.add(in.readString());
41+
}
42+
}
43+
44+
/**
45+
* Serialize this WriterFileSet to StreamOutput
46+
*/
47+
@Override
48+
public void writeTo(StreamOutput out) throws IOException {
49+
out.writeString(directory);
50+
out.writeLong(writerGeneration);
51+
out.writeVInt(files.size());
52+
for (String file : files) {
53+
out.writeString(file);
54+
}
55+
}
56+
2857
public void add(String file) {
2958
this.files.add(file);
3059
}

server/src/main/java/org/opensearch/index/engine/exec/commit/LuceneCommitEngine.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,27 @@ public void addLuceneIndexes(CatalogSnapshot catalogSnapshot) {
4747
throw new RuntimeException(e);
4848
}
4949
});
50-
Map<String, String> userData = new HashMap<>();
51-
catalogSnapshot.getSegments().forEach(segment -> userData.put(String.valueOf(segment.getGeneration()),
52-
new String(SerializationUtils.serialize(segment))));
50+
51+
Map<String, String> userData = null;
52+
try {
53+
userData = catalogSnapshot.toCommitUserData();
54+
} catch (IOException e) {
55+
throw new RuntimeException(e);
56+
}
5357
indexWriter.setLiveCommitData(userData.entrySet());
5458
}
5559

60+
public CatalogSnapshot readCatalogSnapshot() throws IOException {
61+
if(indexWriter.getLiveCommitData().iterator().hasNext()) {
62+
Map.Entry<String, String> entry = indexWriter.getLiveCommitData().iterator().next();
63+
return CatalogSnapshot.fromCommitUserData(entry.getValue());
64+
}
65+
return null;
66+
}
67+
public IndexWriter getIndexWriter() {
68+
return indexWriter;
69+
}
70+
5671
@Override
5772
public CommitPoint commit(CatalogSnapshot catalogSnapshot) {
5873
addLuceneIndexes(catalogSnapshot);

server/src/main/java/org/opensearch/index/engine/exec/coord/CatalogSnapshot.java

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,32 @@
99
package org.opensearch.index.engine.exec.coord;
1010

1111
import org.opensearch.common.annotation.ExperimentalApi;
12+
import org.opensearch.common.io.stream.BytesStreamOutput;
1213
import org.opensearch.common.util.concurrent.AbstractRefCounted;
14+
import org.opensearch.core.common.io.stream.BytesStreamInput;
15+
import org.opensearch.core.common.io.stream.StreamInput;
16+
import org.opensearch.core.common.io.stream.StreamOutput;
17+
import org.opensearch.core.common.io.stream.Writeable;
1318
import org.opensearch.index.engine.exec.FileMetadata;
1419
import org.opensearch.index.engine.exec.RefreshResult;
1520
import org.opensearch.index.engine.exec.WriterFileSet;
1621

22+
import java.io.IOException;
1723
import java.io.Serializable;
1824
import java.util.ArrayList;
25+
import java.util.Base64;
1926
import java.util.Collection;
2027
import java.util.Collections;
2128
import java.util.HashMap;
2229
import java.util.List;
2330
import java.util.Map;
2431

2532
@ExperimentalApi
26-
public class CatalogSnapshot extends AbstractRefCounted {
33+
public class CatalogSnapshot extends AbstractRefCounted implements Writeable {
2734

2835
private final long id;
2936
private final Map<String, Collection<WriterFileSet>> dfGroupedSearchableFiles;
37+
private static final String CATALOG_SNAPSHOT_KEY = "_catalog_snapshot_";
3038

3139
public CatalogSnapshot(RefreshResult refreshResult, long id) {
3240
super("catalog_snapshot");
@@ -35,6 +43,60 @@ public CatalogSnapshot(RefreshResult refreshResult, long id) {
3543
refreshResult.getRefreshedFiles().forEach((dataFormat, writerFiles) -> dfGroupedSearchableFiles.put(dataFormat.name(), writerFiles));
3644
}
3745

46+
public CatalogSnapshot(StreamInput in) throws IOException {
47+
super("catalog_snapshot");
48+
this.id = in.readLong();
49+
this.dfGroupedSearchableFiles = new HashMap<>();
50+
51+
int mapSize = in.readVInt();
52+
for (int i = 0; i < mapSize; i++) {
53+
String dataFormat = in.readString();
54+
int fileSetCount = in.readVInt();
55+
List<WriterFileSet> fileSets = new ArrayList<>(fileSetCount);
56+
for (int j = 0; j < fileSetCount; j++) {
57+
fileSets.add(new WriterFileSet(in));
58+
}
59+
dfGroupedSearchableFiles.put(dataFormat, fileSets);
60+
}
61+
}
62+
63+
@Override
64+
public void writeTo(StreamOutput out) throws IOException {
65+
out.writeLong(id);
66+
out.writeVInt(dfGroupedSearchableFiles.size());
67+
for (Map.Entry<String, Collection<WriterFileSet>> entry : dfGroupedSearchableFiles.entrySet()) {
68+
out.writeString(entry.getKey());
69+
out.writeVInt(entry.getValue().size());
70+
for (WriterFileSet fileSet : entry.getValue()) {
71+
fileSet.writeTo(out);
72+
}
73+
}
74+
}
75+
76+
public String serializeToString() throws IOException {
77+
try (BytesStreamOutput out = new BytesStreamOutput()) {
78+
this.writeTo(out);
79+
return Base64.getEncoder().encodeToString(out.bytes().toBytesRef().bytes);
80+
}
81+
}
82+
83+
public static CatalogSnapshot deserializeFromString(String serializedData) throws IOException {
84+
byte[] bytes = Base64.getDecoder().decode(serializedData);
85+
try (BytesStreamInput in = new BytesStreamInput(bytes)) {
86+
return new CatalogSnapshot(in);
87+
}
88+
}
89+
90+
public Map<String, String> toCommitUserData() throws IOException {
91+
Map<String, String> userData = new HashMap<>();
92+
userData.put(CATALOG_SNAPSHOT_KEY, serializeToString());
93+
return userData;
94+
}
95+
96+
public static CatalogSnapshot fromCommitUserData(String userData) throws IOException {
97+
return deserializeFromString(userData);
98+
}
99+
38100
public Collection<WriterFileSet> getSearchableFiles(String dataFormat) {
39101
if (dfGroupedSearchableFiles.containsKey(dataFormat)) {
40102
return dfGroupedSearchableFiles.get(dataFormat);
@@ -68,7 +130,7 @@ public String toString() {
68130
'}';
69131
}
70132

71-
public static class Segment implements Serializable {
133+
public static class Segment implements Serializable, Writeable {
72134

73135
private final long generation;
74136
private final Map<String, WriterFileSet> dfGroupedSearchableFiles;
@@ -78,6 +140,28 @@ public Segment(long generation) {
78140
this.generation = generation;
79141
}
80142

143+
public Segment(StreamInput in) throws IOException {
144+
this.generation = in.readLong();
145+
this.dfGroupedSearchableFiles = new HashMap<>();
146+
147+
int mapSize = in.readVInt();
148+
for (int i = 0; i < mapSize; i++) {
149+
String dataFormat = in.readString();
150+
WriterFileSet fileSet = new WriterFileSet(in);
151+
dfGroupedSearchableFiles.put(dataFormat, fileSet);
152+
}
153+
}
154+
155+
@Override
156+
public void writeTo(StreamOutput out) throws IOException {
157+
out.writeLong(generation);
158+
out.writeVInt(dfGroupedSearchableFiles.size());
159+
for (Map.Entry<String, WriterFileSet> entry : dfGroupedSearchableFiles.entrySet()) {
160+
out.writeString(entry.getKey());
161+
entry.getValue().writeTo(out);
162+
}
163+
}
164+
81165
public void addSearchableFiles(String dataFormat, WriterFileSet writerFileSetGroup) {
82166
dfGroupedSearchableFiles.put(dataFormat, writerFileSetGroup);
83167
}

server/src/main/java/org/opensearch/index/engine/exec/coord/CompositeEngine.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.engine.exec.coord;
1010

11+
import org.apache.commons.lang3.SerializationUtils;
1112
import org.apache.lucene.search.ReferenceManager;
1213
import org.opensearch.common.annotation.ExperimentalApi;
1314
import org.opensearch.index.engine.CatalogSnapshotAwareRefreshListener;
@@ -48,6 +49,7 @@
4849
import java.util.Collection;
4950
import java.util.Collections;
5051
import java.util.HashMap;
52+
import java.util.Iterator;
5153
import java.util.List;
5254
import java.util.Map;
5355

@@ -67,17 +69,18 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
6769
List<SearchEnginePlugin> searchEnginePlugins = pluginsService.filterPlugins(SearchEnginePlugin.class);
6870
// How to bring the Dataformat here? Currently this means only Text and LuceneFormat can be used
6971
this.engine = new CompositeIndexingExecutionEngine(mapperService, pluginsService, shardPath, 0);
70-
Path committerPath = Files.createTempDirectory("lucene-committer-index");
71-
this.compositeEngineCommitter = new LuceneCommitEngine(committerPath);
72-
72+
this.compositeEngineCommitter = new LuceneCommitEngine(shardPath.getDataPath());
73+
this.catalogSnapshot = ((LuceneCommitEngine)this.compositeEngineCommitter).readCatalogSnapshot();
7374
this.mergeHandler = new ParquetMergeHandler(this, this.engine, this.engine.getDataFormat());
7475
mergeScheduler = new MergeScheduler(this.mergeHandler, this);
7576

7677
// Refresh here so that catalog snapshot gets initialized
7778
// TODO : any better way to do this ?
7879
refresh("start");
80+
7981
// TODO : how to extend this for Lucene ? where engine is a r/w engine
8082
// Create read specific engines for each format which is associated with shard
83+
8184
for (SearchEnginePlugin searchEnginePlugin : searchEnginePlugins) {
8285
for (org.opensearch.vectorized.execution.search.DataFormat dataFormat : searchEnginePlugin.getSupportedFormats()) {
8386
List<SearchExecEngine<?, ?, ?, ?>> currentSearchEngines = readEngines.getOrDefault(dataFormat, new ArrayList<>());
@@ -99,6 +102,13 @@ public CompositeEngine(MapperService mapperService, PluginsService pluginsServic
99102
}
100103
}
101104
}
105+
catalogSnapshotAwareRefreshListeners.forEach(ref -> {
106+
try {
107+
ref.afterRefresh(true, catalogSnapshot);
108+
} catch (IOException e) {
109+
throw new RuntimeException(e);
110+
}
111+
});
102112
}
103113

104114
public SearchExecEngine<?, ?, ?, ?> getReadEngine(org.opensearch.vectorized.execution.search.DataFormat dataFormat) {

0 commit comments

Comments
 (0)