Remove usage of ~fn() from uv async/idle
This commit is contained in:
parent
9286d5113d
commit
28219fc679
@ -8,51 +8,155 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use std::cast;
|
||||
use std::libc::c_int;
|
||||
use std::rt::rtio::{Callback, RemoteCallback};
|
||||
use std::unstable::sync::Exclusive;
|
||||
|
||||
use uvll;
|
||||
use super::{Watcher, Loop, NativeHandle, AsyncCallback, WatcherInterop};
|
||||
use super::status_to_maybe_uv_error;
|
||||
use super::{Loop, UvHandle};
|
||||
|
||||
pub struct AsyncWatcher(*uvll::uv_async_t);
|
||||
impl Watcher for AsyncWatcher { }
|
||||
// The entire point of async is to call into a loop from other threads so it
|
||||
// does not need to home.
|
||||
pub struct AsyncWatcher {
|
||||
handle: *uvll::uv_async_t,
|
||||
|
||||
// A flag to tell the callback to exit, set from the dtor. This is
|
||||
// almost never contested - only in rare races with the dtor.
|
||||
exit_flag: Exclusive<bool>
|
||||
}
|
||||
|
||||
struct Payload {
|
||||
callback: ~Callback,
|
||||
exit_flag: Exclusive<bool>,
|
||||
}
|
||||
|
||||
impl AsyncWatcher {
|
||||
pub fn new(loop_: &mut Loop, cb: AsyncCallback) -> AsyncWatcher {
|
||||
pub fn new(loop_: &mut Loop, cb: ~Callback) -> AsyncWatcher {
|
||||
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
|
||||
assert_eq!(unsafe {
|
||||
uvll::async_init(loop_.native_handle(), handle, async_cb)
|
||||
}, 0);
|
||||
let flag = Exclusive::new(false);
|
||||
let payload = ~Payload { callback: cb, exit_flag: flag.clone() };
|
||||
unsafe {
|
||||
let handle = uvll::malloc_handle(uvll::UV_ASYNC);
|
||||
assert!(handle.is_not_null());
|
||||
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
|
||||
watcher.install_watcher_data();
|
||||
let data = watcher.get_watcher_data();
|
||||
data.async_cb = Some(cb);
|
||||
assert_eq!(0, uvll::uv_async_init(loop_.native_handle(), handle, async_cb));
|
||||
return watcher;
|
||||
}
|
||||
|
||||
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
|
||||
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
|
||||
let status = status_to_maybe_uv_error(status);
|
||||
let data = watcher.get_watcher_data();
|
||||
let cb = data.async_cb.get_ref();
|
||||
(*cb)(watcher, status);
|
||||
let payload: *u8 = cast::transmute(payload);
|
||||
uvll::set_data_for_uv_handle(handle, payload);
|
||||
}
|
||||
return AsyncWatcher { handle: handle, exit_flag: flag, };
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(&mut self) {
|
||||
impl UvHandle<uvll::uv_async_t> for AsyncWatcher {
|
||||
fn uv_handle(&self) -> *uvll::uv_async_t { self.handle }
|
||||
unsafe fn from_uv_handle<'a>(h: &'a *T) -> &'a mut AsyncWatcher {
|
||||
fail!("async watchers can't be built from their handles");
|
||||
}
|
||||
}
|
||||
|
||||
extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
|
||||
assert!(status == 0);
|
||||
let payload: &mut Payload = unsafe {
|
||||
cast::transmute(uvll::get_data_for_uv_handle(handle))
|
||||
};
|
||||
|
||||
// The synchronization logic here is subtle. To review,
|
||||
// the uv async handle type promises that, after it is
|
||||
// triggered the remote callback is definitely called at
|
||||
// least once. UvRemoteCallback needs to maintain those
|
||||
// semantics while also shutting down cleanly from the
|
||||
// dtor. In our case that means that, when the
|
||||
// UvRemoteCallback dtor calls `async.send()`, here `f` is
|
||||
// always called later.
|
||||
|
||||
// In the dtor both the exit flag is set and the async
|
||||
// callback fired under a lock. Here, before calling `f`,
|
||||
// we take the lock and check the flag. Because we are
|
||||
// checking the flag before calling `f`, and the flag is
|
||||
// set under the same lock as the send, then if the flag
|
||||
// is set then we're guaranteed to call `f` after the
|
||||
// final send.
|
||||
|
||||
// If the check was done after `f()` then there would be a
|
||||
// period between that call and the check where the dtor
|
||||
// could be called in the other thread, missing the final
|
||||
// callback while still destroying the handle.
|
||||
|
||||
let should_exit = unsafe {
|
||||
payload.exit_flag.with_imm(|&should_exit| should_exit)
|
||||
};
|
||||
|
||||
payload.callback.call();
|
||||
|
||||
if should_exit {
|
||||
unsafe { uvll::close(handle, close_cb) }
|
||||
}
|
||||
}
|
||||
|
||||
extern fn close_cb(handle: *uvll::uv_handle_t) {
|
||||
// drop the payload
|
||||
let _payload: ~Payload = unsafe {
|
||||
cast::transmute(uvll::get_data_for_uv_handle(handle))
|
||||
};
|
||||
// and then free the handle
|
||||
unsafe { uvll::free_handle(handle) }
|
||||
}
|
||||
|
||||
impl RemoteCallback for AsyncWatcher {
|
||||
fn fire(&mut self) {
|
||||
unsafe { uvll::async_send(self.handle) }
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AsyncWatcher {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let handle = self.native_handle();
|
||||
uvll::uv_async_send(handle);
|
||||
do self.exit_flag.with |should_exit| {
|
||||
// NB: These two things need to happen atomically. Otherwise
|
||||
// the event handler could wake up due to a *previous*
|
||||
// signal and see the exit flag, destroying the handle
|
||||
// before the final send.
|
||||
*should_exit = true;
|
||||
uvll::async_send(self.handle)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {
|
||||
fn from_native_handle(handle: *uvll::uv_async_t) -> AsyncWatcher {
|
||||
AsyncWatcher(handle)
|
||||
}
|
||||
fn native_handle(&self) -> *uvll::uv_async_t {
|
||||
match self { &AsyncWatcher(ptr) => ptr }
|
||||
#[cfg(test)]
|
||||
mod test_remote {
|
||||
use std::cell::Cell;
|
||||
use std::rt::test::*;
|
||||
use std::rt::thread::Thread;
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::rtio::EventLoop;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::sched::Scheduler;
|
||||
|
||||
#[test]
|
||||
fn test_uv_remote() {
|
||||
do run_in_mt_newsched_task {
|
||||
let mut tube = Tube::new();
|
||||
let tube_clone = tube.clone();
|
||||
let remote_cell = Cell::new_empty();
|
||||
do Local::borrow |sched: &mut Scheduler| {
|
||||
let tube_clone = tube_clone.clone();
|
||||
let tube_clone_cell = Cell::new(tube_clone);
|
||||
let remote = do sched.event_loop.remote_callback {
|
||||
// This could be called multiple times
|
||||
if !tube_clone_cell.is_empty() {
|
||||
tube_clone_cell.take().send(1);
|
||||
}
|
||||
};
|
||||
remote_cell.put_back(remote);
|
||||
}
|
||||
let thread = do Thread::start {
|
||||
remote_cell.take().fire();
|
||||
};
|
||||
|
||||
assert!(tube.recv() == 1);
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,70 +8,98 @@
|
||||
// option. This file may not be copied, modified, or distributed
|
||||
// except according to those terms.
|
||||
|
||||
use std::libc::c_int;
|
||||
use std::cast;
|
||||
use std::libc::{c_int, c_void};
|
||||
|
||||
use uvll;
|
||||
use super::{Watcher, Loop, NativeHandle, IdleCallback, status_to_maybe_uv_error};
|
||||
use super::{Loop, UvHandle};
|
||||
use std::rt::rtio::{Callback, PausibleIdleCallback};
|
||||
|
||||
pub struct IdleWatcher(*uvll::uv_idle_t);
|
||||
impl Watcher for IdleWatcher { }
|
||||
pub struct IdleWatcher {
|
||||
handle: *uvll::uv_idle_t,
|
||||
idle_flag: bool,
|
||||
closed: bool,
|
||||
callback: Option<~Callback>,
|
||||
}
|
||||
|
||||
impl IdleWatcher {
|
||||
pub fn new(loop_: &mut Loop) -> IdleWatcher {
|
||||
unsafe {
|
||||
let handle = uvll::malloc_handle(uvll::UV_IDLE);
|
||||
assert!(handle.is_not_null());
|
||||
assert_eq!(uvll::uv_idle_init(loop_.native_handle(), handle), 0);
|
||||
let mut watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
|
||||
watcher.install_watcher_data();
|
||||
return watcher
|
||||
}
|
||||
pub fn new(loop_: &mut Loop) -> ~IdleWatcher {
|
||||
let handle = UvHandle::alloc(None::<IdleWatcher>, uvll::UV_IDLE);
|
||||
assert_eq!(unsafe {
|
||||
uvll::idle_init(loop_.native_handle(), handle)
|
||||
}, 0);
|
||||
let me = ~IdleWatcher {
|
||||
handle: handle,
|
||||
idle_flag: false,
|
||||
closed: false,
|
||||
callback: None,
|
||||
};
|
||||
return me.install();
|
||||
}
|
||||
|
||||
pub fn start(&mut self, cb: IdleCallback) {
|
||||
{
|
||||
let data = self.get_watcher_data();
|
||||
data.idle_cb = Some(cb);
|
||||
pub fn onetime(loop_: &mut Loop, f: proc()) {
|
||||
let handle = UvHandle::alloc(None::<IdleWatcher>, uvll::UV_IDLE);
|
||||
unsafe {
|
||||
assert_eq!(uvll::idle_init(loop_.native_handle(), handle), 0);
|
||||
let data: *c_void = cast::transmute(~f);
|
||||
uvll::set_data_for_uv_handle(handle, data);
|
||||
assert_eq!(uvll::idle_start(handle, onetime_cb), 0)
|
||||
}
|
||||
|
||||
unsafe {
|
||||
assert_eq!(uvll::uv_idle_start(self.native_handle(), idle_cb), 0)
|
||||
extern fn onetime_cb(handle: *uvll::uv_idle_t, status: c_int) {
|
||||
assert_eq!(status, 0);
|
||||
unsafe {
|
||||
let data = uvll::get_data_for_uv_handle(handle);
|
||||
let f: ~proc() = cast::transmute(data);
|
||||
(*f)();
|
||||
uvll::idle_stop(handle);
|
||||
uvll::close(handle, close_cb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn restart(&mut self) {
|
||||
unsafe {
|
||||
assert!(self.get_watcher_data().idle_cb.is_some());
|
||||
assert_eq!(uvll::uv_idle_start(self.native_handle(), idle_cb), 0)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&mut self) {
|
||||
// NB: Not resetting the Rust idle_cb to None here because `stop` is
|
||||
// likely called from *within* the idle callback, causing a use after
|
||||
// free
|
||||
|
||||
unsafe {
|
||||
assert_eq!(uvll::uv_idle_stop(self.native_handle()), 0);
|
||||
extern fn close_cb(handle: *uvll::uv_handle_t) {
|
||||
unsafe { uvll::free_handle(handle) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {
|
||||
fn from_native_handle(handle: *uvll::uv_idle_t) -> IdleWatcher {
|
||||
IdleWatcher(handle)
|
||||
impl PausibleIdleCallback for IdleWatcher {
|
||||
fn start(&mut self, cb: ~Callback) {
|
||||
assert!(self.callback.is_none());
|
||||
self.callback = Some(cb);
|
||||
assert_eq!(unsafe { uvll::idle_start(self.handle, idle_cb) }, 0)
|
||||
self.idle_flag = true;
|
||||
}
|
||||
fn native_handle(&self) -> *uvll::uv_idle_t {
|
||||
match self { &IdleWatcher(ptr) => ptr }
|
||||
fn pause(&mut self) {
|
||||
if self.idle_flag == true {
|
||||
assert_eq!(unsafe {uvll::idle_stop(self.handle) }, 0);
|
||||
self.idle_flag = false;
|
||||
}
|
||||
}
|
||||
fn resume(&mut self) {
|
||||
if self.idle_flag == false {
|
||||
assert_eq!(unsafe { uvll::idle_start(self.handle, idle_cb) }, 0)
|
||||
self.idle_flag = true;
|
||||
}
|
||||
}
|
||||
fn close(&mut self) {
|
||||
self.pause();
|
||||
if !self.closed {
|
||||
self.closed = true;
|
||||
self.close_async_();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_idle_t> for IdleWatcher {
|
||||
fn uv_handle(&self) -> *uvll::uv_idle_t { self.handle }
|
||||
}
|
||||
|
||||
extern fn idle_cb(handle: *uvll::uv_idle_t, status: c_int) {
|
||||
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
|
||||
let data = idle_watcher.get_watcher_data();
|
||||
let cb: &IdleCallback = data.idle_cb.get_ref();
|
||||
let status = status_to_maybe_uv_error(status);
|
||||
(*cb)(idle_watcher, status);
|
||||
assert_eq!(status, 0);
|
||||
let idle: &mut IdleWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
|
||||
assert!(idle.callback.is_some());
|
||||
idle.callback.get_mut_ref().call();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -55,7 +55,6 @@
|
||||
use std::ptr::null;
|
||||
use std::unstable::finally::Finally;
|
||||
use std::rt::io::net::ip::SocketAddr;
|
||||
use std::rt::io::signal::Signum;
|
||||
|
||||
use std::rt::io::IoError;
|
||||
|
||||
@ -152,7 +151,39 @@ fn close_async_(&mut self) {
|
||||
unsafe { uvll::free_handle(handle) }
|
||||
}
|
||||
|
||||
unsafe { uvll::close(self.uv_handle(), close_cb) }
|
||||
unsafe {
|
||||
uvll::set_data_for_uv_handle(self.uv_handle(), null::<()>());
|
||||
uvll::close(self.uv_handle(), close_cb)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait UvRequest<T> {
|
||||
fn uv_request(&self) -> *T;
|
||||
|
||||
// FIXME(#8888) dummy self
|
||||
fn alloc(_: Option<Self>, ty: uvll::uv_req_type) -> *T {
|
||||
unsafe {
|
||||
let handle = uvll::malloc_req(ty);
|
||||
assert!(!handle.is_null());
|
||||
handle as *T
|
||||
}
|
||||
}
|
||||
|
||||
unsafe fn from_uv_request<'a>(h: &'a *T) -> &'a mut Self {
|
||||
cast::transmute(uvll::get_data_for_req(*h))
|
||||
}
|
||||
|
||||
fn install(~self) -> ~Self {
|
||||
unsafe {
|
||||
let myptr = cast::transmute::<&~Self, &*u8>(&self);
|
||||
uvll::set_data_for_req(self.uv_request(), *myptr);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
fn delete(&mut self) {
|
||||
unsafe { uvll::free_req(self.uv_request() as *c_void) }
|
||||
}
|
||||
}
|
||||
|
||||
@ -185,7 +216,6 @@ fn native_handle(&self) -> *uvll::uv_loop_t {
|
||||
pub type AllocCallback = ~fn(uint) -> Buf;
|
||||
pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
|
||||
pub type NullCallback = ~fn();
|
||||
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
|
||||
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
|
||||
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
|
||||
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
|
||||
@ -201,7 +231,6 @@ struct WatcherData {
|
||||
connect_cb: Option<ConnectionCallback>,
|
||||
close_cb: Option<NullCallback>,
|
||||
alloc_cb: Option<AllocCallback>,
|
||||
idle_cb: Option<IdleCallback>,
|
||||
async_cb: Option<AsyncCallback>,
|
||||
udp_recv_cb: Option<UdpReceiveCallback>,
|
||||
udp_send_cb: Option<UdpSendCallback>,
|
||||
@ -234,11 +263,9 @@ fn install_watcher_data(&mut self) {
|
||||
connect_cb: None,
|
||||
close_cb: None,
|
||||
alloc_cb: None,
|
||||
idle_cb: None,
|
||||
async_cb: None,
|
||||
udp_recv_cb: None,
|
||||
udp_send_cb: None,
|
||||
signal_cb: None,
|
||||
};
|
||||
let data = transmute::<~WatcherData, *c_void>(data);
|
||||
uvll::set_data_for_uv_handle(self.native_handle(), data);
|
||||
|
@ -28,9 +28,12 @@
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::task::Task;
|
||||
use std::unstable::sync::Exclusive;
|
||||
use std::libc::{lseek, off_t};
|
||||
use std::rt::io::{FileMode, FileAccess, FileStat};
|
||||
use std::path::{GenericPath, Path};
|
||||
use std::libc::{lseek, off_t, O_CREAT, O_APPEND, O_TRUNC, O_RDWR, O_RDONLY,
|
||||
O_WRONLY, S_IRUSR, S_IWUSR, S_IRWXU};
|
||||
use std::rt::io::{FileMode, FileAccess, OpenOrCreate, Open, Create,
|
||||
CreateOrTruncate, Append, Truncate, Read, Write, ReadWrite,
|
||||
FileStat};
|
||||
use std::rt::io::signal::Signum;
|
||||
use std::task;
|
||||
use ai = std::rt::io::net::addrinfo;
|
||||
@ -199,27 +202,16 @@ fn run(&mut self) {
|
||||
self.uvio.uv_loop().run();
|
||||
}
|
||||
|
||||
fn callback(&mut self, f: ~fn()) {
|
||||
let mut idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
|
||||
do idle_watcher.start |mut idle_watcher, status| {
|
||||
assert!(status.is_none());
|
||||
idle_watcher.stop();
|
||||
idle_watcher.close(||());
|
||||
f();
|
||||
}
|
||||
fn callback(&mut self, f: proc()) {
|
||||
IdleWatcher::onetime(self.uvio.uv_loop(), f);
|
||||
}
|
||||
|
||||
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
|
||||
let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
|
||||
~UvPausibleIdleCallback {
|
||||
watcher: idle_watcher,
|
||||
idle_flag: false,
|
||||
closed: false
|
||||
} as ~PausibleIdleCallback
|
||||
IdleWatcher::new(self.uvio.uv_loop()) as ~PausibleIdleCallback
|
||||
}
|
||||
|
||||
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
|
||||
~UvRemoteCallback::new(self.uvio.uv_loop(), f) as ~RemoteCallback
|
||||
fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
|
||||
~AsyncWatcher::new(self.uvio.uv_loop(), f) as ~RemoteCallback
|
||||
}
|
||||
|
||||
fn io<'a>(&'a mut self, f: &fn(&'a mut IoFactory)) {
|
||||
@ -233,44 +225,6 @@ pub extern "C" fn new_loop() -> ~EventLoop {
|
||||
~UvEventLoop::new() as ~EventLoop
|
||||
}
|
||||
|
||||
pub struct UvPausibleIdleCallback {
|
||||
priv watcher: IdleWatcher,
|
||||
priv idle_flag: bool,
|
||||
priv closed: bool
|
||||
}
|
||||
|
||||
impl PausibleIdleCallback for UvPausibleIdleCallback {
|
||||
#[inline]
|
||||
fn start(&mut self, f: ~fn()) {
|
||||
do self.watcher.start |_idle_watcher, _status| {
|
||||
f();
|
||||
};
|
||||
self.idle_flag = true;
|
||||
}
|
||||
#[inline]
|
||||
fn pause(&mut self) {
|
||||
if self.idle_flag == true {
|
||||
self.watcher.stop();
|
||||
self.idle_flag = false;
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
fn resume(&mut self) {
|
||||
if self.idle_flag == false {
|
||||
self.watcher.restart();
|
||||
self.idle_flag = true;
|
||||
}
|
||||
}
|
||||
#[inline]
|
||||
fn close(&mut self) {
|
||||
self.pause();
|
||||
if !self.closed {
|
||||
self.closed = true;
|
||||
self.watcher.close(||{});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_callback_run_once() {
|
||||
do run_in_bare_thread {
|
||||
@ -285,119 +239,6 @@ fn test_callback_run_once() {
|
||||
}
|
||||
}
|
||||
|
||||
// The entire point of async is to call into a loop from other threads so it does not need to home.
|
||||
pub struct UvRemoteCallback {
|
||||
// The uv async handle for triggering the callback
|
||||
priv async: AsyncWatcher,
|
||||
// A flag to tell the callback to exit, set from the dtor. This is
|
||||
// almost never contested - only in rare races with the dtor.
|
||||
priv exit_flag: Exclusive<bool>
|
||||
}
|
||||
|
||||
impl UvRemoteCallback {
|
||||
pub fn new(loop_: &mut Loop, f: ~fn()) -> UvRemoteCallback {
|
||||
let exit_flag = Exclusive::new(false);
|
||||
let exit_flag_clone = exit_flag.clone();
|
||||
let async = do AsyncWatcher::new(loop_) |watcher, status| {
|
||||
assert!(status.is_none());
|
||||
|
||||
// The synchronization logic here is subtle. To review,
|
||||
// the uv async handle type promises that, after it is
|
||||
// triggered the remote callback is definitely called at
|
||||
// least once. UvRemoteCallback needs to maintain those
|
||||
// semantics while also shutting down cleanly from the
|
||||
// dtor. In our case that means that, when the
|
||||
// UvRemoteCallback dtor calls `async.send()`, here `f` is
|
||||
// always called later.
|
||||
|
||||
// In the dtor both the exit flag is set and the async
|
||||
// callback fired under a lock. Here, before calling `f`,
|
||||
// we take the lock and check the flag. Because we are
|
||||
// checking the flag before calling `f`, and the flag is
|
||||
// set under the same lock as the send, then if the flag
|
||||
// is set then we're guaranteed to call `f` after the
|
||||
// final send.
|
||||
|
||||
// If the check was done after `f()` then there would be a
|
||||
// period between that call and the check where the dtor
|
||||
// could be called in the other thread, missing the final
|
||||
// callback while still destroying the handle.
|
||||
|
||||
let should_exit = unsafe {
|
||||
exit_flag_clone.with_imm(|&should_exit| should_exit)
|
||||
};
|
||||
|
||||
f();
|
||||
|
||||
if should_exit {
|
||||
watcher.close(||());
|
||||
}
|
||||
|
||||
};
|
||||
UvRemoteCallback {
|
||||
async: async,
|
||||
exit_flag: exit_flag
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteCallback for UvRemoteCallback {
|
||||
fn fire(&mut self) { self.async.send() }
|
||||
}
|
||||
|
||||
impl Drop for UvRemoteCallback {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let this: &mut UvRemoteCallback = cast::transmute_mut(self);
|
||||
do this.exit_flag.with |should_exit| {
|
||||
// NB: These two things need to happen atomically. Otherwise
|
||||
// the event handler could wake up due to a *previous*
|
||||
// signal and see the exit flag, destroying the handle
|
||||
// before the final send.
|
||||
*should_exit = true;
|
||||
this.async.send();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_remote {
|
||||
use std::cell::Cell;
|
||||
use std::rt::test::*;
|
||||
use std::rt::thread::Thread;
|
||||
use std::rt::tube::Tube;
|
||||
use std::rt::rtio::EventLoop;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::sched::Scheduler;
|
||||
|
||||
#[test]
|
||||
fn test_uv_remote() {
|
||||
do run_in_mt_newsched_task {
|
||||
let mut tube = Tube::new();
|
||||
let tube_clone = tube.clone();
|
||||
let remote_cell = Cell::new_empty();
|
||||
do Local::borrow |sched: &mut Scheduler| {
|
||||
let tube_clone = tube_clone.clone();
|
||||
let tube_clone_cell = Cell::new(tube_clone);
|
||||
let remote = do sched.event_loop.remote_callback {
|
||||
// This could be called multiple times
|
||||
if !tube_clone_cell.is_empty() {
|
||||
tube_clone_cell.take().send(1);
|
||||
}
|
||||
};
|
||||
remote_cell.put_back(remote);
|
||||
}
|
||||
let thread = do Thread::start {
|
||||
remote_cell.take().fire();
|
||||
};
|
||||
|
||||
assert!(tube.recv() == 1);
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvIoFactory(Loop);
|
||||
|
||||
impl UvIoFactory {
|
||||
|
@ -15,7 +15,8 @@
|
||||
use prelude::*;
|
||||
|
||||
use cast;
|
||||
use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback};
|
||||
use rt::rtio::{EventLoop, IoFactory, RemoteCallback, PausibleIdleCallback,
|
||||
Callback};
|
||||
use unstable::sync::Exclusive;
|
||||
use util;
|
||||
|
||||
@ -25,9 +26,9 @@ pub fn event_loop() -> ~EventLoop {
|
||||
}
|
||||
|
||||
struct BasicLoop {
|
||||
work: ~[~fn()], // pending work
|
||||
idle: Option<*BasicPausible>, // only one is allowed
|
||||
remotes: ~[(uint, ~fn())],
|
||||
work: ~[proc()], // pending work
|
||||
idle: Option<*mut BasicPausible>, // only one is allowed
|
||||
remotes: ~[(uint, ~Callback)],
|
||||
next_remote: uint,
|
||||
messages: Exclusive<~[Message]>
|
||||
}
|
||||
@ -86,8 +87,8 @@ fn remote_work(&mut self) {
|
||||
fn message(&mut self, message: Message) {
|
||||
match message {
|
||||
RunRemote(i) => {
|
||||
match self.remotes.iter().find(|& &(id, _)| id == i) {
|
||||
Some(&(_, ref f)) => (*f)(),
|
||||
match self.remotes.mut_iter().find(|& &(id, _)| id == i) {
|
||||
Some(&(_, ref mut f)) => f.call(),
|
||||
None => unreachable!()
|
||||
}
|
||||
}
|
||||
@ -106,7 +107,7 @@ fn idle(&mut self) {
|
||||
match self.idle {
|
||||
Some(idle) => {
|
||||
if (*idle).active {
|
||||
(*(*idle).work.get_ref())();
|
||||
(*idle).work.get_mut_ref().call();
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
@ -144,7 +145,7 @@ fn run(&mut self) {
|
||||
}
|
||||
}
|
||||
|
||||
fn callback(&mut self, f: ~fn()) {
|
||||
fn callback(&mut self, f: proc()) {
|
||||
self.work.push(f);
|
||||
}
|
||||
|
||||
@ -153,13 +154,13 @@ fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
|
||||
let callback = ~BasicPausible::new(self);
|
||||
rtassert!(self.idle.is_none());
|
||||
unsafe {
|
||||
let cb_ptr: &*BasicPausible = cast::transmute(&callback);
|
||||
let cb_ptr: &*mut BasicPausible = cast::transmute(&callback);
|
||||
self.idle = Some(*cb_ptr);
|
||||
}
|
||||
return callback as ~PausibleIdleCallback;
|
||||
}
|
||||
|
||||
fn remote_callback(&mut self, f: ~fn()) -> ~RemoteCallback {
|
||||
fn remote_callback(&mut self, f: ~Callback) -> ~RemoteCallback {
|
||||
let id = self.next_remote;
|
||||
self.next_remote += 1;
|
||||
self.remotes.push((id, f));
|
||||
@ -203,7 +204,7 @@ fn drop(&mut self) {
|
||||
|
||||
struct BasicPausible {
|
||||
eloop: *mut BasicLoop,
|
||||
work: Option<~fn()>,
|
||||
work: Option<~Callback>,
|
||||
active: bool,
|
||||
}
|
||||
|
||||
@ -218,7 +219,7 @@ fn new(eloop: &mut BasicLoop) -> BasicPausible {
|
||||
}
|
||||
|
||||
impl PausibleIdleCallback for BasicPausible {
|
||||
fn start(&mut self, f: ~fn()) {
|
||||
fn start(&mut self, f: ~Callback) {
|
||||
rtassert!(!self.active && self.work.is_none());
|
||||
self.active = true;
|
||||
self.work = Some(f);
|
||||
|
@ -24,11 +24,15 @@
|
||||
use super::io::{SeekStyle};
|
||||
use super::io::{FileMode, FileAccess, FileStat, FilePermission};
|
||||
|
||||
pub trait Callback {
|
||||
fn call(&mut self);
|
||||
}
|
||||
|
||||
pub trait EventLoop {
|
||||
fn run(&mut self);
|
||||
fn callback(&mut self, ~fn());
|
||||
fn callback(&mut self, proc());
|
||||
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
|
||||
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallback;
|
||||
fn remote_callback(&mut self, ~Callback) -> ~RemoteCallback;
|
||||
|
||||
/// The asynchronous I/O services. Not all event loops may provide one
|
||||
// FIXME(#9382) this is an awful interface
|
||||
@ -222,7 +226,7 @@ pub trait RtioTTY {
|
||||
}
|
||||
|
||||
pub trait PausibleIdleCallback {
|
||||
fn start(&mut self, f: ~fn());
|
||||
fn start(&mut self, f: ~Callback);
|
||||
fn pause(&mut self);
|
||||
fn resume(&mut self);
|
||||
fn close(&mut self);
|
||||
|
@ -23,7 +23,7 @@
|
||||
use rt::kill::BlockedTask;
|
||||
use rt::local_ptr;
|
||||
use rt::local::Local;
|
||||
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
|
||||
use rt::rtio::{RemoteCallback, PausibleIdleCallback, Callback};
|
||||
use borrow::{to_uint};
|
||||
use cell::Cell;
|
||||
use rand::{XorShiftRng, Rng, Rand};
|
||||
@ -184,7 +184,7 @@ pub fn bootstrap(mut ~self, task: ~Task) {
|
||||
// Before starting our first task, make sure the idle callback
|
||||
// is active. As we do not start in the sleep state this is
|
||||
// important.
|
||||
self.idle_callback.get_mut_ref().start(Scheduler::run_sched_once);
|
||||
self.idle_callback.get_mut_ref().start(~SchedRunner as ~Callback);
|
||||
|
||||
// Now, as far as all the scheduler state is concerned, we are
|
||||
// inside the "scheduler" context. So we can act like the
|
||||
@ -767,7 +767,7 @@ pub fn run_cleanup_job(&mut self) {
|
||||
}
|
||||
|
||||
pub fn make_handle(&mut self) -> SchedHandle {
|
||||
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
|
||||
let remote = self.event_loop.remote_callback(~SchedRunner as ~Callback);
|
||||
|
||||
return SchedHandle {
|
||||
remote: remote,
|
||||
@ -802,6 +802,14 @@ pub fn send(&mut self, msg: SchedMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
struct SchedRunner;
|
||||
|
||||
impl Callback for SchedRunner {
|
||||
fn call(&mut self) {
|
||||
Scheduler::run_sched_once();
|
||||
}
|
||||
}
|
||||
|
||||
struct CleanupJob {
|
||||
task: ~Task,
|
||||
f: UnsafeTaskReceiver
|
||||
|
Loading…
Reference in New Issue
Block a user