//@ run-pass //@ compile-flags:--test //@ needs-threads use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::RecvError; use std::sync::mpsc::RecvTimeoutError; use std::sync::Arc; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::thread; use std::time::Duration; /// Simple thread synchronization utility struct Barrier { // Not using mutex/condvar for precision shared: Arc, count: usize, } impl Barrier { fn new(count: usize) -> Vec { let shared = Arc::new(AtomicUsize::new(0)); (0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect() } fn new2() -> (Barrier, Barrier) { let mut v = Barrier::new(2); (v.pop().unwrap(), v.pop().unwrap()) } /// Returns when `count` threads enter `wait` fn wait(self) { self.shared.fetch_add(1, Ordering::SeqCst); while self.shared.load(Ordering::SeqCst) != self.count { #[cfg(target_env = "sgx")] thread::yield_now(); } } } fn shared_close_sender_does_not_lose_messages_iter() { let (tb, rb) = Barrier::new2(); let (tx, rx) = channel(); let _ = tx.clone(); // convert to shared thread::spawn(move || { tb.wait(); thread::sleep(Duration::from_micros(1)); tx.send(17).expect("send"); drop(tx); }); let i = rx.into_iter(); rb.wait(); // Make sure it doesn't return disconnected before returning an element assert_eq!(vec![17], i.collect::>()); } #[test] fn shared_close_sender_does_not_lose_messages() { with_minimum_timer_resolution(|| { for _ in 0..10000 { shared_close_sender_does_not_lose_messages_iter(); } }); } // https://github.com/rust-lang/rust/issues/39364 fn concurrent_recv_timeout_and_upgrade_iter() { // 1 us let sleep = Duration::new(0, 1_000); let (a, b) = Barrier::new2(); let (tx, rx) = channel(); let th = thread::spawn(move || { a.wait(); loop { match rx.recv_timeout(sleep) { Ok(_) => { break; }, Err(_) => {}, } } }); b.wait(); thread::sleep(sleep); tx.clone().send(()).expect("send"); th.join().unwrap(); } #[test] fn concurrent_recv_timeout_and_upgrade() { with_minimum_timer_resolution(|| { for _ in 0..10000 { concurrent_recv_timeout_and_upgrade_iter(); } }); } fn concurrent_writes_iter() { const THREADS: usize = 4; const PER_THR: usize = 100; let mut bs = Barrier::new(THREADS + 1); let (tx, rx) = channel(); let mut threads = Vec::new(); for j in 0..THREADS { let tx = tx.clone(); let b = bs.pop().unwrap(); threads.push(thread::spawn(move || { b.wait(); for i in 0..PER_THR { tx.send(j * 1000 + i).expect("send"); } })); } let b = bs.pop().unwrap(); b.wait(); let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect(); v.sort(); for j in 0..THREADS { for i in 0..PER_THR { assert_eq!(j * 1000 + i, v[j * PER_THR + i]); } } for t in threads { t.join().unwrap(); } let one_us = Duration::new(0, 1000); assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err()); assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err()); drop(tx); assert_eq!(RecvError, rx.recv().unwrap_err()); assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err()); assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err()); } #[test] fn concurrent_writes() { with_minimum_timer_resolution(|| { for _ in 0..100 { concurrent_writes_iter(); } }); } #[cfg(windows)] pub mod timeapi { #![allow(non_snake_case)] use std::ffi::c_uint; pub const TIMERR_NOERROR: c_uint = 0; #[link(name = "winmm")] extern "system" { pub fn timeBeginPeriod(uPeriod: c_uint) -> c_uint; pub fn timeEndPeriod(uPeriod: c_uint) -> c_uint; } } /// Window's minimum sleep time can be as much as 16ms. // This function evaluates the closure with this resolution // set as low as possible. /// /// This takes the above test's duration from 10000*16/1000/60=2.67 minutes to ~16 seconds. fn with_minimum_timer_resolution(f: impl Fn()) { #[cfg(windows)] unsafe { let ret = timeapi::timeBeginPeriod(1); assert_eq!(ret, timeapi::TIMERR_NOERROR); f(); let ret = timeapi::timeEndPeriod(1); assert_eq!(ret, timeapi::TIMERR_NOERROR); } #[cfg(not(windows))] { f(); } }