Many small changes to thread management.

This commit is contained in:
Vytautas Astrauskas 2020-04-19 14:21:18 -07:00
parent 75e6549c11
commit 94cbe88e80
2 changed files with 87 additions and 33 deletions

View File

@ -419,7 +419,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
throw_ub_format!("called pthread_mutex_unlock on a mutex owned by another thread");
} else if locked_count == 1 {
let blockset = mutex_get_or_create_blockset(this, mutex_op)?;
if let Some(new_owner) = this.unblock_random_thread(blockset)? {
if let Some(new_owner) = this.unblock_some_thread(blockset)? {
// We have at least one thread waiting on this mutex. Transfer
// ownership to it.
mutex_set_owner(this, mutex_op, new_owner.to_u32_scalar())?;
@ -543,7 +543,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
assert_eq!(writers, 0);
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(new_readers))?;
if new_readers == 0 {
if let Some(_writer) = this.unblock_random_thread(writer_blockset)? {
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
}
}
@ -551,11 +551,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
} else if writers != 0 {
let reader_blockset = rwlock_get_or_create_reader_blockset(this, rwlock_op)?;
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(0))?;
if let Some(_writer) = this.unblock_random_thread(writer_blockset)? {
if let Some(_writer) = this.unblock_some_thread(writer_blockset)? {
rwlock_set_writers(this, rwlock_op, Scalar::from_u32(1))?;
} else {
let mut readers = 0;
while let Some(_reader) = this.unblock_random_thread(reader_blockset)? {
while let Some(_reader) = this.unblock_some_thread(reader_blockset)? {
readers += 1;
}
rwlock_set_readers(this, rwlock_op, Scalar::from_u32(readers))?

View File

@ -31,6 +31,9 @@ pub enum SchedulingAction {
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct ThreadId(usize);
/// The main thread. When it terminates, the whole application terminates.
const MAIN_THREAD: ThreadId = ThreadId(0);
impl Idx for ThreadId {
fn new(idx: usize) -> Self {
ThreadId(idx)
@ -42,13 +45,13 @@ impl Idx for ThreadId {
impl From<u64> for ThreadId {
fn from(id: u64) -> Self {
Self(id as usize)
Self(usize::try_from(id).unwrap())
}
}
impl From<u32> for ThreadId {
fn from(id: u32) -> Self {
Self(id as usize)
Self(usize::try_from(id).unwrap())
}
}
@ -82,10 +85,10 @@ pub enum ThreadState {
/// The thread tried to join the specified thread and is blocked until that
/// thread terminates.
BlockedOnJoin(ThreadId),
/// The thread is blocked and belongs to the given blockset..
/// The thread is blocked and belongs to the given blockset.
Blocked(BlockSetId),
/// The thread has terminated its execution (we do not delete terminated
/// threads.)
/// threads).
Terminated,
}
@ -150,6 +153,7 @@ pub struct ThreadManager<'mir, 'tcx> {
impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> {
fn default() -> Self {
let mut threads = IndexVec::new();
// Create the main thread and add it to the list of threads.
threads.push(Default::default());
Self {
active_thread: ThreadId::new(0),
@ -170,14 +174,13 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
/// Set the allocation id as the allocation id of the given thread local
/// static for the active thread.
///
/// Panics if a thread local is initialized twice for the same thread.
fn set_thread_local_alloc_id(&self, def_id: DefId, new_alloc_id: AllocId) {
assert!(
self.thread_local_alloc_ids
.borrow_mut()
.insert((def_id, self.active_thread), new_alloc_id)
.is_none(),
"a thread local initialized twice for the same thread"
);
self.thread_local_alloc_ids
.borrow_mut()
.insert((def_id, self.active_thread), new_alloc_id)
.unwrap_none();
}
/// Borrow the stack of the active thread.
@ -227,15 +230,20 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
/// Mark that the active thread tries to join the thread with `joined_thread_id`.
fn join_thread(&mut self, joined_thread_id: ThreadId) {
assert!(!self.threads[joined_thread_id].detached, "Bug: trying to join a detached thread.");
assert_ne!(joined_thread_id, self.active_thread, "Bug: trying to join itself");
assert!(
self.threads
.iter()
.all(|thread| thread.state != ThreadState::BlockedOnJoin(joined_thread_id)),
"Bug: multiple threads try to join the same thread."
);
fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
if self.threads[joined_thread_id].detached {
throw_ub_format!("trying to join a detached thread");
}
if joined_thread_id == self.active_thread {
throw_ub_format!("trying to join itself");
}
if self
.threads
.iter()
.any(|thread| thread.state == ThreadState::BlockedOnJoin(joined_thread_id))
{
throw_ub_format!("multiple threads try to join the same thread");
}
if self.threads[joined_thread_id].state != ThreadState::Terminated {
// The joined thread is still running, we need to wait for it.
self.active_thread_mut().state = ThreadState::BlockedOnJoin(joined_thread_id);
@ -245,6 +253,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
joined_thread_id
);
}
Ok(())
}
/// Set the name of the active thread.
@ -252,6 +261,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
self.active_thread_mut().thread_name = Some(new_thread_name);
}
/// Get the name of the active thread.
fn get_thread_name(&mut self) -> InterpResult<'tcx, Vec<u8>> {
if let Some(ref thread_name) = self.active_thread_mut().thread_name {
Ok(thread_name.clone())
} else {
throw_ub_format!("thread {:?} has no name set", self.active_thread)
}
}
/// Allocate a new blockset id.
fn create_blockset(&mut self) -> BlockSetId {
self.blockset_counter = self.blockset_counter.checked_add(1).unwrap();
@ -267,7 +285,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
/// Unblock any one thread from the given blockset if it contains at least
/// one. Return the id of the unblocked thread.
fn unblock_random_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
fn unblock_some_thread(&mut self, set: BlockSetId) -> Option<ThreadId> {
for (id, thread) in self.threads.iter_enumerated_mut() {
if thread.state == ThreadState::Blocked(set) {
trace!("unblocking {:?} in blockset {:?}", id, set);
@ -284,6 +302,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
/// Decide which action to take next and on which thread.
///
/// The currently implemented scheduling policy is the one that is commonly
/// used in stateless model checkers such as Loom: run the active thread as
/// long as we can and switch only when we have to (the active thread was
/// blocked, terminated, or was explicitly asked to be preempted).
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
if self.threads[self.active_thread].check_terminated() {
// Check if we need to unblock any threads.
@ -295,14 +318,24 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
}
return Ok(SchedulingAction::ExecuteDtors);
}
if self.threads[MAIN_THREAD].state == ThreadState::Terminated {
// The main thread terminated; stop the program.
if self.threads.iter().any(|thread| thread.state != ThreadState::Terminated) {
// FIXME: This check should be either configurable or just emit a warning.
throw_unsup_format!("the main thread terminated without waiting for other threads");
}
return Ok(SchedulingAction::Stop);
}
if self.threads[self.active_thread].state == ThreadState::Enabled
&& !self.yield_active_thread
{
// The currently active thread is still enabled, just continue with it.
return Ok(SchedulingAction::ExecuteStep);
}
// We need to pick a new thread for execution.
for (id, thread) in self.threads.iter_enumerated() {
if thread.state == ThreadState::Enabled {
if !(self.yield_active_thread && id == self.active_thread) {
if !self.yield_active_thread || id != self.active_thread {
self.active_thread = id;
break;
}
@ -312,14 +345,16 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
if self.threads[self.active_thread].state == ThreadState::Enabled {
return Ok(SchedulingAction::ExecuteStep);
}
// We have not found a thread to execute.
if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) {
Ok(SchedulingAction::Stop)
unreachable!();
} else {
throw_machine_stop!(TerminationInfo::Deadlock);
}
}
}
// Public interface to thread management.
impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriEvalContext<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx> {
/// A workaround for thread-local statics until
@ -331,8 +366,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
val: &mut mir::interpret::ConstValue<'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_ref();
match val {
mir::interpret::ConstValue::Scalar(Scalar::Ptr(ptr)) => {
match *val {
mir::interpret::ConstValue::Scalar(Scalar::Ptr(ref mut ptr)) => {
let alloc_id = ptr.alloc_id;
let alloc = this.tcx.alloc_map.lock().get(alloc_id);
let tcx = this.tcx;
@ -407,68 +442,86 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
}
}
#[inline]
fn create_thread(&mut self) -> InterpResult<'tcx, ThreadId> {
let this = self.eval_context_mut();
Ok(this.machine.threads.create_thread())
}
#[inline]
fn detach_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.detach_thread(thread_id);
Ok(())
}
#[inline]
fn join_thread(&mut self, joined_thread_id: ThreadId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.join_thread(joined_thread_id);
Ok(())
this.machine.threads.join_thread(joined_thread_id)
}
#[inline]
fn set_active_thread(&mut self, thread_id: ThreadId) -> InterpResult<'tcx, ThreadId> {
let this = self.eval_context_mut();
Ok(this.machine.threads.set_active_thread_id(thread_id))
}
#[inline]
fn get_active_thread(&self) -> InterpResult<'tcx, ThreadId> {
let this = self.eval_context_ref();
Ok(this.machine.threads.get_active_thread_id())
}
#[inline]
fn has_terminated(&self, thread_id: ThreadId) -> InterpResult<'tcx, bool> {
let this = self.eval_context_ref();
Ok(this.machine.threads.has_terminated(thread_id))
}
#[inline]
fn active_thread_stack(&self) -> &[Frame<'mir, 'tcx, Tag, FrameData<'tcx>>] {
let this = self.eval_context_ref();
this.machine.threads.active_thread_stack()
}
#[inline]
fn active_thread_stack_mut(&mut self) -> &mut Vec<Frame<'mir, 'tcx, Tag, FrameData<'tcx>>> {
let this = self.eval_context_mut();
this.machine.threads.active_thread_stack_mut()
}
#[inline]
fn set_active_thread_name(&mut self, new_thread_name: Vec<u8>) -> InterpResult<'tcx, ()> {
let this = self.eval_context_mut();
Ok(this.machine.threads.set_thread_name(new_thread_name))
}
#[inline]
fn get_active_thread_name(&mut self) -> InterpResult<'tcx, Vec<u8>> {
let this = self.eval_context_mut();
this.machine.threads.get_thread_name()
}
#[inline]
fn create_blockset(&mut self) -> InterpResult<'tcx, BlockSetId> {
let this = self.eval_context_mut();
Ok(this.machine.threads.create_blockset())
}
#[inline]
fn block_active_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
Ok(this.machine.threads.block_active_thread(set))
}
fn unblock_random_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
#[inline]
fn unblock_some_thread(&mut self, set: BlockSetId) -> InterpResult<'tcx, Option<ThreadId>> {
let this = self.eval_context_mut();
Ok(this.machine.threads.unblock_random_thread(set))
Ok(this.machine.threads.unblock_some_thread(set))
}
#[inline]
fn yield_active_thread(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
this.machine.threads.yield_active_thread();
@ -476,6 +529,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
}
/// Decide which action to take next and on which thread.
#[inline]
fn schedule(&mut self) -> InterpResult<'tcx, SchedulingAction> {
let this = self.eval_context_mut();
this.machine.threads.schedule()