uv: Remove closure-based home_for_io for raii

Using an raii wrapper means that there's no need for a '_self' variant and we
can greatly reduce the amount of 'self_'-named variables.
This commit is contained in:
Alex Crichton 2013-11-03 11:26:08 -08:00
parent 4bcde6bc06
commit 653406fcf7

@ -86,12 +86,39 @@ trait HomingIO {
self.home().sched_id
}
// XXX: dummy self parameter
fn restore_original_home(_: Option<Self>, io_home: uint) {
/// Fires a single homing missile, returning another missile targeted back
/// at the original home of this task. In other words, this function will
/// move the local task to its I/O scheduler and then return an RAII wrapper
/// which will return the task home.
fn fire_homing_missile(&mut self) -> HomingMissile {
HomingMissile { io_home: self.go_to_IO_home() }
}
/// Same as `fire_homing_missile`, but returns the local I/O scheduler as
/// well (the one that was homed to).
fn fire_homing_missile_sched(&mut self) -> (HomingMissile, ~Scheduler) {
// First, transplant ourselves to the home I/O scheduler
let missile = self.fire_homing_missile();
// Next (must happen next), grab the local I/O scheduler
let io_sched: ~Scheduler = Local::take();
(missile, io_sched)
}
}
/// After a homing operation has been completed, this will return the current
/// task back to its appropriate home (if applicable). The field is used to
/// assert that we are where we think we are.
struct HomingMissile {
priv io_home: uint,
}
impl Drop for HomingMissile {
fn drop(&mut self) {
// It would truly be a sad day if we had moved off the home I/O
// scheduler while we were doing I/O.
assert_eq!(Local::borrow(|sched: &mut Scheduler| sched.sched_id()),
io_home);
self.io_home);
// If we were a homed task, then we must send ourselves back to the
// original scheduler. Otherwise, we can just return and keep running
@ -106,30 +133,6 @@ trait HomingIO {
}
}
}
fn home_for_io<A>(&mut self, io: &fn(&mut Self) -> A) -> A {
let home = self.go_to_IO_home();
let a = io(self); // do IO
HomingIO::restore_original_home(None::<Self>, home);
a // return the result of the IO
}
fn home_for_io_consume<A>(mut self, io: &fn(Self) -> A) -> A {
let home = self.go_to_IO_home();
let a = io(self); // do IO
HomingIO::restore_original_home(None::<Self>, home);
a // return the result of the IO
}
fn home_for_io_with_sched<A>(&mut self, io_sched: &fn(&mut Self, ~Scheduler) -> A) -> A {
let home = self.go_to_IO_home();
let a = do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
io_sched(self, scheduler) // do IO and scheduling action
};
HomingIO::restore_original_home(None::<Self>, home);
a // return result of IO
}
}
// get a handle for the current scheduler
@ -915,13 +918,12 @@ impl UvTcpListener {
impl Drop for UvTcpListener {
fn drop(&mut self) {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
}
@ -929,38 +931,36 @@ impl Drop for UvTcpListener {
impl RtioSocket for UvTcpListener {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
}
let _m = self.fire_homing_missile();
socket_name(Tcp, self.watcher)
}
}
impl RtioTcpListener for UvTcpListener {
fn listen(~self) -> Result<~RtioTcpAcceptor, IoError> {
do self.home_for_io_consume |self_| {
let acceptor = ~UvTcpAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
let mut stream = acceptor.listener.watcher.as_stream();
let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
None => {
let inc = TcpWatcher::new(&server.event_loop());
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: inc, home: home }
as ~RtioTcpStream)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
fn listen(mut ~self) -> Result<~RtioTcpAcceptor, IoError> {
let _m = self.fire_homing_missile();
let acceptor = ~UvTcpAcceptor::new(*self);
let incoming = Cell::new(acceptor.incoming.clone());
let mut stream = acceptor.listener.watcher.as_stream();
let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(_) => Err(standard_error(OtherIoError)),
None => {
let inc = TcpWatcher::new(&server.event_loop());
// first accept call in the callback guarenteed to succeed
server.accept(inc.as_stream());
let home = get_handle_to_current_scheduler!();
Ok(~UvTcpStream { watcher: inc, home: home }
as ~RtioTcpStream)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor as ~RtioTcpAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
@ -982,9 +982,8 @@ impl UvTcpAcceptor {
impl RtioSocket for UvTcpAcceptor {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.listener.watcher)
}
let _m = self.fire_homing_missile();
socket_name(Tcp, self.listener.watcher)
}
}
@ -997,21 +996,18 @@ fn accept_simultaneously(stream: StreamWatcher, a: int) -> Result<(), IoError> {
impl RtioTcpAcceptor for UvTcpAcceptor {
fn accept(&mut self) -> Result<~RtioTcpStream, IoError> {
do self.home_for_io |self_| {
self_.incoming.recv()
}
let _m = self.fire_homing_missile();
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
accept_simultaneously(self_.listener.watcher.as_stream(), 1)
}
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.watcher.as_stream(), 1)
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
accept_simultaneously(self_.listener.watcher.as_stream(), 0)
}
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.watcher.as_stream(), 0)
}
}
@ -1102,14 +1098,12 @@ impl HomingIO for UvUnboundPipe {
impl Drop for UvUnboundPipe {
fn drop(&mut self) {
do self.home_for_io |self_| {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.pipe.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -1127,14 +1121,12 @@ impl UvPipeStream {
impl RtioPipe for UvPipeStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.inner.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.pipe.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.inner.fire_homing_missile_sched();
read_stream(self.inner.pipe.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.inner.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.pipe.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.inner.fire_homing_missile_sched();
write_stream(self.inner.pipe.as_stream(), scheduler, buf)
}
}
@ -1149,13 +1141,12 @@ impl HomingIO for UvTcpStream {
impl Drop for UvTcpStream {
fn drop(&mut self) {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.as_stream().close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -1163,67 +1154,55 @@ impl Drop for UvTcpStream {
impl RtioSocket for UvTcpStream {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Tcp, self_.watcher)
}
let _m = self.fire_homing_missile();
socket_name(Tcp, self.watcher)
}
}
impl RtioTcpStream for UvTcpStream {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.watcher.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.fire_homing_missile_sched();
read_stream(self.watcher.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.watcher.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.fire_homing_missile_sched();
write_stream(self.watcher.as_stream(), scheduler, buf)
}
fn peer_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(TcpPeer, self_.watcher)
}
let _m = self.fire_homing_missile();
socket_name(TcpPeer, self.watcher)
}
fn control_congestion(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 0 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.watcher.native_handle(), 0 as c_int)
})
}
fn nodelay(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_tcp_nodelay(self_.watcher.native_handle(), 1 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_nodelay(self.watcher.native_handle(), 1 as c_int)
})
}
fn keepalive(&mut self, delay_in_seconds: uint) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_tcp_keepalive(self_.watcher.native_handle(), 1 as c_int,
delay_in_seconds as c_uint)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.watcher.native_handle(), 1 as c_int,
delay_in_seconds as c_uint)
})
}
fn letdie(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_tcp_keepalive(self_.watcher.native_handle(),
0 as c_int, 0 as c_uint)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_tcp_keepalive(self.watcher.native_handle(),
0 as c_int, 0 as c_uint)
})
}
}
@ -1238,13 +1217,12 @@ impl HomingIO for UvUdpSocket {
impl Drop for UvUdpSocket {
fn drop(&mut self) {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -1252,156 +1230,138 @@ impl Drop for UvUdpSocket {
impl RtioSocket for UvUdpSocket {
fn socket_name(&mut self) -> Result<SocketAddr, IoError> {
do self.home_for_io |self_| {
socket_name(Udp, self_.watcher)
}
let _m = self.fire_homing_missile();
socket_name(Udp, self.watcher)
}
}
impl RtioUdpSocket for UvUdpSocket {
fn recvfrom(&mut self, buf: &mut [u8]) -> Result<(uint, SocketAddr), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
let uv_buf = slice_to_uv_buf(buf);
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| uv_buf;
do self_.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
let _ = flags; // /XXX add handling for partials?
let (_m, scheduler) = self.fire_homing_missile_sched();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(uint, SocketAddr), IoError>> = &result_cell;
watcher.recv_stop();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let alloc: AllocCallback = |_| unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.recv_start(alloc) |mut watcher, nread, _buf, addr, flags, status| {
let _ = flags; // /XXX add handling for partials?
let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err)),
};
watcher.recv_stop();
unsafe { (*result_cell_ptr).put_back(result); }
let result = match status {
None => {
assert!(nread >= 0);
Ok((nread as uint, addr))
}
Some(err) => Err(uv_error_to_io_error(err)),
};
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
assert!(!result_cell.is_empty());
result_cell.take()
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn sendto(&mut self, buf: &[u8], dst: SocketAddr) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self_.watcher.send(buf, dst) |_watcher, status| {
let (_m, scheduler) = self.fire_homing_missile_sched();
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let buf_ptr: *&[u8] = &buf;
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
do self.watcher.send(buf, dst) |_watcher, status| {
let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};
let result = match status {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err)),
};
unsafe { (*result_cell_ptr).put_back(result); }
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
assert!(!result_cell.is_empty());
result_cell.take()
}
assert!(!result_cell.is_empty());
result_cell.take()
}
fn join_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self_.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_JOIN_GROUP)
}
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_JOIN_GROUP)
}
})
}
fn leave_multicast(&mut self, multi: IpAddr) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self_.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_LEAVE_GROUP)
}
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
do multi.to_str().with_c_str |m_addr| {
uvll::uv_udp_set_membership(self.watcher.native_handle(),
m_addr, ptr::null(),
uvll::UV_LEAVE_GROUP)
}
})
}
fn loop_multicast_locally(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
1 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
1 as c_int)
})
}
fn dont_loop_multicast_locally(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_multicast_loop(self_.watcher.native_handle(),
0 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_loop(self.watcher.native_handle(),
0 as c_int)
})
}
fn multicast_time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_multicast_ttl(self_.watcher.native_handle(),
ttl as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_multicast_ttl(self.watcher.native_handle(),
ttl as c_int)
})
}
fn time_to_live(&mut self, ttl: int) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_ttl(self_.watcher.native_handle(), ttl as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_ttl(self.watcher.native_handle(), ttl as c_int)
})
}
fn hear_broadcasts(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
1 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
1 as c_int)
})
}
fn ignore_broadcasts(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
let r = unsafe {
uvll::uv_udp_set_broadcast(self_.watcher.native_handle(),
0 as c_int)
};
status_to_io_result(r)
}
let _m = self.fire_homing_missile();
status_to_io_result(unsafe {
uvll::uv_udp_set_broadcast(self.watcher.native_handle(),
0 as c_int)
})
}
}
@ -1422,14 +1382,13 @@ impl UvTimer {
impl Drop for UvTimer {
fn drop(&mut self) {
do self.home_for_io_with_sched |self_, scheduler| {
uvdebug!("closing UvTimer");
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
uvdebug!("closing UvTimer");
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -1437,18 +1396,17 @@ impl Drop for UvTimer {
impl RtioTimer for UvTimer {
fn sleep(&mut self, msecs: u64) {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_sched, task| {
uvdebug!("sleep: entered scheduler context");
let task_cell = Cell::new(task);
do self_.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
do scheduler.deschedule_running_task_and_then |_sched, task| {
uvdebug!("sleep: entered scheduler context");
let task_cell = Cell::new(task);
do self.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
self_.watcher.stop();
}
self.watcher.stop();
}
fn oneshot(&mut self, msecs: u64) -> PortOne<()> {
@ -1456,13 +1414,11 @@ impl RtioTimer for UvTimer {
let (port, chan) = oneshot();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
assert!(!chan.is_empty());
chan.take().send_deferred(());
}
let _m = self.fire_homing_missile();
do self.watcher.start(msecs, 0) |_, status| {
assert!(status.is_none());
assert!(!chan.is_empty());
chan.take().send_deferred(());
}
return port;
@ -1473,13 +1429,11 @@ impl RtioTimer for UvTimer {
let (port, chan) = stream();
let chan = Cell::new(chan);
do self.home_for_io |self_| {
let chan = Cell::new(chan.take());
do self_.watcher.start(msecs, msecs) |_, status| {
assert!(status.is_none());
do chan.with_ref |chan| {
chan.send_deferred(());
}
let _m = self.fire_homing_missile();
do self.watcher.start(msecs, msecs) |_, status| {
assert!(status.is_none());
do chan.with_ref |chan| {
chan.send_deferred(());
}
}
@ -1512,20 +1466,19 @@ impl UvFileStream {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<int, IoError>> = &result_cell;
let buf_ptr: *&mut [u8] = &buf;
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
let read_req = file::FsRequest::new();
do read_req.read(&self_.loop_, self_.fd, buf, offset) |req, uverr| {
let res = match uverr {
None => Ok(req.get_result() as int),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
do scheduler.deschedule_running_task_and_then |_, task| {
let buf = unsafe { slice_to_uv_buf(*buf_ptr) };
let task_cell = Cell::new(task);
let read_req = file::FsRequest::new();
do read_req.read(&self.loop_, self.fd, buf, offset) |req, uverr| {
let res = match uverr {
None => Ok(req.get_result() as int),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
result_cell.take()
@ -1555,19 +1508,18 @@ impl UvFileStream {
-> Result<(), IoError> {
let result_cell = Cell::new_empty();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
do self.home_for_io_with_sched |self_, sched| {
do sched.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
let req = file::FsRequest::new();
do f(self_, req) |_, uverr| {
let res = match uverr {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
let (_m, sched) = self.fire_homing_missile_sched();
do sched.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
let req = file::FsRequest::new();
do f(self_, req) |_, uverr| {
let res = match uverr {
None => Ok(()),
Some(err) => Err(uv_error_to_io_error(err))
};
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
result_cell.take()
@ -1583,14 +1535,13 @@ impl Drop for UvFileStream {
do close_req.close(&self.loop_, self.fd) |_,_| {}
}
CloseSynchronously => {
do self.home_for_io_with_sched |self_, scheduler| {
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let close_req = file::FsRequest::new();
do close_req.close(&self_.loop_, self_.fd) |_,_| {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
let close_req = file::FsRequest::new();
do close_req.close(&self.loop_, self.fd) |_,_| {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}
@ -1623,7 +1574,7 @@ impl RtioFileStream for UvFileStream {
fn tell(&self) -> Result<u64, IoError> {
use std::libc::SEEK_CUR;
// this is temporary
let self_ = unsafe { cast::transmute::<&UvFileStream, &mut UvFileStream>(self) };
let self_ = unsafe { cast::transmute_mut(self) };
self_.seek_common(0, SEEK_CUR)
}
fn fsync(&mut self) -> Result<(), IoError> {
@ -1681,7 +1632,8 @@ impl Drop for UvProcess {
if self.home.is_none() {
close(self)
} else {
self.home_for_io(close)
let _m = self.fire_homing_missile();
close(self)
}
}
}
@ -1692,30 +1644,28 @@ impl RtioProcess for UvProcess {
}
fn kill(&mut self, signal: int) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.process.kill(signal) {
Ok(()) => Ok(()),
Err(uverr) => Err(uv_error_to_io_error(uverr))
}
let _m = self.fire_homing_missile();
match self.process.kill(signal) {
Ok(()) => Ok(()),
Err(uverr) => Err(uv_error_to_io_error(uverr))
}
}
fn wait(&mut self) -> int {
// Make sure (on the home scheduler) that we have an exit status listed
do self.home_for_io |self_| {
match self_.exit_status {
Some(*) => {}
None => {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
assert!(self_.descheduled.is_none());
self_.descheduled = Some(task);
}
assert!(self_.exit_status.is_some());
let _m = self.fire_homing_missile();
match self.exit_status {
Some(*) => {}
None => {
// If there's no exit code previously listed, then the
// process's exit callback has yet to be invoked. We just
// need to deschedule ourselves and wait to be reawoken.
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
assert!(self.descheduled.is_none());
self.descheduled = Some(task);
}
assert!(self.exit_status.is_some());
}
}
@ -1738,28 +1688,27 @@ impl UvUnixListener {
}
impl RtioUnixListener for UvUnixListener {
fn listen(~self) -> Result<~RtioUnixAcceptor, IoError> {
do self.home_for_io_consume |self_| {
let acceptor = ~UvUnixAcceptor::new(self_);
let incoming = Cell::new(acceptor.incoming.clone());
let mut stream = acceptor.listener.inner.pipe.as_stream();
let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(e) => Err(uv_error_to_io_error(e)),
None => {
let pipe = UvUnboundPipe::new(&server.event_loop());
server.accept(pipe.pipe.as_stream());
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
let _m = self.fire_homing_missile();
let acceptor = ~UvUnixAcceptor::new(*self);
let incoming = Cell::new(acceptor.incoming.clone());
let mut stream = acceptor.listener.inner.pipe.as_stream();
let res = do stream.listen |mut server, status| {
do incoming.with_mut_ref |incoming| {
let inc = match status {
Some(e) => Err(uv_error_to_io_error(e)),
None => {
let pipe = UvUnboundPipe::new(&server.event_loop());
server.accept(pipe.pipe.as_stream());
Ok(~UvPipeStream::new(pipe) as ~RtioPipe)
}
};
incoming.send(inc);
}
};
match res {
Ok(()) => Ok(acceptor as ~RtioUnixAcceptor),
Err(e) => Err(uv_error_to_io_error(e)),
}
}
}
@ -1787,30 +1736,26 @@ impl Drop for UvTTY {
impl RtioTTY for UvTTY {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.tty.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.fire_homing_missile_sched();
read_stream(self.tty.as_stream(), scheduler, buf)
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.tty.as_stream(), scheduler, buf)
}
let (_m, scheduler) = self.fire_homing_missile_sched();
write_stream(self.tty.as_stream(), scheduler, buf)
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.tty.set_mode(raw) {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
let _m = self.fire_homing_missile();
match self.tty.set_mode(raw) {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
do self.home_for_io |self_| {
match self_.tty.get_winsize() {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
let _m = self.fire_homing_missile();
match self.tty.get_winsize() {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
@ -1836,21 +1781,18 @@ impl UvUnixAcceptor {
impl RtioUnixAcceptor for UvUnixAcceptor {
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
do self.home_for_io |self_| {
self_.incoming.recv()
}
let _m = self.fire_homing_missile();
self.incoming.recv()
}
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
accept_simultaneously(self_.listener.inner.pipe.as_stream(), 1)
}
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.inner.pipe.as_stream(), 1)
}
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
do self.home_for_io |self_| {
accept_simultaneously(self_.listener.inner.pipe.as_stream(), 0)
}
let _m = self.fire_homing_missile();
accept_simultaneously(self.listener.inner.pipe.as_stream(), 0)
}
}
@ -1873,14 +1815,13 @@ impl RtioSignal for UvSignal {}
impl Drop for UvSignal {
fn drop(&mut self) {
do self.home_for_io_with_sched |self_, scheduler| {
uvdebug!("closing UvSignal");
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self_.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
let (_m, scheduler) = self.fire_homing_missile_sched();
uvdebug!("closing UvSignal");
do scheduler.deschedule_running_task_and_then |_, task| {
let task_cell = Cell::new(task);
do self.watcher.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task_cell.take());
}
}
}