|
| 1 | +use std::mem; |
1 | 2 | use std::thread::available_parallelism;
|
2 | 3 |
|
3 | 4 | use async_executor::Executor;
|
@@ -139,6 +140,122 @@ fn running_benches(c: &mut Criterion) {
|
139 | 140 | *multithread,
|
140 | 141 | );
|
141 | 142 | });
|
| 143 | + |
| 144 | + group.bench_function("executor::channels", |b| { |
| 145 | + run( |
| 146 | + || { |
| 147 | + b.iter(move || { |
| 148 | + future::block_on(async { |
| 149 | + // Create channels. |
| 150 | + let mut tasks = Vec::new(); |
| 151 | + let (first_send, first_recv) = async_channel::bounded(1); |
| 152 | + let mut current_recv = first_recv; |
| 153 | + |
| 154 | + for _ in 0..TASKS { |
| 155 | + let (next_send, next_recv) = async_channel::bounded(1); |
| 156 | + let current_recv = mem::replace(&mut current_recv, next_recv); |
| 157 | + |
| 158 | + tasks.push(EX.spawn(async move { |
| 159 | + // Send a notification on to the next task. |
| 160 | + for _ in 0..STEPS { |
| 161 | + current_recv.recv().await.unwrap(); |
| 162 | + next_send.send(()).await.unwrap(); |
| 163 | + } |
| 164 | + })); |
| 165 | + } |
| 166 | + |
| 167 | + for _ in 0..STEPS { |
| 168 | + first_send.send(()).await.unwrap(); |
| 169 | + current_recv.recv().await.unwrap(); |
| 170 | + } |
| 171 | + |
| 172 | + for task in tasks { |
| 173 | + task.await; |
| 174 | + } |
| 175 | + }); |
| 176 | + }); |
| 177 | + }, |
| 178 | + *multithread, |
| 179 | + ) |
| 180 | + }); |
| 181 | + |
| 182 | + group.bench_function("executor::web_server", |b| { |
| 183 | + run( |
| 184 | + || { |
| 185 | + b.iter(move || { |
| 186 | + future::block_on(async { |
| 187 | + let (db_send, db_recv) = |
| 188 | + async_channel::bounded::<async_channel::Sender<_>>(TASKS / 5); |
| 189 | + let mut db_rng = fastrand::Rng::with_seed(0x12345678); |
| 190 | + let mut web_rng = db_rng.fork(); |
| 191 | + |
| 192 | + // This task simulates a database. |
| 193 | + let db_task = EX.spawn(async move { |
| 194 | + loop { |
| 195 | + // Wait for a new task. |
| 196 | + let incoming = match db_recv.recv().await { |
| 197 | + Ok(incoming) => incoming, |
| 198 | + Err(_) => break, |
| 199 | + }; |
| 200 | + |
| 201 | + // Process the task. Maybe it takes a while. |
| 202 | + for _ in 0..db_rng.usize(..10) { |
| 203 | + future::yield_now().await; |
| 204 | + } |
| 205 | + |
| 206 | + // Send the data back. |
| 207 | + incoming.send(db_rng.usize(..)).await.ok(); |
| 208 | + } |
| 209 | + }); |
| 210 | + |
| 211 | + // This task simulates a web server waiting for new tasks. |
| 212 | + let server_task = EX.spawn(async move { |
| 213 | + for i in 0..TASKS { |
| 214 | + // Get a new connection. |
| 215 | + if web_rng.usize(..=16) == 16 { |
| 216 | + future::yield_now().await; |
| 217 | + } |
| 218 | + |
| 219 | + let mut web_rng = web_rng.fork(); |
| 220 | + let db_send = db_send.clone(); |
| 221 | + let task = EX.spawn(async move { |
| 222 | + // Check if the data is cached... |
| 223 | + if web_rng.bool() { |
| 224 | + // ...it's in cache! |
| 225 | + future::yield_now().await; |
| 226 | + return; |
| 227 | + } |
| 228 | + |
| 229 | + // Otherwise we have to make a DB call or two. |
| 230 | + for _ in 0..web_rng.usize(STEPS / 2..STEPS) { |
| 231 | + let (resp_send, resp_recv) = async_channel::bounded(1); |
| 232 | + db_send.send(resp_send).await.unwrap(); |
| 233 | + criterion::black_box(resp_recv.recv().await.unwrap()); |
| 234 | + } |
| 235 | + |
| 236 | + // Send the data back... |
| 237 | + for _ in 0..web_rng.usize(3..16) { |
| 238 | + future::yield_now().await; |
| 239 | + } |
| 240 | + }); |
| 241 | + |
| 242 | + task.detach(); |
| 243 | + |
| 244 | + if i & 16 == 0 { |
| 245 | + future::yield_now().await; |
| 246 | + } |
| 247 | + } |
| 248 | + }); |
| 249 | + |
| 250 | + // Spawn and wait for it to stop. |
| 251 | + server_task.await; |
| 252 | + db_task.await; |
| 253 | + }); |
| 254 | + }) |
| 255 | + }, |
| 256 | + *multithread, |
| 257 | + ) |
| 258 | + }); |
142 | 259 | }
|
143 | 260 | }
|
144 | 261 |
|
|
0 commit comments