diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index d4794da9b0f..07ba44101c8 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr}; use rt::io::{standard_error, OtherIoError}; use rt::local::Local; use rt::rtio::*; -use rt::sched::Scheduler; +use rt::sched::{Scheduler, SchedHandle}; use rt::tube::Tube; use rt::uv::*; use rt::uv::idle::IdleWatcher; @@ -239,6 +239,27 @@ impl UvIoFactory { pub fn uv_loop<'a>(&'a mut self) -> &'a mut Loop { match self { &UvIoFactory(ref mut ptr) => ptr } } + + pub fn homed_udp_bind(&mut self, addr: SocketAddr) -> Result<~HomedUvUdpSocket, IoError> { + let mut watcher = UdpWatcher::new(self.uv_loop()); + match watcher.bind(addr) { + Ok(_) => { + let home = do Local::borrow:: |sched| {sched.make_handle()}; + Ok(~HomedUvUdpSocket { watcher: watcher, home: home }) + } + Err(uverr) => { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + Err(uv_error_to_io_error(uverr)) + } + } + } } impl IoFactory for UvIoFactory { @@ -582,6 +603,135 @@ impl RtioTcpStream for UvTcpStream { } } +pub struct HomedUvUdpSocket { + watcher: UdpWatcher, + home: SchedHandle, +} + +impl HomedUvUdpSocket { + fn go_home(&mut self) { + use rt::sched::PinnedTask; + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + do task.wake().map_move |task| { self.home.send(PinnedTask(task)); }; + } + } +} + +impl Drop for HomedUvUdpSocket { + fn drop(&self) { + rtdebug!("closing homed udp socket"); + // first go home + // XXX need mutable finalizer + let this = unsafe { transmute::<&HomedUvUdpSocket, &mut HomedUvUdpSocket>(self) }; + this.go_home(); + // now we're home so block the task and start IO + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do this.watcher.close { + // now IO is finished so resume the blocked task + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + } + } + } +} + +impl RtioSocket for HomedUvUdpSocket { + fn socket_name(&mut self) -> Result { + self.go_home(); + socket_name(Udp, self.watcher) + } +} + +#[test] +fn test_simple_homed_udp_io_bind_only() { + do run_in_newsched_task { + unsafe { + let io = Local::unsafe_borrow::(); + let addr = next_test_ip4(); + let maybe_socket = (*io).homed_udp_bind(addr); + assert!(maybe_socket.is_ok()); + } + } +} + +#[test] +fn test_simple_homed_udp_io_bind_then_move_then_home_and_close() { + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use rt::thread::Thread; + use rt::task::Task; + use rt::sched::{Shutdown, TaskFromFriend}; + do run_in_bare_thread { + let sleepers = SleeperList::new(); + let work_queue1 = WorkQueue::new(); + let work_queue2 = WorkQueue::new(); + let queues = ~[work_queue1.clone(), work_queue2.clone()]; + + let mut sched1 = ~Scheduler::new(~UvEventLoop::new(), work_queue1, queues.clone(), + sleepers.clone()); + let mut sched2 = ~Scheduler::new(~UvEventLoop::new(), work_queue2, queues.clone(), + sleepers.clone()); + + let handle1 = Cell::new(sched1.make_handle()); + let handle2 = Cell::new(sched2.make_handle()); + let tasksFriendHandle = Cell::new(sched2.make_handle()); + + let on_exit: ~fn(bool) = |exit_status| { + handle1.take().send(Shutdown); + handle2.take().send(Shutdown); + rtassert!(exit_status); + }; + + let test_function: ~fn() = || { + let io = unsafe { Local::unsafe_borrow::() }; + let addr = next_test_ip4(); + let maybe_socket = unsafe { (*io).homed_udp_bind(addr) }; + // this socket is bound to this event loop + assert!(maybe_socket.is_ok()); + + // block self on sched1 + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + // unblock task + do task.wake().map_move |task| { + // send self to sched2 + tasksFriendHandle.take().send(TaskFromFriend(task)); + }; + // sched1 should now sleep since it has nothing else to do + } + // sched2 will wake up and get the task + // as we do nothing else, the function ends and the socket goes out of scope + // sched2 will start to run the destructor + // the destructor will first block the task, set it's home as sched1, then enqueue it + // sched2 will dequeue the task, see that it has a home, and send it to sched1 + // sched1 will wake up, execute the close function on the correct loop, and then we're done + }; + + let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None, test_function); + main_task.death.on_exit = Some(on_exit); + let main_task = Cell::new(main_task); + + let null_task = Cell::new(~do Task::new_root(&mut sched2.stack_pool, None) || {}); + + let sched1 = Cell::new(sched1); + let sched2 = Cell::new(sched2); + + // XXX could there be a race on the threads that causes a crash? + let thread1 = do Thread::start { + sched1.take().bootstrap(main_task.take()); + }; + let thread2 = do Thread::start { + sched2.take().bootstrap(null_task.take()); + }; + + thread1.join(); + thread2.join(); + } +} + pub struct UvUdpSocket(UdpWatcher); impl Drop for UvUdpSocket {