Skip to content

Commit e6e7d0a

Browse files
jiangtianjiangtian
authored andcommitted
fix
1 parent 52a4015 commit e6e7d0a

File tree

3 files changed

+15
-12
lines changed

3 files changed

+15
-12
lines changed

velox/exec/Aggregate.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,7 @@ class Aggregate {
302302

303303
for (auto* group : groups) {
304304
group[initializedByte_] &= ~initializedMask_;
305+
clearNull(group);
305306
}
306307
}
307308

velox/exec/AggregateWindow.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ class AggregateWindowFunction : public exec::WindowFunction {
150150
// This is the start of a new incremental aggregation. So the
151151
// aggregate_ function object should be initialized.
152152
auto singleGroup = std::vector<vector_size_t>{0};
153-
aggregate_->clear();
154153
aggregate_->destroy(folly::Range<char**>(&rawSingleGroupRow_, 1));
155154
aggregate_->initializeNewGroups(&rawSingleGroupRow_, singleGroup);
156155
aggregateInitialized_ = true;
@@ -337,7 +336,6 @@ class AggregateWindowFunction : public exec::WindowFunction {
337336
// TODO : Try to re-use previous computations by advancing and retracting
338337
// the aggregation based on the frame changes with each row. This would
339338
// require adding new APIs to the Aggregate framework.
340-
aggregate_->clear();
341339
aggregate_->destroy(folly::Range<char**>(&rawSingleGroupRow_, 1));
342340
aggregate_->initializeNewGroups(&rawSingleGroupRow_, kSingleGroup);
343341
aggregateInitialized_ = true;

velox/functions/sparksql/window/tests/SparkWindowTest.cpp

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ class SparkAggregateWindowLimitMemoryTest
115115
static void SetUpTestCase() {
116116
OperatorTestBase::SetUpTestCase();
117117
OperatorTestBase::setupMemory(
118-
256 << 20, // allocatorCapacity
119-
256 << 20, // arbitratorCapacity
118+
192 << 20, // allocatorCapacity
119+
192 << 20, // arbitratorCapacity
120120
0, // arbitratorReservedCapacity
121121
0, // memoryPoolInitCapacity
122122
0, // memoryPoolReservedCapacity
@@ -137,18 +137,22 @@ class SparkAggregateWindowLimitMemoryTest
137137
// the limit and cause failure.
138138
// The capacity is calculated as:
139139
// 1. size of input data: 1 rows * 32KB (the length of the string) = 32KB
140-
// 2. size of RowContainer: ((1,024 rows * 32KB (data) + 1,024 rows * 32KB
141-
// (Accumulators))) * 3 = 192MB
142-
// (Accumulators won't be destroyed now)
140+
// 2. size of RowContainer: ((1,024 rows * 32KB (data))) * 3 +
141+
// 32KB(Accumulator) = 96MB
142+
// (Accumulators will be destroyed when processing a new partition.)
143143
// 3. size of results: 10 rows * 32KB * 2 (column 'd' and the result column) =
144144
// 640KB
145145
// 4. other overheads
146-
// Total: ~ 192MB
146+
// Total: ~ 96MB
147147
// If we don't clear the string buffers in time, the size of the string buffers
148-
// would be at least 1,024 rows * 32KB * 3 = 96MB. So without the fix, we need
149-
// capacity to be set to more than 192MB + 96MB = 288MB to pass the test.
150-
// So we set the capacity to 256MB here.
151-
TEST_F(SparkAggregateWindowLimitMemoryTest, clearStringBuffersInTime) {
148+
// would be at least 1,024 rows * 32KB * 3 = 96MB. If we don't destroy the
149+
// previously created accumulator, the memory size of accumulators accumulated
150+
// will be (32KB * 1024 rows * 3) = 96MB. So without the fix, we need capacity
151+
// to be set to more than 96MB + 96MB + 96MB = 288MB to pass the test. We set
152+
// the capacity to 192MB here to verify that our fixes work.
153+
TEST_F(
154+
SparkAggregateWindowLimitMemoryTest,
155+
clearStringBuffersAndAccumulatorsInTime) {
152156
constexpr vector_size_t size = 1'024 * 3;
153157
constexpr vector_size_t resultSize = 10;
154158
// For this test, it is important to create a single-row partition.

0 commit comments

Comments
 (0)