Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env sh
#!/bin/bash

# Script to run format checks valkey-bloom module, build it and generate .so files, run unit and integration tests.

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
valkey
pytest==6
pytest==7.4.3
27 changes: 18 additions & 9 deletions src/bloom/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,24 +817,33 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe
None => {
// if filter not exists, create it.
let hex = value.to_vec();
let validate_size_limit = !must_obey_client(ctx);
let is_aof_loading = must_obey_client(ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this intermediary step here of having the variable of is_aof_loading could we not use validate_size_limit on line 829?

let validate_size_limit = !is_aof_loading;
let bloom = match BloomObject::decode_object(&hex, validate_size_limit) {
Ok(v) => v,
Err(err) => {
return Err(ValkeyError::Str(err.as_str()));
}
};
let replicate_args = ReplicateArgs {
capacity: bloom.capacity(),
expansion: bloom.expansion(),
fp_rate: bloom.fp_rate(),
tightening_ratio: bloom.tightening_ratio(),
seed: bloom.seed(),
items: &input_args[idx..],
// Get replicate args before moving bloom
let replicate_args = if !is_aof_loading {
Some(ReplicateArgs {
capacity: bloom.capacity(),
expansion: bloom.expansion(),
fp_rate: bloom.fp_rate(),
tightening_ratio: bloom.tightening_ratio(),
seed: bloom.seed(),
items: &input_args[idx..],
})
} else {
None
};
match filter_key.set_value(&BLOOM_TYPE, bloom) {
Ok(_) => {
replicate_and_notify_events(ctx, filter_name, false, true, replicate_args);
// Only replicate if not loading from AOF/replication
if let Some(args) = replicate_args {
replicate_and_notify_events(ctx, filter_name, false, true, args);
}
VALKEY_OK
}
Err(_) => Err(ValkeyError::Str(utils::ERROR)),
Expand Down
6 changes: 3 additions & 3 deletions src/bloom/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,15 +488,15 @@ impl BloomObject {
/// * `capacity` - The size of the initial filter in the bloom object.
/// * `fp_rate` - the false positive rate for the bloom object
/// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we
/// want to check the maximum size we could scale up till
/// want to check the maximum size we could scale up till
/// * `tightening_ratio` - The tightening ratio of the object
/// * `expansion` - The expanison rate of the object
///
/// # Returns
/// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to`
/// * ValkeyError - Can return two different errors:
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0
/// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit
/// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0
pub fn calculate_max_scaled_capacity(
capacity: i64,
fp_rate: f64,
Expand Down
12 changes: 12 additions & 0 deletions tests/test_bloom_aofrewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ class TestBloomAofRewrite(ValkeyBloomTestCaseBase):

def test_basic_aofrewrite_and_restore(self):
client = self.server.get_new_client()
# Enable AOF before adding data
client.config_set('appendonly', 'yes')
# Wait for any initial AOF rewrite to complete
wait_for_equal(lambda: client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30)
bf_add_result_1 = client.execute_command('BF.ADD testSave item')
assert bf_add_result_1 == 1
bf_exists_result_1 = client.execute_command('BF.EXISTS testSave item')
Expand All @@ -25,6 +29,8 @@ def test_basic_aofrewrite_and_restore(self):
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# Keep the server running for 1 second more to have a larger uptime.
time.sleep(1)
# Add appendonly to server args so it loads AOF on restart
self.server.args['appendonly'] = 'yes'
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
restored_server_digest = client.execute_command('DEBUG', 'DIGEST')
Expand All @@ -42,6 +48,10 @@ def test_basic_aofrewrite_and_restore(self):
client.execute_command('DEL testSave')

def test_aofrewrite_bloomfilter_metrics(self):
# Enable AOF before adding data
self.client.config_set('appendonly', 'yes')
# Wait for any initial AOF rewrite to complete
wait_for_equal(lambda: self.client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30)
# Create scaled bloom filter and add 7500 items to trigger a scale out.
self.client.execute_command('BF.RESERVE key1 0.001 7000')
info_obj = self.client.execute_command('BF.INFO key1')
Expand All @@ -56,6 +66,8 @@ def test_aofrewrite_bloomfilter_metrics(self):
self.client.bgrewriteaof()
self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE)
# restart server
# Add appendonly to server args so it loads AOF on restart
self.server.args['appendonly'] = 'yes'
self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True)
assert self.server.is_alive()
restored_server_digest = self.client.execute_command('DEBUG', 'DIGEST')
Expand Down