Skip to content

Commit 9cb3cd1

Browse files
PingLiuPingmeta-codesync[bot]
authored andcommitted
fix: Iceberg commit message when writer rotated (facebookincubator#16365)
Summary: Fixes commit message correctness for multi-file writes by updating Iceberg writer rotation handling and tracking per-file row counts and emitting one commit task per rotated file. Fix parquet writer does not honor smaller values of `max-target-file-size` config. These are the adjustments required after facebookincubator#16077. Pull Request resolved: facebookincubator#16365 Reviewed By: Yuhta Differential Revision: D93157234 Pulled By: jainxrohit fbshipit-source-id: f09cd1a01cb2ee63172f233d9fc8234688037fb6
1 parent 44e10f4 commit 9cb3cd1

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ void HiveDataSink::write(size_t index, RowVectorPtr input) {
621621
writers_[index]->write(dataInput);
622622
writerInfo_[index]->inputSizeInBytes += dataInput->estimateFlatSize();
623623
writerInfo_[index]->numWrittenRows += dataInput->size();
624+
writerInfo_[index]->currentFileWrittenRows += dataInput->size();
624625

625626
// File rotation is not supported for bucketed tables (require one file per
626627
// bucket with predictable name) or sorted writes (SortingWriter not
@@ -642,12 +643,7 @@ uint64_t HiveDataSink::getCurrentFileBytes(size_t writerIndex) const {
642643
const auto baselineBytes = writerInfo_[writerIndex]->cumulativeWrittenBytes;
643644
// Sanity check: total should always be >= baseline since ioStats is
644645
// never reset and cumulative is a snapshot of rawBytesWritten at rotation.
645-
VELOX_DCHECK_GE(
646-
totalBytes,
647-
baselineBytes,
648-
"rawBytesWritten ({}) < cumulativeWrittenBytes ({})",
649-
totalBytes,
650-
baselineBytes);
646+
VELOX_DCHECK_GE(totalBytes, baselineBytes);
651647
return totalBytes - baselineBytes;
652648
}
653649

@@ -665,6 +661,9 @@ void HiveDataSink::finalizeWriterFile(size_t index) {
665661
fileInfo.writeFileName = info->currentWriteFileName;
666662
fileInfo.targetFileName = info->currentTargetFileName;
667663
fileInfo.fileSize = currentFileBytes;
664+
fileInfo.numRows = info->currentFileWrittenRows;
665+
// Reset for next file.
666+
info->currentFileWrittenRows = 0;
668667
info->writtenFiles.push_back(std::move(fileInfo));
669668

670669
// Update cumulative stats as a snapshot of total stats so far.

velox/connectors/hive/HiveDataSink.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,8 @@ struct HiveFileInfo {
426426
std::string targetFileName;
427427
/// Size of the file in bytes.
428428
uint64_t fileSize{0};
429+
/// Number of rows in the file.
430+
uint64_t numRows{0};
429431
};
430432

431433
struct HiveWriterInfo {
@@ -449,8 +451,11 @@ struct HiveWriterInfo {
449451
const std::shared_ptr<memory::MemoryPool> writerPool;
450452
const std::shared_ptr<memory::MemoryPool> sinkPool;
451453
const std::shared_ptr<memory::MemoryPool> sortPool;
452-
int64_t numWrittenRows = 0;
453-
int64_t inputSizeInBytes = 0;
454+
/// Total rows written by this writer across all files.
455+
uint64_t numWrittenRows = 0;
456+
/// Rows written to the current file; reset to 0 when the file is finalized.
457+
uint64_t currentFileWrittenRows{0};
458+
uint64_t inputSizeInBytes = 0;
454459
/// File sequence number for tracking multiple files written due to size-based
455460
/// splitting. Incremented each time the writer rotates to a new file.
456461
/// Used to generate sequenced file names (e.g., file_1.orc, file_2.orc).

velox/connectors/hive/iceberg/IcebergDataSink.cpp

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -315,30 +315,36 @@ std::vector<std::string> IcebergDataSink::commitMessage() const {
315315
insertTableHandle_);
316316

317317
for (auto i = 0; i < writerInfo_.size(); ++i) {
318-
const auto& info = writerInfo_.at(i);
319-
VELOX_CHECK_NOT_NULL(info);
318+
const auto& writerInfo = writerInfo_.at(i);
319+
VELOX_CHECK_NOT_NULL(writerInfo);
320+
320321
// Following metadata (json format) is consumed by Presto CommitTaskData.
321322
// It contains the minimal subset of metadata.
322323
// TODO: Complete metrics is missing now and this could lead to suboptimal
323324
// query plan, will collect full iceberg metrics in following PR.
324-
// clang-format off
325-
folly::dynamic commitData = folly::dynamic::object(
326-
"path", (fs::path(info->writerParameters.writeDirectory()) /
327-
info->writerParameters.writeFileName()).string())
328-
("fileSizeInBytes", ioStats_.at(i)->rawBytesWritten())
329-
("metrics",
330-
folly::dynamic::object("recordCount", info->numWrittenRows))
331-
("partitionSpecJson",
332-
icebergInsertTableHandle->partitionSpec() ? icebergInsertTableHandle->partitionSpec()->specId : 0)
333-
("fileFormat", "PARQUET")
334-
("content", "DATA");
335-
// clang-format on
336-
if (!commitPartitionValue_.empty() && !commitPartitionValue_[i].isNull()) {
337-
commitData["partitionDataJson"] = folly::toJson(
338-
folly::dynamic::object("partitionValues", commitPartitionValue_[i]));
325+
for (const auto& fileInfo : writerInfo->writtenFiles) {
326+
// clang-format off
327+
folly::dynamic commitData = folly::dynamic::object(
328+
"path", (fs::path(writerInfo->writerParameters.targetDirectory()) /
329+
fileInfo.targetFileName).string())
330+
("fileSizeInBytes", fileInfo.fileSize)
331+
("metrics",
332+
folly::dynamic::object("recordCount", fileInfo.numRows))
333+
("partitionSpecJson",
334+
icebergInsertTableHandle->partitionSpec() ?
335+
icebergInsertTableHandle->partitionSpec()->specId : 0)
336+
("fileFormat", "PARQUET")
337+
("content", "DATA");
338+
// clang-format on
339+
if (!commitPartitionValue_.empty() &&
340+
!commitPartitionValue_[i].isNull()) {
341+
commitData["partitionDataJson"] = folly::toJson(
342+
folly::dynamic::object(
343+
"partitionValues", commitPartitionValue_[i]));
344+
}
345+
auto commitDataJson = folly::toJson(commitData);
346+
commitTasks.push_back(commitDataJson);
339347
}
340-
auto commitDataJson = folly::toJson(commitData);
341-
commitTasks.push_back(commitDataJson);
342348
}
343349
return commitTasks;
344350
}

velox/connectors/hive/iceberg/tests/TransformE2ETest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,14 +280,14 @@ class TransformE2ETest : public test::IcebergTestBase {
280280
const auto dataSink = createDataSinkAndAppendData(
281281
{rowVector}, outputDirectory->getPath(), partitionTransforms);
282282

283+
dataSink->close();
283284
auto commitMessages = dataSink->commitMessage();
284285
VELOX_CHECK_EQ(commitMessages.size(), 1);
285286
auto commitData = folly::parseJson(commitMessages[0]);
286287
auto partitionDataJson =
287288
folly::parseJson(commitData["partitionDataJson"].asString());
288289
auto partitionValues = partitionDataJson["partitionValues"];
289290
VELOX_CHECK_EQ(partitionValues.size(), 1);
290-
dataSink->close();
291291
return partitionValues[0];
292292
}
293293
};

0 commit comments

Comments
 (0)