diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index bdd4e836997..b1533237b15 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -18,7 +18,7 @@ use rt::kill::BlockedTask; use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; -use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use unstable::atomics::{AtomicUint, AtomicOption, Acquire, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -164,36 +164,39 @@ impl PortOne { let mut this = self; let packet = this.packet(); - // XXX: Optimize this to not require the two context switches when data is available + // Optimistic check. If data was sent already, we don't even need to block. + // No release barrier needed here; we're not handing off our task pointer yet. + if unsafe { (*packet).state.load(Acquire) } != STATE_ONE { + // No data available yet. + // Switch to the scheduler to put the ~Task into the Packet state. + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + unsafe { + // Atomically swap the task pointer into the Packet state, issuing + // an acquire barrier to prevent reordering of the subsequent read + // of the payload. Also issues a release barrier to prevent + // reordering of any previous writes to the task structure. + let task_as_state = task.cast_to_uint(); + let oldstate = (*packet).state.swap(task_as_state, SeqCst); + match oldstate { + STATE_BOTH => { + // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; + } + STATE_ONE => { + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; - // Switch to the scheduler to put the ~Task into the Packet state. - let sched = Local::take::(); - do sched.deschedule_running_task_and_then |sched, task| { - unsafe { - // Atomically swap the task pointer into the Packet state, issuing - // an acquire barrier to prevent reordering of the subsequent read - // of the payload. Also issues a release barrier to prevent reordering - // of any previous writes to the task structure. - let task_as_state = task.cast_to_uint(); - let oldstate = (*packet).state.swap(task_as_state, SeqCst); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - rtdebug!("non-rendezvous recv"); - sched.metrics.non_rendezvous_recvs += 1; + // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. + let recvr = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(recvr); + } + _ => util::unreachable() } - STATE_ONE => { - rtdebug!("rendezvous recv"); - sched.metrics.rendezvous_recvs += 1; - - // Channel is closed. Switch back and check the data. - // NB: We have to drop back into the scheduler event loop here - // instead of switching immediately back or we could end up - // triggering infinite recursion on the scheduler's stack. - let recvr = BlockedTask::cast_from_uint(task_as_state); - sched.enqueue_blocked_task(recvr); - } - _ => util::unreachable() } } } @@ -212,7 +215,7 @@ impl PortOne { // a different scheduler for resuming. That send synchronized memory. unsafe { - let payload = util::replace(&mut (*packet).payload, None); + let payload = (*packet).payload.take(); // The sender has closed up shop. Drop the packet. let _packet: ~Packet = cast::transmute(this.void_packet);