Skip to content

Commit ecc4620

Browse files
authored
Merge pull request #13 from asder8215/notify_pin_code
Sharded Ring Buffer Pin Policy Updated + Lock-free Queue w/ Tokio Notify Multi Ring Buffer Data Structure Made! I just need to clean up code with the warning areas and it'll pass the cargo fmt check
2 parents aea3ad4 + 12eaeb0 commit ecc4620

File tree

14 files changed

+3040
-177
lines changed

14 files changed

+3040
-177
lines changed

Cargo.lock

Lines changed: 330 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ homepage = "https://github.com/asder8215/sharded_ringbuf"
1616
documentation = "https://docs.rs/sharded_ringbuf/latest/sharded_ringbuf/"
1717

1818
[dependencies]
19+
async-executor = "1.13.3"
1920
crossbeam-utils = "^0.8.21"
2021
fastrand = "^2.3.0"
2122
futures-util = "0.3.31"
23+
smol = "2.0.2"
2224
tokio = {version = "^1.45.1", features = ["rt", "sync"] }
2325

2426
[dev-dependencies]
@@ -74,3 +76,4 @@ panic = "abort"
7476

7577
[profile.release]
7678
panic = "abort"
79+
debug = true

benches/kanal_async.rs

Lines changed: 177 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,90 @@
1+
use std::time::{Duration, Instant};
2+
13
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
24
use kanal::bounded_async;
35
use tokio::task;
46

5-
fn test_add(x: usize) -> usize {
7+
#[derive(Default, Debug, Clone, Copy)]
8+
struct Message {
9+
// item_one: u128,
10+
item_one: usize,
11+
// item_two: u128,
12+
// item_three: u128,
13+
// item_four: u128,
14+
// item_five: u128,
15+
// item_six: u128,
16+
// item_seven: u128,
17+
// item_eight: u128,
18+
// item_nine: u128,
19+
// item_ten: u128,
20+
// item_eleven: u128,
21+
// item_twelve: u128,
22+
}
23+
24+
#[derive(Debug, Clone)]
25+
struct BigData {
26+
// buf: Box<[u8; 1 * 1024]>, // 1 KiB
27+
buf: Box<[u8; 8]>, // 1 KiB
28+
}
29+
30+
static FUNC_TO_TEST: i32 = 2;
31+
fn test_func(x: u128) -> u128 {
32+
match FUNC_TO_TEST {
33+
0 => mult_add_ops(x),
34+
1 => fib(x),
35+
2 => prime_sieve(x),
36+
3 => mul_stress(x as usize),
37+
_ => {
38+
todo!()
39+
}
40+
}
41+
}
42+
43+
fn mult_add_ops(x: u128) -> u128 {
644
let mut y = x;
745
y = y.wrapping_mul(31);
846
y = y.rotate_left(7);
947
y = y.wrapping_add(1);
1048
y
1149
}
1250

51+
fn fib(x: u128) -> u128 {
52+
let mut a = 0u128;
53+
let mut b = 1u128;
54+
for _ in 0..x {
55+
let tmp = a + b;
56+
a = b;
57+
b = tmp;
58+
}
59+
a
60+
}
61+
62+
fn prime_sieve(x: u128) -> u128 {
63+
let mut is_prime = vec![true; x as usize];
64+
let mut count = 0;
65+
66+
for i in 2..x {
67+
if is_prime[i as usize] {
68+
count += 1;
69+
let mut j = i * 2;
70+
while j < x {
71+
is_prime[j as usize] = false;
72+
j += i;
73+
}
74+
}
75+
}
76+
77+
count
78+
}
79+
80+
fn mul_stress(iter: usize) -> u128 {
81+
let mut acc = 1u128;
82+
for i in 1..=iter as u128 {
83+
acc = acc.wrapping_mul(i ^ 0xdeadbeefdeadbeef);
84+
}
85+
acc
86+
}
87+
1388
async fn kanal_async(c: usize, task_count: usize) {
1489
let (s, r) = bounded_async(c);
1590
let mut handles = Vec::new();
@@ -18,19 +93,57 @@ async fn kanal_async(c: usize, task_count: usize) {
1893
let rx = r.clone();
1994
handles.push(task::spawn(async move {
2095
for _ in 0..task_count {
21-
for _ in 0..1000000 {
22-
let x = rx.recv().await.unwrap();
23-
test_add(x);
96+
for _ in 0..1 {
97+
let x: Message = rx.recv().await.unwrap();
98+
test_func(x.item_one as u128);
2499
}
25100
}
26101
}));
27102
}
28103

29104
for _ in 0..task_count {
30105
let tx = s.clone();
106+
let msg = Message::default();
31107
handles.push(task::spawn(async move {
32-
for i in 0..1000000 {
33-
tx.send(i).await.unwrap();
108+
for i in 0..1 {
109+
tx.send(msg).await.unwrap();
110+
}
111+
}));
112+
}
113+
114+
for handle in handles {
115+
handle.await.unwrap();
116+
}
117+
}
118+
119+
async fn kanal_async_with_msg_vec(
120+
msg_vecs: Vec<Vec<BigData>>,
121+
c: usize,
122+
task_count: usize,
123+
msg_count: usize,
124+
) {
125+
let (s, r) = bounded_async(c);
126+
let mut handles = Vec::new();
127+
128+
for _ in 0..1 {
129+
let rx = r.clone();
130+
handles.push(task::spawn(async move {
131+
for _ in 0..task_count {
132+
for _ in 0..msg_count {
133+
let x = rx.recv().await.unwrap();
134+
// test_func(x.item_one as u128);
135+
}
136+
}
137+
}));
138+
}
139+
140+
// for _ in 0..task_count {
141+
for msg_vec in msg_vecs {
142+
let tx = s.clone();
143+
// let msg = Message::default();
144+
handles.push(task::spawn(async move {
145+
for msg in msg_vec {
146+
tx.send(msg).await.unwrap();
34147
}
35148
}));
36149
}
@@ -41,9 +154,49 @@ async fn kanal_async(c: usize, task_count: usize) {
41154
}
42155

43156
fn benchmark_kanal_async(c: &mut Criterion) {
44-
const MAX_THREADS: [usize; 2] = [4, 8];
45-
const CAPACITY: usize = 1024;
46-
const TASKS: [usize; 5] = [1, 2, 4, 8, 16];
157+
const MAX_THREADS: [usize; 1] = [8];
158+
const CAPACITY: usize = 128;
159+
const TASKS: [usize; 1] = [100000];
160+
const MSG_COUNT: usize = 1;
161+
let msg = BigData {
162+
buf: Box::new([0; 8]),
163+
};
164+
let mut msg_vecs = Vec::with_capacity(TASKS[0]);
165+
for _ in 0..TASKS[0] {
166+
msg_vecs.push(Vec::with_capacity(MSG_COUNT));
167+
let msg_vecs_len = msg_vecs.len();
168+
for _ in 0..MSG_COUNT {
169+
msg_vecs[msg_vecs_len - 1].push(msg.clone());
170+
}
171+
}
172+
173+
// for thread_num in MAX_THREADS {
174+
// let runtime = tokio::runtime::Builder::new_multi_thread()
175+
// .enable_all()
176+
// .worker_threads(thread_num)
177+
// .build()
178+
// .unwrap();
179+
180+
// for task_count in TASKS {
181+
// let func_name = format!(
182+
// "Kanal Async: {} threads, {} enq tasks enqueuing 1 million items, 1 looping deq task",
183+
// thread_num, task_count
184+
// );
185+
186+
// c.bench_with_input(
187+
// BenchmarkId::new(func_name, CAPACITY),
188+
// &(CAPACITY),
189+
// |b, &cap| {
190+
// // Insert a call to `to_async` to convert the bencher to async mode.
191+
// // The timing loops are the same as with the normal bencher.
192+
// b.to_async(&runtime).iter(async || {
193+
// kanal_async(cap, task_count).await;
194+
// });
195+
// },
196+
// );
197+
// }
198+
// }
199+
47200
for thread_num in MAX_THREADS {
48201
let runtime = tokio::runtime::Builder::new_multi_thread()
49202
.enable_all()
@@ -53,8 +206,7 @@ fn benchmark_kanal_async(c: &mut Criterion) {
53206

54207
for task_count in TASKS {
55208
let func_name = format!(
56-
"Kanal Async: {} threads, {} enq tasks enqueuing 1 million items, 1 looping deq task",
57-
thread_num, task_count
209+
"Kanal Async: {thread_num} threads, {task_count} enq tasks enqueuing 1 million items, 1 looping deq task"
58210
);
59211

60212
c.bench_with_input(
@@ -63,8 +215,20 @@ fn benchmark_kanal_async(c: &mut Criterion) {
63215
|b, &cap| {
64216
// Insert a call to `to_async` to convert the bencher to async mode.
65217
// The timing loops are the same as with the normal bencher.
66-
b.to_async(&runtime).iter(async || {
67-
kanal_async(cap, task_count).await;
218+
b.to_async(&runtime).iter_custom(|iters| {
219+
let msg_vecs = msg_vecs.clone();
220+
async move {
221+
let mut total = Duration::ZERO;
222+
for _ in 0..iters {
223+
let msg_vecs = msg_vecs.clone();
224+
let start = Instant::now();
225+
kanal_async_with_msg_vec(msg_vecs, cap, task_count, MSG_COUNT)
226+
.await;
227+
let end = Instant::now();
228+
total += end - start;
229+
}
230+
total
231+
}
68232
});
69233
},
70234
);

benches/srb_cft.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ fn benchmark_cft(c: &mut Criterion) {
107107
for shard_num in SHARDS {
108108
for task_count in TASKS {
109109
let func_name = format!(
110-
"CFT: {} threads, {} shards, {} enq tasks enqueuing 1 million items, {} looping deq task",
111-
thread_num, shard_num, task_count, shard_num
110+
"CFT: {thread_num} threads, {shard_num} shards, {task_count} enq tasks enqueuing 1 million items, {shard_num} looping deq task"
112111
);
113112

114113
c.bench_with_input(
@@ -136,8 +135,7 @@ fn benchmark_cft(c: &mut Criterion) {
136135
for shard_num in SHARDS {
137136
for task_count in TASKS {
138137
let func_name = format!(
139-
"Pin: {} threads, {} shards, {} enq tasks enqueuing 1 million items, {} looping deq full task",
140-
thread_num, shard_num, task_count, shard_num
138+
"Pin: {thread_num} threads, {shard_num} shards, {task_count} enq tasks enqueuing 1 million items, {shard_num} looping deq full task"
141139
);
142140

143141
c.bench_with_input(

0 commit comments

Comments
 (0)