Migrate uv process bindings away from ~fn()
This commit is contained in:
parent
24b4223418
commit
ceab326e82
@ -139,8 +139,8 @@ pub trait UvHandle<T> {
|
||||
|
||||
fn install(~self) -> ~Self {
|
||||
unsafe {
|
||||
let myptr = cast::transmute::<&~Self, *u8>(&self);
|
||||
uvll::set_data_for_uv_handle(self.uv_handle(), myptr);
|
||||
let myptr = cast::transmute::<&~Self, &*u8>(&self);
|
||||
uvll::set_data_for_uv_handle(self.uv_handle(), *myptr);
|
||||
}
|
||||
self
|
||||
}
|
||||
@ -188,9 +188,6 @@ pub type NullCallback = ~fn();
|
||||
pub type IdleCallback = ~fn(IdleWatcher, Option<UvError>);
|
||||
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
|
||||
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
|
||||
// first int is exit_status, second is term_signal
|
||||
pub type ExitCallback = ~fn(Process, int, int, Option<UvError>);
|
||||
pub type TimerCallback = ~fn(TimerWatcher, Option<UvError>);
|
||||
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
|
||||
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
|
||||
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
|
||||
@ -206,11 +203,9 @@ struct WatcherData {
|
||||
close_cb: Option<NullCallback>,
|
||||
alloc_cb: Option<AllocCallback>,
|
||||
idle_cb: Option<IdleCallback>,
|
||||
timer_cb: Option<TimerCallback>,
|
||||
async_cb: Option<AsyncCallback>,
|
||||
udp_recv_cb: Option<UdpReceiveCallback>,
|
||||
udp_send_cb: Option<UdpSendCallback>,
|
||||
exit_cb: Option<ExitCallback>,
|
||||
signal_cb: Option<SignalCallback>,
|
||||
}
|
||||
|
||||
@ -242,11 +237,9 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
|
||||
close_cb: None,
|
||||
alloc_cb: None,
|
||||
idle_cb: None,
|
||||
timer_cb: None,
|
||||
async_cb: None,
|
||||
udp_recv_cb: None,
|
||||
udp_send_cb: None,
|
||||
exit_cb: None,
|
||||
signal_cb: None,
|
||||
};
|
||||
let data = transmute::<~WatcherData, *c_void>(data);
|
||||
|
@ -9,58 +9,42 @@
|
||||
// except according to those terms.
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::libc::c_int;
|
||||
use std::libc;
|
||||
use std::ptr;
|
||||
use std::vec;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::io::IoError;
|
||||
use std::rt::io::process::*;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::RtioProcess;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
use std::vec;
|
||||
|
||||
use super::{Watcher, Loop, NativeHandle, UvError};
|
||||
use super::{status_to_maybe_uv_error, ExitCallback};
|
||||
use uvio::{UvPipeStream, UvUnboundPipe};
|
||||
use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error};
|
||||
use uvio::{HomingIO, UvPipeStream, UvUnboundPipe};
|
||||
use uvll;
|
||||
|
||||
/// A process wraps the handle of the underlying uv_process_t.
|
||||
pub struct Process(*uvll::uv_process_t);
|
||||
pub struct Process {
|
||||
handle: *uvll::uv_process_t,
|
||||
home: SchedHandle,
|
||||
|
||||
impl Watcher for Process {}
|
||||
/// Task to wake up (may be null) for when the process exits
|
||||
to_wake: Option<BlockedTask>,
|
||||
|
||||
/// Collected from the exit_cb
|
||||
exit_status: Option<int>,
|
||||
term_signal: Option<int>,
|
||||
}
|
||||
|
||||
impl Process {
|
||||
/// Creates a new process, ready to spawn inside an event loop
|
||||
pub fn new() -> Process {
|
||||
let handle = unsafe { uvll::malloc_handle(uvll::UV_PROCESS) };
|
||||
assert!(handle.is_not_null());
|
||||
let mut ret: Process = NativeHandle::from_native_handle(handle);
|
||||
ret.install_watcher_data();
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Spawn a new process inside the specified event loop.
|
||||
///
|
||||
/// The `config` variable will be passed down to libuv, and the `exit_cb`
|
||||
/// will be run only once, when the process exits.
|
||||
///
|
||||
/// Returns either the corresponding process object or an error which
|
||||
/// occurred.
|
||||
pub fn spawn(&mut self, loop_: &Loop, config: ProcessConfig,
|
||||
exit_cb: ExitCallback)
|
||||
-> Result<~[Option<~UvPipeStream>], UvError>
|
||||
pub fn spawn(loop_: &Loop, config: ProcessConfig)
|
||||
-> Result<(~Process, ~[Option<~UvPipeStream>]), UvError>
|
||||
{
|
||||
let cwd = config.cwd.map(|s| s.to_c_str());
|
||||
|
||||
extern fn on_exit(p: *uvll::uv_process_t,
|
||||
exit_status: libc::c_int,
|
||||
term_signal: libc::c_int) {
|
||||
let mut p: Process = NativeHandle::from_native_handle(p);
|
||||
let err = match exit_status {
|
||||
0 => None,
|
||||
_ => status_to_maybe_uv_error(-1)
|
||||
};
|
||||
p.get_watcher_data().exit_cb.take_unwrap()(p,
|
||||
exit_status as int,
|
||||
term_signal as int,
|
||||
err);
|
||||
}
|
||||
|
||||
let io = config.io;
|
||||
let mut stdio = vec::with_capacity::<uvll::uv_stdio_container_t>(io.len());
|
||||
let mut ret_io = vec::with_capacity(io.len());
|
||||
@ -73,7 +57,6 @@ impl Process {
|
||||
}
|
||||
}
|
||||
|
||||
let exit_cb = Cell::new(exit_cb);
|
||||
let ret_io = Cell::new(ret_io);
|
||||
do with_argv(config.program, config.args) |argv| {
|
||||
do with_env(config.env) |envp| {
|
||||
@ -93,34 +76,47 @@ impl Process {
|
||||
gid: 0,
|
||||
};
|
||||
|
||||
let handle = UvHandle::alloc(None::<Process>, uvll::UV_PROCESS);
|
||||
match unsafe {
|
||||
uvll::uv_spawn(loop_.native_handle(), **self, options)
|
||||
uvll::uv_spawn(loop_.native_handle(), handle, options)
|
||||
} {
|
||||
0 => {
|
||||
(*self).get_watcher_data().exit_cb = Some(exit_cb.take());
|
||||
Ok(ret_io.take())
|
||||
let process = ~Process {
|
||||
handle: handle,
|
||||
home: get_handle_to_current_scheduler!(),
|
||||
to_wake: None,
|
||||
exit_status: None,
|
||||
term_signal: None,
|
||||
};
|
||||
Ok((process.install(), ret_io.take()))
|
||||
}
|
||||
err => {
|
||||
unsafe { uvll::free_handle(handle) }
|
||||
Err(UvError(err))
|
||||
}
|
||||
err => Err(UvError(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a signal to this process.
|
||||
///
|
||||
/// This is a wrapper around `uv_process_kill`
|
||||
pub fn kill(&self, signum: int) -> Result<(), UvError> {
|
||||
match unsafe {
|
||||
uvll::uv_process_kill(self.native_handle(), signum as libc::c_int)
|
||||
} {
|
||||
0 => Ok(()),
|
||||
err => Err(UvError(err))
|
||||
extern fn on_exit(handle: *uvll::uv_process_t,
|
||||
exit_status: libc::c_int,
|
||||
term_signal: libc::c_int) {
|
||||
let handle = handle as *uvll::uv_handle_t;
|
||||
let p: &mut Process = unsafe { UvHandle::from_uv_handle(&handle) };
|
||||
|
||||
assert!(p.exit_status.is_none());
|
||||
assert!(p.term_signal.is_none());
|
||||
p.exit_status = Some(exit_status as int);
|
||||
p.term_signal = Some(term_signal as int);
|
||||
|
||||
match p.to_wake.take() {
|
||||
Some(task) => {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(task);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the process id of a spawned process
|
||||
pub fn pid(&self) -> libc::pid_t {
|
||||
unsafe { uvll::process_pid(**self) as libc::pid_t }
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
||||
@ -192,11 +188,59 @@ fn with_env<T>(env: Option<&[(~str, ~str)]>, f: &fn(**libc::c_char) -> T) -> T {
|
||||
c_envp.as_imm_buf(|buf, _| f(buf))
|
||||
}
|
||||
|
||||
impl NativeHandle<*uvll::uv_process_t> for Process {
|
||||
fn from_native_handle(handle: *uvll::uv_process_t) -> Process {
|
||||
Process(handle)
|
||||
impl HomingIO for Process {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
|
||||
}
|
||||
|
||||
impl UvHandle<uvll::uv_process_t> for Process {
|
||||
fn uv_handle(&self) -> *uvll::uv_process_t { self.handle }
|
||||
}
|
||||
|
||||
impl RtioProcess for Process {
|
||||
fn id(&self) -> libc::pid_t {
|
||||
unsafe { uvll::process_pid(self.handle) as libc::pid_t }
|
||||
}
|
||||
fn native_handle(&self) -> *uvll::uv_process_t {
|
||||
match self { &Process(ptr) => ptr }
|
||||
|
||||
fn kill(&mut self, signal: int) -> Result<(), IoError> {
|
||||
do self.home_for_io |self_| {
|
||||
match unsafe {
|
||||
uvll::process_kill(self_.handle, signal as libc::c_int)
|
||||
} {
|
||||
0 => Ok(()),
|
||||
err => Err(uv_error_to_io_error(UvError(err)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> int {
|
||||
// Make sure (on the home scheduler) that we have an exit status listed
|
||||
do self.home_for_io |self_| {
|
||||
match self_.exit_status {
|
||||
Some(*) => {}
|
||||
None => {
|
||||
// If there's no exit code previously listed, then the
|
||||
// process's exit callback has yet to be invoked. We just
|
||||
// need to deschedule ourselves and wait to be reawoken.
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
assert!(self_.to_wake.is_none());
|
||||
self_.to_wake = Some(task);
|
||||
}
|
||||
assert!(self_.exit_status.is_some());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(#10109): this is wrong
|
||||
self.exit_status.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Process {
|
||||
fn drop(&mut self) {
|
||||
do self.home_for_io |self_| {
|
||||
assert!(self_.to_wake.is_none());
|
||||
self_.close_async_();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -103,9 +103,9 @@ impl RtioTimer for TimerWatcher {
|
||||
|
||||
extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) {
|
||||
let handle = handle as *uvll::uv_handle_t;
|
||||
let foo: &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
|
||||
let timer : &mut TimerWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
|
||||
|
||||
match foo.action.take_unwrap() {
|
||||
match timer.action.take_unwrap() {
|
||||
WakeTask(task) => {
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.resume_blocked_task_immediately(task);
|
||||
@ -113,7 +113,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, _status: c_int) {
|
||||
SendOnce(chan) => chan.send(()),
|
||||
SendMany(chan) => {
|
||||
chan.send(());
|
||||
foo.action = Some(SendMany(chan));
|
||||
timer.action = Some(SendMany(chan));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,7 +23,6 @@ use std::rt::io::net::ip::{SocketAddr, IpAddr};
|
||||
use std::rt::io::{standard_error, OtherIoError, SeekStyle, SeekSet, SeekCur,
|
||||
SeekEnd};
|
||||
use std::rt::io::process::ProcessConfig;
|
||||
use std::rt::BlockedTask;
|
||||
use std::rt::local::Local;
|
||||
use std::rt::rtio::*;
|
||||
use std::rt::sched::{Scheduler, SchedHandle};
|
||||
@ -772,54 +771,12 @@ impl IoFactory for UvIoFactory {
|
||||
fn spawn(&mut self, config: ProcessConfig)
|
||||
-> Result<(~RtioProcess, ~[Option<~RtioPipe>]), IoError>
|
||||
{
|
||||
// Sadly, we must create the UvProcess before we actually call uv_spawn
|
||||
// so that the exit_cb can close over it and notify it when the process
|
||||
// has exited.
|
||||
let mut ret = ~UvProcess {
|
||||
process: Process::new(),
|
||||
home: None,
|
||||
exit_status: None,
|
||||
term_signal: None,
|
||||
exit_error: None,
|
||||
descheduled: None,
|
||||
};
|
||||
let ret_ptr = unsafe {
|
||||
*cast::transmute::<&~UvProcess, &*mut UvProcess>(&ret)
|
||||
};
|
||||
|
||||
// The purpose of this exit callback is to record the data about the
|
||||
// exit and then wake up the task which may be waiting for the process
|
||||
// to exit. This is all performed in the current io-loop, and the
|
||||
// implementation of UvProcess ensures that reading these fields always
|
||||
// occurs on the current io-loop.
|
||||
let exit_cb: ExitCallback = |_, exit_status, term_signal, error| {
|
||||
unsafe {
|
||||
assert!((*ret_ptr).exit_status.is_none());
|
||||
(*ret_ptr).exit_status = Some(exit_status);
|
||||
(*ret_ptr).term_signal = Some(term_signal);
|
||||
(*ret_ptr).exit_error = error;
|
||||
match (*ret_ptr).descheduled.take() {
|
||||
Some(task) => {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(task);
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
match ret.process.spawn(self.uv_loop(), config, exit_cb) {
|
||||
Ok(io) => {
|
||||
// Only now do we actually get a handle to this scheduler.
|
||||
ret.home = Some(get_handle_to_current_scheduler!());
|
||||
Ok((ret as ~RtioProcess,
|
||||
io.move_iter().map(|p| p.map(|p| p as ~RtioPipe)).collect()))
|
||||
}
|
||||
Err(uverr) => {
|
||||
// We still need to close the process handle we created, but
|
||||
// that's taken care for us in the destructor of UvProcess
|
||||
Err(uv_error_to_io_error(uverr))
|
||||
match Process::spawn(self.uv_loop(), config) {
|
||||
Ok((p, io)) => {
|
||||
Ok((p as ~RtioProcess,
|
||||
io.move_iter().map(|i| i.map(|p| p as ~RtioPipe)).collect()))
|
||||
}
|
||||
Err(e) => Err(uv_error_to_io_error(e)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -1511,85 +1468,6 @@ impl RtioFileStream for UvFileStream {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvProcess {
|
||||
priv process: process::Process,
|
||||
|
||||
// Sadly, this structure must be created before we return it, so in that
|
||||
// brief interim the `home` is None.
|
||||
priv home: Option<SchedHandle>,
|
||||
|
||||
// All None until the process exits (exit_error may stay None)
|
||||
priv exit_status: Option<int>,
|
||||
priv term_signal: Option<int>,
|
||||
priv exit_error: Option<UvError>,
|
||||
|
||||
// Used to store which task to wake up from the exit_cb
|
||||
priv descheduled: Option<BlockedTask>,
|
||||
}
|
||||
|
||||
impl HomingIO for UvProcess {
|
||||
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.home.get_mut_ref() }
|
||||
}
|
||||
|
||||
impl Drop for UvProcess {
|
||||
fn drop(&mut self) {
|
||||
let close = |self_: &mut UvProcess| {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
let task = Cell::new(task);
|
||||
do self_.process.close {
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
scheduler.resume_blocked_task_immediately(task.take());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// If home is none, then this process never actually successfully
|
||||
// spawned, so there's no need to switch event loops
|
||||
if self.home.is_none() {
|
||||
close(self)
|
||||
} else {
|
||||
let _m = self.fire_homing_missile();
|
||||
close(self)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RtioProcess for UvProcess {
|
||||
fn id(&self) -> pid_t {
|
||||
self.process.pid()
|
||||
}
|
||||
|
||||
fn kill(&mut self, signal: int) -> Result<(), IoError> {
|
||||
let _m = self.fire_homing_missile();
|
||||
match self.process.kill(signal) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(uverr) => Err(uv_error_to_io_error(uverr))
|
||||
}
|
||||
}
|
||||
|
||||
fn wait(&mut self) -> int {
|
||||
// Make sure (on the home scheduler) that we have an exit status listed
|
||||
let _m = self.fire_homing_missile();
|
||||
match self.exit_status {
|
||||
Some(*) => {}
|
||||
None => {
|
||||
// If there's no exit code previously listed, then the
|
||||
// process's exit callback has yet to be invoked. We just
|
||||
// need to deschedule ourselves and wait to be reawoken.
|
||||
let scheduler: ~Scheduler = Local::take();
|
||||
do scheduler.deschedule_running_task_and_then |_, task| {
|
||||
assert!(self.descheduled.is_none());
|
||||
self.descheduled = Some(task);
|
||||
}
|
||||
assert!(self.exit_status.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
self.exit_status.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UvUnixListener {
|
||||
priv inner: UvUnboundPipe
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user