Make QueryWaiter use safe code
This commit is contained in:
parent
3571684d2c
commit
b2555bd545
@ -94,15 +94,15 @@ pub(super) fn await<'lcx>(
|
||||
#[cfg(parallel_queries)]
|
||||
{
|
||||
tls::with_related_context(tcx, move |icx| {
|
||||
let mut waiter = QueryWaiter {
|
||||
query: &icx.query,
|
||||
let mut waiter = Lrc::new(QueryWaiter {
|
||||
query: icx.query.clone(),
|
||||
span,
|
||||
cycle: None,
|
||||
cycle: Lock::new(None),
|
||||
condvar: Condvar::new(),
|
||||
};
|
||||
self.latch.await(&mut waiter);
|
||||
});
|
||||
self.latch.await(&waiter);
|
||||
|
||||
match waiter.cycle {
|
||||
match Lrc::get_mut(&mut waiter).unwrap().cycle.get_mut().take() {
|
||||
None => Ok(()),
|
||||
Some(cycle) => Err(cycle)
|
||||
}
|
||||
@ -154,10 +154,10 @@ pub fn signal_complete(&self) {
|
||||
|
||||
#[cfg(parallel_queries)]
|
||||
struct QueryWaiter<'tcx> {
|
||||
query: *const Option<Lrc<QueryJob<'tcx>>>,
|
||||
query: Option<Lrc<QueryJob<'tcx>>>,
|
||||
condvar: Condvar,
|
||||
span: Span,
|
||||
cycle: Option<CycleError<'tcx>>,
|
||||
cycle: Lock<Option<CycleError<'tcx>>>,
|
||||
}
|
||||
|
||||
#[cfg(parallel_queries)]
|
||||
@ -171,13 +171,9 @@ fn notify(&self, registry: &rayon_core::Registry) {
|
||||
#[cfg(parallel_queries)]
|
||||
struct QueryLatchInfo<'tcx> {
|
||||
complete: bool,
|
||||
waiters: Vec<*mut QueryWaiter<'tcx>>,
|
||||
waiters: Vec<Lrc<QueryWaiter<'tcx>>>,
|
||||
}
|
||||
|
||||
// Required because of raw pointers
|
||||
#[cfg(parallel_queries)]
|
||||
unsafe impl<'tcx> Send for QueryLatchInfo<'tcx> {}
|
||||
|
||||
#[cfg(parallel_queries)]
|
||||
struct QueryLatch<'tcx> {
|
||||
info: Mutex<QueryLatchInfo<'tcx>>,
|
||||
@ -195,14 +191,14 @@ fn new() -> Self {
|
||||
}
|
||||
|
||||
/// Awaits the caller on this latch by blocking the current thread.
|
||||
fn await(&self, waiter: &mut QueryWaiter<'tcx>) {
|
||||
fn await(&self, waiter: &Lrc<QueryWaiter<'tcx>>) {
|
||||
let mut info = self.info.lock();
|
||||
if !info.complete {
|
||||
// We push the waiter on to the `waiters` list. It can be accessed inside
|
||||
// the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
|
||||
// Both of these will remove it from the `waiters` list before resuming
|
||||
// this thread.
|
||||
info.waiters.push(waiter);
|
||||
info.waiters.push(waiter.clone());
|
||||
|
||||
// If this detects a deadlock and the deadlock handler want to resume this thread
|
||||
// we have to be in the `wait` call. This is ensured by the deadlock handler
|
||||
@ -219,9 +215,7 @@ fn set(&self) {
|
||||
info.complete = true;
|
||||
let registry = rayon_core::Registry::current();
|
||||
for waiter in info.waiters.drain(..) {
|
||||
unsafe {
|
||||
(*waiter).notify(®istry);
|
||||
}
|
||||
waiter.notify(®istry);
|
||||
}
|
||||
}
|
||||
|
||||
@ -230,7 +224,7 @@ fn set(&self) {
|
||||
fn extract_waiter(
|
||||
&self,
|
||||
waiter: usize,
|
||||
) -> *mut QueryWaiter<'tcx> {
|
||||
) -> Lrc<QueryWaiter<'tcx>> {
|
||||
let mut info = self.info.lock();
|
||||
debug_assert!(!info.complete);
|
||||
// Remove the waiter from the list of waiters
|
||||
@ -270,13 +264,11 @@ fn visit_waiters<'tcx, F>(query_ref: Ref<'tcx>, mut visit: F) -> Option<Option<W
|
||||
}
|
||||
|
||||
// Visit the explict waiters which use condvars and are resumable
|
||||
for (i, &waiter) in query.latch.info.lock().waiters.iter().enumerate() {
|
||||
unsafe {
|
||||
if let Some(ref waiter_query) = *(*waiter).query {
|
||||
if visit((*waiter).span, &**waiter_query as Ref).is_some() {
|
||||
// Return a value which indicates that this waiter can be resumed
|
||||
return Some(Some((query_ref, i)));
|
||||
}
|
||||
for (i, waiter) in query.latch.info.lock().waiters.iter().enumerate() {
|
||||
if let Some(ref waiter_query) = waiter.query {
|
||||
if visit(waiter.span, &**waiter_query).is_some() {
|
||||
// Return a value which indicates that this waiter can be resumed
|
||||
return Some(Some((query_ref, i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -359,7 +351,7 @@ fn connected_to_root<'tcx>(query: Ref<'tcx>, visited: &mut HashSet<Ref<'tcx>>) -
|
||||
#[cfg(parallel_queries)]
|
||||
fn remove_cycle<'tcx>(
|
||||
jobs: &mut Vec<Ref<'tcx>>,
|
||||
wakelist: &mut Vec<*mut QueryWaiter<'tcx>>,
|
||||
wakelist: &mut Vec<Lrc<QueryWaiter<'tcx>>>,
|
||||
tcx: TyCtxt<'_, 'tcx, '_>
|
||||
) -> bool {
|
||||
let mut visited = HashSet::new();
|
||||
@ -439,9 +431,9 @@ fn remove_cycle<'tcx>(
|
||||
// Extract the waiter we want to resume
|
||||
let waiter = waitee_query.latch.extract_waiter(waiter_idx);
|
||||
|
||||
// Set the cycle error it will be picked it up when resumed
|
||||
// Set the cycle error so it will be picked up when resumed
|
||||
unsafe {
|
||||
(*waiter).cycle = Some(error);
|
||||
*waiter.cycle.lock() = Some(error);
|
||||
}
|
||||
|
||||
// Put the waiter on the list of things to resume
|
||||
@ -525,9 +517,7 @@ fn deadlock(tcx: TyCtxt<'_, '_, '_>, registry: &rayon_core::Registry) {
|
||||
|
||||
// FIXME: Ensure this won't cause a deadlock before we return
|
||||
for waiter in wakelist.into_iter() {
|
||||
unsafe {
|
||||
(*waiter).notify(registry);
|
||||
}
|
||||
waiter.notify(registry);
|
||||
}
|
||||
|
||||
on_panic.disable();
|
||||
|
Loading…
Reference in New Issue
Block a user