From 56a5663ebf2b6ce913d9cd4117c5531b1ccd13e8 Mon Sep 17 00:00:00 2001 From: Arpit Bhayani Date: Mon, 13 Oct 2025 20:11:19 +0000 Subject: [PATCH] Fix AOF rewrite and restore for BF tests The AOF rewrite tests were failing because bloom filter data was not being properly restored after server restart. The root cause was twofold: 1. BF.LOAD command was replicating during AOF loading, causing duplicate entries 2. Tests were not properly configuring AOF persistence for server restart Changes: - Modified BF.LOAD handler to skip replication when loading from AOF by checking must_obey_client() context - Updated AOF tests to enable appendonly before adding data - Added wait for initial AOF rewrite to prevent concurrent rewrites - Set appendonly in server startup args for proper AOF loading on restart Also includes minor fixes: - Changed build.sh shebang to bash for better compatibility - Updated pytest requirement to 7.4.3 - Fixed indentation in utils.rs documentation Fixes #74 --- build.sh | 2 +- requirements.txt | 2 +- src/bloom/command_handler.rs | 27 ++++++++++++++++++--------- src/bloom/utils.rs | 6 +++--- tests/test_bloom_aofrewrite.py | 12 ++++++++++++ 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/build.sh b/build.sh index 41e4b4d..2cc74ba 100755 --- a/build.sh +++ b/build.sh @@ -1,4 +1,4 @@ -#!/usr/bin/env sh +#!/bin/bash # Script to run format checks valkey-bloom module, build it and generate .so files, run unit and integration tests. diff --git a/requirements.txt b/requirements.txt index e6ccde8..f5c265d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ valkey -pytest==6 +pytest==7.4.3 diff --git a/src/bloom/command_handler.rs b/src/bloom/command_handler.rs index 518fbed..070dde5 100644 --- a/src/bloom/command_handler.rs +++ b/src/bloom/command_handler.rs @@ -817,24 +817,33 @@ pub fn bloom_filter_load(ctx: &Context, input_args: &[ValkeyString]) -> ValkeyRe None => { // if filter not exists, create it. let hex = value.to_vec(); - let validate_size_limit = !must_obey_client(ctx); + let is_aof_loading = must_obey_client(ctx); + let validate_size_limit = !is_aof_loading; let bloom = match BloomObject::decode_object(&hex, validate_size_limit) { Ok(v) => v, Err(err) => { return Err(ValkeyError::Str(err.as_str())); } }; - let replicate_args = ReplicateArgs { - capacity: bloom.capacity(), - expansion: bloom.expansion(), - fp_rate: bloom.fp_rate(), - tightening_ratio: bloom.tightening_ratio(), - seed: bloom.seed(), - items: &input_args[idx..], + // Get replicate args before moving bloom + let replicate_args = if !is_aof_loading { + Some(ReplicateArgs { + capacity: bloom.capacity(), + expansion: bloom.expansion(), + fp_rate: bloom.fp_rate(), + tightening_ratio: bloom.tightening_ratio(), + seed: bloom.seed(), + items: &input_args[idx..], + }) + } else { + None }; match filter_key.set_value(&BLOOM_TYPE, bloom) { Ok(_) => { - replicate_and_notify_events(ctx, filter_name, false, true, replicate_args); + // Only replicate if not loading from AOF/replication + if let Some(args) = replicate_args { + replicate_and_notify_events(ctx, filter_name, false, true, args); + } VALKEY_OK } Err(_) => Err(ValkeyError::Str(utils::ERROR)), diff --git a/src/bloom/utils.rs b/src/bloom/utils.rs index c0972d8..c4afb4a 100644 --- a/src/bloom/utils.rs +++ b/src/bloom/utils.rs @@ -488,15 +488,15 @@ impl BloomObject { /// * `capacity` - The size of the initial filter in the bloom object. /// * `fp_rate` - the false positive rate for the bloom object /// * `validate_scale_to` - the capacity we check to see if it can scale to. If this method is called from BF.INFO this is set as -1 as we - /// want to check the maximum size we could scale up till + /// want to check the maximum size we could scale up till /// * `tightening_ratio` - The tightening ratio of the object /// * `expansion` - The expanison rate of the object /// /// # Returns /// * i64 - The maximum capacity that can be reached if called from BF.INFO. If called from BF.INSERT the size it reached when it became greater than `validate_scale_to` /// * ValkeyError - Can return two different errors: - /// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit - /// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0 + /// VALIDATE_SCALE_TO_EXCEEDS_MAX_SIZE: When scaling to the wanted capacity would go over the bloom object memory limit + /// VALIDATE_SCALE_TO_FALSE_POSITIVE_INVALID: When scaling to the wanted capacity would cause the false positive rate to reach 0 pub fn calculate_max_scaled_capacity( capacity: i64, fp_rate: f64, diff --git a/tests/test_bloom_aofrewrite.py b/tests/test_bloom_aofrewrite.py index 49328ca..de03855 100644 --- a/tests/test_bloom_aofrewrite.py +++ b/tests/test_bloom_aofrewrite.py @@ -7,6 +7,10 @@ class TestBloomAofRewrite(ValkeyBloomTestCaseBase): def test_basic_aofrewrite_and_restore(self): client = self.server.get_new_client() + # Enable AOF before adding data + client.config_set('appendonly', 'yes') + # Wait for any initial AOF rewrite to complete + wait_for_equal(lambda: client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30) bf_add_result_1 = client.execute_command('BF.ADD testSave item') assert bf_add_result_1 == 1 bf_exists_result_1 = client.execute_command('BF.EXISTS testSave item') @@ -25,6 +29,8 @@ def test_basic_aofrewrite_and_restore(self): self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # Keep the server running for 1 second more to have a larger uptime. time.sleep(1) + # Add appendonly to server args so it loads AOF on restart + self.server.args['appendonly'] = 'yes' self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() restored_server_digest = client.execute_command('DEBUG', 'DIGEST') @@ -42,6 +48,10 @@ def test_basic_aofrewrite_and_restore(self): client.execute_command('DEL testSave') def test_aofrewrite_bloomfilter_metrics(self): + # Enable AOF before adding data + self.client.config_set('appendonly', 'yes') + # Wait for any initial AOF rewrite to complete + wait_for_equal(lambda: self.client.info('persistence')['aof_rewrite_in_progress'], 0, timeout=30) # Create scaled bloom filter and add 7500 items to trigger a scale out. self.client.execute_command('BF.RESERVE key1 0.001 7000') info_obj = self.client.execute_command('BF.INFO key1') @@ -56,6 +66,8 @@ def test_aofrewrite_bloomfilter_metrics(self): self.client.bgrewriteaof() self.server.wait_for_action_done(ValkeyAction.AOF_REWRITE) # restart server + # Add appendonly to server args so it loads AOF on restart + self.server.args['appendonly'] = 'yes' self.server.restart(remove_rdb=False, remove_nodes_conf=False, connect_client=True) assert self.server.is_alive() restored_server_digest = self.client.execute_command('DEBUG', 'DIGEST')