Skip to content

Commit 78b111e

Browse files
malinjawiMohammad Linjawi
authored andcommitted
Merge branch 'main' into spark-arithmetic-ansi-mode
2 parents 4a7513d + 44f99c3 commit 78b111e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2622
-666
lines changed

CMake/resolve_dependency_modules/cudf.cmake

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,48 +17,48 @@ include_guard(GLOBAL)
1717
# 3.30.4 is the minimum version required by cudf
1818
cmake_minimum_required(VERSION 3.30.4)
1919

20-
# rapids_cmake commit 5ec2245 from 2026-01-26
20+
# rapids_cmake commit 7ece71c from 2026-02-04
2121
set(VELOX_rapids_cmake_VERSION 26.04)
22-
set(VELOX_rapids_cmake_COMMIT 5ec22457e58953e0a68f0745ce7a11a896ba62b1)
22+
set(VELOX_rapids_cmake_COMMIT 7ece71c2f94fb0ed402d567b457ce54ecb859695)
2323
set(
2424
VELOX_rapids_cmake_BUILD_SHA256_CHECKSUM
25-
bf7d4ed5885f5fe012c42fb0977e1fe1416896479ffd34baa0cf762d3e83dc80
25+
02abaa8580c30a0b01eb142d5cd58b5acc85005bf58f5360f4a62efbd6e4635a
2626
)
2727
set(
2828
VELOX_rapids_cmake_SOURCE_URL
2929
"https://github.com/rapidsai/rapids-cmake/archive/${VELOX_rapids_cmake_COMMIT}.tar.gz"
3030
)
3131
velox_resolve_dependency_url(rapids_cmake)
3232

33-
# rmm commit e728b29 from 2026-01-26
33+
# rmm commit 3d6669c from 2026-02-09
3434
set(VELOX_rmm_VERSION 26.04)
35-
set(VELOX_rmm_COMMIT e728b2923f748d71aad30294b6926f43cb4c826e)
35+
set(VELOX_rmm_COMMIT 3d6669cd21e15080a0af2dc18f991060be2a4c3c)
3636
set(
3737
VELOX_rmm_BUILD_SHA256_CHECKSUM
38-
ec18d881b327514de154af67a33a1288eec7bcd86909f23c9bf2d90511b0cf2f
38+
1d2575d7a0fb492feaabc6917e7db33eb5c446a8e8eea301c54bd7e3d25fe66c
3939
)
4040
set(VELOX_rmm_SOURCE_URL "https://github.com/rapidsai/rmm/archive/${VELOX_rmm_COMMIT}.tar.gz")
4141
velox_resolve_dependency_url(rmm)
4242

43-
# kvikio commit 0f03349 from 2026-01-26
43+
# kvikio commit 593245b from 2026-02-05
4444
set(VELOX_kvikio_VERSION 26.04)
45-
set(VELOX_kvikio_COMMIT 0f03349bcaf029a2f582d9915a88d09e355ac691)
45+
set(VELOX_kvikio_COMMIT 593245b7799b6ea91eed77dd03ce4c9a4e158465)
4646
set(
4747
VELOX_kvikio_BUILD_SHA256_CHECKSUM
48-
728868c671e2686b5e9b7b4122d1661475f803c4fb98c0852d7be65c365d7b2d
48+
bcd03423b727fb0a23551a8ad3c6fcb58eaf0eb54ded7cdce914ea07a60ea1d7
4949
)
5050
set(
5151
VELOX_kvikio_SOURCE_URL
5252
"https://github.com/rapidsai/kvikio/archive/${VELOX_kvikio_COMMIT}.tar.gz"
5353
)
5454
velox_resolve_dependency_url(kvikio)
5555

56-
# cudf commit 68a0714 from 2026-01-27
56+
# cudf commit fc213fc from 2026-02-10
5757
set(VELOX_cudf_VERSION 26.04 CACHE STRING "cudf version")
58-
set(VELOX_cudf_COMMIT 68a0714a3701431041cb47bf1163706f597f9f48)
58+
set(VELOX_cudf_COMMIT fc213fc1ad889e2edf291b5555764ce677cb5dfa)
5959
set(
6060
VELOX_cudf_BUILD_SHA256_CHECKSUM
61-
0c723d7fd04eab60336dd4bcce41e225821d13b54cdabe485ec54517f3aa8b15
61+
f2c8eec90cb8571188e5bdd32222da208c353bbe1066c56bb3052a988feda1a2
6262
)
6363
set(VELOX_cudf_SOURCE_URL "https://github.com/rapidsai/cudf/archive/${VELOX_cudf_COMMIT}.tar.gz")
6464
velox_resolve_dependency_url(cudf)

velox/common/base/tests/SkewedPartitionBalancerTest.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class SkewedPartitionRebalancerTestHelper {
7676

7777
class SkewedPartitionRebalancerTest : public testing::Test {
7878
protected:
79+
static void SetUpTestSuite() {
80+
TestValue::enable();
81+
}
82+
7983
std::unique_ptr<SkewedPartitionRebalancer> createBalancer(
8084
uint32_t numPartitions = 128,
8185
uint32_t numTasks = 8,
@@ -356,7 +360,7 @@ DEBUG_ONLY_TEST_F(SkewedPartitionRebalancerTest, serializedRebalanceExecution) {
356360
// there. This ensures that when we call rebalance() from the main thread,
357361
// rebalancing_ is already true and our rebalance() call will return early.
358362
mainThreadRebalancerWait.await(
359-
[&] { return mainThreadRebalancerWaitFlag.load(); });
363+
[&] { return !mainThreadRebalancerWaitFlag.load(); });
360364

361365
balancer->rebalance();
362366

velox/common/caching/AsyncDataCache.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -935,11 +935,15 @@ void AsyncDataCache::possibleSsdSave(uint64_t bytes) {
935935
936936
ssdSaveable_ += bytes;
937937
if (memory::AllocationTraits::numPages(ssdSaveable_) >
938-
std::max<int32_t>(
939-
static_cast<int32_t>(
940-
memory::AllocationTraits::numPages(opts_.minSsdSavableBytes)),
941-
static_cast<int32_t>(
942-
static_cast<double>(cachedPages_) * opts_.ssdSavableRatio))) {
938+
std::max<int32_t>(
939+
static_cast<int32_t>(
940+
memory::AllocationTraits::numPages(opts_.minSsdSavableBytes)),
941+
static_cast<int32_t>(
942+
static_cast<double>(cachedPages_) * opts_.ssdSavableRatio)) ||
943+
(opts_.ssdFlushThresholdBytes > 0 &&
944+
memory::AllocationTraits::numPages(ssdSaveable_) >
945+
static_cast<int32_t>(memory::AllocationTraits::numPages(
946+
opts_.ssdFlushThresholdBytes)))) {
943947
// Do not start a new save if another one is in progress.
944948
if (!ssdCache_->startWrite()) {
945949
return;

velox/common/caching/AsyncDataCache.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -714,11 +714,13 @@ class AsyncDataCache : public memory::Cache {
714714
double _maxWriteRatio = 0.7,
715715
double _ssdSavableRatio = 0.125,
716716
int32_t _minSsdSavableBytes = 1 << 24,
717-
int32_t _numShards = kDefaultNumShards)
717+
int32_t _numShards = kDefaultNumShards,
718+
uint64_t _ssdFlushThresholdBytes = 0)
718719
: maxWriteRatio(_maxWriteRatio),
719720
ssdSavableRatio(_ssdSavableRatio),
720721
minSsdSavableBytes(_minSsdSavableBytes),
721-
numShards(_numShards) {}
722+
numShards(_numShards),
723+
ssdFlushThresholdBytes(_ssdFlushThresholdBytes) {}
722724

723725
/// The max ratio of the number of in-memory cache entries being written to
724726
/// SSD cache over the total number of cache entries. This is to control SSD
@@ -742,6 +744,11 @@ class AsyncDataCache : public memory::Cache {
742744
/// shards to decrease contention on the mutex for the key to entry mapping
743745
/// and other housekeeping. Must be a power of 2.
744746
int32_t numShards;
747+
748+
/// The maximum threshold in bytes for triggering SSD flush. When the
749+
/// accumulated SSD-savable bytes exceed this value, a flush to SSD is
750+
/// triggered. Set to 0 to disable this threshold (default).
751+
uint64_t ssdFlushThresholdBytes;
745752
};
746753

747754
AsyncDataCache(

velox/common/caching/tests/AsyncDataCacheTest.cpp

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1504,6 +1504,65 @@ TEST_P(AsyncDataCacheTest, ssdWriteOptions) {
15041504
}
15051505
}
15061506

1507+
TEST_P(AsyncDataCacheTest, ssdFlushThresholdBytes) {
1508+
constexpr uint64_t kRamBytes = 16UL << 20; // 16 MB
1509+
constexpr uint64_t kSsdBytes = 64UL << 20; // 64 MB
1510+
1511+
struct {
1512+
double maxWriteRatio;
1513+
double ssdSavableRatio;
1514+
int32_t minSsdSavableBytes;
1515+
uint64_t ssdFlushThresholdBytes;
1516+
bool expectedSaveToSsd;
1517+
1518+
std::string debugString() const {
1519+
return fmt::format(
1520+
"maxWriteRatio {}, ssdSavableRatio {}, minSsdSavableBytes {}, ssdFlushThresholdBytes {}, expectedSaveToSsd {}",
1521+
maxWriteRatio,
1522+
ssdSavableRatio,
1523+
minSsdSavableBytes,
1524+
ssdFlushThresholdBytes,
1525+
expectedSaveToSsd);
1526+
}
1527+
} testSettings[] = {
1528+
// Ratio-based threshold not met, ssdFlushThresholdBytes disabled (0).
1529+
// No flush expected.
1530+
{0.8, 0.95, 32 << 20, 0, false},
1531+
// Ratio-based threshold not met, but ssdFlushThresholdBytes is small
1532+
// (1MB).
1533+
// Flush expected due to absolute threshold.
1534+
{0.8, 0.95, 32 << 20, 1UL << 20, true},
1535+
// Ratio-based threshold met. ssdFlushThresholdBytes disabled.
1536+
// Flush expected due to ratio.
1537+
{0.8, 0.3, 4 << 20, 0, true},
1538+
// Both thresholds could trigger. Flush expected.
1539+
{0.8, 0.3, 4 << 20, 1UL << 20, true}};
1540+
1541+
for (const auto& testData : testSettings) {
1542+
SCOPED_TRACE(testData.debugString());
1543+
initializeCache(
1544+
kRamBytes,
1545+
kSsdBytes,
1546+
0,
1547+
true,
1548+
AsyncDataCache::Options(
1549+
testData.maxWriteRatio,
1550+
testData.ssdSavableRatio,
1551+
testData.minSsdSavableBytes,
1552+
AsyncDataCache::kDefaultNumShards,
1553+
testData.ssdFlushThresholdBytes));
1554+
// Load data half of the in-memory capacity.
1555+
loadLoop(0, kRamBytes / 2);
1556+
waitForPendingLoads();
1557+
auto stats = cache_->refreshStats();
1558+
if (testData.expectedSaveToSsd) {
1559+
EXPECT_GT(stats.ssdStats->entriesWritten, 0);
1560+
} else {
1561+
EXPECT_EQ(stats.ssdStats->entriesWritten, 0);
1562+
}
1563+
}
1564+
}
1565+
15071566
TEST_P(AsyncDataCacheTest, appendSsdSaveable) {
15081567
constexpr uint64_t kRamBytes = 64UL << 20; // 64 MB
15091568
constexpr uint64_t kSsdBytes = 128UL << 20; // 128 MB

velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ std::unique_ptr<velox::dwio::common::FileSink> abfsWriteFileSinkGenerator(
5252
fileSystem->openFileForWrite(fileURI),
5353
fileURI,
5454
options.metricLogger,
55-
options.stats);
55+
options.stats,
56+
options.fileSystemStats);
5657
}
5758
return nullptr;
5859
}

velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ std::unique_ptr<velox::dwio::common::FileSink> gcsWriteFileSinkGenerator(
103103
fileSystem->openFileForWrite(fileURI, {{}, options.pool, std::nullopt}),
104104
fileURI,
105105
options.metricLogger,
106-
options.stats);
106+
options.stats,
107+
options.fileSystemStats);
107108
}
108109
return nullptr;
109110
}

velox/connectors/hive/storage_adapters/hdfs/RegisterHdfsFileSystem.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ hdfsWriteFileSinkGenerator() {
8989
fileSystem->openFileForWrite(pathSuffix),
9090
fileURI,
9191
options.metricLogger,
92-
options.stats);
92+
options.stats,
93+
options.fileSystemStats);
9394
}
9495
return static_cast<std::unique_ptr<dwio::common::WriteFileSink>>(
9596
nullptr);

velox/connectors/hive/storage_adapters/s3fs/RegisterS3FileSystem.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ std::unique_ptr<velox::dwio::common::FileSink> s3WriteFileSinkGenerator(
102102
fileSystem->openFileForWrite(fileURI, {{}, options.pool, std::nullopt}),
103103
fileURI,
104104
options.metricLogger,
105-
options.stats);
105+
options.stats,
106+
options.fileSystemStats);
106107
}
107108
return nullptr;
108109
}

velox/core/Expressions.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
* limitations under the License.
1515
*/
1616
#include "velox/core/Expressions.h"
17+
18+
#include <folly/hash/Hash.h>
19+
1720
#include "velox/common/Casts.h"
1821
#include "velox/common/encode/Base64.h"
1922
#include "velox/vector/ComplexVector.h"
@@ -424,7 +427,8 @@ uint64_t hashImpl(const TypePtr& type, const Variant& value) {
424427
} // namespace
425428

426429
size_t ConstantTypedExpr::localHash() const {
427-
static const size_t kBaseHash = std::hash<const char*>()("ConstantTypedExpr");
430+
static const size_t kBaseHash =
431+
folly::hasher<std::string_view>()("ConstantTypedExpr");
428432

429433
uint64_t h;
430434

0 commit comments

Comments
 (0)