1
Fork 0

Move as much I/O as possible off of native::io

When uv's TTY I/O is used for the stdio streams, the file descriptors are put
into a non-blocking mode. This means that other concurrent writes to the same
stream can fail with EAGAIN or EWOULDBLOCK. By all I/O to event-loop I/O, we
avoid this error.

There is one location which cannot move, which is the runtime's dumb_println
function. This was implemented to handle the EAGAIN and EWOULDBLOCK errors and
simply retry again and again.
This commit is contained in:
Alex Crichton 2013-10-17 17:04:51 -07:00
parent 4ce71eaca3
commit 4eb5336054
13 changed files with 167 additions and 101 deletions

View file

@ -112,7 +112,7 @@ pub fn log(_level: u32, args: &fmt::Arguments) {
} }
None => { None => {
// There is no logger anywhere, just write to stderr // There is no logger anywhere, just write to stderr
let mut logger = StdErrLogger; let mut logger = StdErrLogger::new();
logger.log(args); logger.log(args);
} }
} }

View file

@ -370,6 +370,7 @@ pub enum IoErrorKind {
PathAlreadyExists, PathAlreadyExists,
PathDoesntExist, PathDoesntExist,
MismatchedFileTypeForOperation, MismatchedFileTypeForOperation,
ResourceUnavailable,
IoUnavailable, IoUnavailable,
} }
@ -392,6 +393,7 @@ impl ToStr for IoErrorKind {
PathDoesntExist => ~"PathDoesntExist", PathDoesntExist => ~"PathDoesntExist",
MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation", MismatchedFileTypeForOperation => ~"MismatchedFileTypeForOperation",
IoUnavailable => ~"IoUnavailable", IoUnavailable => ~"IoUnavailable",
ResourceUnavailable => ~"ResourceUnavailable",
} }
} }
} }

View file

@ -21,6 +21,12 @@ fn raise_error() {
// XXX: this should probably be a bit more descriptive... // XXX: this should probably be a bit more descriptive...
let (kind, desc) = match os::errno() as i32 { let (kind, desc) = match os::errno() as i32 {
libc::EOF => (EndOfFile, "end of file"), libc::EOF => (EndOfFile, "end of file"),
// These two constants can have the same value on some systems, but
// different values on others, so we can't use a match clause
x if x == libc::EAGAIN || x == libc::EWOULDBLOCK =>
(ResourceUnavailable, "resource temporarily unavailable"),
_ => (OtherIoError, "unknown error"), _ => (OtherIoError, "unknown error"),
}; };

View file

@ -8,6 +8,24 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
/*!
This modules provides bindings to the local event loop's TTY interface, using it
to have synchronous, but non-blocking versions of stdio. These handles can be
inspected for information about terminal dimensions or related information
about the stream or terminal that it is attached to.
# Example
```rust
use std::rt::io;
let mut out = io::stdout();
out.write(bytes!("Hello, world!"));
```
*/
use fmt; use fmt;
use libc; use libc;
use option::{Option, Some, None}; use option::{Option, Some, None};
@ -15,13 +33,14 @@ use result::{Ok, Err};
use rt::rtio::{IoFactory, RtioTTY, with_local_io}; use rt::rtio::{IoFactory, RtioTTY, with_local_io};
use super::{Reader, Writer, io_error}; use super::{Reader, Writer, io_error};
/// Creates a new non-blocking handle to the stdin of the current process. #[fixed_stack_segment] #[inline(never)]
/// fn tty<T>(fd: libc::c_int, f: &fn(~RtioTTY) -> T) -> T {
/// See `stdout()` for notes about this function.
pub fn stdin() -> StdReader {
do with_local_io |io| { do with_local_io |io| {
match io.tty_open(libc::STDIN_FILENO, true, false) { // Always pass in readable as true, otherwise libuv turns our writes
Ok(tty) => Some(StdReader { inner: tty }), // into blocking writes. We also need to dup the file descriptor because
// the tty will be closed when it's dropped.
match io.tty_open(unsafe { libc::dup(fd) }, true) {
Ok(tty) => Some(f(tty)),
Err(e) => { Err(e) => {
io_error::cond.raise(e); io_error::cond.raise(e);
None None
@ -30,6 +49,13 @@ pub fn stdin() -> StdReader {
}.unwrap() }.unwrap()
} }
/// Creates a new non-blocking handle to the stdin of the current process.
///
/// See `stdout()` for notes about this function.
pub fn stdin() -> StdReader {
do tty(libc::STDIN_FILENO) |tty| { StdReader { inner: tty } }
}
/// Creates a new non-blocking handle to the stdout of the current process. /// Creates a new non-blocking handle to the stdout of the current process.
/// ///
/// Note that this is a fairly expensive operation in that at least one memory /// Note that this is a fairly expensive operation in that at least one memory
@ -37,30 +63,14 @@ pub fn stdin() -> StdReader {
/// task context because the stream returned will be a non-blocking object using /// task context because the stream returned will be a non-blocking object using
/// the local scheduler to perform the I/O. /// the local scheduler to perform the I/O.
pub fn stdout() -> StdWriter { pub fn stdout() -> StdWriter {
do with_local_io |io| { do tty(libc::STDOUT_FILENO) |tty| { StdWriter { inner: tty } }
match io.tty_open(libc::STDOUT_FILENO, false, false) {
Ok(tty) => Some(StdWriter { inner: tty }),
Err(e) => {
io_error::cond.raise(e);
None
}
}
}.unwrap()
} }
/// Creates a new non-blocking handle to the stderr of the current process. /// Creates a new non-blocking handle to the stderr of the current process.
/// ///
/// See `stdout()` for notes about this function. /// See `stdout()` for notes about this function.
pub fn stderr() -> StdWriter { pub fn stderr() -> StdWriter {
do with_local_io |io| { do tty(libc::STDERR_FILENO) |tty| { StdWriter { inner: tty } }
match io.tty_open(libc::STDERR_FILENO, false, false) {
Ok(tty) => Some(StdWriter { inner: tty }),
Err(e) => {
io_error::cond.raise(e);
None
}
}
}.unwrap()
} }
/// Prints a string to the stdout of the current process. No newline is emitted /// Prints a string to the stdout of the current process. No newline is emitted
@ -115,6 +125,11 @@ impl StdReader {
Err(e) => io_error::cond.raise(e), Err(e) => io_error::cond.raise(e),
} }
} }
/// Returns whether this tream is attached to a TTY instance or not.
///
/// This is similar to libc's isatty() function
pub fn isatty(&self) -> bool { self.inner.isatty() }
} }
impl Reader for StdReader { impl Reader for StdReader {
@ -170,6 +185,11 @@ impl StdWriter {
Err(e) => io_error::cond.raise(e), Err(e) => io_error::cond.raise(e),
} }
} }
/// Returns whether this tream is attached to a TTY instance or not.
///
/// This is similar to libc's isatty() function
pub fn isatty(&self) -> bool { self.inner.isatty() }
} }
impl Writer for StdWriter { impl Writer for StdWriter {

View file

@ -12,6 +12,7 @@ use fmt;
use from_str::from_str; use from_str::from_str;
use libc::exit; use libc::exit;
use option::{Some, None, Option}; use option::{Some, None, Option};
use rt::io;
use rt::crate_map::{ModEntry, CrateMap, iter_crate_map, get_crate_map}; use rt::crate_map::{ModEntry, CrateMap, iter_crate_map, get_crate_map};
use str::StrSlice; use str::StrSlice;
use u32; use u32;
@ -166,14 +167,23 @@ pub trait Logger {
fn log(&mut self, args: &fmt::Arguments); fn log(&mut self, args: &fmt::Arguments);
} }
pub struct StdErrLogger; /// This logger emits output to the stderr of the process, and contains a lazily
/// initialized event-loop driven handle to the stream.
pub struct StdErrLogger {
priv handle: Option<io::stdio::StdWriter>,
}
impl StdErrLogger {
pub fn new() -> StdErrLogger { StdErrLogger { handle: None } }
}
impl Logger for StdErrLogger { impl Logger for StdErrLogger {
fn log(&mut self, args: &fmt::Arguments) { fn log(&mut self, args: &fmt::Arguments) {
// FIXME(#6846): this should not call the blocking version of println, // First time logging? Get a handle to the stderr of this process.
// or at least the default loggers for tasks shouldn't do if self.handle.is_none() {
// that self.handle = Some(io::stderr());
::rt::util::dumb_println(args); }
fmt::writeln(self.handle.get_mut_ref() as &mut io::Writer, args);
} }
} }

View file

@ -97,7 +97,7 @@ pub trait IoFactory {
fn unix_bind(&mut self, path: &CString) -> fn unix_bind(&mut self, path: &CString) ->
Result<~RtioUnixListener, IoError>; Result<~RtioUnixListener, IoError>;
fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>; fn unix_connect(&mut self, path: &CString) -> Result<~RtioPipe, IoError>;
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool) fn tty_open(&mut self, fd: c_int, readable: bool)
-> Result<~RtioTTY, IoError>; -> Result<~RtioTTY, IoError>;
} }
@ -182,6 +182,7 @@ pub trait RtioTTY {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>; fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
fn set_raw(&mut self, raw: bool) -> Result<(), IoError>; fn set_raw(&mut self, raw: bool) -> Result<(), IoError>;
fn get_winsize(&mut self) -> Result<(int, int), IoError>; fn get_winsize(&mut self) -> Result<(int, int), IoError>;
fn isatty(&self) -> bool;
} }
pub trait PausibleIdleCallback { pub trait PausibleIdleCallback {

View file

@ -132,7 +132,7 @@ impl Task {
heap: LocalHeap::new(), heap: LocalHeap::new(),
gc: GarbageCollector, gc: GarbageCollector,
storage: LocalStorage(None), storage: LocalStorage(None),
logger: StdErrLogger, logger: StdErrLogger::new(),
unwinder: Unwinder { unwinding: false }, unwinder: Unwinder { unwinding: false },
taskgroup: None, taskgroup: None,
death: Death::new(), death: Death::new(),
@ -166,7 +166,7 @@ impl Task {
heap: LocalHeap::new(), heap: LocalHeap::new(),
gc: GarbageCollector, gc: GarbageCollector,
storage: LocalStorage(None), storage: LocalStorage(None),
logger: StdErrLogger, logger: StdErrLogger::new(),
unwinder: Unwinder { unwinding: false }, unwinder: Unwinder { unwinding: false },
taskgroup: None, taskgroup: None,
death: Death::new(), death: Death::new(),
@ -188,7 +188,7 @@ impl Task {
heap: LocalHeap::new(), heap: LocalHeap::new(),
gc: GarbageCollector, gc: GarbageCollector,
storage: LocalStorage(None), storage: LocalStorage(None),
logger: StdErrLogger, logger: StdErrLogger::new(),
unwinder: Unwinder { unwinding: false }, unwinder: Unwinder { unwinding: false },
taskgroup: None, taskgroup: None,
// FIXME(#7544) make watching optional // FIXME(#7544) make watching optional
@ -549,6 +549,7 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! {
use rt::logging::Logger; use rt::logging::Logger;
use str::Str; use str::Str;
use c_str::CString; use c_str::CString;
use unstable::intrinsics;
unsafe { unsafe {
let msg = CString::new(msg, false); let msg = CString::new(msg, false);
@ -557,35 +558,32 @@ pub fn begin_unwind(msg: *c_char, file: *c_char, line: size_t) -> ! {
Some(s) => s, None => rtabort!("message wasn't utf8?") Some(s) => s, None => rtabort!("message wasn't utf8?")
}; };
if in_green_task_context() { if !in_green_task_context() {
// Be careful not to allocate in this block, if we're failing we may
// have been failing due to a lack of memory in the first place...
do Local::borrow |task: &mut Task| {
let n = task.name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>");
match file.as_str() {
Some(file) => {
format_args!(|args| { task.logger.log(args) },
"task '{}' failed at '{}', {}:{}",
n, msg, file, line);
}
None => {
format_args!(|args| { task.logger.log(args) },
"task '{}' failed at '{}'", n, msg);
}
}
}
} else {
match file.as_str() { match file.as_str() {
Some(file) => { Some(file) => {
rterrln!("failed in non-task context at '{}', {}:{}", rterrln!("failed in non-task context at '{}', {}:{}",
msg, file, line as int); msg, file, line as int);
} }
None => rterrln!("failed in non-task context at '{}'", msg), None => rterrln!("failed in non-task context at '{}'", msg)
} }
intrinsics::abort();
} }
// Be careful not to allocate in this block, if we're failing we may
// have been failing due to a lack of memory in the first place...
let task: *mut Task = Local::unsafe_borrow(); let task: *mut Task = Local::unsafe_borrow();
let n = (*task).name.as_ref().map(|n| n.as_slice()).unwrap_or("<unnamed>");
match file.as_str() {
Some(file) => {
format_args!(|args| { (*task).logger.log(args) },
"task '{}' failed at '{}', {}:{}",
n, msg, file, line);
}
None => {
format_args!(|args| { (*task).logger.log(args) },
"task '{}' failed at '{}'", n, msg);
}
}
if (*task).unwinder.unwinding { if (*task).unwinder.unwinding {
rtabort!("unwinding again"); rtabort!("unwinding again");
} }

View file

@ -71,9 +71,18 @@ pub fn default_sched_threads() -> uint {
pub fn dumb_println(args: &fmt::Arguments) { pub fn dumb_println(args: &fmt::Arguments) {
use rt::io::native::stdio::stderr; use rt::io::native::stdio::stderr;
use rt::io::Writer; use rt::io::{Writer, io_error, ResourceUnavailable};
let mut out = stderr(); let mut out = stderr();
fmt::writeln(&mut out as &mut Writer, args);
let mut again = true;
do io_error::cond.trap(|e| {
again = e.kind == ResourceUnavailable;
}).inside {
while again {
again = false;
fmt::writeln(&mut out as &mut Writer, args);
}
}
} }
pub fn abort(msg: &str) -> ! { pub fn abort(msg: &str) -> ! {

View file

@ -170,6 +170,7 @@ pub trait WatcherInterop {
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData; fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self); fn drop_watcher_data(&mut self);
fn close(self, cb: NullCallback); fn close(self, cb: NullCallback);
fn close_async(self);
} }
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W { impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
@ -235,6 +236,16 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
unsafe { uvll::free_handle(handle as *c_void) } unsafe { uvll::free_handle(handle as *c_void) }
} }
} }
fn close_async(self) {
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_handle_t) {
let mut h: Handle = NativeHandle::from_native_handle(handle);
h.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void) }
}
}
} }
// XXX: Need to define the error constants like EOF so they can be // XXX: Need to define the error constants like EOF so they can be

View file

@ -868,13 +868,13 @@ impl IoFactory for UvIoFactory {
return ret; return ret;
} }
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool) fn tty_open(&mut self, fd: c_int, readable: bool)
-> Result<~RtioTTY, IoError> { -> Result<~RtioTTY, IoError> {
match tty::TTY::new(self.uv_loop(), fd, readable) { match tty::TTY::new(self.uv_loop(), fd, readable) {
Ok(tty) => Ok(~UvTTY { Ok(tty) => Ok(~UvTTY {
home: get_handle_to_current_scheduler!(), home: get_handle_to_current_scheduler!(),
tty: tty, tty: tty,
close_on_drop: close_on_drop, fd: fd,
} as ~RtioTTY), } as ~RtioTTY),
Err(e) => Err(uv_error_to_io_error(e)) Err(e) => Err(uv_error_to_io_error(e))
} }
@ -1748,7 +1748,7 @@ impl RtioUnixListener for UvUnixListener {
pub struct UvTTY { pub struct UvTTY {
tty: tty::TTY, tty: tty::TTY,
home: SchedHandle, home: SchedHandle,
close_on_drop: bool, fd: c_int,
} }
impl HomingIO for UvTTY { impl HomingIO for UvTTY {
@ -1757,20 +1757,48 @@ impl HomingIO for UvTTY {
impl Drop for UvTTY { impl Drop for UvTTY {
fn drop(&mut self) { fn drop(&mut self) {
if self.close_on_drop { // TTY handles are used for the logger in a task, so this destructor is
let scheduler: ~Scheduler = Local::take(); // run when a task is destroyed. When a task is being destroyed, a local
do scheduler.deschedule_running_task_and_then |_, task| { // scheduler isn't available, so we can't do the normal "take the
let task = Cell::new(task); // scheduler and resume once close is done". Instead close operations on
do self.tty.close { // a TTY are asynchronous.
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take()); self.tty.close_async();
} }
} }
} else {
self.tty.drop_watcher_data(); impl RtioTTY for UvTTY {
unsafe { uvll::free_handle(self.tty.native_handle()) } fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.tty.as_stream(), scheduler, buf)
} }
} }
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.tty.as_stream(), scheduler, buf)
}
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.tty.set_mode(raw) {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
do self.home_for_io |self_| {
match self_.tty.get_winsize() {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
fn isatty(&self) -> bool {
unsafe { uvll::guess_handle(self.fd) == uvll::UV_TTY }
}
} }
pub struct UvUnixAcceptor { pub struct UvUnixAcceptor {
@ -1808,36 +1836,6 @@ impl RtioUnixAcceptor for UvUnixAcceptor {
} }
} }
impl RtioTTY for UvTTY {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.tty.as_stream(), scheduler, buf)
}
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.tty.as_stream(), scheduler, buf)
}
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.tty.set_mode(raw) {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
do self.home_for_io |self_| {
match self_.tty.get_winsize() {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
}
// this function is full of lies // this function is full of lies
unsafe fn local_io() -> &'static mut IoFactory { unsafe fn local_io() -> &'static mut IoFactory {
do Local::borrow |sched: &mut Scheduler| { do Local::borrow |sched: &mut Scheduler| {

View file

@ -986,6 +986,10 @@ pub unsafe fn tty_get_winsize(tty: *uv_tty_t, width: *c_int,
#[fixed_stack_segment]; #[inline(never)]; #[fixed_stack_segment]; #[inline(never)];
rust_uv_tty_get_winsize(tty, width, height) rust_uv_tty_get_winsize(tty, width, height)
} }
pub unsafe fn guess_handle(fd: c_int) -> uv_handle_type {
#[fixed_stack_segment]; #[inline(never)];
rust_uv_guess_handle(fd)
}
pub struct uv_err_data { pub struct uv_err_data {
priv err_name: ~str, priv err_name: ~str,
@ -1140,6 +1144,7 @@ extern {
fn rust_uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int; fn rust_uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int;
fn rust_uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int, fn rust_uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int,
height: *c_int) -> c_int; height: *c_int) -> c_int;
fn rust_uv_guess_handle(fd: c_int) -> uv_handle_type;
// These should all really be constants... // These should all really be constants...
#[rust_stack] pub fn rust_SOCK_STREAM() -> c_int; #[rust_stack] pub fn rust_SOCK_STREAM() -> c_int;

View file

@ -681,3 +681,8 @@ extern "C" int
rust_uv_tty_get_winsize(uv_tty_t *tty, int *width, int *height) { rust_uv_tty_get_winsize(uv_tty_t *tty, int *width, int *height) {
return uv_tty_get_winsize(tty, width, height); return uv_tty_get_winsize(tty, width, height);
} }
extern "C" uv_handle_type
rust_uv_guess_handle(int fd) {
return uv_guess_handle(fd);
}

View file

@ -217,3 +217,4 @@ rust_uv_pipe_connect
rust_uv_tty_init rust_uv_tty_init
rust_uv_tty_set_mode rust_uv_tty_set_mode
rust_uv_tty_get_winsize rust_uv_tty_get_winsize
rust_uv_guess_handle