1
Fork 0

Move rt::io::stdio from FileStream to a TTY

We get a little more functionality from libuv for these kinds of streams (things
like terminal dimentions), and it also appears to more gracefully handle the
stream being a window. Beforehand, if you used stdio and hit CTRL+d on a
process, libuv would continually return 0-length successful reads instead of
interpreting that the stream was closed.

I was hoping to be able to write tests for this, but currently the testing
infrastructure doesn't allow tests with a stdin and a stdout, but this has been
manually tested! (not that it means much)
This commit is contained in:
Alex Crichton 2013-10-16 11:47:12 -07:00
parent 32b07c6a40
commit 35756fbcf6
13 changed files with 307 additions and 162 deletions

View file

@ -13,7 +13,7 @@ use libc;
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::local::Local;
use rt::rtio::{RtioFileStream, IoFactoryObject, IoFactory};
use rt::rtio::{IoFactoryObject, IoFactory, RtioTTYObject, RtioTTY};
use super::{Reader, Writer, io_error};
/// Creates a new non-blocking handle to the stdin of the current process.
@ -22,8 +22,8 @@ use super::{Reader, Writer, io_error};
pub fn stdin() -> StdReader {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).fs_from_raw_fd(libc::STDIN_FILENO, false)
};
(*io).tty_open(libc::STDIN_FILENO, true, false)
}.unwrap();
StdReader { inner: stream }
}
@ -36,8 +36,8 @@ pub fn stdin() -> StdReader {
pub fn stdout() -> StdWriter {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).fs_from_raw_fd(libc::STDOUT_FILENO, false)
};
(*io).tty_open(libc::STDOUT_FILENO, false, false)
}.unwrap();
StdWriter { inner: stream }
}
@ -47,8 +47,8 @@ pub fn stdout() -> StdWriter {
pub fn stderr() -> StdWriter {
let stream = unsafe {
let io: *mut IoFactoryObject = Local::unsafe_borrow();
(*io).fs_from_raw_fd(libc::STDERR_FILENO, false)
};
(*io).tty_open(libc::STDERR_FILENO, false, false)
}.unwrap();
StdWriter { inner: stream }
}
@ -87,7 +87,30 @@ pub fn println_args(fmt: &fmt::Arguments) {
/// Representation of a reader of a standard input stream
pub struct StdReader {
priv inner: ~RtioFileStream
priv inner: ~RtioTTYObject
}
impl StdReader {
/// Controls whether this output stream is a "raw stream" or simply a normal
/// stream.
///
/// # Failure
///
/// This function will raise on the `io_error` condition if an error
/// happens.
pub fn set_raw(&mut self, raw: bool) {
match self.inner.set_raw(raw) {
Ok(()) => {},
Err(e) => io_error::cond.raise(e),
}
}
/// Resets the mode of this stream back to its original state.
///
/// # Failure
///
/// This function cannot fail.
pub fn reset_mode(&mut self) { self.inner.reset_mode(); }
}
impl Reader for StdReader {
@ -106,7 +129,50 @@ impl Reader for StdReader {
/// Representation of a writer to a standard output stream
pub struct StdWriter {
priv inner: ~RtioFileStream
priv inner: ~RtioTTYObject
}
impl StdWriter {
/// Gets the size of this output window, if possible. This is typically used
/// when the writer is attached to something like a terminal, this is used
/// to fetch the dimensions of the terminal.
///
/// If successful, returns Some((width, height)).
///
/// # Failure
///
/// This function will raise on the `io_error` condition if an error
/// happens.
pub fn winsize(&mut self) -> Option<(int, int)> {
match self.inner.get_winsize() {
Ok(p) => Some(p),
Err(e) => {
io_error::cond.raise(e);
None
}
}
}
/// Controls whether this output stream is a "raw stream" or simply a normal
/// stream.
///
/// # Failure
///
/// This function will raise on the `io_error` condition if an error
/// happens.
pub fn set_raw(&mut self, raw: bool) {
match self.inner.set_raw(raw) {
Ok(()) => {},
Err(e) => io_error::cond.raise(e),
}
}
/// Resets the mode of this stream back to its original state.
///
/// # Failure
///
/// This function cannot fail.
pub fn reset_mode(&mut self) { self.inner.reset_mode(); }
}
impl Writer for StdWriter {
@ -117,10 +183,18 @@ impl Writer for StdWriter {
}
}
fn flush(&mut self) {
match self.inner.flush() {
Ok(()) => {}
Err(e) => io_error::cond.raise(e)
}
fn flush(&mut self) { /* nothing to do */ }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn smoke() {
// Just make sure we can acquire handles
stdin();
stdout();
stderr();
}
}

View file

@ -38,6 +38,7 @@ pub type RtioPipeObject = uvio::UvPipeStream;
pub type RtioProcessObject = uvio::UvProcess;
pub type RtioUnixListenerObject = uvio::UvUnixListener;
pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
pub type RtioTTYObject = uvio::UvTTY;
pub trait EventLoop {
fn run(&mut self);
@ -94,6 +95,8 @@ pub trait IoFactory {
Result<~RtioUnixListenerObject, IoError>;
fn unix_connect<P: PathLike>(&mut self, path: &P) ->
Result<~RtioPipeObject, IoError>;
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
-> Result<~RtioTTYObject, IoError>;
}
pub trait RtioTcpListener : RtioSocket {
@ -171,3 +174,11 @@ pub trait RtioUnixAcceptor {
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
}
pub trait RtioTTY {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
fn set_raw(&mut self, raw: bool) -> Result<(), IoError>;
fn reset_mode(&mut self);
fn get_winsize(&mut self) -> Result<(int, int), IoError>;
}

View file

@ -8,11 +8,11 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_int, c_void};
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::uvll::UV_ASYNC;
use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback, NullCallback};
use rt::uv::{Watcher, Loop, NativeHandle, AsyncCallback};
use rt::uv::WatcherInterop;
use rt::uv::status_to_maybe_uv_error;
@ -47,27 +47,6 @@ impl AsyncWatcher {
uvll::async_send(handle);
}
}
pub fn close(self, cb: NullCallback) {
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
unsafe {
uvll::close(self.native_handle(), close_cb);
}
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut watcher: AsyncWatcher = NativeHandle::from_native_handle(handle);
{
let data = watcher.get_watcher_data();
data.close_cb.take_unwrap()();
}
watcher.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void); }
}
}
}
impl NativeHandle<*uvll::uv_async_t> for AsyncWatcher {

View file

@ -11,7 +11,7 @@
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback, NullCallback};
use rt::uv::{Watcher, Loop, NativeHandle, IdleCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct IdleWatcher(*uvll::uv_idle_t);
@ -71,29 +71,6 @@ impl IdleWatcher {
assert!(0 == uvll::idle_stop(self.native_handle()));
}
}
pub fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb) };
extern fn close_cb(handle: *uvll::uv_idle_t) {
unsafe {
let mut idle_watcher: IdleWatcher = NativeHandle::from_native_handle(handle);
{
let data = idle_watcher.get_watcher_data();
data.close_cb.take_unwrap()();
}
idle_watcher.drop_watcher_data();
uvll::idle_delete(handle);
}
}
}
}
impl NativeHandle<*uvll::uv_idle_t> for IdleWatcher {

View file

@ -75,6 +75,7 @@ pub mod async;
pub mod addrinfo;
pub mod process;
pub mod pipe;
pub mod tty;
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
/// with dtors may not be destructured, but tuple structs can,
@ -83,6 +84,14 @@ pub struct Loop {
priv handle: *uvll::uv_loop_t
}
pub struct Handle(*uvll::uv_handle_t);
impl Watcher for Handle {}
impl NativeHandle<*uvll::uv_handle_t> for Handle {
fn from_native_handle(h: *uvll::uv_handle_t) -> Handle { Handle(h) }
fn native_handle(&self) -> *uvll::uv_handle_t { **self }
}
/// The trait implemented by uv 'watchers' (handles). Watchers are
/// non-owning wrappers around the uv handles and are not completely
/// safe - there may be multiple instances for a single underlying
@ -160,6 +169,7 @@ pub trait WatcherInterop {
fn install_watcher_data(&mut self);
fn get_watcher_data<'r>(&'r mut self) -> &'r mut WatcherData;
fn drop_watcher_data(&mut self);
fn close(self, cb: NullCallback);
}
impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
@ -207,6 +217,24 @@ impl<H, W: Watcher + NativeHandle<*H>> WatcherInterop for W {
uvll::set_data_for_uv_handle(self.native_handle(), null::<()>());
}
}
fn close(self, cb: NullCallback) {
let mut this = self;
{
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(this.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_handle_t) {
let mut h: Handle = NativeHandle::from_native_handle(handle);
h.get_watcher_data().close_cb.take_unwrap()();
h.drop_watcher_data();
unsafe { uvll::free_handle(handle as *c_void) }
}
}
}
// XXX: Need to define the error constants like EOF so they can be

View file

@ -13,7 +13,7 @@ use libc::{size_t, ssize_t, c_int, c_void, c_uint};
use rt::uv::uvll;
use rt::uv::uvll::*;
use rt::uv::{AllocCallback, ConnectionCallback, ReadCallback, UdpReceiveCallback, UdpSendCallback};
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle, NullCallback,
use rt::uv::{Loop, Watcher, Request, UvError, Buf, NativeHandle,
status_to_maybe_uv_error, vec_to_uv_buf};
use rt::io::net::ip::{SocketAddr, Ipv4Addr, Ipv6Addr};
use vec;
@ -184,24 +184,6 @@ impl StreamWatcher {
}
}
pub fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_stream_t) {
let mut stream_watcher: StreamWatcher = NativeHandle::from_native_handle(handle);
let cb = stream_watcher.get_watcher_data().close_cb.take_unwrap();
stream_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
cb();
}
}
pub fn listen(&mut self, cb: ConnectionCallback) -> Result<(), UvError> {
{
@ -413,25 +395,6 @@ impl UdpWatcher {
cb(udp_watcher, status);
}
}
pub fn close(self, cb: NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_udp_t) {
let mut udp_watcher: UdpWatcher = NativeHandle::from_native_handle(handle);
let cb = udp_watcher.get_watcher_data().close_cb.take_unwrap();
udp_watcher.drop_watcher_data();
unsafe { free_handle(handle as *c_void) }
cb();
}
}
}
impl NativeHandle<*uvll::uv_udp_t> for UdpWatcher {

View file

@ -86,23 +86,6 @@ impl Pipe {
}
}
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern "C" fn close_cb(handle: *uvll::uv_pipe_t) {
let mut process: Pipe = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
impl uv::NativeHandle<*uvll::uv_pipe_t> for Pipe {

View file

@ -123,25 +123,6 @@ impl Process {
pub fn pid(&self) -> libc::pid_t {
unsafe { uvll::process_pid(**self) as libc::pid_t }
}
/// Closes this handle, invoking the specified callback once closed
pub fn close(self, cb: uv::NullCallback) {
{
let mut this = self;
let data = this.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe { uvll::close(self.native_handle(), close_cb); }
extern fn close_cb(handle: *uvll::uv_process_t) {
let mut process: Process = uv::NativeHandle::from_native_handle(handle);
process.get_watcher_data().close_cb.take_unwrap()();
process.drop_watcher_data();
unsafe { uvll::free_handle(handle as *libc::c_void) }
}
}
}
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,

View file

@ -8,10 +8,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use libc::{c_void, c_int};
use libc::c_int;
use option::Some;
use rt::uv::uvll;
use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback, NullCallback};
use rt::uv::{Watcher, Loop, NativeHandle, TimerCallback};
use rt::uv::status_to_maybe_uv_error;
pub struct TimerWatcher(*uvll::uv_timer_t);
@ -53,31 +53,6 @@ impl TimerWatcher {
uvll::timer_stop(self.native_handle());
}
}
pub fn close(self, cb: NullCallback) {
let mut watcher = self;
{
let data = watcher.get_watcher_data();
assert!(data.close_cb.is_none());
data.close_cb = Some(cb);
}
unsafe {
uvll::close(watcher.native_handle(), close_cb);
}
extern fn close_cb(handle: *uvll::uv_timer_t) {
let mut watcher: TimerWatcher = NativeHandle::from_native_handle(handle);
{
let data = watcher.get_watcher_data();
data.close_cb.take_unwrap()();
}
watcher.drop_watcher_data();
unsafe {
uvll::free_handle(handle as *c_void);
}
}
}
}
impl NativeHandle<*uvll::uv_timer_t> for TimerWatcher {

89
src/libstd/rt/uv/tty.rs Normal file
View file

@ -0,0 +1,89 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use prelude::*;
use libc;
use rt::uv;
use rt::uv::net;
use rt::uv::uvll;
/// A process wraps the handle of the underlying uv_process_t.
pub struct TTY(*uvll::uv_tty_t);
impl uv::Watcher for TTY {}
impl TTY {
#[fixed_stack_segment] #[inline(never)]
pub fn new(loop_: &uv::Loop, fd: libc::c_int, readable: bool) ->
Result<TTY, uv::UvError>
{
let handle = unsafe { uvll::malloc_handle(uvll::UV_TTY) };
assert!(handle.is_not_null());
let ret = unsafe {
uvll::uv_tty_init(loop_.native_handle(), handle, fd as libc::c_int,
readable as libc::c_int)
};
match ret {
0 => {
let mut ret: TTY = uv::NativeHandle::from_native_handle(handle);
ret.install_watcher_data();
Ok(ret)
}
n => {
unsafe { uvll::free_handle(handle); }
Err(uv::UvError(n))
}
}
}
pub fn as_stream(&self) -> net::StreamWatcher {
net::StreamWatcher(**self as *uvll::uv_stream_t)
}
#[fixed_stack_segment] #[inline(never)]
pub fn set_mode(&self, raw: bool) -> Result<(), uv::UvError> {
let raw = raw as libc::c_int;
match unsafe { uvll::uv_tty_set_mode(self.native_handle(), raw) } {
0 => Ok(()),
n => Err(uv::UvError(n))
}
}
#[fixed_stack_segment] #[inline(never)]
pub fn reset_mode(&self) {
unsafe { uvll::uv_tty_reset_mode(self.native_handle()) }
}
#[fixed_stack_segment] #[inline(never)] #[allow(unused_mut)]
pub fn get_winsize(&self) -> Result<(int, int), uv::UvError> {
let mut width: libc::c_int = 0;
let mut height: libc::c_int = 0;
let widthptr: *libc::c_int = &width;
let heightptr: *libc::c_int = &width;
match unsafe { uvll::uv_tty_get_winsize(self.native_handle(),
widthptr, heightptr) } {
0 => Ok((width as int, height as int)),
n => Err(uv::UvError(n))
}
}
}
impl uv::NativeHandle<*uvll::uv_tty_t> for TTY {
fn from_native_handle(handle: *uvll::uv_tty_t) -> TTY {
TTY(handle)
}
fn native_handle(&self) -> *uvll::uv_tty_t {
match self { &TTY(ptr) => ptr }
}
}

View file

@ -869,6 +869,18 @@ impl IoFactory for UvIoFactory {
}
return ret;
}
fn tty_open(&mut self, fd: c_int, readable: bool, close_on_drop: bool)
-> Result<~RtioTTYObject, IoError> {
match tty::TTY::new(self.uv_loop(), fd, readable) {
Ok(tty) => Ok(~UvTTY {
home: get_handle_to_current_scheduler!(),
tty: tty,
close_on_drop: close_on_drop,
}),
Err(e) => Err(uv_error_to_io_error(e))
}
}
}
pub struct UvTcpListener {
@ -1734,6 +1746,34 @@ impl RtioUnixListener for UvUnixListener {
}
}
pub struct UvTTY {
tty: tty::TTY,
home: SchedHandle,
close_on_drop: bool,
}
impl HomingIO for UvTTY {
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
}
impl Drop for UvTTY {
fn drop(&mut self) {
if self.close_on_drop {
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
let task = Cell::new(task);
do self.tty.close {
let scheduler: ~Scheduler = Local::take();
scheduler.resume_blocked_task_immediately(task.take());
}
}
} else {
self.tty.drop_watcher_data();
unsafe { uvll::free_handle(self.tty.native_handle()) }
}
}
}
pub struct UvUnixAcceptor {
listener: UvUnixListener,
incoming: Tube<Result<~RtioPipeObject, IoError>>,
@ -1769,6 +1809,40 @@ impl RtioUnixAcceptor for UvUnixAcceptor {
}
}
impl RtioTTY for UvTTY {
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
read_stream(self_.tty.as_stream(), scheduler, buf)
}
}
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
do self.home_for_io_with_sched |self_, scheduler| {
write_stream(self_.tty.as_stream(), scheduler, buf)
}
}
fn set_raw(&mut self, raw: bool) -> Result<(), IoError> {
do self.home_for_io |self_| {
match self_.tty.set_mode(raw) {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
fn reset_mode(&mut self) {
do self.home_for_io |self_| { self_.tty.reset_mode() }
}
fn get_winsize(&mut self) -> Result<(int, int), IoError> {
do self.home_for_io |self_| {
match self_.tty.get_winsize() {
Ok(p) => Ok(p), Err(e) => Err(uv_error_to_io_error(e))
}
}
}
}
#[test]
fn test_simple_io_no_connect() {
do run_in_mt_newsched_task {

View file

@ -131,6 +131,7 @@ pub type uv_udp_send_t = c_void;
pub type uv_getaddrinfo_t = c_void;
pub type uv_process_t = c_void;
pub type uv_pipe_t = c_void;
pub type uv_tty_t = c_void;
pub struct uv_timespec_t {
tv_sec: libc::c_long,
@ -1107,6 +1108,12 @@ extern {
pub fn uv_pipe_bind(pipe: *uv_pipe_t, name: *c_char) -> c_int;
pub fn uv_pipe_connect(req: *uv_connect_t, handle: *uv_pipe_t,
name: *c_char, cb: uv_connect_cb);
pub fn uv_tty_init(loop_ptr: *uv_loop_t, tty: *uv_tty_t, fd: c_int,
readable: c_int) -> c_int;
pub fn uv_tty_set_mode(tty: *uv_tty_t, mode: c_int) -> c_int;
pub fn uv_tty_reset_mode(tty: *uv_tty_t);
pub fn uv_tty_get_winsize(tty: *uv_tty_t, width: *c_int,
height: *c_int) -> c_int;
// These should all really be constants...
#[rust_stack] pub fn rust_SOCK_STREAM() -> c_int;

View file

@ -214,3 +214,7 @@ rust_AI_V4MAPPED
uv_pipe_open
uv_pipe_bind
uv_pipe_connect
uv_tty_init
uv_tty_set_mode
uv_tty_reset_mode
uv_tty_get_winsize