Skip to content

Commit 179241f

Browse files
committed
implement bytes written tracking in LanceLakeWriter
1 parent c6f35e8 commit 179241f

File tree

2 files changed

+22
-1
lines changed

2 files changed

+22
-1
lines changed

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.lancedb.lance.Fragment;
3030
import com.lancedb.lance.FragmentMetadata;
3131
import com.lancedb.lance.WriteParams;
32+
import com.lancedb.lance.fragment.DataFile;
3233
import org.apache.arrow.memory.BufferAllocator;
3334
import org.apache.arrow.memory.RootAllocator;
3435
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -52,6 +53,7 @@ public class LanceLakeWriter implements LakeWriter<LanceWriteResult> {
5253

5354
private final ShadedArrowBatchWriter arrowWriter;
5455
private final List<FragmentMetadata> allFragments;
56+
private long bytesWritten = -1L;
5557

5658
public LanceLakeWriter(Configuration options, WriterInitContext writerInitContext)
5759
throws IOException {
@@ -117,13 +119,32 @@ private List<FragmentMetadata> flush() throws IOException {
117119
}
118120
}
119121

122+
private long computeBytesWritten(List<FragmentMetadata> fragments) {
123+
long total = 0L;
124+
for (FragmentMetadata fragment : fragments) {
125+
for (DataFile dataFile : fragment.getFiles()) {
126+
Long size = dataFile.getFileSizeBytes();
127+
if (size != null) {
128+
total += size;
129+
}
130+
}
131+
}
132+
return total;
133+
}
134+
120135
@Override
121136
public LanceWriteResult complete() throws IOException {
122137
List<FragmentMetadata> fragments = flush();
123138
allFragments.addAll(fragments);
139+
bytesWritten = computeBytesWritten(allFragments);
124140
return new LanceWriteResult(allFragments);
125141
}
126142

143+
@Override
144+
public long getBytesWritten() {
145+
return bytesWritten;
146+
}
147+
127148
@Override
128149
public void close() throws IOException {
129150
if (arrowWriter != null) {

fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
160160
// serialize/deserialize writeResult
161161
LanceWriteResult lanceWriteResult = lakeWriter.complete();
162162
// Verify that getBytesWritten returns a positive value after complete
163-
assertThat(lakeWriter.getBytesWritten()).isEqualTo(-1L);
163+
assertThat(lakeWriter.getBytesWritten()).isGreaterThan(0L);
164164
byte[] serialized = writeResultSerializer.serialize(lanceWriteResult);
165165
lanceWriteResults.add(
166166
writeResultSerializer.deserialize(

0 commit comments

Comments
 (0)