// Copyright 2013 The Rust Project Developers. See the COPYRIGHT // file at the top-level directory of this distribution and at // http://rust-lang.org/COPYRIGHT. // // Licensed under the Apache License, Version 2.0 or the MIT license // , at your // option. This file may not be copied, modified, or distributed // except according to those terms. //! Ports and channels. use option::*; use cast; use util; use ops::Drop; use rt::kill::BlockedTask; use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; use unstable::atomics::{AtomicUint, AtomicOption, Acquire, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; use clone::Clone; /// A combined refcount / BlockedTask-as-uint pointer. /// /// Can be equal to the following values: /// /// * 2 - both endpoints are alive /// * 1 - either the sender or the receiver is dead, determined by context /// * - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint) type State = uint; static STATE_BOTH: State = 2; static STATE_ONE: State = 1; /// The heap-allocated structure shared between two endpoints. struct Packet { state: AtomicUint, payload: Option, } /// A one-shot channel. pub struct ChanOne { void_packet: *mut Void, suppress_finalize: bool } /// A one-shot port. pub struct PortOne { void_packet: *mut Void, suppress_finalize: bool } pub fn oneshot() -> (PortOne, ChanOne) { let packet: ~Packet = ~Packet { state: AtomicUint::new(STATE_BOTH), payload: None }; unsafe { let packet: *mut Void = cast::transmute(packet); let port = PortOne { void_packet: packet, suppress_finalize: false }; let chan = ChanOne { void_packet: packet, suppress_finalize: false }; return (port, chan); } } impl ChanOne { fn packet(&self) -> *mut Packet { unsafe { let p: *mut ~Packet = cast::transmute(&self.void_packet); let p: *mut Packet = &mut **p; return p; } } pub fn send(self, val: T) { self.try_send(val); } pub fn try_send(self, val: T) -> bool { let mut this = self; let mut recvr_active = true; let packet = this.packet(); unsafe { // Install the payload assert!((*packet).payload.is_none()); (*packet).payload = Some(val); // Atomically swap out the old state to figure out what // the port's up to, issuing a release barrier to prevent // reordering of the payload write. This also issues an // acquire barrier that keeps the subsequent access of the // ~Task pointer from being reordered. let oldstate = (*packet).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do do Local::borrow:: |sched| { rtdebug!("non-rendezvous send"); sched.metrics.non_rendezvous_sends += 1; } } STATE_ONE => { do Local::borrow:: |sched| { rtdebug!("rendezvous send"); sched.metrics.rendezvous_sends += 1; } // Port has closed. Need to clean up. let _packet: ~Packet = cast::transmute(this.void_packet); recvr_active = false; } task_as_state => { // Port is blocked. Wake it up. let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { let mut sched = Local::take::(); rtdebug!("rendezvous send"); sched.metrics.rendezvous_sends += 1; sched.schedule_task(woken_task); }; } } } // Suppress the synchronizing actions in the finalizer. We're done with the packet. this.suppress_finalize = true; return recvr_active; } } impl PortOne { fn packet(&self) -> *mut Packet { unsafe { let p: *mut ~Packet = cast::transmute(&self.void_packet); let p: *mut Packet = &mut **p; return p; } } pub fn recv(self) -> T { match self.try_recv() { Some(val) => val, None => { fail!("receiving on closed channel"); } } } pub fn try_recv(self) -> Option { let mut this = self; let packet = this.packet(); // Optimistic check. If data was sent already, we don't even need to block. // No release barrier needed here; we're not handing off our task pointer yet. if unsafe { (*packet).state.load(Acquire) } != STATE_ONE { // No data available yet. // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { unsafe { // Atomically swap the task pointer into the Packet state, issuing // an acquire barrier to prevent reordering of the subsequent read // of the payload. Also issues a release barrier to prevent // reordering of any previous writes to the task structure. let task_as_state = task.cast_to_uint(); let oldstate = (*packet).state.swap(task_as_state, SeqCst); match oldstate { STATE_BOTH => { // Data has not been sent. Now we're blocked. rtdebug!("non-rendezvous recv"); sched.metrics.non_rendezvous_recvs += 1; } STATE_ONE => { rtdebug!("rendezvous recv"); sched.metrics.rendezvous_recvs += 1; // Channel is closed. Switch back and check the data. // NB: We have to drop back into the scheduler event loop here // instead of switching immediately back or we could end up // triggering infinite recursion on the scheduler's stack. let recvr = BlockedTask::cast_from_uint(task_as_state); sched.enqueue_blocked_task(recvr); } _ => util::unreachable() } } } } // Task resumes. // No further memory barrier is needed here to access the // payload. Some scenarios: // // 1) We encountered STATE_ONE above - the atomic_xchg was the acq barrier. We're fine. // 2) We encountered STATE_BOTH above and blocked. The sending task then ran us // and ran on its thread. The sending task issued a read barrier when taking the // pointer to the receiving task. // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) // is pinned to some other scheduler, so the sending task had to give us to // a different scheduler for resuming. That send synchronized memory. unsafe { let payload = (*packet).payload.take(); // The sender has closed up shop. Drop the packet. let _packet: ~Packet = cast::transmute(this.void_packet); // Suppress the synchronizing actions in the finalizer. We're done with the packet. this.suppress_finalize = true; return payload; } } } impl Peekable for PortOne { fn peek(&self) -> bool { unsafe { let packet: *mut Packet = self.packet(); let oldstate = (*packet).state.load(SeqCst); match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), _ => util::unreachable() } } } } #[unsafe_destructor] impl Drop for ChanOne { fn drop(&self) { if self.suppress_finalize { return } unsafe { let this = cast::transmute_mut(self); let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Port still active. It will destroy the Packet. }, STATE_ONE => { let _packet: ~Packet = cast::transmute(this.void_packet); }, task_as_state => { // The port is blocked waiting for a message we will never send. Wake it. assert!((*this.packet()).payload.is_none()); let recvr = BlockedTask::cast_from_uint(task_as_state); do recvr.wake().map_consume |woken_task| { let sched = Local::take::(); sched.schedule_task(woken_task); }; } } } } } #[unsafe_destructor] impl Drop for PortOne { fn drop(&self) { if self.suppress_finalize { return } unsafe { let this = cast::transmute_mut(self); let oldstate = (*this.packet()).state.swap(STATE_ONE, SeqCst); match oldstate { STATE_BOTH => { // Chan still active. It will destroy the packet. }, STATE_ONE => { let _packet: ~Packet = cast::transmute(this.void_packet); } task_as_state => { // This case occurs during unwinding, when the blocked // receiver was killed awake. The task can't still be // blocked (we are it), but we need to free the handle. let recvr = BlockedTask::cast_from_uint(task_as_state); // FIXME(#7554)(bblum): Make this cfg(test) dependent. // in a later commit. assert!(recvr.wake().is_none()); } } } } } struct StreamPayload { val: T, next: PortOne> } type StreamChanOne = ChanOne>; type StreamPortOne = PortOne>; /// A channel with unbounded size. pub struct Chan { // FIXME #5372. Using Cell because we don't take &mut self next: Cell> } /// An port with unbounded size. pub struct Port { // FIXME #5372. Using Cell because we don't take &mut self next: Cell> } pub fn stream() -> (Port, Chan) { let (pone, cone) = oneshot(); let port = Port { next: Cell::new(pone) }; let chan = Chan { next: Cell::new(cone) }; return (port, chan); } impl GenericChan for Chan { fn send(&self, val: T) { self.try_send(val); } } impl GenericSmartChan for Chan { fn try_send(&self, val: T) -> bool { let (next_pone, next_cone) = oneshot(); let cone = self.next.take(); self.next.put_back(next_cone); cone.try_send(StreamPayload { val: val, next: next_pone }) } } impl GenericPort for Port { fn recv(&self) -> T { match self.try_recv() { Some(val) => val, None => { fail!("receiving on closed channel"); } } } fn try_recv(&self) -> Option { let pone = self.next.take(); match pone.try_recv() { Some(StreamPayload { val, next }) => { self.next.put_back(next); Some(val) } None => None } } } impl Peekable for Port { fn peek(&self) -> bool { self.next.with_mut_ref(|p| p.peek()) } } pub struct SharedChan { // Just like Chan, but a shared AtomicOption instead of Cell priv next: UnsafeAtomicRcBox>> } impl SharedChan { pub fn new(chan: Chan) -> SharedChan { let next = chan.next.take(); let next = AtomicOption::new(~next); SharedChan { next: UnsafeAtomicRcBox::new(next) } } } impl GenericChan for SharedChan { fn send(&self, val: T) { self.try_send(val); } } impl GenericSmartChan for SharedChan { fn try_send(&self, val: T) -> bool { unsafe { let (next_pone, next_cone) = oneshot(); let cone = (*self.next.get()).swap(~next_cone, SeqCst); cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) } } } impl Clone for SharedChan { fn clone(&self) -> SharedChan { SharedChan { next: self.next.clone() } } } pub struct SharedPort { // The next port on which we will receive the next port on which we will receive T priv next_link: UnsafeAtomicRcBox>>> } impl SharedPort { pub fn new(port: Port) -> SharedPort { // Put the data port into a new link pipe let next_data_port = port.next.take(); let (next_link_port, next_link_chan) = oneshot(); next_link_chan.send(next_data_port); let next_link = AtomicOption::new(~next_link_port); SharedPort { next_link: UnsafeAtomicRcBox::new(next_link) } } } impl GenericPort for SharedPort { fn recv(&self) -> T { match self.try_recv() { Some(val) => val, None => { fail!("receiving on a closed channel"); } } } fn try_recv(&self) -> Option { unsafe { let (next_link_port, next_link_chan) = oneshot(); let link_port = (*self.next_link.get()).swap(~next_link_port, SeqCst); let link_port = link_port.unwrap(); let data_port = link_port.recv(); let (next_data_port, res) = match data_port.try_recv() { Some(StreamPayload { val, next }) => { (next, Some(val)) } None => { let (next_data_port, _) = oneshot(); (next_data_port, None) } }; next_link_chan.send(next_data_port); return res; } } } impl Clone for SharedPort { fn clone(&self) -> SharedPort { SharedPort { next_link: self.next_link.clone() } } } // XXX: Need better name type MegaPipe = (SharedPort, SharedChan); pub fn megapipe() -> MegaPipe { let (port, chan) = stream(); (SharedPort::new(port), SharedChan::new(chan)) } impl GenericChan for MegaPipe { fn send(&self, val: T) { match *self { (_, ref c) => c.send(val) } } } impl GenericSmartChan for MegaPipe { fn try_send(&self, val: T) -> bool { match *self { (_, ref c) => c.try_send(val) } } } impl GenericPort for MegaPipe { fn recv(&self) -> T { match *self { (ref p, _) => p.recv() } } fn try_recv(&self) -> Option { match *self { (ref p, _) => p.try_recv() } } } #[cfg(test)] mod test { use super::*; use option::*; use rt::test::*; use cell::Cell; use iter::Times; #[test] fn oneshot_single_thread_close_port_first() { // Simple test of closing without sending do run_in_newsched_task { let (port, _chan) = oneshot::(); { let _p = port; } } } #[test] fn oneshot_single_thread_close_chan_first() { // Simple test of closing without sending do run_in_newsched_task { let (_port, chan) = oneshot::(); { let _c = chan; } } } #[test] fn oneshot_single_thread_send_port_close() { // Testing that the sender cleans up the payload if receiver is closed do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); { let _p = port; } chan.send(~0); } } #[test] fn oneshot_single_thread_recv_chan_close() { // Receiving on a closed chan will fail do run_in_newsched_task { let res = do spawntask_try { let (port, chan) = oneshot::<~int>(); { let _c = chan; } port.recv(); }; // What is our res? rtdebug!("res is: %?", res.is_err()); assert!(res.is_err()); } } #[test] fn oneshot_single_thread_send_then_recv() { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); chan.send(~10); assert!(port.recv() == ~10); } } #[test] fn oneshot_single_thread_try_send_open() { do run_in_newsched_task { let (port, chan) = oneshot::(); assert!(chan.try_send(10)); assert!(port.recv() == 10); } } #[test] fn oneshot_single_thread_try_send_closed() { do run_in_newsched_task { let (port, chan) = oneshot::(); { let _p = port; } assert!(!chan.try_send(10)); } } #[test] fn oneshot_single_thread_try_recv_open() { do run_in_newsched_task { let (port, chan) = oneshot::(); chan.send(10); assert!(port.try_recv() == Some(10)); } } #[test] fn oneshot_single_thread_try_recv_closed() { do run_in_newsched_task { let (port, chan) = oneshot::(); { let _c = chan; } assert!(port.try_recv() == None); } } #[test] fn oneshot_single_thread_peek_data() { do run_in_newsched_task { let (port, chan) = oneshot::(); assert!(!port.peek()); chan.send(10); assert!(port.peek()); } } #[test] fn oneshot_single_thread_peek_close() { do run_in_newsched_task { let (port, chan) = oneshot::(); { let _c = chan; } assert!(!port.peek()); assert!(!port.peek()); } } #[test] fn oneshot_single_thread_peek_open() { do run_in_newsched_task { let (port, _) = oneshot::(); assert!(!port.peek()); } } #[test] fn oneshot_multi_task_recv_then_send() { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); do spawntask_immediately { assert!(port_cell.take().recv() == ~10); } chan.send(~10); } } #[test] fn oneshot_multi_task_recv_then_close() { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let port_cell = Cell::new(port); let chan_cell = Cell::new(chan); do spawntask_later { let _cell = chan_cell.take(); } let res = do spawntask_try { assert!(port_cell.take().recv() == ~10); }; assert!(res.is_err()); } } #[test] fn oneshot_multi_thread_close_stress() { for stress_factor().times { do run_in_newsched_task { let (port, chan) = oneshot::(); let port_cell = Cell::new(port); let _thread = do spawntask_thread { let _p = port_cell.take(); }; let _chan = chan; } } } #[test] fn oneshot_multi_thread_send_close_stress() { for stress_factor().times { do run_in_newsched_task { let (port, chan) = oneshot::(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); let _thread1 = do spawntask_thread { let _p = port_cell.take(); }; let _thread2 = do spawntask_thread { let c = chan_cell.take(); c.send(1); }; } } } #[test] fn oneshot_multi_thread_recv_close_stress() { for stress_factor().times { do run_in_newsched_task { let (port, chan) = oneshot::(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); let _thread1 = do spawntask_thread { let port_cell = Cell::new(port_cell.take()); let res = do spawntask_try { port_cell.take().recv(); }; assert!(res.is_err()); }; let _thread2 = do spawntask_thread { let chan_cell = Cell::new(chan_cell.take()); do spawntask { chan_cell.take(); } }; } } } #[test] fn oneshot_multi_thread_send_recv_stress() { for stress_factor().times { do run_in_newsched_task { let (port, chan) = oneshot::<~int>(); let chan_cell = Cell::new(chan); let port_cell = Cell::new(port); let _thread1 = do spawntask_thread { chan_cell.take().send(~10); }; let _thread2 = do spawntask_thread { assert!(port_cell.take().recv() == ~10); }; } } } #[test] fn stream_send_recv_stress() { for stress_factor().times { do run_in_mt_newsched_task { let (port, chan) = stream::<~int>(); send(chan, 0); recv(port, 0); fn send(chan: Chan<~int>, i: int) { if i == 10 { return } let chan_cell = Cell::new(chan); do spawntask_random { let chan = chan_cell.take(); chan.send(~i); send(chan, i + 1); } } fn recv(port: Port<~int>, i: int) { if i == 10 { return } let port_cell = Cell::new(port); do spawntask_random { let port = port_cell.take(); assert!(port.recv() == ~i); recv(port, i + 1); }; } } } } #[test] fn recv_a_lot() { // Regression test that we don't run out of stack in scheduler context do run_in_newsched_task { let (port, chan) = stream(); for 10000.times { chan.send(()) } for 10000.times { port.recv() } } } #[test] fn shared_chan_stress() { do run_in_mt_newsched_task { let (port, chan) = stream(); let chan = SharedChan::new(chan); let total = stress_factor() + 100; for total.times { let chan_clone = chan.clone(); do spawntask_random { chan_clone.send(()); } } for total.times { port.recv(); } } } #[test] fn shared_port_stress() { do run_in_mt_newsched_task { // XXX: Removing these type annotations causes an ICE let (end_port, end_chan) = stream::<()>(); let (port, chan) = stream::<()>(); let end_chan = SharedChan::new(end_chan); let port = SharedPort::new(port); let total = stress_factor() + 100; for total.times { let end_chan_clone = end_chan.clone(); let port_clone = port.clone(); do spawntask_random { port_clone.recv(); end_chan_clone.send(()); } } for total.times { chan.send(()); } for total.times { end_port.recv(); } } } #[test] fn shared_port_close_simple() { do run_in_mt_newsched_task { let (port, chan) = stream::<()>(); let port = SharedPort::new(port); { let _chan = chan; } assert!(port.try_recv().is_none()); } } #[test] fn shared_port_close() { do run_in_mt_newsched_task { let (end_port, end_chan) = stream::(); let (port, chan) = stream::<()>(); let end_chan = SharedChan::new(end_chan); let port = SharedPort::new(port); let chan = SharedChan::new(chan); let send_total = 10; let recv_total = 20; do spawntask_random { for send_total.times { let chan_clone = chan.clone(); do spawntask_random { chan_clone.send(()); } } } let end_chan_clone = end_chan.clone(); do spawntask_random { for recv_total.times { let port_clone = port.clone(); let end_chan_clone = end_chan_clone.clone(); do spawntask_random { let recvd = port_clone.try_recv().is_some(); end_chan_clone.send(recvd); } } } let mut recvd = 0; for recv_total.times { recvd += if end_port.recv() { 1 } else { 0 }; } assert!(recvd == send_total); } } #[test] fn megapipe_stress() { use rand; use rand::RngUtil; do run_in_mt_newsched_task { let (end_port, end_chan) = stream::<()>(); let end_chan = SharedChan::new(end_chan); let pipe = megapipe(); let total = stress_factor() + 10; let mut rng = rand::rng(); for total.times { let msgs = rng.gen_uint_range(0, 10); let pipe_clone = pipe.clone(); let end_chan_clone = end_chan.clone(); do spawntask_random { for msgs.times { pipe_clone.send(()); } for msgs.times { pipe_clone.recv(); } } end_chan_clone.send(()); } for total.times { end_port.recv(); } } } }