Skip to content

Commit 3a496fb

Browse files
mbasmanovameta-codesync[bot]
authored andcommitted
feat: Add stats spec to TableWrite/TableWriteMerge toString output (#16738)
Summary: Pull Request resolved: #16738 Add aggregation step and aggregate names to the detailed toString output of TableWriteNode and TableWriteMergeNode. Example: The detailed output now looks like: - Without stats: -- TableWrite[1][hive, c0, c1, c2] -> rows:BIGINT, ... - With stats: -- TableWrite[1][hive, c0, c1, c2, stats[PARTIAL: a0]] -> rows:BIGINT, ... - Merge with stats: -- TableWriteMerge[3][stats[INTERMEDIATE: a0]] -> rows:BIGINT, ... Reviewed By: xiaoxmeng Differential Revision: D96205345 fbshipit-source-id: 9ee7d331afdcc9acabb8360d3ac71d889534b16d
1 parent 8101254 commit 3a496fb

File tree

3 files changed

+103
-4
lines changed

3 files changed

+103
-4
lines changed

velox/core/PlanNode.cpp

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2772,6 +2772,7 @@ PlanNodePtr LocalMergeNode::create(const folly::dynamic& obj, void* context) {
27722772
std::move(sources));
27732773
}
27742774

2775+
namespace {
27752776
// Validates that grouping keys in 'spec' are present in 'type' and have no
27762777
// duplicates. 'context' is used in error messages (e.g. "written columns",
27772778
// "source output").
@@ -2792,6 +2793,7 @@ void validateGroupingKeys(
27922793
key->name());
27932794
}
27942795
}
2796+
} // namespace
27952797

27962798
TableWriteNode::TableWriteNode(
27972799
const PlanNodeId& id,
@@ -2847,8 +2849,35 @@ TableWriteNode::TableWriteNode(
28472849
expectedType->toString());
28482850
}
28492851

2852+
namespace {
2853+
void addStatsSpecDetails(
2854+
std::stringstream& stream,
2855+
const std::optional<ColumnStatsSpec>& spec) {
2856+
if (!spec.has_value()) {
2857+
return;
2858+
}
2859+
stream << "stats[" << AggregationNode::toName(spec->aggregationStep);
2860+
if (!spec->groupingKeys.empty()) {
2861+
stream << " [";
2862+
addFields(stream, spec->groupingKeys);
2863+
stream << "]";
2864+
}
2865+
stream << ": ";
2866+
for (auto i = 0; i < spec->aggregates.size(); ++i) {
2867+
appendComma(i, stream);
2868+
stream << spec->aggregates[i].call->toString();
2869+
}
2870+
stream << "]";
2871+
}
2872+
} // namespace
2873+
28502874
void TableWriteNode::addDetails(std::stringstream& stream) const {
2851-
stream << insertTableHandle_->connectorInsertTableHandle()->toString();
2875+
stream << insertTableHandle_->connectorId() << ", "
2876+
<< folly::join(", ", columnNames_);
2877+
if (columnStatsSpec_.has_value()) {
2878+
stream << ", ";
2879+
addStatsSpecDetails(stream, columnStatsSpec_);
2880+
}
28522881
}
28532882

28542883
RowTypePtr ColumnStatsSpec::outputType() const {
@@ -2991,7 +3020,9 @@ TableWriteMergeNode::TableWriteMergeNode(
29913020
}
29923021
}
29933022

2994-
void TableWriteMergeNode::addDetails(std::stringstream& /* stream */) const {}
3023+
void TableWriteMergeNode::addDetails(std::stringstream& stream) const {
3024+
addStatsSpecDetails(stream, columnStatsSpec_);
3025+
}
29953026

29963027
folly::dynamic TableWriteMergeNode::serialize() const {
29973028
auto obj = PlanNode::serialize();

velox/exec/tests/PlanNodeToStringTest.cpp

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17+
#include "velox/common/testutil/TempDirectoryPath.h"
1718
#include "velox/connectors/hive/HiveConnector.h"
1819
#include "velox/exec/WindowFunction.h"
1920
#include "velox/exec/tests/utils/PlanBuilder.h"
@@ -988,5 +989,72 @@ TEST_F(PlanNodeToStringTest, markDistinct) {
988989
op->toString(true, false));
989990
}
990991

992+
TEST_F(PlanNodeToStringTest, tableWrite) {
993+
auto outputDir = common::testutil::TempDirectoryPath::create();
994+
995+
// TableWrite without stats.
996+
{
997+
auto plan = PlanBuilder()
998+
.values({data_})
999+
.tableWrite(outputDir->getPath())
1000+
.planNode();
1001+
ASSERT_EQ("-- TableWrite[1]\n", plan->toString());
1002+
ASSERT_EQ(
1003+
"-- TableWrite[1][test-hive, c0, c1, c2] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY\n",
1004+
plan->toString(true, false));
1005+
}
1006+
1007+
// TableWrite with stats (no grouping keys) and TableWriteMerge.
1008+
{
1009+
core::TableWriteNodePtr writeNode;
1010+
auto plan = PlanBuilder()
1011+
.values({data_})
1012+
.tableWrite(
1013+
outputDir->getPath(),
1014+
dwio::common::FileFormat::DWRF,
1015+
{"min(c0)"})
1016+
.capturePlanNode(writeNode)
1017+
.localGather()
1018+
.tableWriteMerge()
1019+
.planNode();
1020+
1021+
ASSERT_EQ("-- TableWrite[1]\n", writeNode->toString());
1022+
ASSERT_EQ(
1023+
"-- TableWrite[1][test-hive, c0, c1, c2, stats[PARTIAL: min(ROW[\"c0\"])]] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY, a0:SMALLINT\n",
1024+
writeNode->toString(true, false));
1025+
1026+
ASSERT_EQ("-- TableWriteMerge[3]\n", plan->toString());
1027+
ASSERT_EQ(
1028+
"-- TableWriteMerge[3][stats[INTERMEDIATE: min(\"a0\")]] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY, a0:SMALLINT\n",
1029+
plan->toString(true, false));
1030+
}
1031+
1032+
// TableWrite with stats and grouping keys (partitioned table).
1033+
{
1034+
core::TableWriteNodePtr writeNode;
1035+
auto plan = PlanBuilder()
1036+
.values({data_})
1037+
.tableWrite(
1038+
outputDir->getPath(),
1039+
{"c2"},
1040+
dwio::common::FileFormat::DWRF,
1041+
{"min(c0)", "max(c1)"})
1042+
.capturePlanNode(writeNode)
1043+
.localGather()
1044+
.tableWriteMerge()
1045+
.planNode();
1046+
1047+
ASSERT_EQ("-- TableWrite[1]\n", writeNode->toString());
1048+
ASSERT_EQ(
1049+
"-- TableWrite[1][test-hive, c0, c1, c2, stats[PARTIAL [c2]: min(ROW[\"c0\"]), max(ROW[\"c1\"])]] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY, c2:BIGINT, a0:SMALLINT, a1:INTEGER\n",
1050+
writeNode->toString(true, false));
1051+
1052+
ASSERT_EQ("-- TableWriteMerge[3]\n", plan->toString());
1053+
ASSERT_EQ(
1054+
"-- TableWriteMerge[3][stats[INTERMEDIATE [c2]: min(\"a0\"), max(\"a1\")]] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY, c2:BIGINT, a0:SMALLINT, a1:INTEGER\n",
1055+
plan->toString(true, false));
1056+
}
1057+
}
1058+
9911059
} // namespace
9921060
} // namespace facebook::velox::exec

velox/exec/tests/PrintPlanWithStatsTest.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
447447
compareOutputs(
448448
::testing::UnitTest::GetInstance()->current_test_info()->name(),
449449
printPlanWithStats(*writePlan, task->taskStats()),
450-
{{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
450+
{{R"(-- TableWrite\[1\]\[test-hive, c0, c1, c2, c3, c4, c5\] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY)"},
451451
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
452452
{R"( -- TableScan\[0\]\[table: hive_table\] -> c0:BIGINT, c1:INTEGER, c2:SMALLINT, c3:REAL, c4:DOUBLE, c5:VARCHAR)"},
453453
{R"( Input: 100 rows \(.+\), Output: 100 rows \(.+\), Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, Splits: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+))"}});
@@ -456,7 +456,7 @@ TEST_F(PrintPlanWithStatsTest, tableWriterWithTableScan) {
456456
::testing::UnitTest::GetInstance()->current_test_info()->name(),
457457
printPlanWithStats(*writePlan, task->taskStats(), true),
458458
{
459-
{R"(-- TableWrite\[1\]\[.+InsertTableHandle .+)"},
459+
{R"(-- TableWrite\[1\]\[test-hive, c0, c1, c2, c3, c4, c5\] -> rows:BIGINT, fragments:VARBINARY, commitcontext:VARBINARY)"},
460460
{" Output: .+, Physical written output: .+, Cpu time: .+, Blocked wall time: .+, Peak memory: .+, Memory allocations: .+, Threads: 1, CPU breakdown: B/I/O/F (.+/.+/.+/.+)"},
461461
{" dataSourceLazyCpuNanos\\s+sum: .+, count: .+, min: .+, max: .+"},
462462
{" dataSourceLazyInputBytes\\s+sum: .+, count: .+, min: .+, max: .+"},

0 commit comments

Comments
 (0)