Skip to content

Commit 7f6d309

Browse files
removing CSV codec and integrating with parquet module
Signed-off-by: bharath-techie <[email protected]>
1 parent 02bd3cc commit 7f6d309

31 files changed

+142
-1472
lines changed

gradle/missing-javadoc.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,6 @@ configure([
165165
project(":plugins:engine-datafusion"), //TODO
166166
project(":server"),
167167
project(":modules:parquet-data-format"),
168-
project(":plugins:dataformat-csv"), //TODO
169168
]) {
170169
project.tasks.withType(MissingJavadocTask) {
171170
isExcluded = true

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
*/
88
package com.parquet.parquetdataformat;
99

10+
import com.parquet.parquetdataformat.engine.ParquetDataFormat;
1011
import com.parquet.parquetdataformat.fields.ParquetFieldUtil;
12+
import com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec;
1113
import com.parquet.parquetdataformat.writer.ParquetWriter;
1214
import org.opensearch.index.engine.DataFormatPlugin;
1315
import org.opensearch.index.engine.exec.DataFormat;
@@ -18,8 +20,12 @@
1820
import org.opensearch.plugins.DataSourcePlugin;
1921
import org.opensearch.index.mapper.MapperService;
2022
import org.opensearch.plugins.Plugin;
23+
import org.opensearch.vectorized.execution.search.spi.DataSourceCodec;
2124

2225
import java.io.IOException;
26+
import java.util.HashMap;
27+
import java.util.Map;
28+
import java.util.Optional;
2329

2430
/**
2531
* OpenSearch plugin that provides Parquet data format support for indexing operations.
@@ -58,9 +64,23 @@ public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperSe
5864
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ParquetFieldUtil.getSchema(mapperService), shardPath);
5965
}
6066

67+
private Class<? extends DataFormat> getDataFormatType() {
68+
return ParquetDataFormat.class;
69+
}
70+
6171
@Override
6272
public DataFormat getDataFormat() {
63-
return null;
73+
return new ParquetDataFormat();
74+
}
75+
76+
@Override
77+
public Optional<Map<org.opensearch.vectorized.execution.search.DataFormat, DataSourceCodec>> getDataSourceCodecs() {
78+
Map<org.opensearch.vectorized.execution.search.DataFormat, DataSourceCodec> codecs = new HashMap<>();
79+
ParquetDataSourceCodec parquetDataSourceCodec = new ParquetDataSourceCodec();
80+
// TODO : version it correctly - similar to lucene codecs?
81+
codecs.put(parquetDataSourceCodec.getDataFormat(), new ParquetDataSourceCodec());
82+
return Optional.of(codecs);
83+
// return Optional.empty();
6484
}
6585

6686
// for testing locally only

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,21 @@
1111

1212
/**
1313
* JNI bridge to the native Rust Parquet writer implementation.
14-
*
14+
*
1515
* <p>This class provides the interface between Java and the native Rust library
1616
* that handles low-level Parquet file operations. It automatically loads the
1717
* appropriate native library for the current platform and architecture.
18-
*
18+
*
1919
* <p>Supported platforms:
2020
* <ul>
2121
* <li>Windows (x86, x86_64, aarch64)</li>
2222
* <li>macOS (x86_64, aarch64/arm64)</li>
2323
* <li>Linux (x86, x86_64, aarch64)</li>
2424
* </ul>
25-
*
25+
*
2626
* <p>The native library is extracted from resources and loaded as a temporary file,
2727
* which is automatically cleaned up on JVM shutdown.
28-
*
28+
*
2929
* <p>All native methods operate on Arrow C Data Interface pointers and return
3030
* integer status codes for error handling.
3131
*/
@@ -83,19 +83,37 @@ private static void loadNativeLibrary() {
8383
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
8484
public static native void closeWriter(String file) throws IOException;
8585
public static native void flushToDisk(String file) throws IOException;
86-
86+
8787
// State and metrics methods handled on Rust side
8888
public static native boolean writerExists(String file);
8989
public static native long getWriteCount(String file);
9090
public static native long getTotalRows(String file);
9191
public static native String[] getActiveWriters();
92-
92+
9393
// Validation helpers that could be implemented natively for better performance
9494
public static boolean isValidFileName(String fileName) {
9595
return fileName != null && !fileName.trim().isEmpty();
9696
}
97-
97+
9898
public static boolean isValidMemoryAddress(long address) {
9999
return address != 0;
100100
}
101+
102+
103+
// DATAFUSION specific native methods starts here
104+
105+
// Record batch and streaming related methods
106+
public static native String nativeNextBatch(long streamPtr);
107+
108+
public static native void nativeCloseStream(long streamPtr);
109+
110+
111+
// Native method declarations - these will be implemented in the JNI library
112+
public static native void nativeRegisterDirectory(String tableName, String directoryPath, String[] files, long runtimeId);
113+
114+
public static native long nativeCreateSessionContext(String[] configKeys, String[] configValues);
115+
116+
public static native long nativeExecuteSubstraitQuery(long sessionContextPtr, byte[] substraitPlan);
117+
118+
public static native void nativeCloseSessionContext(long sessionContextPtr);
101119
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetDataSourceCodec.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
package com.parquet.parquetdataformat.read;
9+
package com.parquet.parquetdataformat.engine.read;
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
@@ -20,6 +20,11 @@
2020
import java.util.concurrent.ConcurrentHashMap;
2121
import java.util.concurrent.atomic.AtomicLong;
2222

23+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCloseSessionContext;
24+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCreateSessionContext;
25+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeExecuteSubstraitQuery;
26+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeRegisterDirectory;
27+
2328
/**
2429
* Datasource codec implementation for parquet files
2530
*/
@@ -33,7 +38,7 @@ public class ParquetDataSourceCodec implements DataSourceCodec {
3338
// JNI library loading
3439
static {
3540
try {
36-
JniLibraryLoader.loadLibrary();
41+
//JniLibraryLoader.loadLibrary();
3742
logger.info("DataFusion JNI library loaded successfully");
3843
} catch (Exception e) {
3944
logger.error("Failed to load DataFusion JNI library", e);
@@ -135,13 +140,4 @@ public CompletableFuture<Void> closeSessionContext(long sessionContextId) {
135140
public DataFormat getDataFormat() {
136141
return DataFormat.CSV;
137142
}
138-
139-
// Native method declarations - these will be implemented in the JNI library
140-
private static native void nativeRegisterDirectory(String tableName, String directoryPath, String[] files, long runtimeId);
141-
142-
private static native long nativeCreateSessionContext(String[] configKeys, String[] configValues);
143-
144-
private static native long nativeExecuteSubstraitQuery(long sessionContextPtr, byte[] substraitPlan);
145-
146-
private static native void nativeCloseSessionContext(long sessionContextPtr);
147143
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetRecordBatchStream.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
* compatible open source license.
77
*/
88

9-
package com.parquet.parquetdataformat.read;
9+
package com.parquet.parquetdataformat.engine.read;
1010

1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.vectorized.execution.search.spi.RecordBatchStream;
1414

1515
import java.util.concurrent.CompletableFuture;
1616

17+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCloseStream;
18+
import static com.parquet.parquetdataformat.bridge.RustBridge.nativeNextBatch;
19+
1720
/**
1821
* TODO : this need not be here - nothing specific to parquet - move to LIB ?
1922
* Native implementation of RecordBatchStream that wraps a JNI stream pointer.
@@ -111,9 +114,4 @@ public void close() {
111114
}
112115
}
113116
}
114-
115-
// Native method declarations
116-
private static native String nativeNextBatch(long streamPtr);
117-
118-
private static native void nativeCloseStream(long streamPtr);
119117
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@
1010
* CSV data format implementation for DataFusion integration.
1111
* Provides CSV file reading capabilities through DataFusion query engine.
1212
*/
13-
package com.parquet.parquetdataformat.read;
13+
package com.parquet.parquetdataformat.engine.read;
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
org.opensearch.datafusion.csv.CsvDataSourceCodec
1+
com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec

modules/parquet-data-format/src/main/rust/Cargo.toml

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,56 @@ name = "parquet_dataformat_jni"
88
crate-type = ["cdylib"]
99

1010
[dependencies]
11-
jni = "0.21.1"
12-
arrow = { version = "53.0.0", features = ["ffi"] }
13-
parquet = "53.0.0"
11+
12+
# DataFusion dependencies
13+
datafusion = "49.0.0"
14+
datafusion-substrait = "49.0.0"
15+
arrow = { version = "54.0.0", features = ["ffi"] }
16+
17+
arrow-array = "54.0.0"
18+
arrow-schema = "54.0.0"
19+
arrow-buffer = "54.0.0"
20+
21+
# JNI dependencies
22+
jni = "0.21"
23+
24+
# Async runtime
25+
tokio = { version = "1.0", features = ["full"] }
26+
futures = "0.3"
27+
futures-util = "0.3"
28+
29+
# Serialization
30+
serde = { version = "1.0", features = ["derive"] }
31+
serde_json = "1.0"
32+
33+
# Error handling
34+
anyhow = "1.0"
35+
thiserror = "1.0"
36+
37+
# Logging
38+
log = "0.4"
39+
40+
# Parquet support
41+
parquet = "54.0.0"
42+
43+
# Object store for file access
44+
object_store = "0.11"
45+
url = "2.0"
46+
47+
# Substrait support
48+
substrait = "0.47"
49+
prost = "0.13"
50+
51+
# Temporary directory support
52+
tempfile = "3.0"
53+
54+
#jni = "0.21.1"
55+
#arrow = { version = "53.0.0", features = ["ffi"] }
56+
#parquet = "53.0.0"
1457
lazy_static = "1.4.0"
1558
dashmap = "7.0.0-rc2"
1659
chrono = "0.4"
60+
61+
62+
[build-dependencies]
63+
cbindgen = "0.27"

modules/parquet-data-format/src/main/rust/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ impl SessionContextManager {
2828
directory_path: &str,
2929
options: HashMap<String, String>,
3030
) -> Result<u64> {
31-
// Placeholder implementation - would register csv directory as table
31+
// Placeholder implementation - would register parquet directory as table
3232
log::info!("Registering directory: {} at path: {} with options: {:?}",
3333
table_name, directory_path, options);
3434

modules/parquet-data-format/src/main/rust/src/csv_exec.rs

Lines changed: 0 additions & 24 deletions
This file was deleted.

0 commit comments

Comments
 (0)