Skip to content

Commit 77b320c

Browse files
bharath-techieShashank Gowri
andauthored
Commiter integration and build fixes (#19612)
* Feature/datafusion 4 (#46) * Composite document writer pool initial implementation * Committer interface and lucene based commit engine implementation * Catalog snapshot changes to create segment view during commit --------- Co-authored-by: Shashank Gowri <[email protected]> * fix build for commit integration Signed-off-by: bharath-techie <[email protected]> --------- Signed-off-by: bharath-techie <[email protected]> Co-authored-by: Shashank Gowri <[email protected]>
1 parent 3d06c82 commit 77b320c

33 files changed

+1051
-274
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,5 +82,6 @@ doc-tools/missing-doclet/bin/
8282
/modules/parquet-data-format/src/main/resources/native/
8383
/modules/parquet-data-format/jni/target/debug
8484

85+
/modules/parquet-data-format/jni/target/release
8586
**/Cargo.lock
8687
/modules/parquet-data-format/jni/

.idea/runConfigurations/Debug_OpenSearch.xml

Lines changed: 14 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
package com.parquet.parquetdataformat.engine;
22

3-
import org.apache.arrow.vector.types.pojo.Schema;
4-
import org.opensearch.index.engine.exec.*;
53
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
64
import com.parquet.parquetdataformat.writer.ParquetWriter;
5+
import org.apache.arrow.vector.types.pojo.Schema;
6+
import org.opensearch.index.engine.exec.DataFormat;
7+
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
8+
import org.opensearch.index.engine.exec.RefreshInput;
9+
import org.opensearch.index.engine.exec.RefreshResult;
10+
import org.opensearch.index.engine.exec.Writer;
11+
import org.opensearch.index.engine.exec.WriterFileSet;
712
import org.opensearch.index.shard.ShardPath;
813

914
import java.io.IOException;
1015
import java.nio.file.Path;
1116
import java.util.ArrayList;
1217
import java.util.List;
13-
import java.util.concurrent.atomic.AtomicInteger;
1418
import java.util.function.Supplier;
1519

1620
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
@@ -47,13 +51,11 @@
4751
public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDataFormat> {
4852

4953
public static final String FILE_NAME_PREFIX = "parquet_file_generation";
50-
AtomicInteger counter;
51-
Supplier<Schema> schema;
52-
private final List<FileMetadata> filesWrittenAlready = new ArrayList<>();
54+
private final Supplier<Schema> schema;
55+
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
5356
private final ShardPath shardPath;
5457

5558
public ParquetExecutionEngine(Supplier<Schema> schema, ShardPath shardPath) {
56-
counter = new AtomicInteger(0);
5759
this.schema = schema;
5860
this.shardPath = shardPath;
5961
}
@@ -64,15 +66,15 @@ public List<String> supportedFieldTypes() {
6466
}
6567

6668
@Override
67-
public Writer<ParquetDocumentInput> createWriter() throws IOException {
68-
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + counter.getAndIncrement() + ".parquet").toString();
69-
return new ParquetWriter(fileName, schema.get());
69+
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
70+
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + ".parquet").toString();
71+
return new ParquetWriter(fileName, schema.get(), writerGeneration);
7072
}
7173

7274
@Override
7375
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
7476
RefreshResult refreshResult = new RefreshResult();
75-
filesWrittenAlready.addAll(refreshInput.getFiles());
77+
filesWrittenAlready.addAll(refreshInput.getWriterFiles());
7678
refreshResult.add(PARQUET_DATA_FORMAT, filesWrittenAlready);
7779
return refreshResult;
7880
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,20 @@
88

99
package com.parquet.parquetdataformat.vsr;
1010

11-
import com.parquet.parquetdataformat.engine.ParquetDataFormat;
11+
import com.parquet.parquetdataformat.bridge.ArrowExport;
12+
import com.parquet.parquetdataformat.bridge.RustBridge;
13+
import com.parquet.parquetdataformat.memory.MemoryPressureMonitor;
1214
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
1315
import org.apache.arrow.vector.FieldVector;
1416
import org.apache.arrow.vector.types.pojo.Field;
1517
import org.apache.arrow.vector.types.pojo.Schema;
16-
import com.parquet.parquetdataformat.bridge.RustBridge;
17-
import com.parquet.parquetdataformat.bridge.ArrowExport;
18-
import com.parquet.parquetdataformat.memory.MemoryPressureMonitor;
18+
import org.opensearch.index.engine.exec.FlushIn;
19+
import org.opensearch.index.engine.exec.WriteResult;
1920

2021
import java.io.IOException;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324

24-
import org.opensearch.index.engine.exec.FileMetadata;
25-
import org.opensearch.index.engine.exec.FlushIn;
26-
import org.opensearch.index.engine.exec.WriteResult;
27-
28-
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
29-
3025
/**
3126
* Manages VectorSchemaRoot lifecycle with integrated memory management and native call wrappers.
3227
* Provides a high-level interface for Parquet document operations using managed VSR abstractions.
@@ -113,7 +108,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
113108
}
114109
}
115110

116-
public FileMetadata flush(FlushIn flushIn) throws IOException {
111+
public String flush(FlushIn flushIn) throws IOException {
117112
System.out.println("[JAVA] flush called, row count: " + managedVSR.getRowCount());
118113
try {
119114
// Only flush if we have data
@@ -136,7 +131,7 @@ public FileMetadata flush(FlushIn flushIn) throws IOException {
136131
}
137132
System.out.println("[JAVA] Successfully flushed data");
138133

139-
return new FileMetadata(PARQUET_DATA_FORMAT, fileName);
134+
return fileName;
140135
} catch (Exception e) {
141136
System.out.println("[JAVA] ERROR in flush: " + e.getMessage());
142137
throw new IOException("Failed to flush data: " + e.getMessage(), e);

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/writer/ParquetWriter.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.parquet.parquetdataformat.writer;
22

33
import com.parquet.parquetdataformat.vsr.VSRManager;
4-
import org.opensearch.index.engine.exec.FileMetadata;
4+
import org.apache.arrow.vector.types.pojo.Schema;
5+
import org.opensearch.index.engine.exec.FileInfos;
56
import org.opensearch.index.engine.exec.FlushIn;
67
import org.opensearch.index.engine.exec.WriteResult;
78
import org.opensearch.index.engine.exec.Writer;
8-
import org.apache.arrow.vector.types.pojo.Schema;
9+
import org.opensearch.index.engine.exec.WriterFileSet;
910

1011
import java.io.IOException;
11-
import java.util.Optional;
12+
import java.nio.file.Path;
13+
14+
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
1215

1316
/**
1417
* Parquet file writer implementation that integrates with OpenSearch's Writer interface.
@@ -33,11 +36,13 @@ public class ParquetWriter implements Writer<ParquetDocumentInput> {
3336
private final String file;
3437
private final Schema schema;
3538
private final VSRManager vsrManager;
39+
private final long writerGeneration;
3640

37-
public ParquetWriter(String file, Schema schema) {
41+
public ParquetWriter(String file, Schema schema, long writerGeneration) {
3842
this.file = file;
3943
this.schema = schema;
4044
this.vsrManager = new VSRManager(file, schema);
45+
this.writerGeneration = writerGeneration;
4146
}
4247

4348
@Override
@@ -46,8 +51,13 @@ public WriteResult addDoc(ParquetDocumentInput d) throws IOException {
4651
}
4752

4853
@Override
49-
public FileMetadata flush(FlushIn flushIn) throws IOException {
50-
return vsrManager.flush(flushIn);
54+
public FileInfos flush(FlushIn flushIn) throws IOException {
55+
String fileName = vsrManager.flush(flushIn);
56+
FileInfos fileInfos = new FileInfos();
57+
WriterFileSet writerFileSet = new WriterFileSet(Path.of(fileName).getParent(), writerGeneration);
58+
writerFileSet.add(fileName);
59+
fileInfos.putWriterFileSet(PARQUET_DATA_FORMAT, writerFileSet);
60+
return fileInfos;
5161
}
5262

5363
@Override
@@ -60,11 +70,6 @@ public void close() {
6070
vsrManager.close();
6171
}
6272

63-
@Override
64-
public Optional<FileMetadata> getMetadata() {
65-
return Optional.empty();
66-
}
67-
6873
@Override
6974
public ParquetDocumentInput newDocumentInput() {
7075
// Get a new ManagedVSR from VSRManager for this document input

plugins/engine-datafusion/jni/src/util.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,16 @@ pub fn throw_exception(env: &mut JNIEnv, message: &str) {
160160
pub fn create_object_meta_from_filenames(base_path: &str, filenames: Vec<String>) -> Vec<ObjectMeta> {
161161
filenames.into_iter().map(|filename| {
162162
let filename = filename.as_str();
163-
let full_path = format!("{}/{}", base_path.trim_end_matches('/'), filename);
163+
164+
// Handle both full paths and relative filenames
165+
let full_path = if filename.starts_with('/') || filename.contains(base_path) {
166+
// Already a full path
167+
filename.to_string()
168+
} else {
169+
// Just a filename, needs base_path
170+
format!("{}/{}", base_path.trim_end_matches('/'), filename)
171+
};
172+
164173
let file_size = fs::metadata(&full_path).map(|m| m.len()).unwrap_or(0);
165174
let modified = fs::metadata(&full_path)
166175
.and_then(|m| m.modified())

plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionReader.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99
package org.opensearch.datafusion.search;
1010

1111
import org.opensearch.datafusion.DataFusionQueryJNI;
12-
import org.opensearch.index.engine.exec.FileMetadata;
12+
import org.opensearch.index.engine.exec.WriterFileSet;
1313

1414
import java.io.Closeable;
1515
import java.io.IOException;
16-
import java.nio.file.Path;
1716
import java.util.Arrays;
1817
import java.util.Collection;
19-
import java.util.Objects;
2018
import java.util.concurrent.atomic.AtomicInteger;
2119

2220
import static org.opensearch.datafusion.DataFusionQueryJNI.closeDatafusionReader;
@@ -32,7 +30,7 @@ public class DatafusionReader implements Closeable {
3230
/**
3331
* The file metadata collection.
3432
*/
35-
public Collection<FileMetadata> files;
33+
public Collection<WriterFileSet> files;
3634
/**
3735
* The cache pointer.
3836
*/
@@ -44,15 +42,16 @@ public class DatafusionReader implements Closeable {
4442
* @param directoryPath The directory path
4543
* @param files The file metadata collection
4644
*/
47-
public DatafusionReader(String directoryPath, Collection<FileMetadata> files) {
45+
public DatafusionReader(String directoryPath, Collection<WriterFileSet> files) {
4846
this.directoryPath = directoryPath;
4947
this.files = files;
5048
String[] fileNames = new String[0];
5149
if(files != null) {
5250
System.out.println("Got the files!!!!!");
53-
fileNames = files.stream().map(file -> Path.of(file.fileName()).getFileName().toString()).toArray(String[]::new);
51+
fileNames = files.stream()
52+
.flatMap(writerFileSet -> writerFileSet.getFiles().stream())
53+
.toArray(String[]::new);
5454
}
55-
//String[] fileNames = files.stream().map(file -> Path.of(file.fileName()).getFileName().toString()).toArray(String[]::new);
5655
System.out.println("File names: " + Arrays.toString(fileNames));
5756
System.out.println("Directory path: " + directoryPath);
5857

server/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ dependencies {
7878
compileOnly project(':libs:opensearch-plugin-classloader')
7979
testRuntimeOnly project(':libs:opensearch-plugin-classloader')
8080

81+
implementation 'org.apache.commons:commons-lang3:3.17.0'
82+
8183
api libs.bundles.lucene
8284

8385
// utilities
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.engine.exec;
10+
11+
import java.util.Collections;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.Optional;
15+
16+
public final class FileInfos {
17+
18+
private final Map<DataFormat, WriterFileSet> writerFilesMap;
19+
20+
public FileInfos() {
21+
this.writerFilesMap = new HashMap<>();
22+
}
23+
24+
public Map<DataFormat, WriterFileSet> getWriterFilesMap() {
25+
return Collections.unmodifiableMap(writerFilesMap);
26+
}
27+
28+
public void putWriterFileSet(DataFormat format, WriterFileSet writerFileSet) {
29+
writerFilesMap.put(format, writerFileSet);
30+
}
31+
32+
public Optional<WriterFileSet> getWriterFileSet(DataFormat format) {
33+
return Optional.ofNullable(writerFilesMap.get(format));
34+
}
35+
}

server/src/main/java/org/opensearch/index/engine/exec/FileMetadata.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88

99
package org.opensearch.index.engine.exec;
1010

11-
public record FileMetadata(DataFormat df, String fileName) { }
11+
public record FileMetadata(String directory, String file) {
12+
}

0 commit comments

Comments
 (0)