diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 0f98ab11513..1274dbc3220 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -433,6 +433,86 @@ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { } } +pub struct UvUdpStream { + watcher: UdpWatcher, + address: IpAddr +} + +impl UvUdpStream { + fn watcher(&self) -> UdpWatcher { self.watcher } + fn address(&self) -> IpAddr { self.address } +} + +impl Drop for UvUdpStream { + fn finalize(&self) { + rtdebug!("closing udp stream"); + let watcher = self.watcher(); + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell(task); + do watcher.close { + let scheduler = Local::take::(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + } +} + +impl RtioUdpStream for UvUdpStream { + fn read(&mut self, buf: &mut [u8]) -> Result { + let result_cell = empty_cell(); + let result_cell_ptr: *Cell> = &result_cell; + + let scheduler = Local::take::(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let connection_address = self.address(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |sched, task| { + rtdebug!("read: entered scheduler context"); + assert!(!sched.in_task_context()); + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: see note in RtioTcpStream implementation for UvTcpStream + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| { + let _ = flags; // TODO actually use flags + + // XXX: see note in RtioTcpStream implementation for UvTcpStream + let mut watcher = watcher; + watcher.recv_stop(); + + let incoming_address = net::uv_ip4_to_ip4(&addr); + let result = if status.is_none() { + assert!(nread >= 0); + if incoming_address != connection_address { + Ok(0u) + } else { + Ok(nread as uint) + } + } else { + Err(uv_error_to_io_error(status.unwrap())) + }; + + unsafe { (*result_cell_ptr).put_back(result); } + + let scheduler = Local::take::(); + scheduler.resume_task_immediately(task_cell.take()); + } + } + + assert!(!result_cell.is_empty()); + return result_cell.take(); + } + + fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { + let _ = buf; + fail!() + } +} + #[test] fn test_simple_io_no_connect() { do run_in_newsched_task {