Skip to content

Commit b3e4cad

Browse files
macvincentfacebook-github-bot
authored andcommitted
Improve Performance by Bucketing the Streams Instead of Sorting (#328)
Summary: Pull Request resolved: #328 Differential Revision: D87503296
1 parent 58cd140 commit b3e4cad

File tree

4 files changed

+194
-10
lines changed

4 files changed

+194
-10
lines changed

dwio/nimble/velox/VeloxWriter.cpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "dwio/nimble/velox/SchemaTypes.h"
3535
#include "dwio/nimble/velox/StatsGenerated.h"
3636
#include "dwio/nimble/velox/StreamChunker.h"
37+
#include "dwio/nimble/velox/VeloxWriterUtils.h"
3738
#include "velox/common/time/CpuWallTimer.h"
3839
#include "velox/dwio/common/ExecutorBarrier.h"
3940
#include "velox/type/Type.h"
@@ -1144,16 +1145,7 @@ bool VeloxWriter::evalauateFlushPolicy() {
11441145
if (continueChunking) {
11451146
// Relieve memory pressure by chunking small streams.
11461147
// Sort streams for chunking based on raw memory usage.
1147-
// TODO(T240072104): Improve performance by bucketing the streams
1148-
// by size (by most significant bit) instead of sorting them.
1149-
streamIndices.resize(streams.size());
1150-
std::iota(streamIndices.begin(), streamIndices.end(), 0);
1151-
std::sort(
1152-
streamIndices.begin(),
1153-
streamIndices.end(),
1154-
[&](const uint32_t& a, const uint32_t& b) {
1155-
return streams[a]->memoryUsed() > streams[b]->memoryUsed();
1156-
});
1148+
streamIndices = getStreamIndicesByMemoryUsage(streams);
11571149
batchChunkStreams(streamIndices, /*ensureFullChunks=*/false);
11581150
}
11591151
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "dwio/nimble/velox/VeloxWriterUtils.h"
17+
#include "dwio/nimble/common/Bits.h"
18+
19+
namespace facebook::nimble {
20+
std::vector<uint32_t> getStreamIndicesByMemoryUsage(
21+
const std::vector<std::unique_ptr<StreamData>>& streams) {
22+
constexpr size_t kMaxBits = sizeof(uint64_t) * 8;
23+
std::vector<std::vector<uint32_t>> sortedStreams(kMaxBits);
24+
for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) {
25+
const auto bitsRequired =
26+
bits::bitsRequired(streams[streamIndex]->memoryUsed());
27+
NIMBLE_DCHECK_LT(bitsRequired, kMaxBits);
28+
sortedStreams[bitsRequired].push_back(streamIndex);
29+
}
30+
31+
std::vector<uint32_t> streamIndices;
32+
streamIndices.reserve(streams.size());
33+
auto bucketIndex = static_cast<int64_t>(sortedStreams.size()) - 1;
34+
while (bucketIndex >= 0) {
35+
for (const auto streamIndex : sortedStreams[bucketIndex]) {
36+
streamIndices.push_back(streamIndex);
37+
}
38+
bucketIndex--;
39+
}
40+
return streamIndices;
41+
}
42+
} // namespace facebook::nimble
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
#include <vector>
18+
#include "dwio/nimble/velox/StreamData.h"
19+
20+
namespace facebook::nimble {
21+
/// Sorts stream indices by their memory usage in descending order using a
22+
/// bucketing approach based on the most significant bit position.
23+
std::vector<uint32_t> getStreamIndicesByMemoryUsage(
24+
const std::vector<std::unique_ptr<StreamData>>& streams);
25+
} // namespace facebook::nimble
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include <gmock/gmock.h>
17+
#include <gtest/gtest.h>
18+
19+
#include "dwio/nimble/velox/SchemaBuilder.h"
20+
#include "dwio/nimble/velox/StreamData.h"
21+
#include "dwio/nimble/velox/VeloxWriterUtils.h"
22+
#include "velox/common/memory/Memory.h"
23+
#include "velox/common/memory/MemoryArbitrator.h"
24+
#include "velox/common/memory/SharedArbitrator.h"
25+
26+
using namespace ::testing;
27+
28+
namespace facebook::nimble {
29+
30+
class VeloxWriterUtilsTest : public ::testing::Test {
31+
protected:
32+
static void SetUpTestCase() {
33+
velox::memory::SharedArbitrator::registerFactory();
34+
velox::memory::MemoryManager::Options options;
35+
options.arbitratorKind = "SHARED";
36+
velox::memory::MemoryManager::testingSetInstance(options);
37+
}
38+
39+
void SetUp() override {
40+
rootPool_ = velox::memory::memoryManager()->addRootPool("default_root");
41+
leafPool_ = rootPool_->addLeafChild("default_leaf");
42+
schemaBuilder_ = std::make_unique<SchemaBuilder>();
43+
inputBufferGrowthPolicy_ =
44+
DefaultInputBufferGrowthPolicy::withDefaultRanges();
45+
}
46+
47+
std::shared_ptr<velox::memory::MemoryPool> rootPool_;
48+
std::shared_ptr<velox::memory::MemoryPool> leafPool_;
49+
std::unique_ptr<SchemaBuilder> schemaBuilder_;
50+
std::unique_ptr<InputBufferGrowthPolicy> inputBufferGrowthPolicy_;
51+
};
52+
53+
TEST_F(VeloxWriterUtilsTest, getStreamIndicesByMemoryUsageTest) {
54+
// Test 1: Empty streams vector
55+
{
56+
std::vector<std::unique_ptr<StreamData>> streams;
57+
auto result = getStreamIndicesByMemoryUsage(streams);
58+
EXPECT_TRUE(result.empty());
59+
}
60+
61+
// Test 2: Single stream
62+
{
63+
std::vector<std::unique_ptr<StreamData>> streams;
64+
auto scalarTypeBuilder =
65+
schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32);
66+
auto descriptor = &scalarTypeBuilder->scalarDescriptor();
67+
streams.push_back(
68+
std::make_unique<ContentStreamData<int32_t>>(
69+
*leafPool_, *descriptor, *inputBufferGrowthPolicy_));
70+
71+
auto result = getStreamIndicesByMemoryUsage(streams);
72+
EXPECT_THAT(result, ElementsAre(0));
73+
}
74+
75+
// Test 3: Multiple streams with different memory usage
76+
// Should be sorted in descending order by memory usage
77+
{
78+
std::vector<std::unique_ptr<StreamData>> streams;
79+
auto scalarTypeBuilder =
80+
schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32);
81+
auto descriptor = &scalarTypeBuilder->scalarDescriptor();
82+
83+
// Stream 0: Small memory usage (empty)
84+
streams.push_back(
85+
std::make_unique<ContentStreamData<int32_t>>(
86+
*leafPool_, *descriptor, *inputBufferGrowthPolicy_));
87+
88+
// Stream 1: Medium memory usage
89+
auto stream1 = std::make_unique<ContentStreamData<int32_t>>(
90+
*leafPool_, *descriptor, *inputBufferGrowthPolicy_);
91+
std::vector<int32_t> data1 = {1, 2, 3, 4, 5};
92+
auto& mutableData1 = stream1->mutableData();
93+
for (const auto& item : data1) {
94+
mutableData1.push_back(item);
95+
}
96+
streams.push_back(std::move(stream1));
97+
98+
// Stream 2: Large memory usage
99+
auto stream2 = std::make_unique<ContentStreamData<int32_t>>(
100+
*leafPool_, *descriptor, *inputBufferGrowthPolicy_);
101+
std::vector<int32_t> data2(1000, 42);
102+
auto& mutableData2 = stream2->mutableData();
103+
for (const auto& item : data2) {
104+
mutableData2.push_back(item);
105+
}
106+
streams.push_back(std::move(stream2));
107+
108+
// Stream 3: Large memory usage. Should map to same bucket as stream 2
109+
auto stream3 = std::make_unique<ContentStreamData<int32_t>>(
110+
*leafPool_, *descriptor, *inputBufferGrowthPolicy_);
111+
std::vector<int32_t> data3(900, 42);
112+
auto& mutableData3 = stream3->mutableData();
113+
for (const auto& item : data3) {
114+
mutableData3.push_back(item);
115+
}
116+
streams.push_back(std::move(stream3));
117+
118+
auto result = getStreamIndicesByMemoryUsage(streams);
119+
120+
// Stream 2 should come first (largest), then stream 1, then stream 0
121+
EXPECT_THAT(result, ElementsAre(2, 3, 1, 0));
122+
}
123+
}
124+
125+
} // namespace facebook::nimble

0 commit comments

Comments
 (0)