Started to implemented UdpStream

This commit is contained in:
Eric Reed 2013-06-17 12:35:27 -07:00
parent e42f28c05c
commit 33ae193a3c

View File

@ -433,6 +433,86 @@ fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
}
}
pub struct UvUdpStream {
watcher: UdpWatcher,
address: IpAddr
}
impl UvUdpStream {
fn watcher(&self) -> UdpWatcher { self.watcher }
fn address(&self) -> IpAddr { self.address }
}
impl Drop for UvUdpStream {
fn finalize(&self) {
rtdebug!("closing udp stream");
let watcher = self.watcher();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
}
}
impl RtioUdpStream for UvUdpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let connection_address = self.address();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |sched, task| {
rtdebug!("read: entered scheduler context");
assert!(!sched.in_task_context());
let mut watcher = watcher;
let task_cell = Cell(task);
// XXX: see note in RtioTcpStream implementation for UvTcpStream
let alloc: AllocCallback = |_| unsafe {
slice_to_uv_buf(*buf_ptr)
};
do watcher.recv_start(alloc) |watcher, nread, _buf, addr, flags, status| {
let _ = flags; // TODO actually use flags
// XXX: see note in RtioTcpStream implementation for UvTcpStream
let mut watcher = watcher;
watcher.recv_stop();
let incoming_address = net::uv_ip4_to_ip4(&addr);
let result = if status.is_none() {
assert!(nread >= 0);
if incoming_address != connection_address {
Ok(0u)
} else {
Ok(nread as uint)
}
} else {
Err(uv_error_to_io_error(status.unwrap()))
};
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
assert!(!result_cell.is_empty());
return result_cell.take();
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let _ = buf;
fail!()
}
}
#[test]
fn test_simple_io_no_connect() {
do run_in_newsched_task {