diff --git a/dataset/CMakeLists.txt b/dataset/CMakeLists.txt index 348850c3be..ddbafe07ff 100644 --- a/dataset/CMakeLists.txt +++ b/dataset/CMakeLists.txt @@ -25,6 +25,7 @@ add_jar(arrow_java_jni_dataset_jar src/main/java/org/apache/arrow/dataset/jni/JniLoader.java src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java src/main/java/org/apache/arrow/dataset/file/JniWrapper.java + src/main/java/org/apache/arrow/dataset/file/ParquetWriterProperties.java src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java diff --git a/dataset/pom.xml b/dataset/pom.xml index 6e56d555b7..d5ed9a0b57 100644 --- a/dataset/pom.xml +++ b/dataset/pom.xml @@ -172,6 +172,7 @@ under the License. ${arrow.cpp.build.dir} **/*arrow_dataset_jni.* + **/*arrow_cdata_jni.* diff --git a/dataset/src/main/cpp/jni_wrapper.cc b/dataset/src/main/cpp/jni_wrapper.cc index e8087648eb..035315ccf7 100644 --- a/dataset/src/main/cpp/jni_wrapper.cc +++ b/dataset/src/main/cpp/jni_wrapper.cc @@ -26,6 +26,7 @@ #include "arrow/compute/initialize.h" #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_parquet.h" #ifdef ARROW_CSV #include "arrow/dataset/file_csv.h" #endif @@ -36,6 +37,11 @@ #include "arrow/engine/substrait/relation.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" +#include "arrow/util/compression.h" +#include "parquet/arrow/writer.h" +#include "parquet/file_writer.h" +#include "parquet/stream_writer.h" +#include "arrow/io/file.h" #include "jni_util.h" #include "org_apache_arrow_dataset_file_JniWrapper.h" #include "org_apache_arrow_dataset_jni_JniWrapper.h" @@ -231,6 +237,253 @@ class DisposableScannerAdaptor { } }; +// Adapter to wrap Java OutputStream as Arrow OutputStream +class JavaOutputStreamAdapter : public arrow::io::OutputStream { + public: + JavaOutputStreamAdapter(JNIEnv* env, jobject java_output_stream) + : java_output_stream_(env->NewGlobalRef(java_output_stream)), + position_(0) { + JavaVM* vm; + env->GetJavaVM(&vm); + vm_ = vm; + + // Get method IDs (cache them as they're valid for the lifetime of the class) + JNIEnv* current_env = GetEnv(); + if (current_env) { + jclass output_stream_class = current_env->GetObjectClass(java_output_stream); + write_method_ = current_env->GetMethodID(output_stream_class, "write", "([BII)V"); + flush_method_ = current_env->GetMethodID(output_stream_class, "flush", "()V"); + close_method_ = current_env->GetMethodID(output_stream_class, "close", "()V"); + current_env->DeleteLocalRef(output_stream_class); + } + } + + ~JavaOutputStreamAdapter() override { + JNIEnv* env = GetEnv(); + if (env && java_output_stream_) { + env->DeleteGlobalRef(java_output_stream_); + } + } + + arrow::Status Close() override { + JNIEnv* env = GetEnv(); + if (!env) { + return arrow::Status::IOError("Failed to get JNI environment"); + } + if (java_output_stream_) { + env->CallVoidMethod(java_output_stream_, close_method_); + RETURN_NOT_OK(CheckJniException(env)); + env->DeleteGlobalRef(java_output_stream_); + java_output_stream_ = nullptr; + } + return arrow::Status::OK(); + } + + bool closed() const override { return java_output_stream_ == nullptr; } + + arrow::Result Tell() const override { return position_; } + + arrow::Status Write(const void* data, int64_t nbytes) override { + JNIEnv* env = GetEnv(); + if (!env) { + return arrow::Status::IOError("Failed to get JNI environment"); + } + if (!java_output_stream_) { + return arrow::Status::IOError("OutputStream is closed"); + } + + // Create byte array + jbyteArray byte_array = env->NewByteArray(static_cast(nbytes)); + if (byte_array == nullptr) { + return arrow::Status::OutOfMemory("Failed to allocate byte array"); + } + + // Copy data to byte array + env->SetByteArrayRegion(byte_array, 0, static_cast(nbytes), + reinterpret_cast(data)); + + // Call Java write method + env->CallVoidMethod(java_output_stream_, write_method_, byte_array, 0, + static_cast(nbytes)); + env->DeleteLocalRef(byte_array); + + RETURN_NOT_OK(CheckJniException(env)); + position_ += nbytes; + return arrow::Status::OK(); + } + + arrow::Status Flush() override { + JNIEnv* env = GetEnv(); + if (!env) { + return arrow::Status::IOError("Failed to get JNI environment"); + } + if (!java_output_stream_) { + return arrow::Status::IOError("OutputStream is closed"); + } + env->CallVoidMethod(java_output_stream_, flush_method_); + return CheckJniException(env); + } + + private: + JNIEnv* GetEnv() const { + JNIEnv* env; + if (vm_->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return nullptr; + } + return env; + } + + arrow::Status CheckJniException(JNIEnv* env) { + if (env->ExceptionCheck()) { + jthrowable exception = env->ExceptionOccurred(); + env->ExceptionClear(); + std::string error_msg = "Java exception occurred in OutputStream"; + // Try to get exception message + jclass exception_class = env->GetObjectClass(exception); + jmethodID get_message_method = + env->GetMethodID(exception_class, "toString", "()Ljava/lang/String;"); + if (get_message_method) { + jstring message = + (jstring)env->CallObjectMethod(exception, get_message_method); + if (message) { + const char* msg_chars = env->GetStringUTFChars(message, nullptr); + error_msg = std::string(msg_chars); + env->ReleaseStringUTFChars(message, msg_chars); + env->DeleteLocalRef(message); + } + } + env->DeleteLocalRef(exception); + env->DeleteLocalRef(exception_class); + return arrow::Status::IOError(error_msg); + } + return arrow::Status::OK(); + } + + JavaVM* vm_; + jobject java_output_stream_; + jmethodID write_method_; + jmethodID flush_method_; + jmethodID close_method_; + int64_t position_; +}; + +struct ParquetWriterHolder { + std::unique_ptr writer; + std::shared_ptr output_stream; + std::shared_ptr schema; +}; + +// Helper function to build WriterProperties from Java ParquetWriterProperties object +std::shared_ptr BuildWriterProperties(JNIEnv* env, + jobject java_properties) { + parquet::WriterProperties::Builder builder; + + if (java_properties == nullptr) { + return builder.build(); + } + + jclass props_class = env->GetObjectClass(java_properties); + + // Get maxRowGroupLength + jmethodID get_max_row_group_method = + env->GetMethodID(props_class, "getMaxRowGroupLength", "()J"); + if (get_max_row_group_method) { + jlong max_row_group = env->CallLongMethod(java_properties, get_max_row_group_method); + if (max_row_group > 0) { + builder.max_row_group_length(static_cast(max_row_group)); + } + } + + // Get writeBatchSize + jmethodID get_write_batch_size_method = + env->GetMethodID(props_class, "getWriteBatchSize", "()J"); + if (get_write_batch_size_method) { + jlong write_batch_size = env->CallLongMethod(java_properties, get_write_batch_size_method); + if (write_batch_size > 0) { + builder.write_batch_size(static_cast(write_batch_size)); + } + } + + // Get dataPageSize + jmethodID get_data_page_size_method = + env->GetMethodID(props_class, "getDataPageSize", "()J"); + if (get_data_page_size_method) { + jlong data_page_size = env->CallLongMethod(java_properties, get_data_page_size_method); + if (data_page_size > 0) { + builder.data_pagesize(static_cast(data_page_size)); + } + } + + // Get compressionCodec + jmethodID get_compression_codec_method = + env->GetMethodID(props_class, "getCompressionCodec", "()Ljava/lang/String;"); + if (get_compression_codec_method) { + jstring codec_str = (jstring)env->CallObjectMethod(java_properties, get_compression_codec_method); + if (codec_str != nullptr) { + std::string codec_name = arrow::dataset::jni::JStringToCString(env, codec_str); + // Use Arrow's Codec::GetCompressionType to parse compression name + auto arrow_compression_result = arrow::util::Codec::GetCompressionType(codec_name); + if (arrow_compression_result.ok()) { + // Parquet WriterProperties::Builder can accept Arrow Compression::type directly + arrow::Compression::type compression = arrow_compression_result.ValueOrDie(); + // Set compression for all columns (using Arrow compression type directly) + builder.compression(compression); + } else { + // If parsing fails, log a warning but continue with UNCOMPRESSED + // This allows the code to work even with unsupported compression types + } + env->DeleteLocalRef(codec_str); + } + } + + // Get compressionLevel + jmethodID get_compression_level_method = + env->GetMethodID(props_class, "getCompressionLevel", "()I"); + if (get_compression_level_method) { + jint comp_level = env->CallIntMethod(java_properties, get_compression_level_method); + if (comp_level > 0) { + builder.compression_level(comp_level); + } + } + + // Get writePageIndex + jmethodID get_write_page_index_method = + env->GetMethodID(props_class, "getWritePageIndex", "()Z"); + if (get_write_page_index_method) { + jboolean write_index = env->CallBooleanMethod(java_properties, get_write_page_index_method); + if (write_index) { + builder.enable_write_page_index(); + } + } + + env->DeleteLocalRef(props_class); + return builder.build(); +} + +// Helper function to build ArrowWriterProperties +std::shared_ptr BuildArrowWriterProperties(JNIEnv* env, + jobject java_properties) { + parquet::ArrowWriterProperties::Builder builder; + + if (java_properties == nullptr) { + return builder.build(); + } + + jclass props_class = env->GetObjectClass(java_properties); + + // Get useThreads (for ArrowWriterProperties) + jmethodID get_use_threads_method = + env->GetMethodID(props_class, "getUseThreads", "()Z"); + if (get_use_threads_method) { + jboolean use_threads = env->CallBooleanMethod(java_properties, get_use_threads_method); + builder.set_use_threads(use_threads); + } + + env->DeleteLocalRef(props_class); + return builder.build(); +} + + arrow::Result> SchemaFromColumnNames( const std::shared_ptr& input, const std::vector& column_names) { @@ -1019,3 +1272,87 @@ JNIEXPORT void JNICALL JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: nativeCreateParquetWriter + * Signature: (Ljava/io/OutputStream;JLorg/apache/arrow/dataset/file/ParquetWriterProperties;)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_nativeCreateParquetWriter( + JNIEnv* env, jobject, jobject java_output_stream, jlong arrow_schema_ptr, + jobject java_properties) { + JNI_METHOD_START + // Import Schema from Arrow C Data Interface + auto* c_schema = reinterpret_cast(arrow_schema_ptr); + std::shared_ptr schema = JniGetOrThrow(arrow::ImportSchema(c_schema)); + + // Create output stream adapter from Java OutputStream + auto output_stream = std::make_shared(env, java_output_stream); + + // Build Parquet writer properties from Java configuration + std::shared_ptr writer_props = + BuildWriterProperties(env, java_properties); + std::shared_ptr arrow_writer_props = + BuildArrowWriterProperties(env, java_properties); + + // Open Parquet file writer + auto writer_result = parquet::arrow::FileWriter::Open(*schema, arrow::default_memory_pool(), output_stream, + writer_props, arrow_writer_props); + std::unique_ptr writer = JniGetOrThrow(std::move(writer_result)); + + // Return the writer wrapped in a shared_ptr + auto holder = std::make_shared(); + holder->writer = std::move(writer); + holder->output_stream = output_stream; + holder->schema = schema; + return CreateNativeRef(holder); + JNI_METHOD_END(0L) +} + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: nativeWriteParquetBatch + * Signature: (JJ)I + */ +JNIEXPORT jint JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_nativeWriteParquetBatch( + JNIEnv* env, jobject, jlong native_ptr, jlong arrow_array_ptr) { + JNI_METHOD_START + auto holder = RetrieveNativeInstance(native_ptr); + if (!holder->writer) { + JniThrow("ParquetWriter is already closed"); + } + + // Import RecordBatch from Arrow C Data Interface + auto* c_array = reinterpret_cast(arrow_array_ptr); + std::shared_ptr batch = JniGetOrThrow(arrow::ImportRecordBatch(c_array, holder->schema)); + + // Write the RecordBatch + JniAssertOkOrThrow(holder->writer->WriteRecordBatch(*batch)); + return 1; // Success + JNI_METHOD_END(0) +} + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: nativeCloseParquetWriter + * Signature: (J)I + */ +JNIEXPORT jint JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_nativeCloseParquetWriter( + JNIEnv* env, jobject, jlong native_ptr) { + JNI_METHOD_START + auto holder = RetrieveNativeInstance(native_ptr); + if (holder->writer) { + JniAssertOkOrThrow(holder->writer->Close()); + holder->writer.reset(); + } + // Close output stream + if (holder->output_stream) { + JniAssertOkOrThrow(holder->output_stream->Close()); + } + ReleaseNativeRef(native_ptr); + return 1; // Success + JNI_METHOD_END(0) +} diff --git a/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index d2f842f99e..1b149224f0 100644 --- a/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -77,4 +77,32 @@ public native void writeFromScannerToFile( String[] partitionColumns, int maxPartitions, String baseNameTemplate); + + /** + * Create a Parquet writer instance. + * + * @param outputStream the Java OutputStream to write Parquet data to + * @param arrowSchemaPtr the native pointer to ArrowSchema + * @param properties optional writer properties (can be null for defaults) + * @return the native pointer of the ParquetWriterHolder instance + */ + public native long nativeCreateParquetWriter( + java.io.OutputStream outputStream, long arrowSchemaPtr, ParquetWriterProperties properties); + + /** + * Write a RecordBatch to Parquet file. + * + * @param nativePtr the native pointer of the ParquetWriterHolder instance + * @param arrowArrayPtr the native pointer to ArrowArray + * @return 1 on success, 0 on failure + */ + public native int nativeWriteParquetBatch(long nativePtr, long arrowArrayPtr); + + /** + * Close a Parquet writer instance. + * + * @param nativePtr the native pointer of the ParquetWriterHolder instance + * @return 1 on success, 0 on failure + */ + public native int nativeCloseParquetWriter(long nativePtr); } diff --git a/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriter.java b/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriter.java new file mode 100644 index 0000000000..60c91c621e --- /dev/null +++ b/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.file; + +import java.io.IOException; +import java.io.OutputStream; +import org.apache.arrow.c.ArrowArray; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; + +/** JNI-based Parquet Writer. Provides methods to write VectorSchemaRoot to Parquet files. */ +public class ParquetWriter implements AutoCloseable { + + private static final JniWrapper jni = JniWrapper.get(); + + private final BufferAllocator allocator; + private final ArrowSchema arrowSchema; + private long nativePtr; + private boolean closed = false; + + /** + * Create a new ParquetWriter with default properties. + * + * @param outputStream the Java OutputStream to write Parquet data to + * @param schema the Arrow Schema + * @throws IOException if initialization fails + */ + public ParquetWriter(OutputStream outputStream, org.apache.arrow.vector.types.pojo.Schema schema) + throws IOException { + this(outputStream, schema, null); + } + + /** + * Create a new ParquetWriter with custom properties. + * + * @param outputStream the Java OutputStream to write Parquet data to + * @param schema the Arrow Schema + * @param properties optional writer properties (can be null for defaults) + * @throws IOException if initialization fails + */ + public ParquetWriter( + OutputStream outputStream, + org.apache.arrow.vector.types.pojo.Schema schema, + ParquetWriterProperties properties) + throws IOException { + this.allocator = new RootAllocator(); + ArrowSchema arrowSchemaLocal; + + try { + // Convert Java Schema to Arrow C Data Interface Schema + arrowSchemaLocal = ArrowSchema.allocateNew(this.allocator); + this.arrowSchema = arrowSchemaLocal; + Data.exportSchema(this.allocator, schema, null, arrowSchemaLocal); + } catch (Exception e) { + this.close(); + throw new IOException("Failed to convert schema to ArrowSchema: " + e.getMessage(), e); + } + + long ptr = jni.nativeCreateParquetWriter(outputStream, arrowSchema.memoryAddress(), properties); + + if (ptr == 0) { + this.close(); + throw new IOException("Failed to create ParquetWriter"); + } + this.nativePtr = ptr; + } + + /** + * Write VectorSchemaRoot to Parquet file. + * + * @param root VectorSchemaRoot object (contains data and schema) + * @throws IOException if write fails + */ + public void write(VectorSchemaRoot root) throws IOException { + if (closed) { + throw new IOException("ParquetWriter is already closed"); + } + + // Use VectorSchemaRoot's allocator to avoid allocator mismatch issues + // Get allocator from root's vector + BufferAllocator alloc = + root.getFieldVectors().isEmpty() ? allocator : root.getFieldVectors().get(0).getAllocator(); + + // Check if nativePtr is valid (before calling export) + if (nativePtr == 0) { + throw new IOException( + "ParquetWriter native pointer is invalid (0) - writer may be closed or not initialized"); + } + + try (ArrowArray arrowArray = ArrowArray.allocateNew(alloc)) { + // Convert VectorSchemaRoot to Arrow C Data Interface Array + // Must use the same allocator as VectorSchemaRoot for ArrowArray + // Export VectorSchemaRoot to C Data Interface + // This creates references to the data in VectorSchemaRoot + Data.exportVectorSchemaRoot(alloc, root, null, arrowArray, null); + + // Get memory address (must be called after export) + long arrayPtr = arrowArray.memoryAddress(); + + // Check if arrayPtr is valid + if (arrayPtr == 0) { + throw new IOException( + "Failed to get ArrowArray memory address (returned 0). " + + "This may indicate that exportVectorSchemaRoot failed or ArrowArray is invalid. " + + "VectorSchemaRoot row count: " + + root.getRowCount()); + } + + int result = jni.nativeWriteParquetBatch(nativePtr, arrayPtr); + arrowArray.release(); + // Check for pending exceptions (thrown from native side) + if (result == 0) { + throw new IOException("Failed to write RecordBatch (native method returned 0)"); + } + } + } + + /** + * Close ParquetWriter. + * + * @throws IOException if close fails + */ + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (nativePtr != 0) { + int result = jni.nativeCloseParquetWriter(nativePtr); + nativePtr = 0; + if (result == 0) { + throw new IOException("Failed to close ParquetWriter"); + } + } + if (arrowSchema != null) { + arrowSchema.release(); + arrowSchema.close(); + } + if (allocator != null) { + allocator.close(); + } + + closed = true; + } +} diff --git a/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriterProperties.java b/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriterProperties.java new file mode 100644 index 0000000000..323d01f07e --- /dev/null +++ b/dataset/src/main/java/org/apache/arrow/dataset/file/ParquetWriterProperties.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.file; + +/** + * Configuration properties for Parquet Writer. Allows customization of Parquet file writing + * behavior. Use {@link Builder} to create instances. + */ +public class ParquetWriterProperties { + + private final long writeBatchSize; + private final long maxRowGroupLength; + private final long dataPageSize; + private final int compressionLevel; + private final String compressionCodec; + private final boolean writePageIndex; + private final boolean useThreads; + + private ParquetWriterProperties(Builder builder) { + this.writeBatchSize = builder.writeBatchSize; + this.maxRowGroupLength = builder.maxRowGroupLength; + this.dataPageSize = builder.dataPageSize; + this.compressionLevel = builder.compressionLevel; + this.compressionCodec = builder.compressionCodec; + this.writePageIndex = builder.writePageIndex; + this.useThreads = builder.useThreads; + } + + /** + * Get the write batch size (number of rows per batch). + * + * @return write batch size, or -1 if not set + */ + public long getWriteBatchSize() { + return writeBatchSize; + } + + /** + * Create a new Builder for ParquetWriterProperties. + * + * @return a new Builder instance + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Get the maximum row group length. + * + * @return maximum row group length, or -1 if not set + */ + public long getMaxRowGroupLength() { + return maxRowGroupLength; + } + + /** + * Get the data page size. + * + * @return data page size, or -1 if not set + */ + public long getDataPageSize() { + return dataPageSize; + } + + /** + * Get the compression level. + * + * @return compression level, or -1 if not set + */ + public int getCompressionLevel() { + return compressionLevel; + } + + /** + * Get the compression codec. + * + * @return compression codec name, or null if not set + */ + public String getCompressionCodec() { + return compressionCodec; + } + + /** + * Get whether page index writing is enabled. + * + * @return true if page index writing is enabled + */ + public boolean getWritePageIndex() { + return writePageIndex; + } + + /** + * Get whether multi-threading is enabled. + * + * @return true if multi-threading is enabled + */ + public boolean getUseThreads() { + return useThreads; + } + + /** Builder for creating ParquetWriterProperties instances. */ + public static class Builder { + private long maxRowGroupLength = -1; + private long dataPageSize = -1; + private int compressionLevel = -1; + private String compressionCodec = null; + private boolean writePageIndex = false; + private boolean useThreads = false; + private long writeBatchSize = -1; + + private Builder() {} + + /** + * Set the write batch size (number of rows per batch). + * + * @param writeBatchSize number of rows per batch + * @return this builder for method chaining + */ + public Builder writeBatchSize(long writeBatchSize) { + this.writeBatchSize = writeBatchSize; + return this; + } + + /** + * Set the maximum row group length (in rows). + * + * @param maxRowGroupLength maximum number of rows per row group + * @return this builder for method chaining + */ + public Builder maxRowGroupLength(long maxRowGroupLength) { + this.maxRowGroupLength = maxRowGroupLength; + return this; + } + + /** + * Set the data page size (in bytes). + * + * @param dataPageSize data page size in bytes + * @return this builder for method chaining + */ + public Builder dataPageSize(long dataPageSize) { + this.dataPageSize = dataPageSize; + return this; + } + + /** + * Set the compression level. + * + * @param compressionLevel compression level (typically 1-9) + * @return this builder for method chaining + */ + public Builder compressionLevel(int compressionLevel) { + this.compressionLevel = compressionLevel; + return this; + } + + /** + * Set the compression codec. + * + * @param compressionCodec compression codec name (e.g., "SNAPPY", "GZIP", "LZ4", "ZSTD") + * @return this builder for method chaining + */ + public Builder compressionCodec(String compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + /** + * Enable or disable page index writing. + * + * @param writePageIndex whether to write page index + * @return this builder for method chaining + */ + public Builder writePageIndex(boolean writePageIndex) { + this.writePageIndex = writePageIndex; + return this; + } + + /** + * Enable or disable multi-threading for column writing. + * + * @param useThreads whether to use threads for column writing + * @return this builder for method chaining + */ + public Builder useThreads(boolean useThreads) { + this.useThreads = useThreads; + return this; + } + + /** + * Build a ParquetWriterProperties instance with the configured values. + * + * @return a new ParquetWriterProperties instance + */ + public ParquetWriterProperties build() { + return new ParquetWriterProperties(this); + } + } +} diff --git a/dataset/src/test/java/org/apache/arrow/dataset/file/TestParquetWriter.java b/dataset/src/test/java/org/apache/arrow/dataset/file/TestParquetWriter.java new file mode 100644 index 0000000000..4a6de441cd --- /dev/null +++ b/dataset/src/test/java/org/apache/arrow/dataset/file/TestParquetWriter.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.file; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.FileOutputStream; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import org.apache.arrow.dataset.TestDataset; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestParquetWriter extends TestDataset { + + @TempDir public File TMP; + + @Test + void testParquetWriter() throws Exception { + String parquetFilePath = + Paths.get(TMP.getAbsolutePath(), "testParquetWriter.parquet").toString(); + List fields = + Arrays.asList( + Field.nullable("id", new ArrowType.Int(32, true)), + Field.nullable("name", new ArrowType.Utf8())); + Schema arrowSchema = new Schema(fields); + + int[] ids = new int[] {1, 2, 3, 4, 5}; + String[] names = new String[] {"Alice", "Bob", "Charlie", "Diana", "Eve"}; + + // Write Parquet file + try (FileOutputStream fos = new FileOutputStream(parquetFilePath); + ParquetWriter writer = new ParquetWriter(fos, arrowSchema); + VectorSchemaRoot vectorSchemaRoot = createData(rootAllocator(), arrowSchema, ids, names)) { + writer.write(vectorSchemaRoot); + } + + // Verify file exists and is not empty + File parquetFile = new File(parquetFilePath); + assertTrue(parquetFile.exists(), "Parquet file should exist"); + + // Read and verify Parquet file content + FileSystemDatasetFactory factory = + new FileSystemDatasetFactory( + rootAllocator(), + NativeMemoryPool.getDefault(), + FileFormat.PARQUET, + parquetFile.toURI().toString()); + ScanOptions options = new ScanOptions(100); + Schema readSchema = factory.inspect(); + + // verify schema + assertEquals(arrowSchema, readSchema); + List batches = collectResultFromFactory(factory, options); + assertEquals(1, batches.size()); + ArrowRecordBatch batch = batches.get(0); + try (VectorSchemaRoot vsr = VectorSchemaRoot.create(readSchema, rootAllocator())) { + VectorLoader loader = new VectorLoader(vsr); + loader.load(batch); + + IntVector idVector = (IntVector) vsr.getVector("id"); + VarCharVector nameVector = (VarCharVector) vsr.getVector("name"); + for (int rowIndex = 0; rowIndex < batch.getLength(); rowIndex++) { + int actualId = idVector.get(rowIndex); + String actualName = nameVector.getObject(rowIndex).toString(); + // Find the corresponding expected values + int expectedId = ids[rowIndex]; + String expectedName = names[rowIndex]; + assertEquals(expectedId, actualId, "ID mismatch at row " + rowIndex); + assertEquals(expectedName, actualName, "Name mismatch at row " + rowIndex); + rowIndex++; + } + } + + AutoCloseables.close(factory); + AutoCloseables.close(batches); + } + + private static VectorSchemaRoot createData( + BufferAllocator allocator, Schema schema, int[] ids, String[] names) { + // Create VectorSchemaRoot from schema + VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator); + // Allocate space for vectors (we'll write 5 rows) + root.allocateNew(); + + // Get field vectors + IntVector idVector = (IntVector) root.getVector("id"); + VarCharVector nameVector = (VarCharVector) root.getVector("name"); + + // Write data to vectors + for (int i = 0; i < ids.length; i++) { + idVector.setSafe(i, ids[i]); + nameVector.setSafe(i, names[i].getBytes()); + } + + // Set the row count + root.setRowCount(ids.length); + + return root; + } +}