indexingExecutionEngine = indexingEngine();
+// //Create Writer
+// ParquetWriter writer = (ParquetWriter) indexingExecutionEngine.createWriter();
+// for (int i=0;i<10;i++) {
+// //Get DocumentInput
+// DocumentInput documentInput = writer.newDocumentInput();
+// ParquetDocumentInput parquetDocumentInput = (ParquetDocumentInput) documentInput;
+// //Populate data
+// DummyDataUtils.populateDocumentInput(parquetDocumentInput);
+// //Write document
+// writer.addDoc(parquetDocumentInput);
+// }
+// writer.flush(null);
+// writer.close();
+// //refresh engine
+// indexingExecutionEngine.refresh(null);
+ }
+
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ArrowExport.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ArrowExport.java
new file mode 100644
index 0000000000000..694df0c4a9f47
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/ArrowExport.java
@@ -0,0 +1,37 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package com.parquet.parquetdataformat.bridge;
+
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.ArrowSchema;
+
+/**
+ * Container for Arrow C Data Interface exports.
+ * Provides a safe wrapper around ArrowArray and ArrowSchema with proper resource management.
+ */
+public record ArrowExport(ArrowArray arrowArray, ArrowSchema arrowSchema) implements AutoCloseable {
+
+ public long getArrayAddress() {
+ return arrowArray.memoryAddress();
+ }
+
+ public long getSchemaAddress() {
+ return arrowSchema.memoryAddress();
+ }
+
+ @Override
+ public void close() {
+ if (arrowArray != null) {
+ arrowArray.close();
+ }
+ if (arrowSchema != null) {
+ arrowSchema.close();
+ }
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java
new file mode 100644
index 0000000000000..8ef4596395e97
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java
@@ -0,0 +1,119 @@
+package com.parquet.parquetdataformat.bridge;
+
+import org.opensearch.common.SuppressForbidden;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Locale;
+
+/**
+ * JNI bridge to the native Rust Parquet writer implementation.
+ *
+ * This class provides the interface between Java and the native Rust library
+ * that handles low-level Parquet file operations. It automatically loads the
+ * appropriate native library for the current platform and architecture.
+ *
+ *
Supported platforms:
+ *
+ * - Windows (x86, x86_64, aarch64)
+ * - macOS (x86_64, aarch64/arm64)
+ * - Linux (x86, x86_64, aarch64)
+ *
+ *
+ * The native library is extracted from resources and loaded as a temporary file,
+ * which is automatically cleaned up on JVM shutdown.
+ *
+ *
All native methods operate on Arrow C Data Interface pointers and return
+ * integer status codes for error handling.
+ */
+public class RustBridge {
+
+ static {
+ try {
+ loadNativeLibrary();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to load native Rust library", e);
+ }
+ }
+
+ @SuppressForbidden(reason = "Need to create temp files")
+ private static void loadNativeLibrary() {
+
+ String LIB_NAME = "parquet_dataformat_jni";
+ String os = System.getProperty("os.name").toLowerCase(Locale.ROOT);
+ String arch = System.getProperty("os.arch").toLowerCase(Locale.ROOT);
+
+ String osDir = os.contains("win") ? "windows" :
+ os.contains("mac") ? "macos" : "linux";
+ String archDir = arch.contains("aarch64") || arch.contains("arm64") ? "aarch64" :
+ arch.contains("64") ? "x86_64" : "x86";
+
+ String extension = os.contains("win") ? ".dll" :
+ os.contains("mac") ? ".dylib" : ".so";
+
+ String resourcePath = String.format(Locale.ROOT, "/native/%s/%s/lib%s%s", osDir, archDir, LIB_NAME, extension);
+
+ try (InputStream is = RustBridge.class.getResourceAsStream(resourcePath)) {
+ if (is == null) {
+ throw new UnsatisfiedLinkError("Native library not found in resources: " + resourcePath);
+ }
+
+ Path tempFile = Files.createTempFile("lib" + LIB_NAME, extension);
+
+ // Register deletion hook on JVM shutdown
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ try {
+ Files.deleteIfExists(tempFile);
+ } catch (IOException ignored) {}
+ }));
+
+ Files.copy(is, tempFile, StandardCopyOption.REPLACE_EXISTING);
+
+ System.load(tempFile.toAbsolutePath().toString());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to load native library from resources", e);
+ }
+ }
+
+ // Enhanced native methods that handle validation and provide better error reporting
+ public static native void createWriter(String file, long schemaAddress) throws IOException;
+ public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
+ public static native void closeWriter(String file) throws IOException;
+ public static native void flushToDisk(String file) throws IOException;
+
+ // State and metrics methods handled on Rust side
+ public static native boolean writerExists(String file);
+ public static native long getWriteCount(String file);
+ public static native long getTotalRows(String file);
+ public static native String[] getActiveWriters();
+
+ // Validation helpers that could be implemented natively for better performance
+ public static boolean isValidFileName(String fileName) {
+ return fileName != null && !fileName.trim().isEmpty();
+ }
+
+ public static boolean isValidMemoryAddress(long address) {
+ return address != 0;
+ }
+
+
+ // DATAFUSION specific native methods starts here
+
+ // Record batch and streaming related methods
+ public static native String nativeNextBatch(long streamPtr);
+
+ public static native void nativeCloseStream(long streamPtr);
+
+
+ // Native method declarations - these will be implemented in the JNI library
+ public static native void nativeRegisterDirectory(String tableName, String directoryPath, String[] files, long runtimeId);
+
+ public static native long nativeCreateSessionContext(String[] configKeys, String[] configValues);
+
+ public static native long nativeExecuteSubstraitQuery(long sessionContextPtr, byte[] substraitPlan);
+
+ public static native void nativeCloseSessionContext(long sessionContextPtr);
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/FieldTypeConverter.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/FieldTypeConverter.java
new file mode 100644
index 0000000000000..b4ace7c4b1953
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/FieldTypeConverter.java
@@ -0,0 +1,135 @@
+package com.parquet.parquetdataformat.converter;
+
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.lucene.search.Query;
+import org.opensearch.index.mapper.MappedFieldType;
+import org.opensearch.index.mapper.TextSearchInfo;
+import org.opensearch.index.mapper.ValueFetcher;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class for converting between OpenSearch field types and Arrow/Parquet types.
+ *
+ *
This converter provides bidirectional mapping between OpenSearch's field type system
+ * and Apache Arrow's type system, which serves as the bridge to Parquet data representation.
+ * It handles the complete conversion pipeline from OpenSearch indexed data to columnar
+ * Parquet storage format.
+ *
+ *
Supported type conversions:
+ *
+ * - OpenSearch numeric types (long, integer, short, byte, double, float) → Arrow Int/FloatingPoint
+ * - OpenSearch boolean → Arrow Bool
+ * - OpenSearch date → Arrow Timestamp
+ * - OpenSearch text/keyword → Arrow Utf8
+ *
+ *
+ * The converter also provides reverse mapping capabilities to reconstruct OpenSearch
+ * field types from Arrow types, enabling proper schema reconstruction during read operations.
+ *
+ *
All conversion methods are static and thread-safe, making them suitable for concurrent
+ * use across multiple writer instances.
+ */
+public class FieldTypeConverter {
+
+ public static Map convertToArrowFieldMap(MappedFieldType mappedFieldType, Object value) {
+ Map fieldMap = new HashMap<>();
+ FieldType arrowFieldType = convertToArrowFieldType(mappedFieldType);
+ fieldMap.put(arrowFieldType, value);
+ return fieldMap;
+ }
+
+ public static FieldType convertToArrowFieldType(MappedFieldType mappedFieldType) {
+ ArrowType arrowType = getArrowType(mappedFieldType.typeName());
+ return new FieldType(true, arrowType, null);
+ }
+
+ public static ParquetFieldType convertToParquetFieldType(MappedFieldType mappedFieldType) {
+ ArrowType arrowType = getArrowType(mappedFieldType.typeName());
+ return new ParquetFieldType(mappedFieldType.name(), arrowType);
+ }
+
+ public static MappedFieldType convertToMappedFieldType(String name, ArrowType arrowType) {
+ String opensearchType = getOpenSearchType(arrowType);
+ return new MockMappedFieldType(name, opensearchType);
+ }
+
+ private static ArrowType getArrowType(String opensearchType) {
+ switch (opensearchType) {
+ case "long":
+ return new ArrowType.Int(64, true);
+ case "integer":
+ return new ArrowType.Int(32, true);
+ case "short":
+ return new ArrowType.Int(16, true);
+ case "byte":
+ return new ArrowType.Int(8, true);
+ case "double":
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case "float":
+ return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE);
+ case "boolean":
+ return new ArrowType.Bool();
+ case "date":
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ default:
+ return new ArrowType.Utf8();
+ }
+ }
+
+ private static String getOpenSearchType(ArrowType arrowType) {
+ switch (arrowType) {
+ case ArrowType.Int intType -> {
+ return switch (intType.getBitWidth()) {
+ case 8 -> "byte";
+ case 16 -> "short";
+ case 32 -> "integer";
+ case 64 -> "long";
+ default -> "integer";
+ };
+ }
+ case ArrowType.FloatingPoint fpType -> {
+ return fpType.getPrecision() == FloatingPointPrecision.DOUBLE ? "double" : "float";
+ }
+ case ArrowType.Bool bool -> {
+ return "boolean";
+ }
+ case ArrowType.Timestamp timestamp -> {
+ return "date";
+ }
+ case null, default -> {
+ return "text";
+ }
+ }
+ }
+
+ private static class MockMappedFieldType extends MappedFieldType {
+ private final String type;
+
+ public MockMappedFieldType(String name, String type) {
+ super(name, true, false, false, TextSearchInfo.NONE, null);
+ this.type = type;
+ }
+
+ @Override
+ public String typeName() {
+ return type;
+ }
+
+ @Override
+ public ValueFetcher valueFetcher(org.opensearch.index.query.QueryShardContext context,
+ org.opensearch.search.lookup.SearchLookup searchLookup,
+ String format) {
+ return null;
+ }
+
+ @Override
+ public Query termQuery(Object value, org.opensearch.index.query.QueryShardContext context) {
+ return null;
+ }
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/ParquetFieldType.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/ParquetFieldType.java
new file mode 100644
index 0000000000000..84f1b9a4bedd2
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/converter/ParquetFieldType.java
@@ -0,0 +1,48 @@
+package com.parquet.parquetdataformat.converter;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+
+/**
+ * Represents a field type for Parquet-based document fields.
+ *
+ * This class encapsulates the field name and Arrow type information
+ * required for proper type mapping between OpenSearch fields and Parquet
+ * column definitions. It serves as the intermediate representation used
+ * throughout the Parquet processing pipeline.
+ *
+ *
The Arrow type system provides a rich set of data types that can
+ * accurately represent various field types from OpenSearch, ensuring
+ * proper data serialization and deserialization.
+ *
+ *
Key features:
+ *
+ * - Field name preservation for schema mapping
+ * - Arrow type integration for precise data representation
+ * - Simple mutable structure for field definition building
+ *
+ */
+public class ParquetFieldType {
+ private String name;
+ private ArrowType type;
+
+ public ParquetFieldType(String name, ArrowType type) {
+ this.name = name;
+ this.type = type;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public ArrowType getType() {
+ return type;
+ }
+
+ public void setType(ArrowType type) {
+ this.type = type;
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/DummyDataUtils.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/DummyDataUtils.java
new file mode 100644
index 0000000000000..0d6c2519d463a
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/DummyDataUtils.java
@@ -0,0 +1,60 @@
+package com.parquet.parquetdataformat.engine;
+
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.opensearch.common.SuppressForbidden;
+import org.opensearch.index.engine.exec.DocumentInput;
+import org.opensearch.index.mapper.MappedFieldType;
+import com.parquet.parquetdataformat.converter.FieldTypeConverter;
+
+import java.util.Arrays;
+import java.util.Random;
+
+@SuppressForbidden(reason = "Need random for creating temp files")
+public class DummyDataUtils {
+ public static Schema getSchema() {
+ // Create the most minimal schema possible - just one string field
+ return new Schema(Arrays.asList(
+ Field.notNullable(ID, new ArrowType.Int(32, true)),
+ Field.nullable(NAME, new ArrowType.Utf8()),
+ Field.nullable(DESIGNATION, new ArrowType.Utf8()),
+ Field.nullable(SALARY, new ArrowType.Int(32, true))
+ ));
+ }
+
+ public static void populateDocumentInput(DocumentInput> documentInput) {
+ MappedFieldType idField = FieldTypeConverter.convertToMappedFieldType(ID, new ArrowType.Int(32, true));
+ documentInput.addField(idField, generateRandomId());
+ MappedFieldType nameField = FieldTypeConverter.convertToMappedFieldType(NAME, new ArrowType.Utf8());
+ documentInput.addField(nameField, generateRandomName());
+ MappedFieldType designationField = FieldTypeConverter.convertToMappedFieldType(DESIGNATION, new ArrowType.Utf8());
+ documentInput.addField(designationField, generateRandomDesignation());
+ MappedFieldType salaryField = FieldTypeConverter.convertToMappedFieldType(SALARY, new ArrowType.Int(32, true));
+ documentInput.addField(salaryField, random.nextInt(100000));
+ }
+
+ private static final String ID = "id";
+ private static final String NAME = "name";
+ private static final String DESIGNATION = "designation";
+ private static final String SALARY = "salary";
+ private static final String INCREMENT = "increment";
+ private static final Random random = new Random();
+ private static final String[] NAMES = {"John Doe", "Jane Smith", "Alice Johnson", "Bob Wilson", "Carol Brown"};
+ private static final String[] DESIGNATIONS = {"Software Engineer", "Senior Developer", "Team Lead", "Manager", "Architect"};
+
+ private static int generateRandomId() {
+ return random.nextInt(1000000);
+ }
+
+ private static String generateRandomName() {
+ return NAMES[random.nextInt(NAMES.length)];
+ }
+
+ private static String generateRandomDesignation() {
+ return DESIGNATIONS[random.nextInt(DESIGNATIONS.length)];
+ }
+
+
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetDataFormat.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetDataFormat.java
new file mode 100644
index 0000000000000..240a33c10531e
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetDataFormat.java
@@ -0,0 +1,58 @@
+package com.parquet.parquetdataformat.engine;
+
+import org.opensearch.common.settings.Setting;
+import org.opensearch.common.settings.Settings;
+import org.opensearch.index.engine.exec.DataFormat;
+
+/**
+ * Data format implementation for Parquet-based document storage.
+ *
+ * This class integrates with OpenSearch's DataFormat interface to provide
+ * Parquet file format support within the OpenSearch indexing pipeline. It
+ * defines the configuration and behavior for the "parquet" data format.
+ *
+ *
The implementation provides hooks for:
+ *
+ * - Data format specific settings configuration
+ * - Cluster-level settings management
+ * - Store configuration for Parquet-specific optimizations
+ * - Format identification through the "parquet" name
+ *
+ *
+ * This class serves as the entry point for registering Parquet format
+ * capabilities with OpenSearch's execution engine framework, allowing
+ * the system to recognize and utilize Parquet-based storage operations.
+ */
+public class ParquetDataFormat implements DataFormat {
+ @Override
+ public Setting dataFormatSettings() {
+ return null;
+ }
+
+ @Override
+ public Setting clusterLeveldataFormatSettings() {
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return "parquet";
+ }
+
+ @Override
+ public void configureStore() {
+
+ }
+
+ public static ParquetDataFormat PARQUET_DATA_FORMAT = new ParquetDataFormat();
+
+ @Override
+ public boolean equals(Object obj) {
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java
new file mode 100644
index 0000000000000..4778d21f51452
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java
@@ -0,0 +1,86 @@
+package com.parquet.parquetdataformat.engine;
+
+import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
+import com.parquet.parquetdataformat.writer.ParquetWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.opensearch.index.engine.exec.DataFormat;
+import org.opensearch.index.engine.exec.IndexingExecutionEngine;
+import org.opensearch.index.engine.exec.RefreshInput;
+import org.opensearch.index.engine.exec.RefreshResult;
+import org.opensearch.index.engine.exec.Writer;
+import org.opensearch.index.engine.exec.WriterFileSet;
+import org.opensearch.index.shard.ShardPath;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
+
+/**
+ * Main execution engine for Parquet-based indexing operations in OpenSearch.
+ *
+ * This engine implements OpenSearch's IndexingExecutionEngine interface to provide
+ * Parquet file generation capabilities within the indexing pipeline. It manages the
+ * lifecycle of Parquet writers and coordinates the overall document processing workflow.
+ *
+ *
Key responsibilities:
+ *
+ * - Writer creation with unique file naming and Arrow schema integration
+ * - Schema-based field type support and validation
+ * - Refresh operations for completing indexing cycles
+ * - Integration with the broader Parquet data format ecosystem
+ *
+ *
+ * The engine uses an atomic counter to ensure unique Parquet file names across
+ * concurrent operations, following the naming pattern "parquet_file_generation_N.parquet"
+ * where N is an incrementing sequence number.
+ *
+ *
Each writer instance created by this engine is configured with:
+ *
+ * - A unique file name for output isolation
+ * - The Arrow schema provided during engine construction
+ * - Full access to the Parquet processing pipeline via {@link ParquetWriter}
+ *
+ *
+ * The engine is designed to work with {@link ParquetDocumentInput} for document
+ * processing and integrates seamlessly with OpenSearch's execution framework.
+ */
+public class ParquetExecutionEngine implements IndexingExecutionEngine {
+
+ public static final String FILE_NAME_PREFIX = "parquet_file_generation";
+ private final Supplier schema;
+ private final List filesWrittenAlready = new ArrayList<>();
+ private final ShardPath shardPath;
+
+ public ParquetExecutionEngine(Supplier schema, ShardPath shardPath) {
+ this.schema = schema;
+ this.shardPath = shardPath;
+ }
+
+ @Override
+ public List supportedFieldTypes() {
+ return List.of();
+ }
+
+ @Override
+ public Writer createWriter(long writerGeneration) throws IOException {
+ String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + ".parquet").toString();
+ return new ParquetWriter(fileName, schema.get(), writerGeneration);
+ }
+
+ @Override
+ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
+ RefreshResult refreshResult = new RefreshResult();
+ filesWrittenAlready.addAll(refreshInput.getWriterFiles());
+ refreshResult.add(PARQUET_DATA_FORMAT, filesWrittenAlready);
+ return refreshResult;
+ }
+
+ @Override
+ public DataFormat getDataFormat() {
+ return new ParquetDataFormat();
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetDataSourceCodec.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetDataSourceCodec.java
new file mode 100644
index 0000000000000..f20a9bae06ea2
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetDataSourceCodec.java
@@ -0,0 +1,143 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package com.parquet.parquetdataformat.engine.read;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.vectorized.execution.search.DataFormat;
+import org.opensearch.vectorized.execution.search.spi.DataSourceCodec;
+import org.opensearch.vectorized.execution.search.spi.RecordBatchStream;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCloseSessionContext;
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCreateSessionContext;
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeExecuteSubstraitQuery;
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeRegisterDirectory;
+
+/**
+ * Datasource codec implementation for parquet files
+ */
+public class ParquetDataSourceCodec implements DataSourceCodec {
+
+ private static final Logger logger = LogManager.getLogger(ParquetDataSourceCodec.class);
+ private static final AtomicLong runtimeIdGenerator = new AtomicLong(0);
+ private static final AtomicLong sessionIdGenerator = new AtomicLong(0);
+ private final ConcurrentHashMap sessionContexts = new ConcurrentHashMap<>();
+
+ // JNI library loading
+ static {
+ try {
+ //JniLibraryLoader.loadLibrary();
+ logger.info("DataFusion JNI library loaded successfully");
+ } catch (Exception e) {
+ logger.error("Failed to load DataFusion JNI library", e);
+ throw new RuntimeException("Failed to initialize DataFusion JNI library", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture registerDirectory(String directoryPath, List fileNames, long runtimeId) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ logger.debug("Registering directory: {} with {} files", directoryPath, fileNames.size());
+
+ // Convert file names to arrays for JNI
+ String[] fileArray = fileNames.toArray(new String[0]);
+
+ // Call native method to register directory
+ nativeRegisterDirectory("csv_table", directoryPath, fileArray, runtimeId);
+ return null;
+ } catch (Exception e) {
+ logger.error("Failed to register directory: " + directoryPath, e);
+ throw new CompletionException("Failed to register directory", e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture createSessionContext(long globalRuntimeEnvId) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ long sessionId = sessionIdGenerator.incrementAndGet();
+ logger.debug("Creating session context with ID: {} for runtime: {}", sessionId, globalRuntimeEnvId);
+
+ // Default configuration
+ String[] configKeys = { "batch_size", "target_partitions" };
+ String[] configValues = { "1024", "4" };
+
+ // Create native session context
+ long nativeContextPtr = nativeCreateSessionContext(configKeys, configValues);
+ sessionContexts.put(sessionId, nativeContextPtr);
+
+ logger.info("Created session context with ID: {}", sessionId);
+ return sessionId;
+ } catch (Exception e) {
+ logger.error("Failed to create session context for runtime: " + globalRuntimeEnvId, e);
+ throw new CompletionException("Failed to create session context", e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture executeSubstraitQuery(long sessionContextId, byte[] substraitPlanBytes) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ logger.debug("Executing Substrait query for session: {}", sessionContextId);
+
+ Long nativeContextPtr = sessionContexts.get(sessionContextId);
+ if (nativeContextPtr == null) {
+ throw new IllegalArgumentException("Invalid session context ID: " + sessionContextId);
+ }
+
+ // Execute query and get native stream pointer
+ long nativeStreamPtr = nativeExecuteSubstraitQuery(nativeContextPtr, substraitPlanBytes);
+
+ // Create Java wrapper for the native stream
+ RecordBatchStream stream = new ParquetRecordBatchStream(nativeStreamPtr);
+
+ logger.info("Successfully executed Substrait query for session: {}", sessionContextId);
+ return stream;
+ } catch (Exception e) {
+ logger.error("Failed to execute Substrait query for session: " + sessionContextId, e);
+ throw new CompletionException("Failed to execute Substrait query", e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture closeSessionContext(long sessionContextId) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ logger.debug("Closing session context: {}", sessionContextId);
+
+ Long nativeContextPtr = sessionContexts.remove(sessionContextId);
+ if (nativeContextPtr != null) {
+ nativeCloseSessionContext(nativeContextPtr);
+ logger.info("Successfully closed session context: {}", sessionContextId);
+ } else {
+ logger.warn("Session context not found: {}", sessionContextId);
+ }
+
+ return null;
+ } catch (Exception e) {
+ logger.error("Failed to close session context: " + sessionContextId, e);
+ throw new CompletionException("Failed to close session context", e);
+ }
+ });
+ }
+
+ public DataFormat getDataFormat() {
+ return DataFormat.CSV;
+ }
+}
diff --git a/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetRecordBatchStream.java b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetRecordBatchStream.java
new file mode 100644
index 0000000000000..3c23e4fd9d1b5
--- /dev/null
+++ b/modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/read/ParquetRecordBatchStream.java
@@ -0,0 +1,117 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package com.parquet.parquetdataformat.engine.read;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.opensearch.vectorized.execution.search.spi.RecordBatchStream;
+
+import java.util.concurrent.CompletableFuture;
+
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeCloseStream;
+import static com.parquet.parquetdataformat.bridge.RustBridge.nativeNextBatch;
+
+/**
+ * TODO : this need not be here - nothing specific to parquet - move to LIB ?
+ * Native implementation of RecordBatchStream that wraps a JNI stream pointer.
+ * This class provides a Java interface over native DataFusion record batches.
+ */
+public class ParquetRecordBatchStream implements RecordBatchStream {
+
+ private static final Logger logger = LogManager.getLogger(ParquetRecordBatchStream.class);
+
+ private final long nativeStreamPtr;
+ private volatile boolean closed = false;
+ private volatile boolean hasNextCached = false;
+ private volatile boolean hasNextValue = false;
+
+ /**
+ * Creates a new ParquetRecordBatchStream wrapping the given native stream pointer.
+ *
+ * @param nativeStreamPtr Pointer to the native DataFusion RecordBatch stream
+ */
+ public ParquetRecordBatchStream(long nativeStreamPtr) {
+ if (nativeStreamPtr == 0) {
+ throw new IllegalArgumentException("Invalid native stream pointer");
+ }
+ this.nativeStreamPtr = nativeStreamPtr;
+ logger.debug("Created ParquetRecordBatchStream with pointer: {}", nativeStreamPtr);
+ }
+
+ @Override
+ public Object getSchema() {
+ return "ParquetSchema"; // Placeholder
+ }
+
+ @Override
+ public CompletableFuture