Skip to content

Commit a649b6f

Browse files
Feature/datafusion (#38)
* Abstracting lucene away: part 1 * initial abstractions to reduce indexing engine coupling * Text backed engine testing --------- Co-authored-by: Mohit Godwani <[email protected]>
1 parent c79e554 commit a649b6f

17 files changed

+599
-19
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine;
10+
11+
import org.opensearch.index.engine.exec.DataFormat;
12+
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
13+
14+
import javax.xml.crypto.Data;
15+
16+
public interface DataFormatPlugin {
17+
18+
<T extends DataFormat> IndexingExecutionEngine<T> indexingEngine();
19+
20+
DataFormat getDataFormat();
21+
}

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@
7979
import org.opensearch.core.common.unit.ByteSizeValue;
8080
import org.opensearch.core.index.shard.ShardId;
8181
import org.opensearch.index.VersionType;
82+
import org.opensearch.index.engine.exec.DocumentInput;
83+
import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter;
8284
import org.opensearch.index.mapper.IdFieldMapper;
8385
import org.opensearch.index.mapper.Mapping;
8486
import org.opensearch.index.mapper.ParseContext.Document;
@@ -1610,6 +1612,7 @@ public static class Index extends Operation {
16101612
private final boolean isRetry;
16111613
private final long ifSeqNo;
16121614
private final long ifPrimaryTerm;
1615+
public CompositeDataFormatWriter.CompositeDocumentInput documentInput;
16131616

16141617
public Index(
16151618
Term uid,
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
public record FileMetadata(DataFormat df, String fileName) { }
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
public interface FlushIn {
12+
13+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
import org.opensearch.index.mapper.MappedFieldType;
12+
13+
import java.io.IOException;
14+
import java.util.Collection;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public interface IndexingExecutionEngine<T extends DataFormat> {
19+
List<String> supportedFieldTypes();
20+
21+
Writer<? extends DocumentInput<?>> createWriter() throws IOException; // A writer responsible for data format vended by this engine.
22+
23+
RefreshResult refresh(RefreshInput refreshInput) throws IOException;
24+
25+
DataFormat getDataFormat();
26+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
14+
public class RefreshInput {
15+
16+
private List<FileMetadata> files;
17+
18+
public RefreshInput() {
19+
this.files = new ArrayList<>();
20+
}
21+
22+
public void add(FileMetadata fileMetadata) {
23+
this.files.add(fileMetadata);
24+
}
25+
26+
public List<FileMetadata> getFiles() {
27+
return files;
28+
}
29+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
import java.util.ArrayList;
12+
import java.util.HashMap;
13+
import java.util.List;
14+
import java.util.Map;
15+
16+
public class RefreshResult {
17+
private Map<DataFormat, List<FileMetadata>> refreshedFiles = new HashMap<>();
18+
19+
public RefreshResult() {
20+
21+
}
22+
23+
public void add(DataFormat df, List<FileMetadata> fileMetadata) {
24+
refreshedFiles.computeIfAbsent(df, ddf -> new ArrayList<>()).addAll(fileMetadata);
25+
}
26+
27+
public Map<DataFormat, List<FileMetadata>> getRefreshedFiles() {
28+
return refreshedFiles;
29+
}
30+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
public record WriteResult(boolean success, Exception e, long version, long term, long seqNo) {
12+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
import java.io.IOException;
12+
import java.util.Optional;
13+
14+
public interface Writer<P extends DocumentInput<?>> {
15+
WriteResult addDoc(P d) throws IOException;
16+
17+
FileMetadata flush(FlushIn flushIn) throws IOException;
18+
19+
void sync() throws IOException;
20+
21+
void close();
22+
23+
Optional<FileMetadata> getMetadata();
24+
25+
P newDocumentInput();
26+
}

server/src/main/java/org/opensearch/index/engine/exec/composite/CompositeIndexingExecutionEngine.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.engine.exec.composite;
1010

11+
import org.opensearch.index.engine.DataFormatPlugin;
1112
import org.opensearch.index.engine.exec.DataFormat;
1213
import org.opensearch.index.engine.exec.FileMetadata;
1314
import org.opensearch.index.engine.exec.IndexingExecutionEngine;

0 commit comments

Comments
 (0)