Skip to content

Commit 94cbe88

Browse files
author
Vytautas Astrauskas
committed
Many small changes to thread management.
1 parent 75e6549 commit 94cbe88

File tree

2 files changed

+87
-33
lines changed

2 files changed

+87
-33
lines changed

src/shims/sync.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
419419
throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread");
420420
} else if locked_count == 1 {
421421
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
422-
if let Some(new_owner) = this.unblock_random_thread(blockset)? {
422+
if let Some(new_owner) = this.unblock_some_thread(blockset)? {
423423
// We have at least one thread waiting on this mutex. Transfer
424424
// ownership to it.
425425
mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?;
@@ -543,19 +543,19 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
543543
assert_eq!(writers, 0);
544544
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
545545
if new_readers == 0 {
546-
if let Some(_writer) = this.unblock_random_thread(writer_blockset)? {
546+
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
547547
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
548548
}
549549
}
550550
Ok(0)
551551
} else if writers != 0 {
552552
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
553553
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?;
554-
if let Some(_writer) = this.unblock_random_thread(writer_blockset)? {
554+
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
555555
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
556556
} else {
557557
let mut readers = 0;
558-
while let Some(_reader) = this.unblock_random_thread(reader_blockset)? {
558+
while let Some(_reader) = this.unblock_some_thread(reader_blockset)? {
559559
readers += 1;
560560
}
561561
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))?

src/thread.rs

Lines changed: 83 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ pub enum SchedulingAction {
3131
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
3232
pub struct ThreadId(usize);
3333

34+
/// The main thread. When it terminates, the whole application terminates.
35+
const MAIN_THREAD: ThreadId = ThreadId(0);
36+
3437
impl Idx for ThreadId {
3538
fn new(idx: usize) -> Self {
3639
ThreadId(idx)
@@ -42,13 +45,13 @@ impl Idx for ThreadId {
4245

4346
impl From<u64> for ThreadId {
4447
fn from(id: u64) -> Self {
45-
Self(id as usize)
48+
Self(usize::try_from(id).unwrap())
4649
}
4750
}
4851

4952
impl From<u32> for ThreadId {
5053
fn from(id: u32) -> Self {
51-
Self(id as usize)
54+
Self(usize::try_from(id).unwrap())
5255
}
5356
}
5457

@@ -82,10 +85,10 @@ pub enum ThreadState {
8285
/// The thread tried to join the specified thread and is blocked until that
8386
/// thread terminates.
8487
BlockedOnJoin(ThreadId),
85-
/// The thread is blocked and belongs to the given blockset..
88+
/// The thread is blocked and belongs to the given blockset.
8689
Blocked(BlockSetId),
8790
/// The thread has terminated its execution (we do not delete terminated
88-
/// threads.)
91+
/// threads).
8992
Terminated,
9093
}
9194

@@ -150,6 +153,7 @@ pub struct ThreadManager<'mir, 'tcx> {
150153
impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
151154
fn default() -> Self {
152155
let mut threads = IndexVec::new();
156+
// Create the main thread and add it to the list of threads.
153157
threads.push(Default::default());
154158
Self {
155159
active_thread: ThreadId::new(0),
@@ -170,14 +174,13 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
170174

171175
/// Set the allocation id as the allocation id of the given thread local
172176
/// static for the active thread.
177+
///
178+
/// Panics if a thread local is initialized twice for the same thread.
173179
fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) {
174-
assert!(
175-
self.thread_local_alloc_ids
176-
.borrow_mut()
177-
.insert((def_id, self.active_thread), new_alloc_id)
178-
.is_none(),
179-
"a thread local initialized twice for the same thread"
180-
);
180+
self.thread_local_alloc_ids
181+
.borrow_mut()
182+
.insert((def_id, self.active_thread), new_alloc_id)
183+
.unwrap_none();
181184
}
182185

183186
/// Borrow the stack of the active thread.
@@ -227,15 +230,20 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
227230
}
228231

229232
/// Mark that the active thread tries to join the thread with `joined_thread_id`.
230-
fn join_thread(&mut self, joined_thread_id: ThreadId) {
231-
assert!(!self.threads[joined_thread_id].detached, "Bug: trying to join a detached thread.");
232-
assert_ne!(joined_thread_id, self.active_thread, "Bug: trying to join itself");
233-
assert!(
234-
self.threads
235-
.iter()
236-
.all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
237-
"Bug: multiple threads try to join the same thread."
238-
);
233+
fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
234+
if self.threads[joined_thread_id].detached {
235+
throw_ub_format!("trying to join a detached thread");
236+
}
237+
if joined_thread_id == self.active_thread {
238+
throw_ub_format!("trying to join itself");
239+
}
240+
if self
241+
.threads
242+
.iter()
243+
.any(|thread| thread.state == ThreadState::BlockedOnJoin(joined_thread_id))
244+
{
245+
throw_ub_format!("multiple threads try to join the same thread");
246+
}
239247
if self.threads[joined_thread_id].state != ThreadState::Terminated {
240248
// The joined thread is still running, we need to wait for it.
241249
self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
@@ -245,13 +253,23 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
245253
joined_thread_id
246254
);
247255
}
256+
Ok(())
248257
}
249258

250259
/// Set the name of the active thread.
251260
fn set_thread_name(&mut self, new_thread_name: Vec<u8>) {
252261
self.active_thread_mut().thread_name = Some(new_thread_name);
253262
}
254263

264+
/// Get the name of the active thread.
265+
fn get_thread_name(&mut self) -> InterpResult<'tcx, Vec<u8>> {
266+
if let Some(ref thread_name) = self.active_thread_mut().thread_name {
267+
Ok(thread_name.clone())
268+
} else {
269+
throw_ub_format!("thread {:?} has no name set", self.active_thread)
270+
}
271+
}
272+
255273
/// Allocate a new blockset id.
256274
fn create_blockset(&mut self) -> BlockSetId {
257275
self.blockset_counter = self.blockset_counter.checked_add(1).unwrap();
@@ -267,7 +285,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
267285

268286
/// Unblock any one thread from the given blockset if it contains at least
269287
/// one. Return the id of the unblocked thread.
270-
fn unblock_random_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
288+
fn unblock_some_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
271289
for (id, thread) in self.threads.iter_enumerated_mut() {
272290
if thread.state == ThreadState::Blocked(set) {
273291
trace!("unblocking {:?} in blockset {:?}", id, set);
@@ -284,6 +302,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
284302
}
285303

286304
/// Decide which action to take next and on which thread.
305+
///
306+
/// The currently implemented scheduling policy is the one that is commonly
307+
/// used in stateless model checkers such as Loom: run the active thread as
308+
/// long as we can and switch only when we have to (the active thread was
309+
/// blocked, terminated, or was explicitly asked to be preempted).
287310
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
288311
if self.threads[self.active_thread].check_terminated() {
289312
// Check if we need to unblock any threads.
@@ -295,14 +318,24 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
295318
}
296319
return Ok(SchedulingAction::ExecuteDtors);
297320
}
321+
if self.threads[MAIN_THREAD].state == ThreadState::Terminated {
322+
// The main thread terminated; stop the program.
323+
if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) {
324+
// FIXME: This check should be either configurable or just emit a warning.
325+
throw_unsup_format!("the main thread terminated without waiting for other threads");
326+
}
327+
return Ok(SchedulingAction::Stop);
328+
}
298329
if self.threads[self.active_thread].state == ThreadState::Enabled
299330
&& !self.yield_active_thread
300331
{
332+
// The currently active thread is still enabled, just continue with it.
301333
return Ok(SchedulingAction::ExecuteStep);
302334
}
335+
// We need to pick a new thread for execution.
303336
for (id, thread) in self.threads.iter_enumerated() {
304337
if thread.state == ThreadState::Enabled {
305-
if !(self.yield_active_thread && id == self.active_thread) {
338+
if !self.yield_active_thread || id != self.active_thread {
306339
self.active_thread = id;
307340
break;
308341
}
@@ -312,14 +345,16 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
312345
if self.threads[self.active_thread].state == ThreadState::Enabled {
313346
return Ok(SchedulingAction::ExecuteStep);
314347
}
348+
// We have not found a thread to execute.
315349
if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
316-
Ok(SchedulingAction::Stop)
350+
unreachable!();
317351
} else {
318352
throw_machine_stop!(TerminationInfo::Deadlock);
319353
}
320354
}
321355
}
322356

357+
// Public interface to thread management.
323358
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
324359
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
325360
/// A workaround for thread-local statics until
@@ -331,8 +366,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
331366
val: &mut mir::interpret::ConstValue<'tcx>,
332367
) -> InterpResult<'tcx> {
333368
let this = self.eval_context_ref();
334-
match val {
335-
mir::interpret::ConstValue::Scalar(Scalar::Ptr(ptr)) => {
369+
match *val {
370+
mir::interpret::ConstValue::Scalar(Scalar::Ptr(ref mut ptr)) => {
336371
let alloc_id = ptr.alloc_id;
337372
let alloc = this.tcx.alloc_map.lock().get(alloc_id);
338373
let tcx = this.tcx;
@@ -407,75 +442,94 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
407442
}
408443
}
409444

445+
#[inline]
410446
fn create_thread(&mut self) -> InterpResult<'tcx, ThreadId> {
411447
let this = self.eval_context_mut();
412448
Ok(this.machine.threads.create_thread())
413449
}
414450

451+
#[inline]
415452
fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> {
416453
let this = self.eval_context_mut();
417454
this.machine.threads.detach_thread(thread_id);
418455
Ok(())
419456
}
420457

458+
#[inline]
421459
fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
422460
let this = self.eval_context_mut();
423-
this.machine.threads.join_thread(joined_thread_id);
424-
Ok(())
461+
this.machine.threads.join_thread(joined_thread_id)
425462
}
426463

464+
#[inline]
427465
fn set_active_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx, ThreadId> {
428466
let this = self.eval_context_mut();
429467
Ok(this.machine.threads.set_active_thread_id(thread_id))
430468
}
431469

470+
#[inline]
432471
fn get_active_thread(&self) -> InterpResult<'tcx, ThreadId> {
433472
let this = self.eval_context_ref();
434473
Ok(this.machine.threads.get_active_thread_id())
435474
}
436475

476+
#[inline]
437477
fn has_terminated(&self, thread_id: ThreadId) -> InterpResult<'tcx, bool> {
438478
let this = self.eval_context_ref();
439479
Ok(this.machine.threads.has_terminated(thread_id))
440480
}
441481

482+
#[inline]
442483
fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Tag, FrameData<'tcx>>] {
443484
let this = self.eval_context_ref();
444485
this.machine.threads.active_thread_stack()
445486
}
446487

488+
#[inline]
447489
fn active_thread_stack_mut(&mut self) -> &mut Vec<Frame<'mir, 'tcx, Tag, FrameData<'tcx>>> {
448490
let this = self.eval_context_mut();
449491
this.machine.threads.active_thread_stack_mut()
450492
}
451493

494+
#[inline]
452495
fn set_active_thread_name(&mut self, new_thread_name: Vec<u8>) -> InterpResult<'tcx, ()> {
453496
let this = self.eval_context_mut();
454497
Ok(this.machine.threads.set_thread_name(new_thread_name))
455498
}
456499

500+
#[inline]
501+
fn get_active_thread_name(&mut self) -> InterpResult<'tcx, Vec<u8>> {
502+
let this = self.eval_context_mut();
503+
this.machine.threads.get_thread_name()
504+
}
505+
506+
#[inline]
457507
fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> {
458508
let this = self.eval_context_mut();
459509
Ok(this.machine.threads.create_blockset())
460510
}
461511

512+
#[inline]
462513
fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> {
463514
let this = self.eval_context_mut();
464515
Ok(this.machine.threads.block_active_thread(set))
465516
}
466517

467-
fn unblock_random_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
518+
#[inline]
519+
fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
468520
let this = self.eval_context_mut();
469-
Ok(this.machine.threads.unblock_random_thread(set))
521+
Ok(this.machine.threads.unblock_some_thread(set))
470522
}
471523

524+
#[inline]
472525
fn yield_active_thread(&mut self) -> InterpResult<'tcx> {
473526
let this = self.eval_context_mut();
474527
this.machine.threads.yield_active_thread();
475528
Ok(())
476529
}
477530

478531
/// Decide which action to take next and on which thread.
532+
#[inline]
479533
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
480534
let this = self.eval_context_mut();
481535
this.machine.threads.schedule()

0 commit comments

Comments
 (0)