diff --git a/build.sh b/build.sh index 41e4b4d..2cc74ba 100755 --- a/build.sh +++ b/build.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env sh +#!/bin/bash # Script to run format checks valkey-bloom module, build it and generate .so files, run unit and integration tests. diff --git a/requirements.txt b/requirements.txt index e6ccde8..f5c265d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ valkey -pytest==6 +pytest==7.4.3 diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 518fbed..070dde5 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -817,24 +817,33 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe None => { // if filter not exists, create it. let hex = value.to_vec(); - let validate_size_limit = !must_obey_client(ctx); + let is_aof_loading = must_obey_client(ctx); + let validate_size_limit = !is_aof_loading; let bloom = match BloomObject::decode_object(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { return Err(ValkeyError::Str(err.as_str())); } }; - let replicate_args = ReplicateArgs { - capacity: bloom.capacity(), - expansion: bloom.expansion(), - fp_rate: bloom.fp_rate(), - tightening_ratio: bloom.tightening_ratio(), - seed: bloom.seed(), - items: &input_args[idx..], + // Get replicate args before moving bloom + let replicate_args = if !is_aof_loading { + Some(ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }) + } else { + None }; match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(_) => { - replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); + // Only replicate if not loading from AOF/replication + if let Some(args) = replicate_args { + replicate_and_notify_events(ctx, filter_name, false, true, args); + } VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index c0972d8..c4afb4a 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -488,15 +488,15 @@ impl BloomObject { /// * `capacity` - The size of the initial filter in the bloom object. /// * `fp_rate` - the false positive rate for the bloom object /// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we - /// want to check the maximum size we could scale up till + /// want to check the maximum size we could scale up till /// * `tightening_ratio` - The tightening ratio of the object /// * `expansion` - The expanison rate of the object /// /// # Returns /// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to` /// * ValkeyError - Can return two different errors: - /// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit - /// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0 + /// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit + /// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0 pub fn calculate_max_scaled_capacity( capacity: i64, fp_rate: f64, diff --git a/tests/test_bloom_aofrewrite.py b/tests/test_bloom_aofrewrite.py index 49328ca..de03855 100644 --- a/tests/test_bloom_aofrewrite.py +++ b/tests/test_bloom_aofrewrite.py @@ -7,6 +7,10 @@ class TestBloomAofRewrite(ValkeyBloomTestCaseBase): def test_basic_aofrewrite_and_restore(self): client = self.server.get_new_client() + # Enable AOF before adding data + client.config_set('appendonly', 'yes') + # Wait for any initial AOF rewrite to complete + wait_for_equal(lambda: client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30) bf_add_result_1 = client.execute_command('BF.ADD testSave item') assert bf_add_result_1 == 1 bf_exists_result_1 = client.execute_command('BF.EXISTS testSave item') @@ -25,6 +29,8 @@ def test_basic_aofrewrite_and_restore(self): self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # Keep the server running for 1 second more to have a larger uptime. time.sleep(1) + # Add appendonly to server args so it loads AOF on restart + self.server.args['appendonly'] = 'yes' self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() restored_server_digest = client.execute_command('DEBUG', 'DIGEST') @@ -42,6 +48,10 @@ def test_basic_aofrewrite_and_restore(self): client.execute_command('DEL testSave') def test_aofrewrite_bloomfilter_metrics(self): + # Enable AOF before adding data + self.client.config_set('appendonly', 'yes') + # Wait for any initial AOF rewrite to complete + wait_for_equal(lambda: self.client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30) # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') info_obj = self.client.execute_command('BF.INFO key1') @@ -56,6 +66,8 @@ def test_aofrewrite_bloomfilter_metrics(self): self.client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # restart server + # Add appendonly to server args so it loads AOF on restart + self.server.args['appendonly'] = 'yes' self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() restored_server_digest = self.client.execute_command('DEBUG', 'DIGEST')