1
Fork 0

Runtime removal: refactor process

This patch continues the runtime removal by moving and refactoring the
process implementation into the new `sys` module.

Because this eliminates APIs in `libnative` and `librustrt`, it is a:

[breaking-change]

This functionality is likely to be available publicly, in some form,
from `std` in the future.
This commit is contained in:
Aaron Turon 2014-10-09 16:27:28 -07:00
parent 3d195482a4
commit 0f98e75b69
10 changed files with 1250 additions and 176 deletions

View file

@ -20,14 +20,17 @@ use os;
use io::{IoResult, IoError};
use io;
use libc;
use mem;
use rt::rtio::{RtioProcess, ProcessConfig, IoFactory, LocalIo};
use rt::rtio;
use c_str::CString;
use collections::HashMap;
use hash::Hash;
#[cfg(windows)]
use std::hash::sip::SipState;
use io::pipe::{PipeStream, PipePair};
use path::BytesContainer;
use sys;
use sys::fs::FileDesc;
use sys::process::Process as ProcessImp;
/// Signal a process to exit, without forcibly killing it. Corresponds to
/// SIGTERM on unix platforms.
@ -62,24 +65,29 @@ use std::hash::sip::SipState;
/// assert!(child.wait().unwrap().success());
/// ```
pub struct Process {
handle: Box<RtioProcess + Send>,
handle: ProcessImp,
forget: bool,
/// None until wait() is called.
exit_code: Option<ProcessExit>,
/// Manually delivered signal
exit_signal: Option<int>,
/// Deadline after which wait() will return
deadline: u64,
/// Handle to the child's stdin, if the `stdin` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stdin: Option<io::PipeStream>,
pub stdin: Option<PipeStream>,
/// Handle to the child's stdout, if the `stdout` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stdout: Option<io::PipeStream>,
pub stdout: Option<PipeStream>,
/// Handle to the child's stderr, if the `stderr` field of this process's
/// `ProcessConfig` was `CreatePipe`. By default, this handle is `Some`.
pub stderr: Option<io::PipeStream>,
/// Extra I/O handles as configured by the original `ProcessConfig` when
/// this process was created. This is by default empty.
pub extra_io: Vec<Option<io::PipeStream>>,
pub stderr: Option<PipeStream>,
}
/// A representation of environment variable name
@ -130,6 +138,13 @@ impl PartialEq for EnvKey {
}
}
impl BytesContainer for EnvKey {
fn container_as_bytes<'a>(&'a self) -> &'a [u8] {
let &EnvKey(ref k) = self;
k.container_as_bytes()
}
}
/// A HashMap representation of environment variables.
pub type EnvMap = HashMap<EnvKey, CString>;
@ -160,7 +175,6 @@ pub struct Command {
stdin: StdioContainer,
stdout: StdioContainer,
stderr: StdioContainer,
extra_io: Vec<StdioContainer>,
uid: Option<uint>,
gid: Option<uint>,
detach: bool,
@ -194,7 +208,6 @@ impl Command {
stdin: CreatePipe(true, false),
stdout: CreatePipe(false, true),
stderr: CreatePipe(false, true),
extra_io: Vec::new(),
uid: None,
gid: None,
detach: false,
@ -281,14 +294,6 @@ impl Command {
self.stderr = cfg;
self
}
/// Attaches a stream/file descriptor/pipe to the child process. Inherited
/// file descriptors are numbered consecutively, starting at 3; the first
/// three file descriptors (stdin/stdout/stderr) are configured with the
/// `stdin`, `stdout`, and `stderr` methods.
pub fn extra_io<'a>(&'a mut self, cfg: StdioContainer) -> &'a mut Command {
self.extra_io.push(cfg);
self
}
/// Sets the child process's user id. This translates to a `setuid` call in
/// the child process. Setting this value on windows will cause the spawn to
@ -315,50 +320,23 @@ impl Command {
/// Executes the command as a child process, which is returned.
pub fn spawn(&self) -> IoResult<Process> {
fn to_rtio(p: StdioContainer) -> rtio::StdioContainer {
match p {
Ignored => rtio::Ignored,
InheritFd(fd) => rtio::InheritFd(fd),
CreatePipe(a, b) => rtio::CreatePipe(a, b),
}
}
let extra_io: Vec<rtio::StdioContainer> =
self.extra_io.iter().map(|x| to_rtio(*x)).collect();
LocalIo::maybe_raise(|io| {
let env = match self.env {
None => None,
Some(ref env_map) =>
Some(env_map.iter()
.map(|(&EnvKey(ref key), val)| (key, val))
.collect::<Vec<_>>())
};
let cfg = ProcessConfig {
program: &self.program,
args: self.args.as_slice(),
env: env.as_ref().map(|e| e.as_slice()),
cwd: self.cwd.as_ref(),
stdin: to_rtio(self.stdin),
stdout: to_rtio(self.stdout),
stderr: to_rtio(self.stderr),
extra_io: extra_io.as_slice(),
uid: self.uid,
gid: self.gid,
detach: self.detach,
};
io.spawn(cfg).map(|(p, io)| {
let mut io = io.into_iter().map(|p| {
p.map(|p| io::PipeStream::new(p))
});
Process {
handle: p,
forget: false,
stdin: io.next().unwrap(),
stdout: io.next().unwrap(),
stderr: io.next().unwrap(),
extra_io: io.collect(),
}
let (their_stdin, our_stdin) = try!(setup_io(self.stdin));
let (their_stdout, our_stdout) = try!(setup_io(self.stdout));
let (their_stderr, our_stderr) = try!(setup_io(self.stderr));
match ProcessImp::spawn(self, their_stdin, their_stdout, their_stderr) {
Err(e) => Err(e),
Ok(handle) => Ok(Process {
handle: handle,
forget: false,
exit_code: None,
exit_signal: None,
deadline: 0,
stdin: our_stdin,
stdout: our_stdout,
stderr: our_stderr,
})
}).map_err(IoError::from_rtio_error)
}
}
/// Executes the command as a child process, waiting for it to finish and
@ -415,6 +393,58 @@ impl fmt::Show for Command {
}
}
fn setup_io(io: StdioContainer) -> IoResult<(Option<PipeStream>, Option<PipeStream>)> {
let ours;
let theirs;
match io {
Ignored => {
theirs = None;
ours = None;
}
InheritFd(fd) => {
theirs = Some(PipeStream::from_filedesc(FileDesc::new(fd, false)));
ours = None;
}
CreatePipe(readable, _writable) => {
let PipePair { reader, writer } = try!(PipeStream::pair());
if readable {
theirs = Some(reader);
ours = Some(writer);
} else {
theirs = Some(writer);
ours = Some(reader);
}
}
}
Ok((theirs, ours))
}
// Allow the sys module to get access to the Command state
impl sys::process::ProcessConfig<EnvKey, CString> for Command {
fn program(&self) -> &CString {
&self.program
}
fn args(&self) -> &[CString] {
self.args.as_slice()
}
fn env(&self) -> Option<&EnvMap> {
self.env.as_ref()
}
fn cwd(&self) -> Option<&CString> {
self.cwd.as_ref()
}
fn uid(&self) -> Option<uint> {
self.uid.clone()
}
fn gid(&self) -> Option<uint> {
self.gid.clone()
}
fn detach(&self) -> bool {
self.detach
}
}
/// The output of a finished process.
#[deriving(PartialEq, Eq, Clone)]
pub struct ProcessOutput {
@ -494,9 +524,7 @@ impl Process {
/// be successfully delivered if the child has exited, but not yet been
/// reaped.
pub fn kill(id: libc::pid_t, signal: int) -> IoResult<()> {
LocalIo::maybe_raise(|io| {
io.kill(id, signal)
}).map_err(IoError::from_rtio_error)
unsafe { ProcessImp::killpid(id, signal) }
}
/// Returns the process id of this child process
@ -518,7 +546,42 @@ impl Process {
///
/// If the signal delivery fails, the corresponding error is returned.
pub fn signal(&mut self, signal: int) -> IoResult<()> {
self.handle.kill(signal).map_err(IoError::from_rtio_error)
#[cfg(unix)] fn collect_status(p: &mut Process) {
// On Linux (and possibly other unices), a process that has exited will
// continue to accept signals because it is "defunct". The delivery of
// signals will only fail once the child has been reaped. For this
// reason, if the process hasn't exited yet, then we attempt to collect
// their status with WNOHANG.
if p.exit_code.is_none() {
match p.handle.try_wait() {
Some(code) => { p.exit_code = Some(code); }
None => {}
}
}
}
#[cfg(windows)] fn collect_status(_p: &mut Process) {}
collect_status(self);
// if the process has finished, and therefore had waitpid called,
// and we kill it, then on unix we might ending up killing a
// newer process that happens to have the same (re-used) id
if self.exit_code.is_some() {
return Err(IoError {
kind: io::InvalidInput,
desc: "invalid argument: can't kill an exited process",
detail: None,
})
}
// A successfully delivered signal that isn't 0 (just a poll for being
// alive) is recorded for windows (see wait())
match unsafe { self.handle.kill(signal) } {
Ok(()) if signal == 0 => Ok(()),
Ok(()) => { self.exit_signal = Some(signal); Ok(()) }
Err(e) => Err(e),
}
}
/// Sends a signal to this child requesting that it exits. This is
@ -545,10 +608,21 @@ impl Process {
/// `set_timeout` and the timeout expires before the child exits.
pub fn wait(&mut self) -> IoResult<ProcessExit> {
drop(self.stdin.take());
match self.handle.wait() {
Ok(rtio::ExitSignal(s)) => Ok(ExitSignal(s)),
Ok(rtio::ExitStatus(s)) => Ok(ExitStatus(s)),
Err(e) => Err(IoError::from_rtio_error(e)),
match self.exit_code {
Some(code) => Ok(code),
None => {
let code = try!(self.handle.wait(self.deadline));
// On windows, waitpid will never return a signal. If a signal
// was successfully delivered to the process, however, we can
// consider it as having died via a signal.
let code = match self.exit_signal {
None => code,
Some(signal) if cfg!(windows) => ExitSignal(signal),
Some(..) => code,
};
self.exit_code = Some(code);
Ok(code)
}
}
}
@ -594,7 +668,7 @@ impl Process {
/// ```
#[experimental = "the type of the timeout is likely to change"]
pub fn set_timeout(&mut self, timeout_ms: Option<u64>) {
self.handle.set_timeout(timeout_ms)
self.deadline = timeout_ms.map(|i| i + sys::timer::now()).unwrap_or(0);
}
/// Simultaneously wait for the child to exit and collect all remaining
@ -653,7 +727,6 @@ impl Drop for Process {
drop(self.stdin.take());
drop(self.stdout.take());
drop(self.stderr.take());
drop(mem::replace(&mut self.extra_io, Vec::new()));
self.set_timeout(None);
let _ = self.wait().unwrap();
@ -1109,8 +1182,7 @@ mod tests {
#[test]
fn dont_close_fd_on_command_spawn() {
use std::rt::rtio::{Truncate, Write};
use self::native::io::file;
use sys::fs;
let path = if cfg!(windows) {
Path::new("NUL")
@ -1118,7 +1190,7 @@ mod tests {
Path::new("/dev/null")
};
let mut fdes = match file::open(&path.to_c_str(), Truncate, Write) {
let mut fdes = match fs::open(&path, Truncate, Write) {
Ok(f) => f,
Err(_) => panic!("failed to open file descriptor"),
};
@ -1126,7 +1198,7 @@ mod tests {
let mut cmd = pwd_cmd();
let _ = cmd.stdout(InheritFd(fdes.fd()));
assert!(cmd.status().unwrap().success());
assert!(fdes.inner_write("extra write\n".as_bytes()).is_ok());
assert!(fdes.write("extra write\n".as_bytes()).is_ok());
}
#[test]