Skip to content

Commit 7ae76c5

Browse files
authored
Initial implementation (#1)
1 parent 1507513 commit 7ae76c5

File tree

16 files changed

+1407
-0
lines changed

16 files changed

+1407
-0
lines changed

.github/workflows/ci.yml

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: ['main', 'v*.x']
6+
pull_request:
7+
branches: ['main', 'v*.x']
8+
9+
env:
10+
RUSTFLAGS: -Dwarnings
11+
RUST_BACKTRACE: 1
12+
13+
jobs:
14+
ci-msrv:
15+
runs-on: ubuntu-latest
16+
strategy:
17+
matrix:
18+
rust:
19+
- stable
20+
- beta
21+
- nightly
22+
- 1.56.1 # MSRV
23+
24+
steps:
25+
- uses: actions/checkout@v2
26+
27+
- uses: actions-rs/toolchain@v1
28+
with:
29+
profile: minimal
30+
toolchain: ${{ matrix.rust }}
31+
override: true
32+
components: rustfmt, clippy
33+
34+
- uses: actions-rs/cargo@v1
35+
with:
36+
command: build
37+
38+
- uses: actions-rs/cargo@v1
39+
with:
40+
command: test
41+
42+
- uses: actions-rs/cargo@v1
43+
env:
44+
RUSTFLAGS: --cfg loom -Dwarnings
45+
LOOM_MAX_PREEMPTIONS: 2
46+
SCOPE: ${{ matrix.scope }}
47+
with:
48+
command: test
49+
args: --lib --release -- --nocapture $SCOPE
50+
51+
- uses: actions-rs/cargo@v1
52+
with:
53+
command: fmt
54+
args: --all -- --check
55+
56+
- uses: actions-rs/cargo@v1
57+
with:
58+
command: clippy
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
name: PR Security Audit
2+
3+
on:
4+
push:
5+
paths:
6+
- '**/Cargo.toml'
7+
pull_request:
8+
paths:
9+
- '**/Cargo.toml'
10+
11+
jobs:
12+
security-audit:
13+
runs-on: ubuntu-latest
14+
if: "!contains(github.event.head_commit.message, 'ci skip')"
15+
steps:
16+
- uses: actions/checkout@v2
17+
18+
- name: Audit Check
19+
uses: actions-rs/audit-check@v1
20+
with:
21+
token: ${{ secrets.GITHUB_TOKEN }}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
name: Security Audit
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
paths:
8+
- '**/Cargo.toml'
9+
schedule:
10+
- cron: '0 7 * * 1' # run at 7 AM UTC on Monday
11+
12+
jobs:
13+
security-audit:
14+
runs-on: ubuntu-latest
15+
if: "!contains(github.event.head_commit.message, 'ci skip')"
16+
steps:
17+
- uses: actions/checkout@v2
18+
19+
- name: Audit Check
20+
uses: actions-rs/audit-check@v1
21+
with:
22+
token: ${{ secrets.GITHUB_TOKEN }}

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
Cargo.lock

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Changelog

Cargo.toml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "lf-queue"
3+
description = "A lock-free multi-producer multi-consumer unbounded queue."
4+
version = "0.1.0"
5+
license = "MIT"
6+
edition = "2021"
7+
authors = ["Pierre Brouca <[email protected]>"]
8+
categories = ["concurrency", "data-structures"]
9+
keywords = ["spsc", "mpsc", "spmc", "mpmc",]
10+
11+
[target.'cfg(loom)'.dependencies]
12+
loom = "0.5"

README.md

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,185 @@
11
# lf-queue
2+
3+
[![Crates.io](https://img.shields.io/crates/v/lf-queue)](https://crates.io/crates/lf-queue)
4+
[![Documentation](https://docs.rs/lf-queue/badge.svg)](https://docs.rs/lf-queue)
5+
[![Build Status](https://github.com/broucz/lf-queue/workflows/CI/badge.svg)](https://github.com/broucz/lf-queue/actions/workflows/ci.yml?query=branch%3Amain)
6+
[![MIT licensed](https://img.shields.io/crates/l/lf-queue)](LICENSE)
7+
28
A lock-free multi-producer multi-consumer unbounded queue.
9+
10+
## Examples
11+
12+
```toml
13+
[dependencies]
14+
lf-queue = "0.1"
15+
```
16+
17+
Single Producer - Single Consumer:
18+
19+
```rust
20+
use lf_queue::Queue;
21+
22+
fn main() {
23+
const COUNT: usize = 1_000;
24+
let queue: Queue<usize> = Queue::new();
25+
26+
for i in 0..COUNT {
27+
queue.push(i);
28+
}
29+
30+
for i in 0..COUNT {
31+
assert_eq!(i, queue.pop().unwrap());
32+
}
33+
34+
assert!(queue.pop().is_none());
35+
}
36+
```
37+
38+
Multi Producer - Single Consumer:
39+
40+
```rust
41+
use lf_queue::Queue;
42+
use std::thread;
43+
44+
fn main() {
45+
const COUNT: usize = 1_000;
46+
const CONCURRENCY: usize = 4;
47+
48+
let queue: Queue<usize> = Queue::new();
49+
50+
let ths: Vec<_> = (0..CONCURRENCY)
51+
.map(|_| {
52+
let q = queue.clone();
53+
thread::spawn(move || {
54+
for i in 0..COUNT {
55+
q.push(i);
56+
}
57+
})
58+
})
59+
.collect();
60+
61+
for th in ths {
62+
th.join().unwrap();
63+
}
64+
65+
for _ in 0..COUNT * CONCURRENCY {
66+
assert!(queue.pop().is_some());
67+
}
68+
69+
assert!(queue.pop().is_none());
70+
}
71+
```
72+
73+
Single Producer - Multi Consumer:
74+
75+
```rust
76+
use lf_queue::Queue;
77+
use std::thread;
78+
79+
fn main() {
80+
const COUNT: usize = 1_000;
81+
const CONCURRENCY: usize = 4;
82+
83+
let queue: Queue<usize> = Queue::new();
84+
85+
for i in 0..COUNT * CONCURRENCY {
86+
queue.push(i);
87+
}
88+
89+
let ths: Vec<_> = (0..CONCURRENCY)
90+
.map(|_| {
91+
let q = queue.clone();
92+
thread::spawn(move || {
93+
for _ in 0..COUNT {
94+
loop {
95+
if q.pop().is_some() {
96+
break;
97+
}
98+
}
99+
}
100+
})
101+
})
102+
.collect();
103+
104+
for th in ths {
105+
th.join().unwrap();
106+
}
107+
108+
assert!(queue.pop().is_none());
109+
}
110+
111+
```
112+
113+
Multi Producer - Multi Consumer:
114+
115+
```rust
116+
use lf_queue::Queue;
117+
use std::sync::atomic::{AtomicUsize, Ordering};
118+
use std::sync::Arc;
119+
use std::thread;
120+
121+
fn main() {
122+
const COUNT: usize = 1_000;
123+
const CONCURRENCY: usize = 4;
124+
125+
let queue: Queue<usize> = Queue::new();
126+
let items = Arc::new((0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>());
127+
128+
let ths: Vec<_> = (0..CONCURRENCY)
129+
.map(|_| {
130+
let q = queue.clone();
131+
let its = items.clone();
132+
thread::spawn(move || {
133+
for _ in 0..COUNT {
134+
let n = loop {
135+
if let Some(x) = q.pop() {
136+
break x;
137+
} else {
138+
thread::yield_now();
139+
}
140+
};
141+
its[n].fetch_add(1, Ordering::SeqCst);
142+
}
143+
})
144+
})
145+
.map(|_| {
146+
let q = queue.clone();
147+
thread::spawn(move || {
148+
for i in 0..COUNT {
149+
q.push(i);
150+
}
151+
})
152+
})
153+
.collect();
154+
155+
for th in ths {
156+
th.join().unwrap();
157+
}
158+
159+
thread::sleep(std::time::Duration::from_millis(10));
160+
161+
for c in &*items {
162+
assert_eq!(c.load(Ordering::SeqCst), CONCURRENCY);
163+
}
164+
165+
assert!(queue.pop().is_none());
166+
}
167+
```
168+
169+
## Acknowledgement
170+
171+
This implementation of a lock-free queue in Rust took inspiration from the [`concurrent-queue`](https://github.com/smol-rs/concurrent-queue) crate and aims to be used for educational purposes. The code documentation help you to discover the algorithm used to implement a concurrent lock-free queue in Rust, but might not yet be beginner-friendly. More details and learning materials will be added over time.
172+
173+
## License
174+
175+
This project is licensed under the [MIT license](LICENSE).
176+
177+
## Contribution
178+
179+
Unless you explicitly state otherwise, any contribution intentionally submitted
180+
for inclusion in the work by you, shall be licensed as above, without any additional
181+
terms or conditions.
182+
183+
Note that, as of now, my focus is on improving the documentation of this crate, not adding any additional feature. Please open an issue and start a discussion before working on any significant PR.
184+
185+
Contributions are welcome.

benches/queue.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#![feature(test)]
2+
extern crate test;
3+
4+
use lf_queue::Queue;
5+
6+
// cargo +nightly bench
7+
#[cfg(test)]
8+
mod tests {
9+
use super::*;
10+
use test::Bencher;
11+
12+
// cargo +nightly bench --package lf-queue --bench queue -- tests::mpmc --exact
13+
//
14+
// Latest results:
15+
// - MacBook Air (M1, 2020): 260,718 ns/iter (+/- 16,344)
16+
#[bench]
17+
fn mpmc(b: &mut Bencher) {
18+
const COUNT: usize = 1_000;
19+
const CONCURRENCY: usize = 4;
20+
let queue: Queue<usize> = Queue::new();
21+
22+
b.iter(|| {
23+
let ths: Vec<_> = (0..CONCURRENCY)
24+
.map(|_| {
25+
let q = queue.clone();
26+
std::thread::spawn(move || {
27+
for _ in 0..COUNT {
28+
loop {
29+
if q.pop().is_some() {
30+
break;
31+
}
32+
}
33+
}
34+
})
35+
})
36+
.map(|_| {
37+
let q = queue.clone();
38+
std::thread::spawn(move || {
39+
for i in 0..COUNT {
40+
q.push(i);
41+
}
42+
})
43+
})
44+
.collect();
45+
46+
for th in ths {
47+
th.join().unwrap();
48+
}
49+
});
50+
}
51+
}

0 commit comments

Comments
 (0)