Skip to content

Timers fail to fire under heavy I/O load #250

@notgull

Description

@notgull

I was creating a simple load test to ensure that no events are missed under #249. However, it appears that stock async-io without any changes fails this test as well.

The test spawns 10 thousand timers, then spawns 20 or so I/O sources (I've been able to reproduce this with as few as 1 I/O source), then polls them in parallel on a single task.

// We should be able to handle a lot of timers and sources.
let mut timers = Vec::new();
for i in 1..10_000 {
    timers.push(Timer::after(Duration::from_millis(i)));
}

let (spawner, executor) = async_channel::unbounded();
let mut tasks = Vec::new();
// TODO(notgull): Issues occur when this number is increased. It shouldn't.
// Find out why!
for _ in 0..20 {
    let (runnable, task) = spawn(
        async move {
            let mut rng = fastrand::Rng::new();

            // Create a TCP pipe and send bytes to and from.
            let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 0))?;
            let stream1 =
                Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?;
            let stream2 = listener.accept().await?.0;

            let mut bytes = [0u8; 64];
            let mut read_buffer = [0u8; 64];

            loop {
                rng.fill(&mut bytes);

                Timer::after(Duration::from_micros(rng.u64(..1_000))).await;
                (&stream1).write_all(&bytes).await?;
                Timer::after(Duration::from_micros(rng.u64(..1_000))).await;
                (&stream2).read_exact(&mut read_buffer).await?;

                assert_eq!(bytes, read_buffer);
                futures_lite::future::yield_now().await;
            }

            #[allow(unreachable_code)]
            std::io::Result::Ok(())
        },
        {
            let spawner = spawner.clone();
            move |task| {
                spawner.try_send(task).ok();
            }
        },
    );
    runnable.schedule();
    tasks.push(task);
}

// Future to process timers.
let process_timers = async move {
    for timer in timers {
        timer.await;
    }
};

// Future to process sources.
let process_sources = async move {
    while let Ok(task) = executor.recv().await {
        task.run();
        futures_lite::future::yield_now().await;
    }
};

process_timers.or(process_sources).await;

My guess is that there is a race condition between the timer operation being pushed into the timer op queue and the Poller being notified.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions