rust/src/libstd/uv_iotask.rs

259 lines
7.9 KiB
Rust
Raw Normal View History

/*!
* A task-based interface to the uv loop
*
* The I/O task runs in its own single-threaded scheduler. By using the
* `interact` function you can execute code in a uv callback.
*/
2012-04-02 13:03:45 -05:00
2012-09-19 20:17:04 -05:00
#[forbid(deprecated_mode)];
#[forbid(deprecated_pattern)];
2012-08-29 19:41:38 -05:00
export IoTask;
export spawn_iotask;
export interact;
export exit;
2012-09-04 13:23:53 -05:00
use libc::c_void;
use ptr::addr_of;
use comm = core::comm;
use comm::{Port, Chan, listen};
use task::TaskBuilder;
use ll = uv_ll;
2012-04-02 13:03:45 -05:00
/// Used to abstract-away direct interaction with a libuv loop.
2012-08-29 19:41:38 -05:00
enum IoTask {
IoTask_({
async_handle: *ll::uv_async_t,
2012-08-29 19:41:38 -05:00
op_chan: Chan<IoTaskMsg>
})
}
2012-09-19 20:17:04 -05:00
fn spawn_iotask(+task: task::TaskBuilder) -> IoTask {
2012-06-30 18:19:07 -05:00
do listen |iotask_ch| {
2012-05-25 03:06:45 -05:00
2012-08-15 16:10:46 -05:00
do task.sched_mode(task::SingleThreaded).spawn {
2012-08-22 19:24:52 -05:00
debug!("entering libuv task");
2012-05-25 03:06:45 -05:00
run_loop(iotask_ch);
2012-08-22 19:24:52 -05:00
debug!("libuv task exiting");
2012-05-25 03:06:45 -05:00
};
2012-05-25 03:06:45 -05:00
iotask_ch.recv()
}
}
/**
* Provide a callback to be processed by `iotask`
*
* The primary way to do operations again a running `iotask` that
* doesn't involve creating a uv handle via `safe_handle`
*
* # Warning
*
* This function is the only safe way to interact with _any_ `iotask`.
* Using functions in the `uv::ll` module outside of the `cb` passed into
* this function is _very dangerous_.
*
* # Arguments
*
* * iotask - a uv I/O task that you want to do operations against
* * cb - a function callback to be processed on the running loop's
* thread. The only parameter passed in is an opaque pointer representing the
* running `uv_loop_t*`. In the context of this callback, it is safe to use
* this pointer to do various uv_* API calls contained within the `uv::ll`
* module. It is not safe to send the `loop_ptr` param to this callback out
* via ports/chans.
*/
2012-08-29 19:41:38 -05:00
unsafe fn interact(iotask: IoTask,
2012-09-19 20:17:04 -05:00
+cb: fn~(*c_void)) {
send_msg(iotask, Interaction(move cb));
}
/**
* Shut down the I/O task
*
* Is used to signal to the loop that it should close the internally-held
* async handle and do a sanity check to make sure that all other handles are
* closed, causing a failure otherwise.
*/
2012-08-29 19:41:38 -05:00
fn exit(iotask: IoTask) unsafe {
send_msg(iotask, TeardownLoop);
}
// INTERNAL API
2012-08-29 19:41:38 -05:00
enum IoTaskMsg {
Interaction (fn~(*libc::c_void)),
TeardownLoop
}
/// Run the loop and begin handling messages
2012-08-29 19:41:38 -05:00
fn run_loop(iotask_ch: Chan<IoTask>) unsafe {
2012-05-25 03:06:45 -05:00
let loop_ptr = ll::loop_new();
2012-05-25 03:06:45 -05:00
// set up the special async handle we'll use to allow multi-task
// communication with this loop
let async = ll::async_t();
2012-05-24 21:53:08 -05:00
let async_handle = addr_of(async);
2012-05-25 03:06:45 -05:00
// associate the async handle with the loop
ll::async_init(loop_ptr, async_handle, wake_up_cb);
// initialize our loop data and store it in the loop
2012-08-29 19:41:38 -05:00
let data: IoTaskLoopData = {
async_handle: async_handle,
2012-08-27 16:22:25 -05:00
msg_po: Port()
2012-05-24 21:53:08 -05:00
};
ll::set_data_for_uv_handle(async_handle, addr_of(data));
// Send out a handle through which folks can talk to us
// while we dwell in the I/O loop
2012-08-29 19:41:38 -05:00
let iotask = IoTask_({
async_handle: async_handle,
2012-05-25 03:06:45 -05:00
op_chan: data.msg_po.chan()
});
iotask_ch.send(iotask);
log(debug, ~"about to run uv loop");
// enter the loop... this blocks until the loop is done..
ll::run(loop_ptr);
log(debug, ~"uv loop ended");
ll::loop_delete(loop_ptr);
}
// data that lives for the lifetime of the high-evel oo
2012-08-29 19:41:38 -05:00
type IoTaskLoopData = {
2012-05-24 21:53:08 -05:00
async_handle: *ll::uv_async_t,
2012-08-29 19:41:38 -05:00
msg_po: Port<IoTaskMsg>
2012-05-24 21:53:08 -05:00
};
2012-04-02 13:03:45 -05:00
2012-08-29 19:41:38 -05:00
fn send_msg(iotask: IoTask,
2012-09-19 20:17:04 -05:00
+msg: IoTaskMsg) unsafe {
iotask.op_chan.send(move msg);
ll::async_send(iotask.async_handle);
2012-04-02 13:03:45 -05:00
}
/// Dispatch all pending messages
2012-07-03 18:32:02 -05:00
extern fn wake_up_cb(async_handle: *ll::uv_async_t,
status: int) unsafe {
2012-05-25 03:06:45 -05:00
2012-08-22 19:24:52 -05:00
log(debug, fmt!("wake_up_cb extern.. handle: %? status: %?",
async_handle, status));
2012-05-25 03:06:45 -05:00
let loop_ptr = ll::get_loop_for_uv_handle(async_handle);
2012-08-29 19:41:38 -05:00
let data = ll::get_data_for_uv_handle(async_handle) as *IoTaskLoopData;
2012-05-25 03:06:45 -05:00
let msg_po = (*data).msg_po;
while msg_po.peek() {
2012-08-06 14:34:08 -05:00
match msg_po.recv() {
2012-08-29 19:41:38 -05:00
Interaction(cb) => cb(loop_ptr),
TeardownLoop => begin_teardown(data)
}
}
}
2012-08-29 19:41:38 -05:00
fn begin_teardown(data: *IoTaskLoopData) unsafe {
log(debug, ~"iotask begin_teardown() called, close async_handle");
let async_handle = (*data).async_handle;
ll::close(async_handle as *c_void, tear_down_close_cb);
}
2012-07-03 18:32:02 -05:00
extern fn tear_down_close_cb(handle: *ll::uv_async_t) unsafe {
let loop_ptr = ll::get_loop_for_uv_handle(handle);
let loop_refs = ll::loop_refcount(loop_ptr);
2012-08-22 19:24:52 -05:00
log(debug, fmt!("tear_down_close_cb called, closing handle at %? refs %?",
handle, loop_refs));
assert loop_refs == 1i32;
}
#[cfg(test)]
mod test {
#[legacy_exports];
2012-07-03 18:32:02 -05:00
extern fn async_close_cb(handle: *ll::uv_async_t) unsafe {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("async_close_cb handle %?", handle));
let exit_ch = (*(ll::get_data_for_uv_handle(handle)
2012-08-29 19:41:38 -05:00
as *AhData)).exit_ch;
2012-08-14 16:17:27 -05:00
core::comm::send(exit_ch, ());
}
2012-07-03 18:32:02 -05:00
extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int)
unsafe {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("async_handle_cb handle %? status %?",handle,status));
ll::close(handle, async_close_cb);
}
2012-08-29 19:41:38 -05:00
type AhData = {
iotask: IoTask,
2012-08-15 16:10:46 -05:00
exit_ch: comm::Chan<()>
};
2012-08-29 19:41:38 -05:00
fn impl_uv_iotask_async(iotask: IoTask) unsafe {
let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(async_handle);
2012-08-27 16:22:25 -05:00
let exit_po = core::comm::Port::<()>();
let exit_ch = core::comm::Chan(exit_po);
let ah_data = {
iotask: iotask,
exit_ch: exit_ch
};
let ah_data_ptr = ptr::addr_of(ah_data);
do interact(iotask) |loop_ptr| unsafe {
ll::async_init(loop_ptr, ah_ptr, async_handle_cb);
ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
ll::async_send(ah_ptr);
};
2012-08-14 16:17:27 -05:00
core::comm::recv(exit_po);
}
// this fn documents the bear minimum neccesary to roll your own
// high_level_loop
2012-08-29 19:41:38 -05:00
unsafe fn spawn_test_loop(exit_ch: comm::Chan<()>) -> IoTask {
let iotask_port = comm::Port::<IoTask>();
2012-08-27 16:22:25 -05:00
let iotask_ch = comm::Chan(iotask_port);
2012-08-15 16:10:46 -05:00
do task::spawn_sched(task::ManualThreads(1u)) {
run_loop(iotask_ch);
exit_ch.send(());
};
2012-08-14 16:17:27 -05:00
return core::comm::recv(iotask_port);
}
2012-07-03 18:32:02 -05:00
extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("lifetime_handle_close ptr %?", handle));
}
2012-07-03 18:32:02 -05:00
extern fn lifetime_async_callback(handle: *libc::c_void,
status: libc::c_int) {
2012-08-22 19:24:52 -05:00
log(debug, fmt!("lifetime_handle_close ptr %? status %?",
handle, status));
}
#[test]
fn test_uv_iotask_async() unsafe {
2012-08-27 16:22:25 -05:00
let exit_po = core::comm::Port::<()>();
let exit_ch = core::comm::Chan(exit_po);
let iotask = spawn_test_loop(exit_ch);
// using this handle to manage the lifetime of the high_level_loop,
// as it will exit the first time one of the impl_uv_hl_async() is
// cleaned up with no one ref'd handles on the loop (Which can happen
// under race-condition type situations.. this ensures that the loop
// lives until, at least, all of the impl_uv_hl_async() runs have been
// called, at least.
2012-08-27 16:22:25 -05:00
let work_exit_po = core::comm::Port::<()>();
let work_exit_ch = core::comm::Chan(work_exit_po);
for iter::repeat(7u) {
2012-08-15 16:10:46 -05:00
do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_iotask_async(iotask);
2012-08-14 16:17:27 -05:00
core::comm::send(work_exit_ch, ());
};
};
for iter::repeat(7u) {
2012-08-14 16:17:27 -05:00
core::comm::recv(work_exit_po);
};
log(debug, ~"sending teardown_loop msg..");
exit(iotask);
2012-08-14 16:17:27 -05:00
core::comm::recv(exit_po);
log(debug, ~"after recv on exit_po.. exiting..");
}
}