Skip to content

Commit db178da

Browse files
authored
fix: Implement chunking for large SBF filters (#5944)
* fix: Implement chunking for large SBF filters This commit adds chunking functionalities for load/save operations of bloom filters. Additional information is added in the serialization of each filter. Specifically, when saving each filter the total size of the filter is written followed by chunks of the filter (max size of 64 MB per chunk). Signed-off-by: Eric <[email protected]> * Implement unit test for SBF chunking logic * Add flags for setting save format for SBF filters Added a new flag `rdb_sbf_chunked` which determines the save format of SBFs. Also, separate functions for saving SBFs were added. Signed-off-by: Eric <[email protected]> * fix: Fix issues in loading and saving logic of new SBF chunking format There was an issue in the load logic of SBFs that resulted in load logic being determined by flag in the generic SBF loading function. This should have been the chunked parameter which has since been fixed to now using the function parameter. Ensure that the correct format is being written on save side as well. There was a bug that resulted in the RDB_TYPE_SBF to always been saved even if data with the new SBF chunking format was used. This has been updated accordingly to save as a RDB_TYPE_SBF2 when using the new chunking format. Signed-off-by: Eric <[email protected]> * Remove redundant flag for chunking on load side Signed-off-by: Eric <[email protected]> --------- Signed-off-by: Eric <[email protected]>
1 parent 26275e4 commit db178da

File tree

6 files changed

+83
-7
lines changed

6 files changed

+83
-7
lines changed

src/server/rdb_extensions.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@ constexpr uint8_t RDB_TYPE_JSON = 30;
1313
constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31;
1414
constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32;
1515
constexpr uint8_t RDB_TYPE_SBF = 33;
16+
constexpr uint8_t RDB_TYPE_SBF2 = 34;
1617

1718
constexpr bool rdbIsObjectTypeDF(uint8_t type) {
1819
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON) ||
1920
(type == RDB_TYPE_HASH_WITH_EXPIRY) || (type == RDB_TYPE_SET_WITH_EXPIRY) ||
20-
(type == RDB_TYPE_SBF);
21+
(type == RDB_TYPE_SBF) || (type == RDB_TYPE_SBF2);
2122
}
2223

2324
// Opcodes: Range 200-240 is used by DF extensions.

src/server/rdb_load.cc

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ string ModuleTypeName(uint64_t module_id) {
188188
bool RdbTypeAllowedEmpty(int type) {
189189
return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF ||
190190
type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY ||
191-
type == RDB_TYPE_HASH_WITH_EXPIRY;
191+
type == RDB_TYPE_HASH_WITH_EXPIRY || type == RDB_TYPE_SBF2;
192192
}
193193

194194
DbSlice& GetCurrentDbSlice() {
@@ -1316,6 +1316,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
13161316
case RDB_TYPE_SBF:
13171317
iores = ReadSBF();
13181318
break;
1319+
case RDB_TYPE_SBF2:
1320+
iores = ReadSBF2();
1321+
break;
13191322
default:
13201323
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
13211324

@@ -1851,7 +1854,7 @@ auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
18511854
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
18521855
}
18531856

1854-
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
1857+
auto RdbLoaderBase::ReadSBFImpl(bool chunking) -> io::Result<OpaqueObj> {
18551858
RdbSBF res;
18561859
uint64_t options;
18571860
SET_OR_UNEXPECT(LoadLen(nullptr), options);
@@ -1874,7 +1877,27 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
18741877
unsigned hash_cnt;
18751878
string filter_data;
18761879
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
1877-
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
1880+
1881+
if (chunking) {
1882+
unsigned total_size = 0;
1883+
SET_OR_UNEXPECT(LoadLen(nullptr), total_size);
1884+
1885+
filter_data.resize(total_size);
1886+
size_t offset = 0;
1887+
while (offset < total_size) {
1888+
unsigned chunk_size = 0;
1889+
SET_OR_UNEXPECT(LoadLen(nullptr), chunk_size);
1890+
error_code ec = FetchBuf(chunk_size, filter_data.data() + offset);
1891+
if (ec) {
1892+
return make_unexpected(ec);
1893+
}
1894+
1895+
offset += chunk_size;
1896+
}
1897+
} else {
1898+
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
1899+
}
1900+
18781901
size_t bit_len = filter_data.size() * 8;
18791902
if (!is_power2(bit_len)) { // must be power of two
18801903
return Unexpected(errc::rdb_file_corrupted);
@@ -1884,6 +1907,14 @@ auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
18841907
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
18851908
}
18861909

1910+
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
1911+
return ReadSBFImpl(false);
1912+
}
1913+
1914+
auto RdbLoaderBase::ReadSBF2() -> io::Result<OpaqueObj> {
1915+
return ReadSBFImpl(true);
1916+
}
1917+
18871918
template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
18881919
auto ec = EnsureRead(sizeof(T));
18891920
if (ec)

src/server/rdb_load.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ class RdbLoaderBase {
169169
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
170170
::io::Result<OpaqueObj> ReadStreams(int rdbtype);
171171
::io::Result<OpaqueObj> ReadRedisJson();
172+
::io::Result<OpaqueObj> ReadSBFImpl(bool chunking);
172173
::io::Result<OpaqueObj> ReadSBF();
174+
::io::Result<OpaqueObj> ReadSBF2();
173175

174176
std::error_code SkipModuleData();
175177
std::error_code HandleCompressedBlob(int op_type);

src/server/rdb_save.cc

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
4949
ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true,
5050
"Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+");
5151

52+
// Flip this value to 'true' in March 2026.
53+
ABSL_FLAG(bool, rdb_sbf_chunked, false, "Enable new save format for saving SBFs in chunks.");
54+
5255
namespace dfly {
5356

5457
using namespace std;
@@ -198,7 +201,7 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
198201
case OBJ_JSON:
199202
return RDB_TYPE_JSON;
200203
case OBJ_SBF:
201-
return RDB_TYPE_SBF;
204+
return absl::GetFlag(FLAGS_rdb_sbf_chunked) ? RDB_TYPE_SBF2 : RDB_TYPE_SBF;
202205
}
203206
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
204207
return 0; /* avoid warning */
@@ -623,11 +626,20 @@ std::error_code RdbSerializer::SaveSBFObject(const PrimeValue& pv) {
623626
RETURN_ON_ERR(SaveLen(sbf->hashfunc_cnt(i)));
624627

625628
string_view blob = sbf->data(i);
626-
RETURN_ON_ERR(SaveString(blob));
629+
if (absl::GetFlag(FLAGS_rdb_sbf_chunked)) {
630+
RETURN_ON_ERR(SaveLen(blob.size()));
631+
632+
for (size_t offset = 0; offset < blob.size(); offset += kFilterChunkSize) {
633+
size_t chunk_len = std::min(kFilterChunkSize, blob.size() - offset);
634+
RETURN_ON_ERR(SaveString(blob.substr(offset, chunk_len)));
635+
}
636+
} else {
637+
RETURN_ON_ERR(SaveString(blob));
638+
}
639+
627640
FlushState flush_state = FlushState::kFlushMidEntry;
628641
if ((i + 1) == sbf->num_filters())
629642
flush_state = FlushState::kFlushEndEntry;
630-
631643
FlushIfNeeded(flush_state);
632644
}
633645

src/server/rdb_save.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ class SerializerBase {
210210
io::IoBuf mem_buf_;
211211
std::unique_ptr<detail::CompressorImpl> compressor_impl_;
212212

213+
static constexpr size_t kFilterChunkSize = 1ULL << 26;
213214
static constexpr size_t kMinStrSizeToCompress = 256;
214215
static constexpr size_t kMaxStrSizeToCompress = 1 * 1024 * 1024;
215216
static constexpr double kMinCompressionReductionPrecentage = 0.95;

src/server/rdb_test.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
3535
ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode);
3636
ABSL_DECLARE_FLAG(bool, rdb_ignore_expiry);
3737
ABSL_DECLARE_FLAG(uint32_t, num_shards);
38+
ABSL_DECLARE_FLAG(bool, rdb_sbf_chunked);
3839

3940
namespace dfly {
4041

@@ -670,6 +671,34 @@ TEST_F(RdbTest, SBF) {
670671
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
671672
}
672673

674+
TEST_F(RdbTest, SBFLargeFilterChunking) {
675+
absl::SetFlag(&FLAGS_rdb_sbf_chunked, true);
676+
max_memory_limit = 200000000;
677+
678+
// Using this set of parameters for the BF.RESERVE command resulted in a
679+
// filter size large enough to require chunking (> 64 MB).
680+
const double error_rate = 0.001;
681+
const size_t capacity = 50'000'000;
682+
const size_t num_items = 100;
683+
684+
size_t collisions = 0;
685+
686+
Run({"BF.RESERVE", "large_key", std::to_string(error_rate), std::to_string(capacity)});
687+
for (size_t i = 0; i < num_items; i++) {
688+
auto res = Run({"BF.ADD", "large_key", absl::StrCat("item", i)});
689+
if (*res.GetInt() == 0)
690+
collisions++;
691+
}
692+
EXPECT_LT(static_cast<double>(collisions) / num_items, error_rate);
693+
694+
Run({"debug", "reload"});
695+
EXPECT_EQ(Run({"type", "large_key"}), "MBbloom--");
696+
697+
for (size_t i = 0; i < num_items; i++) {
698+
EXPECT_THAT(Run({"BF.EXISTS", "large_key", absl::StrCat("item", i)}), IntArg(1));
699+
}
700+
}
701+
673702
TEST_F(RdbTest, RestoreSearchIndexNameStartingWithColon) {
674703
// Create an index with a name that starts with ':' and add a sample document
675704
EXPECT_EQ(Run({"FT.CREATE", ":Order:index", "ON", "HASH", "PREFIX", "1", ":Order:", "SCHEMA",

0 commit comments

Comments
 (0)