diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 7f93dae00b7..b97a4df2245 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -22,10 +22,12 @@ use ops::Drop; use kinds::Owned; use rt::sched::{Scheduler, Coroutine}; use rt::local::Local; -use unstable::atomics::{AtomicUint, SeqCst}; +use unstable::atomics::{AtomicUint, AtomicOption, SeqCst}; +use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; use cell::Cell; +use clone::Clone; /// A combined refcount / ~Task pointer. /// @@ -312,16 +314,19 @@ struct StreamPayload { next: PortOne> } +type StreamChanOne = ChanOne>; +type StreamPortOne = PortOne>; + /// A channel with unbounded size. pub struct Chan { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } /// An port with unbounded size. pub struct Port { // FIXME #5372. Using Cell because we don't take &mut self - next: Cell>> + next: Cell> } pub fn stream() -> (Port, Chan) { @@ -374,6 +379,43 @@ impl Peekable for Port { } } +pub struct SharedChan { + // Just like Chan, but a shared AtomicOption instead of Cell + priv next: UnsafeAtomicRcBox>> +} + +impl SharedChan { + pub fn new(chan: Chan) -> SharedChan { + let next = chan.next.take(); + let next = AtomicOption::new(~next); + SharedChan { next: UnsafeAtomicRcBox::new(next) } + } +} + +impl GenericChan for SharedChan { + fn send(&self, val: T) { + self.try_send(val); + } +} + +impl GenericSmartChan for SharedChan { + fn try_send(&self, val: T) -> bool { + unsafe { + let (next_pone, next_cone) = oneshot(); + let cone = (*self.next.get()).swap(~next_cone, SeqCst); + cone.unwrap().try_send(StreamPayload { val: val, next: next_pone }) + } + } +} + +impl Clone for SharedChan { + fn clone(&self) -> SharedChan { + SharedChan { + next: self.next.clone() + } + } +} + #[cfg(test)] mod test { use super::*; @@ -641,5 +683,24 @@ mod test { for 10000.times { port.recv() } } } + + #[test] + fn shared_chan_stress() { + do run_in_mt_newsched_task { + let (port, chan) = stream(); + let chan = SharedChan::new(chan); + let total = stress_factor() + 100; + for total.times { + let chan_clone = chan.clone(); + do spawntask_random { + chan_clone.send(()); + } + } + + for total.times { + port.recv(); + } + } + } }