Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@
import org.apache.hudi.storage.StoragePath;

import com.lancedb.lance.spark.arrow.LanceArrowWriter;
import lombok.AllArgsConstructor;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
import org.apache.spark.unsafe.types.UTF8String;

import java.io.IOException;
import java.util.List;
import java.util.function.Function;

import static org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD;
Expand Down Expand Up @@ -62,7 +63,6 @@ public class HoodieSparkLanceWriter extends HoodieBaseLanceWriter<InternalRow>
private final UTF8String instantTime;
private final boolean populateMetaFields;
private final Function<Long, String> seqIdGenerator;
private LanceArrowWriter writer;

/**
* Constructor for Spark Lance writer.
Expand All @@ -81,7 +81,7 @@ public HoodieSparkLanceWriter(StoragePath file,
TaskContextSupplier taskContextSupplier,
HoodieStorage storage,
boolean populateMetaFields) throws IOException {
super(storage, file, DEFAULT_BATCH_SIZE);
super(file, DEFAULT_BATCH_SIZE);
this.sparkSchema = sparkSchema;
this.arrowSchema = LanceArrowUtils.toArrowSchema(sparkSchema, DEFAULT_TIMEZONE, true, false);
this.fileName = UTF8String.fromString(file.getName());
Expand Down Expand Up @@ -114,40 +114,31 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept
if (populateMetaFields) {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount());
super.write(row.copy());
super.write(row);
} else {
super.write(row.copy());
super.write(row);
}
}

@Override
public void writeRow(String recordKey, InternalRow row) throws IOException {
super.write(row.copy());
super.write(row);
}

@Override
public void writeRow(UTF8String key, InternalRow row) throws IOException {
// Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664)
super.write(row.copy());
super.write(row);
}

@Override
public void writeRow(InternalRow row) throws IOException {
super.write(row.copy());
super.write(row);
}

@Override
protected void populateVectorSchemaRoot(List<InternalRow> records) {
if (writer == null) {
writer = LanceArrowWriter.create(this.root, sparkSchema);
}
// Reset writer state from previous batch
writer.reset();
for (InternalRow record : records) {
writer.write(record);
}
// Finalize the writer (sets row count)
writer.finish();
protected ArrowWriter<InternalRow> createArrowWriter(VectorSchemaRoot root) {
return SparkArrowWriter.of(LanceArrowWriter.create(root, sparkSchema));
}

/**
Expand Down Expand Up @@ -184,4 +175,24 @@ protected void updateRecordMetadata(InternalRow row,
row.update(PARTITION_PATH_METADATA_FIELD.ordinal(), UTF8String.fromString(partitionPath));
row.update(FILENAME_METADATA_FIELD.ordinal(), fileName);
}

@AllArgsConstructor(staticName = "of")
private static class SparkArrowWriter implements ArrowWriter<InternalRow> {
private final LanceArrowWriter lanceArrowWriter;

@Override
public void write(InternalRow row) {
lanceArrowWriter.write(row);
}

@Override
public void reset() {
lanceArrowWriter.reset();
}

@Override
public void finishBatch() {
lanceArrowWriter.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@

import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.memory.HoodieArrowAllocator;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;

import lombok.AccessLevel;
import lombok.Getter;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -33,8 +34,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Base class for Hudi Lance file writers supporting different record types.
Expand All @@ -52,43 +51,41 @@
@NotThreadSafe
public abstract class HoodieBaseLanceWriter<R> implements Closeable {
/** Memory size for data write operations: 120MB */
private static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024;
private static final long LANCE_DATA_ALLOCATOR_SIZE = 120 * 1024 * 1024L;

protected static final int DEFAULT_BATCH_SIZE = 1000;
protected final HoodieStorage storage;
protected final StoragePath path;
protected final BufferAllocator allocator;
protected final List<R> bufferedRecords;
protected final int batchSize;
protected long writtenRecordCount = 0;
protected VectorSchemaRoot root;
private final StoragePath path;
private final BufferAllocator allocator;
private final int batchSize;
@Getter(value = AccessLevel.PROTECTED)
private long writtenRecordCount = 0;
private int currentBatchSize = 0;
private VectorSchemaRoot root;
private ArrowWriter<R> arrowWriter;

private LanceFileWriter writer;

/**
* Constructor for base Lance writer.
*
* @param storage HoodieStorage instance
* @param path Path where Lance file will be written
* @param batchSize Number of records to buffer before flushing to Lance
*/
protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int batchSize) {
this.storage = storage;
protected HoodieBaseLanceWriter(StoragePath path, int batchSize) {
this.path = path;
this.allocator = HoodieArrowAllocator.newChildAllocator(
getClass().getSimpleName() + "-data-" + path.getName(), LANCE_DATA_ALLOCATOR_SIZE);
this.bufferedRecords = new ArrayList<>(batchSize);
this.batchSize = batchSize;
}

/**
* Populate the VectorSchemaRoot with buffered records.
* Subclasses must implement type-specific conversion logic.
* The VectorSchemaRoot field is reused across batches and managed by this base class.
* Create and initialize the arrow writer for writing records to VectorSchemaRoot.
* Called once during lazy initialization when the first record is written.
*
* @param records List of records to convert
* @param root The VectorSchemaRoot to write into
* @return An arrow writer implementation that writes records of type R to the root
*/
protected abstract void populateVectorSchemaRoot(List<R> records);
protected abstract ArrowWriter<R> createArrowWriter(VectorSchemaRoot root);

/**
* Get the Arrow schema for this writer.
Expand All @@ -105,23 +102,32 @@ protected HoodieBaseLanceWriter(HoodieStorage storage, StoragePath path, int bat
* @throws IOException if write fails
*/
public void write(R record) throws IOException {
bufferedRecords.add(record);
// Lazy initialization on first write
if (writer == null) {
initializeWriter();
}
if (root == null) {
root = VectorSchemaRoot.create(getArrowSchema(), allocator);
}
if (arrowWriter == null) {
arrowWriter = createArrowWriter(root);
}

// Reset arrow writer at the start of each new batch
if (currentBatchSize == 0) {
arrowWriter.reset();
}

arrowWriter.write(record);
currentBatchSize++;
writtenRecordCount++;

if (bufferedRecords.size() >= batchSize) {
// Flush when batch is full
if (currentBatchSize >= batchSize) {
flushBatch();
}
}

/**
* Get the total number of records written so far.
*
* @return Number of records written
*/
public long getWrittenRecordCount() {
return writtenRecordCount;
}

/**
* Close the writer, flushing any remaining buffered records.
*
Expand All @@ -133,8 +139,8 @@ public void close() throws IOException {

// 1. Flush remaining records
try {
// Flush any remaining buffered records
if (!bufferedRecords.isEmpty()) {
// Flush any remaining records in current batch
if (currentBatchSize > 0) {
flushBatch();
}

Expand Down Expand Up @@ -196,27 +202,18 @@ public void close() throws IOException {
* Flush buffered records to Lance file.
*/
private void flushBatch() throws IOException {
if (bufferedRecords.isEmpty()) {
return;
if (currentBatchSize == 0) {
return; // Nothing to flush
}

// Lazy initialization of writer and root
if (writer == null) {
initializeWriter();
}
if (root == null) {
root = VectorSchemaRoot.create(getArrowSchema(), allocator);
}

// Reset root state for new batch
root.setRowCount(0);
// Finalize the arrow writer (sets row count on VectorSchemaRoot)
arrowWriter.finishBatch();

// Populate root with records and write to Lance
populateVectorSchemaRoot(bufferedRecords);
// Write VectorSchemaRoot to Lance file
writer.write(root);

// Clear buffer
bufferedRecords.clear();
// Reset batch counter for next batch
currentBatchSize = 0;
}

/**
Expand All @@ -225,4 +222,29 @@ private void flushBatch() throws IOException {
private void initializeWriter() throws IOException {
writer = LanceFileWriter.open(path.toString(), allocator, null);
}

/**
* Arrow writer interface for writing records of type T to a VectorSchemaRoot.
* Each engine can provide its own implementation for converting records from the engine specific format to Arrow format.
* @param <T> the record type
*/
protected interface ArrowWriter<T> {
/**
* Write a single record to the VectorSchemaRoot.
* @param row Record to write
* @throws IOException if write fails
*/
void write(T row) throws IOException;

/**
* Finalize the current batch including setting row count on VectorSchemaRoot.
* @throws IOException if finalization fails
*/
void finishBatch() throws IOException;

/**
* Reset the writer state for a new batch.
*/
void reset();
}
}
Loading