Skip to content

Commit b88ce66

Browse files
tanjialiangmeta-codesync[bot]
authored andcommitted
feat: Add overriding file create config for different operators (#16318)
Summary: Pull Request resolved: #16318 for different operator types, we might need different file create configs. this change allows this behavior. Reviewed By: xiaoxmeng Differential Revision: D92762382 fbshipit-source-id: 2ee9a1d1038b71023029997d66b243bef90370fc
1 parent 8877219 commit b88ce66

File tree

17 files changed

+254
-17
lines changed

17 files changed

+254
-17
lines changed

velox/core/QueryConfig.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,19 @@ class QueryConfig {
420420
static constexpr const char* kSpillFileCreateConfig =
421421
"spill_file_create_config";
422422

423-
/// Default offset spill start partition bit. It is used with
423+
/// Config used to create aggregation spill files. This config is provided to
424+
/// underlying file system and the config is free form. The form should be
425+
/// defined by the underlying file system.
426+
static constexpr const char* kAggregationSpillFileCreateConfig =
427+
"aggregation_spill_file_create_config";
428+
429+
/// Config used to create hash join spill files. This config is provided to
430+
/// underlying file system and the config is free form. The form should be
431+
/// defined by the underlying file system.
432+
static constexpr const char* kHashJoinSpillFileCreateConfig =
433+
"hash_join_spill_file_create_config";
434+
435+
/// Default offset spill start partition bit.
424436
/// 'kSpillNumPartitionBits' together to
425437
/// calculate the spilling partition number for join spill or aggregation
426438
/// spill.
@@ -1193,6 +1205,14 @@ class QueryConfig {
11931205
return get<std::string>(kSpillFileCreateConfig, "");
11941206
}
11951207

1208+
std::string aggregationSpillFileCreateConfig() const {
1209+
return get<std::string>(kAggregationSpillFileCreateConfig, "");
1210+
}
1211+
1212+
std::string hashJoinSpillFileCreateConfig() const {
1213+
return get<std::string>(kHashJoinSpillFileCreateConfig, "");
1214+
}
1215+
11961216
int32_t minSpillableReservationPct() const {
11971217
constexpr int32_t kDefaultPct = 5;
11981218
return get<int32_t>(kMinSpillableReservationPct, kDefaultPct);

velox/core/tests/QueryConfigTest.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,4 +280,54 @@ TEST_F(QueryConfigTest, singleSourceExchangeOptimizationConfig) {
280280
}
281281
}
282282

283+
TEST_F(QueryConfigTest, operatorSpillFileCreateConfig) {
284+
// Test default values (empty strings)
285+
{
286+
auto queryCtx = QueryCtx::create(nullptr, QueryConfig{{}});
287+
const QueryConfig& config = queryCtx->queryConfig();
288+
EXPECT_EQ(config.aggregationSpillFileCreateConfig(), "");
289+
EXPECT_EQ(config.hashJoinSpillFileCreateConfig(), "");
290+
}
291+
292+
// Test with aggregation spill file create config set
293+
{
294+
std::unordered_map<std::string, std::string> configData(
295+
{{QueryConfig::kAggregationSpillFileCreateConfig,
296+
"aggregation_config_value"}});
297+
auto queryCtx =
298+
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
299+
const QueryConfig& config = queryCtx->queryConfig();
300+
EXPECT_EQ(
301+
config.aggregationSpillFileCreateConfig(), "aggregation_config_value");
302+
EXPECT_EQ(config.hashJoinSpillFileCreateConfig(), "");
303+
}
304+
305+
// Test with hash join spill file create config set
306+
{
307+
std::unordered_map<std::string, std::string> configData(
308+
{{QueryConfig::kHashJoinSpillFileCreateConfig,
309+
"hashjoin_config_value"}});
310+
auto queryCtx =
311+
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
312+
const QueryConfig& config = queryCtx->queryConfig();
313+
EXPECT_EQ(config.aggregationSpillFileCreateConfig(), "");
314+
EXPECT_EQ(config.hashJoinSpillFileCreateConfig(), "hashjoin_config_value");
315+
}
316+
317+
// Test with both configs set
318+
{
319+
std::unordered_map<std::string, std::string> configData(
320+
{{QueryConfig::kAggregationSpillFileCreateConfig,
321+
"aggregation_config_value"},
322+
{QueryConfig::kHashJoinSpillFileCreateConfig,
323+
"hashjoin_config_value"}});
324+
auto queryCtx =
325+
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
326+
const QueryConfig& config = queryCtx->queryConfig();
327+
EXPECT_EQ(
328+
config.aggregationSpillFileCreateConfig(), "aggregation_config_value");
329+
EXPECT_EQ(config.hashJoinSpillFileCreateConfig(), "hashjoin_config_value");
330+
}
331+
}
332+
283333
} // namespace facebook::velox::core::test

velox/exec/Driver.cpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,19 @@ velox::memory::MemoryPool* DriverCtx::addOperatorPool(
107107
planNodeId, splitGroupId, pipelineId, driverId, operatorType);
108108
}
109109

110+
namespace {
111+
bool isHashJoinSpillOperator(const std::string& operatorType) {
112+
return operatorType == "HashBuild" || operatorType == "HashProbe";
113+
}
114+
115+
bool isAggregationSpillOperator(const std::string& operatorType) {
116+
return operatorType == "Aggregation" || operatorType == "PartialAggregation";
117+
}
118+
} // namespace
119+
110120
std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
111-
int32_t operatorId) const {
121+
int32_t operatorId,
122+
const std::string& operatorType) const {
112123
const auto& queryConfig = task->queryCtx()->queryConfig();
113124
if (!queryConfig.spillEnabled()) {
114125
return std::nullopt;
@@ -126,6 +137,21 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
126137
[this](uint64_t bytes) {
127138
task->queryCtx()->updateSpilledBytesAndCheckLimit(bytes);
128139
};
140+
141+
std::string fileCreateConfig = queryConfig.spillFileCreateConfig();
142+
if (isHashJoinSpillOperator(operatorType)) {
143+
const auto& hashJoinConfig = queryConfig.hashJoinSpillFileCreateConfig();
144+
if (!hashJoinConfig.empty()) {
145+
fileCreateConfig = hashJoinConfig;
146+
}
147+
} else if (isAggregationSpillOperator(operatorType)) {
148+
const auto& aggregationConfig =
149+
queryConfig.aggregationSpillFileCreateConfig();
150+
if (!aggregationConfig.empty()) {
151+
fileCreateConfig = aggregationConfig;
152+
}
153+
}
154+
129155
return common::SpillConfig(
130156
std::move(getSpillDirPathCb),
131157
std::move(updateAndCheckSpillLimitCb),
@@ -146,7 +172,7 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
146172
queryConfig.spillPrefixSortEnabled()
147173
? std::optional<common::PrefixSortConfig>(prefixSortConfig())
148174
: std::nullopt,
149-
queryConfig.spillFileCreateConfig(),
175+
fileCreateConfig,
150176
queryConfig.windowSpillMinReadBatchRows());
151177
}
152178

velox/exec/Driver.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,11 @@ struct DriverCtx {
257257
const core::PlanNodeId& planNodeId,
258258
const std::string& operatorType);
259259

260-
/// Builds the spill config for the operator with specified 'operatorId'.
261-
std::optional<common::SpillConfig> makeSpillConfig(int32_t operatorId) const;
260+
/// Builds the spill config for the operator with specified 'operatorId' and
261+
/// 'operatorType'.
262+
std::optional<common::SpillConfig> makeSpillConfig(
263+
int32_t operatorId,
264+
const std::string& operatorType) const;
262265

263266
common::PrefixSortConfig prefixSortConfig() const {
264267
return common::PrefixSortConfig{

velox/exec/HashAggregation.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ HashAggregation::HashAggregation(
3535
? "PartialAggregation"
3636
: "Aggregation",
3737
aggregationNode->canSpill(driverCtx->queryConfig())
38-
? driverCtx->makeSpillConfig(operatorId)
38+
? driverCtx->makeSpillConfig(
39+
operatorId,
40+
aggregationNode->step() ==
41+
core::AggregationNode::Step::kPartial
42+
? "PartialAggregation"
43+
: "Aggregation")
3944
: std::nullopt),
4045
aggregationNode_(aggregationNode),
4146
isPartialOutput_(isPartialOutput(aggregationNode->step())),

velox/exec/HashBuild.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ HashBuild::HashBuild(
5858
joinNode->id(),
5959
"HashBuild",
6060
joinNode->canSpill(driverCtx->queryConfig())
61-
? driverCtx->makeSpillConfig(operatorId)
61+
? driverCtx->makeSpillConfig(operatorId, "HashBuild")
6262
: std::nullopt),
6363
joinNode_(std::move(joinNode)),
6464
joinType_{joinNode_->joinType()},

velox/exec/HashProbe.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ HashProbe::HashProbe(
120120
joinNode->id(),
121121
"HashProbe",
122122
joinNode->canSpill(driverCtx->queryConfig())
123-
? driverCtx->makeSpillConfig(operatorId)
123+
? driverCtx->makeSpillConfig(operatorId, "HashProbe")
124124
: std::nullopt),
125125
outputBatchSize_{outputBatchRows()},
126126
joinNode_(std::move(joinNode)),

velox/exec/Merge.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ LocalMerge::LocalMerge(
726726
localMergeNode->id(),
727727
"LocalMerge",
728728
localMergeNode->canSpill(driverCtx->queryConfig())
729-
? driverCtx->makeSpillConfig(operatorId)
729+
? driverCtx->makeSpillConfig(operatorId, "LocalMerge")
730730
: std::nullopt) {
731731
VELOX_CHECK_EQ(
732732
operatorCtx_->driverCtx()->driverId,

velox/exec/Operator.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,12 @@ class Operator : public BaseRuntimeStatWriter {
522522
return input_ != nullptr;
523523
}
524524

525+
/// Returns the spill config for this operator. This method is only used for
526+
/// test.
527+
const common::SpillConfig* testingSpillConfig() const {
528+
return spillConfig();
529+
}
530+
525531
protected:
526532
static std::vector<std::unique_ptr<PlanNodeTranslator>>& translators();
527533
friend class NonReclaimableSection;

velox/exec/OrderBy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ OrderBy::OrderBy(
4141
orderByNode->id(),
4242
"OrderBy",
4343
orderByNode->canSpill(driverCtx->queryConfig())
44-
? driverCtx->makeSpillConfig(operatorId)
44+
? driverCtx->makeSpillConfig(operatorId, "OrderBy")
4545
: std::nullopt) {
4646
maxOutputRows_ = outputBatchRows(std::nullopt);
4747
VELOX_CHECK(pool()->trackUsage());

0 commit comments

Comments
 (0)