Skip to content

A question about and_upsert_with in future features  #542

@dynamic-pigeon

Description

@dynamic-pigeon

According to the documentation, the and_upsert_with method should guarantee:

This method guarantees that concurrent calls on the same key are executed serially. That is, and_upsert_with calls on the same key never run concurrently. The calls are serialized by the order of their invocation. It uses a key-level lock to achieve this.

However, when testing with concurrent operations on the same key, the method doesn't appear to be thread-safe as documented.

Expected behavior:

The final value should be 10 since we have 10 concurrent tasks incrementing the counter from 0.

Actual behavior:

The final value varies between runs:

  • Sometimes it's 9
  • Sometimes it's 8
  • Sometimes it's 10
  • Other values between 1-10

This inconsistency suggests that and_upsert_with is not properly serializing concurrent calls on the same key.

Reproduction code:

// [dependencies]
// tokio = { version = "1", features = ["full"] }
// moka = { version = "0.12.11", features = ["future"] }
// futures-util = "0.3.17"
use moka::future::Cache;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let cache: Arc<Cache<String, i32>> = Arc::new(Cache::new(100));
    let barrier = Arc::new(tokio::sync::Barrier::new(10));

    let vec = (0..10)
        .map(|_i| {
            let cache = cache.clone();
            let barrier = barrier.clone();
            tokio::spawn(async move {
                barrier.wait().await;
                let _res = cache
                    .entry("hello".to_string())
                    .and_upsert_with(|maybe_entry| {
                        let cnt = if let Some(entry) = maybe_entry {
                            entry.into_value()
                        } else {
                            0
                        };
                        std::future::ready(cnt + 1)
                    })
                    .await;
            })
        })
        .collect::<Vec<_>>();

    futures_util::future::join_all(vec).await;

    println!(
        "Final value: {}",
        cache.get(&"hello".to_string()).await.unwrap()
    );

    Ok(())
}

Environment:

  • moka version: 0.12.11
  • tokio version: 1.47.1
  • Rust version: 1.90 2024 edition

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions