Skip to content

Commit 9faea74

Browse files
committed
Merge 'tokio-1.38.x' into 'tokio.1.42.x'
2 parents bb9d570 + aa303bc commit 9faea74

File tree

6 files changed

+80
-33
lines changed

6 files changed

+80
-33
lines changed

.cirrus.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
only_if: $CIRRUS_TAG == '' && ($CIRRUS_PR != '' || $CIRRUS_BRANCH == 'master' || $CIRRUS_BRANCH =~ 'tokio-.*')
22
auto_cancellation: $CIRRUS_BRANCH != 'master' && $CIRRUS_BRANCH !=~ 'tokio-.*'
33
freebsd_instance:
4-
image_family: freebsd-14-1
4+
image_family: freebsd-14-2
55
env:
66
RUST_STABLE: 1.81
77
RUST_NIGHTLY: nightly-2024-05-05

.github/workflows/ci.yml

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -460,10 +460,18 @@ jobs:
460460
runs-on: ubuntu-latest
461461
steps:
462462
- uses: actions/checkout@v4
463-
- name: Check semver
463+
- name: Check `tokio` semver
464464
uses: obi1kenobi/cargo-semver-checks-action@v2
465465
with:
466466
rust-toolchain: ${{ env.rust_stable }}
467+
package: tokio
468+
release-type: minor
469+
- name: Check semver for rest of the workspace
470+
if: ${{ !startsWith(github.event.pull_request.base.ref, 'tokio-1.') }}
471+
uses: obi1kenobi/cargo-semver-checks-action@v2
472+
with:
473+
rust-toolchain: ${{ env.rust_stable }}
474+
exclude: tokio
467475
release-type: minor
468476

469477
cross-check:
@@ -694,7 +702,14 @@ jobs:
694702
toolchain: ${{ env.rust_min }}
695703
- uses: Swatinem/rust-cache@v2
696704
- name: "check --workspace --all-features"
697-
run: cargo check --workspace --all-features
705+
run: |
706+
if [[ "${{ github.event.pull_request.base.ref }}" =~ ^tokio-1\..* ]]; then
707+
# Only check `tokio` crate as the PR is backporting to an earlier tokio release.
708+
cargo check -p tokio --all-features
709+
else
710+
# Check all crates in the workspace
711+
cargo check --workspace --all-features
712+
fi
698713
env:
699714
RUSTFLAGS: "" # remove -Dwarnings
700715

@@ -993,7 +1008,7 @@ jobs:
9931008
- name: Install cargo-hack, wasmtime, and cargo-wasi
9941009
uses: taiki-e/install-action@v2
9951010
with:
996-
tool: cargo-hack,wasmtime,cargo-wasi
1011+
tool: cargo-hack,wasmtime
9971012

9981013
- uses: Swatinem/rust-cache@v2
9991014
- name: WASI test tokio full
@@ -1019,9 +1034,12 @@ jobs:
10191034

10201035
- name: test tests-integration --features wasi-rt
10211036
# TODO: this should become: `cargo hack wasi test --each-feature`
1022-
run: cargo wasi test --test rt_yield --features wasi-rt
1037+
run: cargo test --target ${{ matrix.target }} --test rt_yield --features wasi-rt
10231038
if: matrix.target == 'wasm32-wasip1'
10241039
working-directory: tests-integration
1040+
env:
1041+
CARGO_TARGET_WASM32_WASIP1_RUNNER: "wasmtime run --"
1042+
RUSTFLAGS: -Dwarnings -C target-feature=+atomics,+bulk-memory -C link-args=--max-memory=67108864
10251043

10261044
- name: test tests-integration --features wasi-threads-rt
10271045
run: cargo test --target ${{ matrix.target }} --features wasi-threads-rt

Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,16 @@ members = [
1717

1818
[workspace.metadata.spellcheck]
1919
config = "spellcheck.toml"
20+
21+
[workspace.lints.rust]
22+
unexpected_cfgs = { level = "warn", check-cfg = [
23+
'cfg(fuzzing)',
24+
'cfg(loom)',
25+
'cfg(mio_unsupported_force_poll_poll)',
26+
'cfg(tokio_allow_from_blocking_fd)',
27+
'cfg(tokio_internal_mt_counters)',
28+
'cfg(tokio_no_parking_lot)',
29+
'cfg(tokio_no_tuning_tests)',
30+
'cfg(tokio_taskdump)',
31+
'cfg(tokio_unstable)',
32+
] }

examples/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,6 @@ path = "named-pipe-multi-client.rs"
9595
[[example]]
9696
name = "dump"
9797
path = "dump.rs"
98+
99+
[lints]
100+
workspace = true

tokio/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,20 @@ Yanked. Please use 1.39.1 instead.
249249
[#6709]: https://github.com/tokio-rs/tokio/pull/6709
250250
[#6710]: https://github.com/tokio-rs/tokio/pull/6710
251251

252+
# 1.38.2 (April 2nd, 2025)
253+
254+
This release fixes a soundness issue in the broadcast channel. The channel
255+
accepts values that are `Send` but `!Sync`. Previously, the channel called
256+
`clone()` on these values without synchronizing. This release fixes the channel
257+
by synchronizing calls to `.clone()` (Thanks Austin Bonander for finding and
258+
reporting the issue).
259+
260+
### Fixed
261+
262+
- sync: synchronize `clone()` call in broadcast channel ([#7232])
263+
264+
[#7232]: https://github.com/tokio-rs/tokio/pull/7232
265+
252266
# 1.38.1 (July 16th, 2024)
253267

254268
This release fixes the bug identified as ([#6682]), which caused timers not

tokio/src/sync/broadcast.rs

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
119119
use crate::loom::cell::UnsafeCell;
120120
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
121-
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
121+
use crate::loom::sync::{Arc, Mutex, MutexGuard};
122122
use crate::runtime::coop::cooperative;
123123
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
124124
use crate::util::WakeList;
@@ -304,7 +304,7 @@ use self::error::{RecvError, SendError, TryRecvError};
304304
/// Data shared between senders and receivers.
305305
struct Shared<T> {
306306
/// slots in the channel.
307-
buffer: Box<[RwLock<Slot<T>>]>,
307+
buffer: Box<[Mutex<Slot<T>>]>,
308308

309309
/// Mask a position -> index.
310310
mask: usize,
@@ -348,7 +348,7 @@ struct Slot<T> {
348348
///
349349
/// The value is set by `send` when the write lock is held. When a reader
350350
/// drops, `rem` is decremented. When it hits zero, the value is dropped.
351-
val: UnsafeCell<Option<T>>,
351+
val: Option<T>,
352352
}
353353

354354
/// An entry in the wait queue.
@@ -386,7 +386,7 @@ generate_addr_of_methods! {
386386
}
387387

388388
struct RecvGuard<'a, T> {
389-
slot: RwLockReadGuard<'a, Slot<T>>,
389+
slot: MutexGuard<'a, Slot<T>>,
390390
}
391391

392392
/// Receive a value future.
@@ -395,11 +395,15 @@ struct Recv<'a, T> {
395395
receiver: &'a mut Receiver<T>,
396396

397397
/// Entry in the waiter `LinkedList`.
398-
waiter: UnsafeCell<Waiter>,
398+
waiter: WaiterCell,
399399
}
400400

401-
unsafe impl<'a, T: Send> Send for Recv<'a, T> {}
402-
unsafe impl<'a, T: Send> Sync for Recv<'a, T> {}
401+
// The wrapper around `UnsafeCell` isolates the unsafe impl `Send` and `Sync`
402+
// from `Recv`.
403+
struct WaiterCell(UnsafeCell<Waiter>);
404+
405+
unsafe impl Send for WaiterCell {}
406+
unsafe impl Sync for WaiterCell {}
403407

404408
/// Max number of receivers. Reserve space to lock.
405409
const MAX_RECEIVERS: usize = usize::MAX >> 2;
@@ -467,12 +471,6 @@ pub fn channel<T: Clone>(capacity: usize) -> (Sender<T>, Receiver<T>) {
467471
(tx, rx)
468472
}
469473

470-
unsafe impl<T: Send> Send for Sender<T> {}
471-
unsafe impl<T: Send> Sync for Sender<T> {}
472-
473-
unsafe impl<T: Send> Send for Receiver<T> {}
474-
unsafe impl<T: Send> Sync for Receiver<T> {}
475-
476474
impl<T> Sender<T> {
477475
/// Creates the sending-half of the [`broadcast`] channel.
478476
///
@@ -511,10 +509,10 @@ impl<T> Sender<T> {
511509
let mut buffer = Vec::with_capacity(capacity);
512510

513511
for i in 0..capacity {
514-
buffer.push(RwLock::new(Slot {
512+
buffer.push(Mutex::new(Slot {
515513
rem: AtomicUsize::new(0),
516514
pos: (i as u64).wrapping_sub(capacity as u64),
517-
val: UnsafeCell::new(None),
515+
val: None,
518516
}));
519517
}
520518

@@ -600,7 +598,7 @@ impl<T> Sender<T> {
600598
tail.pos = tail.pos.wrapping_add(1);
601599

602600
// Get the slot
603-
let mut slot = self.shared.buffer[idx].write();
601+
let mut slot = self.shared.buffer[idx].lock();
604602

605603
// Track the position
606604
slot.pos = pos;
@@ -609,7 +607,7 @@ impl<T> Sender<T> {
609607
slot.rem.with_mut(|v| *v = rem);
610608

611609
// Write the value
612-
slot.val = UnsafeCell::new(Some(value));
610+
slot.val = Some(value);
613611

614612
// Release the slot lock before notifying the receivers.
615613
drop(slot);
@@ -696,7 +694,7 @@ impl<T> Sender<T> {
696694
while low < high {
697695
let mid = low + (high - low) / 2;
698696
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
699-
if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
697+
if self.shared.buffer[idx].lock().rem.load(SeqCst) == 0 {
700698
low = mid + 1;
701699
} else {
702700
high = mid;
@@ -738,7 +736,7 @@ impl<T> Sender<T> {
738736
let tail = self.shared.tail.lock();
739737

740738
let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
741-
self.shared.buffer[idx].read().rem.load(SeqCst) == 0
739+
self.shared.buffer[idx].lock().rem.load(SeqCst) == 0
742740
}
743741

744742
/// Returns the number of active receivers.
@@ -1058,7 +1056,7 @@ impl<T> Receiver<T> {
10581056
let idx = (self.next & self.shared.mask as u64) as usize;
10591057

10601058
// The slot holding the next value to read
1061-
let mut slot = self.shared.buffer[idx].read();
1059+
let mut slot = self.shared.buffer[idx].lock();
10621060

10631061
if slot.pos != self.next {
10641062
// Release the `slot` lock before attempting to acquire the `tail`
@@ -1075,7 +1073,7 @@ impl<T> Receiver<T> {
10751073
let mut tail = self.shared.tail.lock();
10761074

10771075
// Acquire slot lock again
1078-
slot = self.shared.buffer[idx].read();
1076+
slot = self.shared.buffer[idx].lock();
10791077

10801078
// Make sure the position did not change. This could happen in the
10811079
// unlikely event that the buffer is wrapped between dropping the
@@ -1367,12 +1365,12 @@ impl<'a, T> Recv<'a, T> {
13671365
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
13681366
Recv {
13691367
receiver,
1370-
waiter: UnsafeCell::new(Waiter {
1368+
waiter: WaiterCell(UnsafeCell::new(Waiter {
13711369
queued: AtomicBool::new(false),
13721370
waker: None,
13731371
pointers: linked_list::Pointers::new(),
13741372
_p: PhantomPinned,
1375-
}),
1373+
})),
13761374
}
13771375
}
13781376

@@ -1384,7 +1382,7 @@ impl<'a, T> Recv<'a, T> {
13841382
is_unpin::<&mut Receiver<T>>();
13851383

13861384
let me = self.get_unchecked_mut();
1387-
(me.receiver, &me.waiter)
1385+
(me.receiver, &me.waiter.0)
13881386
}
13891387
}
13901388
}
@@ -1418,6 +1416,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14181416
// `Shared::notify_rx` before we drop the object.
14191417
let queued = self
14201418
.waiter
1419+
.0
14211420
.with(|ptr| unsafe { (*ptr).queued.load(Acquire) });
14221421

14231422
// If the waiter is queued, we need to unlink it from the waiters list.
@@ -1432,6 +1431,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14321431
// `Relaxed` order suffices because we hold the tail lock.
14331432
let queued = self
14341433
.waiter
1434+
.0
14351435
.with_mut(|ptr| unsafe { (*ptr).queued.load(Relaxed) });
14361436

14371437
if queued {
@@ -1440,7 +1440,7 @@ impl<'a, T> Drop for Recv<'a, T> {
14401440
// safety: tail lock is held and the wait node is verified to be in
14411441
// the list.
14421442
unsafe {
1443-
self.waiter.with_mut(|ptr| {
1443+
self.waiter.0.with_mut(|ptr| {
14441444
tail.waiters.remove((&mut *ptr).into());
14451445
});
14461446
}
@@ -1486,16 +1486,15 @@ impl<'a, T> RecvGuard<'a, T> {
14861486
where
14871487
T: Clone,
14881488
{
1489-
self.slot.val.with(|ptr| unsafe { (*ptr).clone() })
1489+
self.slot.val.clone()
14901490
}
14911491
}
14921492

14931493
impl<'a, T> Drop for RecvGuard<'a, T> {
14941494
fn drop(&mut self) {
14951495
// Decrement the remaining counter
14961496
if 1 == self.slot.rem.fetch_sub(1, SeqCst) {
1497-
// Safety: Last receiver, drop the value
1498-
self.slot.val.with_mut(|ptr| unsafe { *ptr = None });
1497+
self.slot.val = None;
14991498
}
15001499
}
15011500
}

0 commit comments

Comments
 (0)