Skip to content

Commit dd4db90

Browse files
authored
Merge pull request #7 from cowlicks/half-open
2 parents 19b66cd + f87591d commit dd4db90

File tree

17 files changed

+293
-90
lines changed

17 files changed

+293
-90
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ SANDBOX
33
vendor
44
perf.data*
55
flamegraph*.svg
6+
Cargo.lock

CHANGELOG.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Changelog
2+
3+
All notable changes to this project will be documented in this file.
4+
5+
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
6+
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
7+
8+
<!-- next-header -->
9+
10+
## [Unreleased] - ReleaseDate
11+
12+
### Added
13+
14+
Added ability to create a "half-open" - preventing messages with the half-open stream id from being routed as socket messages.
15+
The JavaScript implementation does something similar.
16+
17+
### Changed
18+
19+
`UdxSocket::bind` is no longer async. It didn't need to be, but it does need have a tokio runtime running.
20+
21+
Fixes a bug where we were getting the socket address of sender wrong
22+
23+
### Removed
24+
25+
26+
<!-- next-url -->
27+
[Unreleased]: https://github.com/datrs/async-udx/compare/v0.1.0...HEAD

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "async-udx"
33
version = "0.1.0"
4-
edition = "2021"
4+
edition = "2024"
55
license = "MIT OR Apache-2.0"
66
description = "Rust port of libudx, a protocol for reliable, multiplex, and congestion controlled streams over udp"
77
authors = ["Franz Heinzmann (Frando) <[email protected]>"]
@@ -20,7 +20,6 @@ categories = [
2020
bitflags = "1.3.2"
2121
tokio = { version = "1.21.2", features = ["net", "macros", "rt-multi-thread", "time", "io-util", "sync"] }
2222
futures = "0.3.25"
23-
derivative = "2.2.0"
2423
tracing = "0.1.37"
2524
bytes = "1.2.1"
2625
log = "0.4.17"

benches/throughput.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use tokio::{
66
net::TcpStream,
77
};
88

9-
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
9+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
1010
criterion_group!(server_benches, bench_throughput);
1111
criterion_main!(server_benches);
1212
fn rt() -> tokio::runtime::Runtime {
@@ -41,8 +41,8 @@ fn bench_throughput(c: &mut Criterion) {
4141
let num_streams = n_streams;
4242
let limit = *len / num_streams;
4343
b.to_async(&rt).iter_custom(|iters| async move {
44-
let socka = UdxSocket::bind("127.0.0.1:0").await.unwrap();
45-
let sockb = UdxSocket::bind("127.0.0.1:0").await.unwrap();
44+
let socka = UdxSocket::bind("127.0.0.1:0").unwrap();
45+
let sockb = UdxSocket::bind("127.0.0.1:0").unwrap();
4646
let addra = socka.local_addr().unwrap();
4747
let addrb = sockb.local_addr().unwrap();
4848
let mut readers = vec![];
@@ -191,8 +191,8 @@ async fn setup_pipe_tcp() -> io::Result<(TcpStream, TcpStream)> {
191191
}
192192

193193
async fn setup_pipe_udx() -> io::Result<(UdxStream, UdxStream)> {
194-
let socka = UdxSocket::bind("127.0.0.1:0").await?;
195-
let sockb = UdxSocket::bind("127.0.0.1:0").await?;
194+
let socka = UdxSocket::bind("127.0.0.1:0")?;
195+
let sockb = UdxSocket::bind("127.0.0.1:0")?;
196196
let addra = socka.local_addr()?;
197197
let addrb = sockb.local_addr()?;
198198
let streama = socka.connect(addrb, 1, 2)?;

examples/bench.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ async fn main() {
1515
}
1616

1717
async fn run(total: usize, num_streams: usize) -> io::Result<()> {
18-
let socka = UdxSocket::bind("127.0.0.1:0").await?;
19-
let sockb = UdxSocket::bind("127.0.0.1:0").await?;
18+
let socka = UdxSocket::bind("127.0.0.1:0")?;
19+
let sockb = UdxSocket::bind("127.0.0.1:0")?;
2020
let addra = socka.local_addr()?;
2121
let addrb = sockb.local_addr()?;
2222

@@ -77,7 +77,7 @@ fn usize_from_env(name: &str, default: usize) -> usize {
7777
std::env::var(name)
7878
.map(|x| {
7979
x.parse::<usize>()
80-
.expect(&format!("{} must be a number", name))
80+
.unwrap_or_else(|_| panic!("{} must be a number", name))
8181
})
8282
.unwrap_or(default)
8383
}

examples/multi_stream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use async_udx::{UdxSocket, UDX_DATA_MTU};
1+
use async_udx::{UDX_DATA_MTU, UdxSocket};
22
use std::time::Instant;
33
use tokio::io::{AsyncReadExt, AsyncWriteExt};
44

@@ -21,8 +21,8 @@ async fn main() {
2121
);
2222

2323
let host = "127.0.0.1";
24-
let socka = UdxSocket::bind(format!("{host}:0")).await.unwrap();
25-
let sockb = UdxSocket::bind(format!("{host}:0")).await.unwrap();
24+
let socka = UdxSocket::bind(format!("{host}:0")).unwrap();
25+
let sockb = UdxSocket::bind(format!("{host}:0")).unwrap();
2626
let addra = socka.local_addr().unwrap();
2727
let addrb = sockb.local_addr().unwrap();
2828
eprintln!("addra {}", addra);
@@ -33,9 +33,9 @@ async fn main() {
3333
let i = i as u32;
3434
let streama = socka.connect(addrb, 1000 + i, i).unwrap();
3535
let streamb = sockb.connect(addra, i, 1000 + i).unwrap();
36-
let read_buf = vec![0u8; MSGSIZE as usize];
37-
let write_buf = vec![i as u8; MSGSIZE as usize];
38-
let (reader, writer) = if i % 2 == 0 {
36+
let read_buf = vec![0u8; MSGSIZE];
37+
let write_buf = vec![i as u8; MSGSIZE];
38+
let (reader, writer) = if i.is_multiple_of(2) {
3939
(streama, streamb)
4040
} else {
4141
(streamb, streama)

examples/rw.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{future::Future, io};
44
use tokio::io::{AsyncReadExt, AsyncWriteExt};
55
use tokio::task::JoinHandle;
66

7-
use async_udx::{UdxSocket, UdxStream, UDX_DATA_MTU};
7+
use async_udx::{UDX_DATA_MTU, UdxSocket, UdxStream};
88

99
pub fn spawn<T>(name: impl ToString, future: T) -> JoinHandle<()>
1010
where
@@ -34,7 +34,7 @@ async fn main() -> io::Result<()> {
3434
.next()
3535
.expect("invalid connect addr");
3636
eprintln!("{} -> {}", listen_addr, connect_addr);
37-
let sock = UdxSocket::bind(listen_addr).await?;
37+
let sock = UdxSocket::bind(listen_addr)?;
3838
let stream = sock.connect(connect_addr, 1, 1)?;
3939
let max_len = UDX_DATA_MTU * 64;
4040
let read = spawn("read", read_loop(stream.clone(), max_len));

examples/simple.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ async fn main() -> io::Result<()> {
77
tracing_subscriber::fmt::init();
88

99
// Bind two sockets
10-
let socka = UdxSocket::bind("127.0.0.1:20004").await?;
10+
let socka = UdxSocket::bind("127.0.0.1:20004")?;
1111
let addra = socka.local_addr()?;
1212
eprintln!("Socket A bound to {addra}");
13-
let sockb = UdxSocket::bind("127.0.0.1:20005").await?;
13+
let sockb = UdxSocket::bind("127.0.0.1:20005")?;
1414
let addrb = sockb.local_addr()?;
1515
eprintln!("Socket B bound to {addrb}");
1616

release.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pre-release-replacements = [
2+
{file="CHANGELOG.md", search="Unreleased", replace="{{version}}"},
3+
{file="CHANGELOG.md", search="\\.\\.\\.HEAD", replace="...{{tag_name}}", exactly=1},
4+
{file="CHANGELOG.md", search="ReleaseDate", replace="{{date}}"},
5+
{file="CHANGELOG.md", search="<!-- next-header -->", replace="<!-- next-header -->\n\n## [Unreleased] - ReleaseDate\n\n### Added\n\n### Changed\n\n### Removed\n\n", exactly=1},
6+
{file="CHANGELOG.md", search="<!-- next-url -->", replace="<!-- next-url -->\n[Unreleased]: https://github.com/datrs/async-udx/compare/{{tag_name}}...HEAD", exactly=1},
7+
]

src/mutex.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ mod tracking {
4747
/// Acquires the lock for a certain purpose
4848
///
4949
/// The purpose will be recorded in the list of last lock owners
50-
pub fn lock(&self, purpose: &'static str) -> MutexGuard<T> {
50+
pub fn lock(&self, purpose: &'static str) -> MutexGuard<'_, T> {
5151
let now = Instant::now();
5252
let guard = self.inner.lock();
5353

@@ -146,7 +146,7 @@ mod non_tracking {
146146
/// Acquires the lock for a certain purpose
147147
///
148148
/// The purpose will be recorded in the list of last lock owners
149-
pub fn lock(&self, _purpose: &'static str) -> MutexGuard<T> {
149+
pub fn lock(&self, _purpose: &'static str) -> MutexGuard<'_, T> {
150150
MutexGuard {
151151
guard: self.inner.lock(),
152152
}

0 commit comments

Comments
 (0)