Skip to content

Commit 873be33

Browse files
authored
Merge pull request #14 from asder8215/notify_pin_code
Notify pin code
2 parents ecc4620 + 41fca91 commit 873be33

File tree

5 files changed

+231
-228
lines changed

5 files changed

+231
-228
lines changed

benches/kanal_async.rs

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,20 @@ async fn kanal_async(c: usize, task_count: usize) {
9494
handles.push(task::spawn(async move {
9595
for _ in 0..task_count {
9696
for _ in 0..1 {
97-
let x: Message = rx.recv().await.unwrap();
98-
test_func(x.item_one as u128);
97+
let x = rx.recv().await.unwrap();
98+
// test_func(x as u128);
9999
}
100100
}
101101
}));
102102
}
103103

104104
for _ in 0..task_count {
105105
let tx = s.clone();
106-
let msg = Message::default();
106+
// let msg = Message::default();
107107
handles.push(task::spawn(async move {
108108
for i in 0..1 {
109-
tx.send(msg).await.unwrap();
109+
// tx.send(msg).await.unwrap();
110+
tx.send(i).await.unwrap();
110111
}
111112
}));
112113
}
@@ -125,10 +126,11 @@ async fn kanal_async_with_msg_vec(
125126
let (s, r) = bounded_async(c);
126127
let mut handles = Vec::new();
127128

128-
for _ in 0..1 {
129+
// for _ in 0..1 {
130+
for _ in 0..8 {
129131
let rx = r.clone();
130132
handles.push(task::spawn(async move {
131-
for _ in 0..task_count {
133+
for _ in 0..task_count/8 {
132134
for _ in 0..msg_count {
133135
let x = rx.recv().await.unwrap();
134136
// test_func(x.item_one as u128);
@@ -156,20 +158,48 @@ async fn kanal_async_with_msg_vec(
156158
fn benchmark_kanal_async(c: &mut Criterion) {
157159
const MAX_THREADS: [usize; 1] = [8];
158160
const CAPACITY: usize = 128;
159-
const TASKS: [usize; 1] = [100000];
160-
const MSG_COUNT: usize = 1;
161-
let msg = BigData {
162-
buf: Box::new([0; 8]),
163-
};
164-
let mut msg_vecs = Vec::with_capacity(TASKS[0]);
165-
for _ in 0..TASKS[0] {
166-
msg_vecs.push(Vec::with_capacity(MSG_COUNT));
167-
let msg_vecs_len = msg_vecs.len();
168-
for _ in 0..MSG_COUNT {
169-
msg_vecs[msg_vecs_len - 1].push(msg.clone());
161+
const TASKS: [usize; 1] = [100];
162+
163+
for thread_num in MAX_THREADS {
164+
let runtime = tokio::runtime::Builder::new_multi_thread()
165+
.enable_all()
166+
.worker_threads(thread_num)
167+
.build()
168+
.unwrap();
169+
170+
for task_count in TASKS {
171+
let func_name = format!(
172+
"Kanal Async: {} threads, {} enq tasks enqueuing 1 million items, 1 looping deq task",
173+
thread_num, task_count
174+
);
175+
176+
c.bench_with_input(
177+
BenchmarkId::new(func_name, CAPACITY),
178+
&(CAPACITY),
179+
|b, &cap| {
180+
// Insert a call to `to_async` to convert the bencher to async mode.
181+
// The timing loops are the same as with the normal bencher.
182+
b.to_async(&runtime).iter(async || {
183+
kanal_async(cap, task_count).await;
184+
});
185+
},
186+
);
170187
}
171188
}
172189

190+
// const MSG_COUNT: usize = 1;
191+
// let msg = BigData {
192+
// buf: Box::new([0; 8]),
193+
// };
194+
// let mut msg_vecs = Vec::with_capacity(TASKS[0]);
195+
// for _ in 0..TASKS[0] {
196+
// msg_vecs.push(Vec::with_capacity(MSG_COUNT));
197+
// let msg_vecs_len = msg_vecs.len();
198+
// for _ in 0..MSG_COUNT {
199+
// msg_vecs[msg_vecs_len - 1].push(msg.clone());
200+
// }
201+
// }
202+
173203
// for thread_num in MAX_THREADS {
174204
// let runtime = tokio::runtime::Builder::new_multi_thread()
175205
// .enable_all()
@@ -179,8 +209,7 @@ fn benchmark_kanal_async(c: &mut Criterion) {
179209

180210
// for task_count in TASKS {
181211
// let func_name = format!(
182-
// "Kanal Async: {} threads, {} enq tasks enqueuing 1 million items, 1 looping deq task",
183-
// thread_num, task_count
212+
// "Kanal Async: {thread_num} threads, {task_count} enq tasks enqueuing 1 million items, 1 looping deq task"
184213
// );
185214

186215
// c.bench_with_input(
@@ -189,51 +218,25 @@ fn benchmark_kanal_async(c: &mut Criterion) {
189218
// |b, &cap| {
190219
// // Insert a call to `to_async` to convert the bencher to async mode.
191220
// // The timing loops are the same as with the normal bencher.
192-
// b.to_async(&runtime).iter(async || {
193-
// kanal_async(cap, task_count).await;
221+
// b.to_async(&runtime).iter_custom(|iters| {
222+
// let msg_vecs = msg_vecs.clone();
223+
// async move {
224+
// let mut total = Duration::ZERO;
225+
// for _ in 0..iters {
226+
// let msg_vecs = msg_vecs.clone();
227+
// let start = Instant::now();
228+
// kanal_async_with_msg_vec(msg_vecs, cap, task_count, MSG_COUNT)
229+
// .await;
230+
// let end = Instant::now();
231+
// total += end - start;
232+
// }
233+
// total
234+
// }
194235
// });
195236
// },
196237
// );
197238
// }
198239
// }
199-
200-
for thread_num in MAX_THREADS {
201-
let runtime = tokio::runtime::Builder::new_multi_thread()
202-
.enable_all()
203-
.worker_threads(thread_num)
204-
.build()
205-
.unwrap();
206-
207-
for task_count in TASKS {
208-
let func_name = format!(
209-
"Kanal Async: {thread_num} threads, {task_count} enq tasks enqueuing 1 million items, 1 looping deq task"
210-
);
211-
212-
c.bench_with_input(
213-
BenchmarkId::new(func_name, CAPACITY),
214-
&(CAPACITY),
215-
|b, &cap| {
216-
// Insert a call to `to_async` to convert the bencher to async mode.
217-
// The timing loops are the same as with the normal bencher.
218-
b.to_async(&runtime).iter_custom(|iters| {
219-
let msg_vecs = msg_vecs.clone();
220-
async move {
221-
let mut total = Duration::ZERO;
222-
for _ in 0..iters {
223-
let msg_vecs = msg_vecs.clone();
224-
let start = Instant::now();
225-
kanal_async_with_msg_vec(msg_vecs, cap, task_count, MSG_COUNT)
226-
.await;
227-
let end = Instant::now();
228-
total += end - start;
229-
}
230-
total
231-
}
232-
});
233-
},
234-
);
235-
}
236-
}
237240
}
238241

239242
criterion_group!(benches, benchmark_kanal_async);

benches/srb_pin.rs

Lines changed: 49 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -173,19 +173,19 @@ async fn mlfsrb_pin(capacity: usize, shards: usize, task_count: usize) {
173173
let mut deq_tasks = Vec::with_capacity(shards);
174174
let mut enq_tasks = Vec::with_capacity(task_count);
175175

176-
let notifier_task = spawn({
177-
let rb_clone = rb.clone();
178-
async move {
179-
// let rb = rb.clone();
180-
loop {
181-
for i in 0..shards {
182-
// sleep(Duration::from_nanos(50));
183-
rb_clone.notify_pin_shard(i % rb_clone.get_num_of_shards());
184-
}
185-
yield_now().await;
186-
}
187-
}
188-
});
176+
// let notifier_task = spawn({
177+
// let rb_clone = rb.clone();
178+
// async move {
179+
// // let rb = rb.clone();
180+
// loop {
181+
// for i in 0..shards {
182+
// // sleep(Duration::from_nanos(50));
183+
// rb_clone.notify_pin_shard(i % rb_clone.get_num_of_shards());
184+
// }
185+
// yield_now().await;
186+
// }
187+
// }
188+
// });
189189
// spawn enq tasks with pin policy
190190
// for i in 0..task_count {
191191
// let handle = spawn_enqueuer_with_iterator(
@@ -195,15 +195,44 @@ async fn mlfsrb_pin(capacity: usize, shards: usize, task_count: usize) {
195195
// );
196196
// enq_tasks.push(handle);
197197
// }
198+
// for i in 0..task_count {
199+
// let handle = mlf_spawn_enqueuer_with_iterator(rb.clone(), i, 0..1);
200+
// enq_tasks.push(handle);
201+
// }
202+
203+
// for i in 0..shards {
204+
// let handle = mlf_spawn_dequeuer_unbounded(rb.clone(), i, |x| {
205+
// // test_func(x as u128);
206+
// // println!("{:?}", x);
207+
// });
208+
// deq_tasks.push(handle);
209+
// }
210+
198211
for i in 0..task_count {
199-
let handle = mlf_spawn_enqueuer_with_iterator(rb.clone(), i, 0..100);
212+
let handle = tokio::spawn({
213+
let rb_clone = rb.clone();
214+
async move {
215+
rb_clone.enqueue(i).await;
216+
}
217+
});
200218
enq_tasks.push(handle);
201219
}
202220

203221
for i in 0..shards {
204-
let handle = mlf_spawn_dequeuer_unbounded(rb.clone(), i, |x| {
205-
test_func(x as u128);
206-
// println!("{:?}", x);
222+
let handle = tokio::spawn({
223+
let rb_clone = rb.clone();
224+
async move {
225+
loop {
226+
// let item = rb_clone.dequeue().await;
227+
let item = rb_clone.dequeue_in_shard(i).await;
228+
match item {
229+
None => break,
230+
Some(val) => {
231+
232+
}
233+
}
234+
}
235+
}
207236
});
208237
deq_tasks.push(handle);
209238
}
@@ -227,7 +256,7 @@ async fn mlfsrb_pin(capacity: usize, shards: usize, task_count: usize) {
227256
deq.await.unwrap();
228257
}
229258

230-
notifier_task.abort();
259+
// notifier_task.abort();
231260
}
232261

233262
async fn lfsrb_pin_deq_full(capacity: usize, shards: usize, task_count: usize) {
@@ -483,39 +512,7 @@ async fn mlfsrb_pin_with_msg_vec(
483512
let mut deq_tasks = Vec::with_capacity(shards);
484513
let mut enq_tasks = Vec::with_capacity(task_count);
485514

486-
// let notifier_task = spawn({
487-
// let rb_clone = rb.clone();
488-
// async move {
489-
// // let rb = rb.clone();
490-
// loop {
491-
// for i in 0..shards {
492-
// // println!("doing stuff at shard {}", i);
493-
// // sleep(Duration::from_nanos(50));
494-
// rb_clone.notify_pin_shard(i % rb_clone.get_num_of_shards());
495-
// }
496-
// yield_now().await;
497-
// }
498-
// }
499-
// });
500-
// let notifier_thread = thread::spawn(move || {
501-
// // let message = "Hello from a standard thread!";
502-
// // println!("{}", message);
503-
// // // Can't use await here, use a regular send
504-
// // tx.blocking_send(message).expect("Failed to send message");
505-
// // let rb_clone = rb.clone();
506-
// loop {
507-
// if rb_clone.assigner_terminate.load(std::sync::atomic::Ordering::Relaxed) {
508-
// break;
509-
// }
510-
// for i in 0..shards {
511-
// rb_clone.notify_pin_shard(i % rb_clone.get_num_of_shards());
512-
// }
513-
// sleep(Duration::from_nanos(500));
514-
// }
515-
// });
516-
517515
// spawn enq tasks with pin policy
518-
// for i in 0..task_count {
519516
let mut counter = 0;
520517

521518
for msg_vec in msg_vecs {
@@ -548,14 +545,6 @@ async fn mlfsrb_pin_with_msg_vec(
548545
deq_tasks.push(handle);
549546
}
550547

551-
// for i in 0..16 {
552-
// let handle =
553-
// spawn_dequeuer(rb.clone(), ShardPolicy::Pin { initial_index: i }, |x| {
554-
// test_add(*x);
555-
// });
556-
// deq_tasks.push(handle);
557-
// }
558-
559548
// Wait for enqueuers
560549
for enq in enq_tasks {
561550
// println!("here");
@@ -575,20 +564,17 @@ async fn mlfsrb_pin_with_msg_vec(
575564
for deq in deq_tasks {
576565
deq.await.unwrap();
577566
}
578-
// notifier_task.abort();
579-
// terminate_assigner(rb.clone());
580-
// notifier_thread.join();
581567
}
582568

583569
fn benchmark_pin(c: &mut Criterion) {
584570
// const MAX_THREADS: [usize; 2] = [4, 8];
585571
const MAX_THREADS: [usize; 1] = [8];
586-
const CAPACITY: usize = 1;
572+
const CAPACITY: usize = 128;
587573
// const CAPACITY: usize = 200000;
588574
// const SHARDS: [usize; 5] = [1, 2, 4, 8, 16];
589575
// const TASKS: [usize; 5] = [1, 2, 4, 8, 16];
590576
const SHARDS: [usize; 1] = [1];
591-
const TASKS: [usize; 1] = [1000];
577+
const TASKS: [usize; 1] = [100];
592578

593579
// const MSG_COUNT: usize = 250000;
594580
// // let msg = BigData { buf: Box::new([0; 1 * 1024]) };

0 commit comments

Comments
 (0)