Skip to content

Commit b12bddb

Browse files
refactor test to reduce code duplication, add clustering validation, fix writer bug
1 parent deb0cee commit b12bddb

File tree

2 files changed

+174
-309
lines changed

2 files changed

+174
-309
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818

1919
package org.apache.hudi.io.storage;
2020

21-
import com.lancedb.lance.spark.arrow.LanceArrowWriter;
22-
import org.apache.arrow.vector.types.pojo.Schema;
2321
import org.apache.hudi.common.engine.TaskContextSupplier;
2422
import org.apache.hudi.common.model.HoodieKey;
2523
import org.apache.hudi.common.model.HoodieRecord;
2624
import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
2725
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
2826
import org.apache.hudi.storage.HoodieStorage;
2927
import org.apache.hudi.storage.StoragePath;
28+
29+
import com.lancedb.lance.spark.arrow.LanceArrowWriter;
30+
import org.apache.arrow.vector.types.pojo.Schema;
3031
import org.apache.spark.sql.catalyst.InternalRow;
3132
import org.apache.spark.sql.types.StructType;
3233
import org.apache.spark.sql.util.LanceArrowUtils;
@@ -113,26 +114,26 @@ public void writeRowWithMetadata(HoodieKey key, InternalRow row) throws IOExcept
113114
if (populateMetaFields) {
114115
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
115116
updateRecordMetadata(row, recordKey, key.getPartitionPath(), getWrittenRecordCount());
116-
super.write(row);
117+
super.write(row.copy());
117118
} else {
118-
super.write(row);
119+
super.write(row.copy());
119120
}
120121
}
121122

122123
@Override
123124
public void writeRow(String recordKey, InternalRow row) throws IOException {
124-
super.write(row);
125+
super.write(row.copy());
125126
}
126127

127128
@Override
128129
public void writeRow(UTF8String key, InternalRow row) throws IOException {
129130
// Key reserved for future bloom filter support (https://github.com/apache/hudi/issues/17664)
130-
super.write(row);
131+
super.write(row.copy());
131132
}
132133

133134
@Override
134135
public void writeRow(InternalRow row) throws IOException {
135-
super.write(row);
136+
super.write(row.copy());
136137
}
137138

138139
@Override

0 commit comments

Comments
 (0)