diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index dd716ce5..b81c4a89 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -34,6 +34,7 @@ #include "dwio/nimble/velox/SchemaTypes.h" #include "dwio/nimble/velox/StatsGenerated.h" #include "dwio/nimble/velox/StreamChunker.h" +#include "dwio/nimble/velox/VeloxWriterUtils.h" #include "velox/common/time/CpuWallTimer.h" #include "velox/dwio/common/ExecutorBarrier.h" #include "velox/type/Type.h" @@ -1144,16 +1145,7 @@ bool VeloxWriter::evalauateFlushPolicy() { if (continueChunking) { // Relieve memory pressure by chunking small streams. // Sort streams for chunking based on raw memory usage. - // TODO(T240072104): Improve performance by bucketing the streams - // by size (by most significant bit) instead of sorting them. - streamIndices.resize(streams.size()); - std::iota(streamIndices.begin(), streamIndices.end(), 0); - std::sort( - streamIndices.begin(), - streamIndices.end(), - [&](const uint32_t& a, const uint32_t& b) { - return streams[a]->memoryUsed() > streams[b]->memoryUsed(); - }); + streamIndices = getStreamIndicesByMemoryUsage(streams); batchChunkStreams(streamIndices, /*ensureFullChunks=*/false); } } diff --git a/dwio/nimble/velox/VeloxWriterUtils.cpp b/dwio/nimble/velox/VeloxWriterUtils.cpp new file mode 100644 index 00000000..76c534a1 --- /dev/null +++ b/dwio/nimble/velox/VeloxWriterUtils.cpp @@ -0,0 +1,42 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dwio/nimble/velox/VeloxWriterUtils.h" +#include "dwio/nimble/common/Bits.h" + +namespace facebook::nimble { +std::vector getStreamIndicesByMemoryUsage( + const std::vector>& streams) { + constexpr size_t kMaxBits = sizeof(uint64_t) * 8; + std::vector> sortedStreams(kMaxBits); + for (auto streamIndex = 0; streamIndex < streams.size(); ++streamIndex) { + const auto bitsRequired = + bits::bitsRequired(streams[streamIndex]->memoryUsed()); + NIMBLE_DCHECK_LT(bitsRequired, kMaxBits); + sortedStreams[bitsRequired].push_back(streamIndex); + } + + std::vector streamIndices; + streamIndices.reserve(streams.size()); + auto bucketIndex = static_cast(sortedStreams.size()) - 1; + while (bucketIndex >= 0) { + for (const auto streamIndex : sortedStreams[bucketIndex]) { + streamIndices.push_back(streamIndex); + } + bucketIndex--; + } + return streamIndices; +} +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxWriterUtils.h b/dwio/nimble/velox/VeloxWriterUtils.h new file mode 100644 index 00000000..a3f216c9 --- /dev/null +++ b/dwio/nimble/velox/VeloxWriterUtils.h @@ -0,0 +1,25 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include "dwio/nimble/velox/StreamData.h" + +namespace facebook::nimble { +/// Sorts stream indices by their memory usage in descending order using a +/// bucketing approach based on the most significant bit position. +std::vector getStreamIndicesByMemoryUsage( + const std::vector>& streams); +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/tests/VeloxWriterUtilsTest.cpp b/dwio/nimble/velox/tests/VeloxWriterUtilsTest.cpp new file mode 100644 index 00000000..46259903 --- /dev/null +++ b/dwio/nimble/velox/tests/VeloxWriterUtilsTest.cpp @@ -0,0 +1,125 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include + +#include "dwio/nimble/velox/SchemaBuilder.h" +#include "dwio/nimble/velox/StreamData.h" +#include "dwio/nimble/velox/VeloxWriterUtils.h" +#include "velox/common/memory/Memory.h" +#include "velox/common/memory/MemoryArbitrator.h" +#include "velox/common/memory/SharedArbitrator.h" + +using namespace ::testing; + +namespace facebook::nimble { + +class VeloxWriterUtilsTest : public ::testing::Test { + protected: + static void SetUpTestCase() { + velox::memory::SharedArbitrator::registerFactory(); + velox::memory::MemoryManager::Options options; + options.arbitratorKind = "SHARED"; + velox::memory::MemoryManager::testingSetInstance(options); + } + + void SetUp() override { + rootPool_ = velox::memory::memoryManager()->addRootPool("default_root"); + leafPool_ = rootPool_->addLeafChild("default_leaf"); + schemaBuilder_ = std::make_unique(); + inputBufferGrowthPolicy_ = + DefaultInputBufferGrowthPolicy::withDefaultRanges(); + } + + std::shared_ptr rootPool_; + std::shared_ptr leafPool_; + std::unique_ptr schemaBuilder_; + std::unique_ptr inputBufferGrowthPolicy_; +}; + +TEST_F(VeloxWriterUtilsTest, getStreamIndicesByMemoryUsageTest) { + // Test 1: Empty streams vector + { + std::vector> streams; + auto result = getStreamIndicesByMemoryUsage(streams); + EXPECT_TRUE(result.empty()); + } + + // Test 2: Single stream + { + std::vector> streams; + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + streams.push_back( + std::make_unique>( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_)); + + auto result = getStreamIndicesByMemoryUsage(streams); + EXPECT_THAT(result, ElementsAre(0)); + } + + // Test 3: Multiple streams with different memory usage + // Should be sorted in descending order by memory usage + { + std::vector> streams; + auto scalarTypeBuilder = + schemaBuilder_->createScalarTypeBuilder(ScalarKind::Int32); + auto descriptor = &scalarTypeBuilder->scalarDescriptor(); + + // Stream 0: Small memory usage (empty) + streams.push_back( + std::make_unique>( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_)); + + // Stream 1: Medium memory usage + auto stream1 = std::make_unique>( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector data1 = {1, 2, 3, 4, 5}; + auto& mutableData1 = stream1->mutableData(); + for (const auto& item : data1) { + mutableData1.push_back(item); + } + streams.push_back(std::move(stream1)); + + // Stream 2: Large memory usage + auto stream2 = std::make_unique>( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector data2(1000, 42); + auto& mutableData2 = stream2->mutableData(); + for (const auto& item : data2) { + mutableData2.push_back(item); + } + streams.push_back(std::move(stream2)); + + // Stream 3: Large memory usage. Should map to same bucket as stream 2 + auto stream3 = std::make_unique>( + *leafPool_, *descriptor, *inputBufferGrowthPolicy_); + std::vector data3(900, 42); + auto& mutableData3 = stream3->mutableData(); + for (const auto& item : data3) { + mutableData3.push_back(item); + } + streams.push_back(std::move(stream3)); + + auto result = getStreamIndicesByMemoryUsage(streams); + + // Stream 2 should come first (largest), then stream 1, then stream 0 + EXPECT_THAT(result, ElementsAre(2, 3, 1, 0)); + } +} + +} // namespace facebook::nimble