|
14 | 14 | * limitations under the License. |
15 | 15 | */ |
16 | 16 | #include "velox/common/base/tests/GTestUtils.h" |
| 17 | +#include "velox/exec/tests/utils/AssertQueryBuilder.h" |
| 18 | +#include "velox/exec/tests/utils/PlanBuilder.h" |
| 19 | +#include "velox/exec/Window.h" |
17 | 20 | #include "velox/functions/lib/window/tests/WindowTestBase.h" |
| 21 | +#include "velox/functions/sparksql/aggregates/Register.h" |
18 | 22 | #include "velox/functions/sparksql/window/WindowFunctionsRegistration.h" |
19 | 23 |
|
20 | 24 | using namespace facebook::velox::exec::test; |
@@ -106,5 +110,59 @@ VELOX_INSTANTIATE_TEST_SUITE_P( |
106 | 110 | SparkWindowTest, |
107 | 111 | testing::ValuesIn(getSparkWindowTestParams())); |
108 | 112 |
|
| 113 | +class SparkAggregateWindowTest : public WindowTestBase { |
| 114 | + public: |
| 115 | + void SetUp() override { |
| 116 | + WindowTestBase::SetUp(); |
| 117 | + WindowTestBase::options_.parseIntegerAsBigint = false; |
| 118 | + velox::functions::aggregate::sparksql::registerAggregateFunctions(""); |
| 119 | + } |
| 120 | +}; |
| 121 | + |
| 122 | +DEBUG_ONLY_TEST_F(SparkAggregateWindowTest, destroyPreviousAccumulator) { |
| 123 | + const auto size = 100; |
| 124 | + auto input = makeRowVector( |
| 125 | + {"d", "p0", "s"}, |
| 126 | + { |
| 127 | + // Payload Data. |
| 128 | + makeFlatVector<std::string>(size, [](auto row){ return std::string(1024, 'a'); }), |
| 129 | + // Partition key. |
| 130 | + makeFlatVector<int64_t>(size, [](auto row) { return row % 11; }), |
| 131 | + // Sorting key. |
| 132 | + makeFlatVector<int32_t>(size, [](auto row) { return row; }), |
| 133 | + }); |
| 134 | + |
| 135 | + createDuckDbTable({input}); |
| 136 | + |
| 137 | + auto plan = PlanBuilder() |
| 138 | + .values(split(input, 10)) |
| 139 | + .window({"last(d) over (partition by p0 order by s)"}) |
| 140 | + .planNode(); |
| 141 | + |
| 142 | + const HashStringAllocator* stringAllocator = nullptr; |
| 143 | + uint64_t usedBytes = 0; |
| 144 | + SCOPED_TESTVALUE_SET( |
| 145 | + "facebook::velox::exec::Window::callApplyForPartitionRows", |
| 146 | + std::function<void(exec::Window*)>([&](exec::Window* windowOp) { |
| 147 | + if (stringAllocator == nullptr) { |
| 148 | + stringAllocator = windowOp->testingGetHashStringAllocator(); |
| 149 | + // Record how many bytes have been used. |
| 150 | + usedBytes = stringAllocator->currentBytes(); |
| 151 | + } else { |
| 152 | + // Because we will destroy previous created accumulator and every string in input |
| 153 | + // is of the same length, so here we check if the `usedBytes` is not changed. |
| 154 | + ASSERT_EQ(usedBytes, stringAllocator->currentBytes()); |
| 155 | + } |
| 156 | + })); |
| 157 | + |
| 158 | + auto task = |
| 159 | + AssertQueryBuilder(plan, duckDbQueryRunner_) |
| 160 | + .config(core::QueryConfig::kPreferredOutputBatchBytes, "1024") |
| 161 | + .serialExecution(true) |
| 162 | + .assertResults( |
| 163 | + "SELECT *, last(d) over (partition by p0 order by s) " |
| 164 | + "FROM tmp "); |
| 165 | +} |
| 166 | + |
109 | 167 | } // namespace |
110 | 168 | } // namespace facebook::velox::window::test |
0 commit comments