1
Fork 0

rustuv: Get all tests passing again after refactor

All tests except for the homing tests are now working again with the
librustuv/libgreen refactoring. The homing-related tests are currently commented
out and now placed in the rustuv::homing module.

I plan on refactoring scheduler pool spawning in order to enable more homing
tests in a future commit.
This commit is contained in:
Alex Crichton 2013-12-13 11:30:59 -08:00
parent f5d9b2ca6d
commit afd4e2ad8d
12 changed files with 286 additions and 313 deletions

View file

@ -186,10 +186,12 @@ pub fn accum_addrinfo(addr: &Addrinfo) -> ~[ai::Info] {
mod test { mod test {
use std::io::net::ip::{SocketAddr, Ipv4Addr}; use std::io::net::ip::{SocketAddr, Ipv4Addr};
use super::super::local_loop; use super::super::local_loop;
use super::GetAddrInfoRequest;
#[test] #[test]
fn getaddrinfo_test() { fn getaddrinfo_test() {
match GetAddrInfoRequest::run(local_loop(), Some("localhost"), None, None) { let loop_ = &mut local_loop().loop_;
match GetAddrInfoRequest::run(loop_, Some("localhost"), None, None) {
Ok(infos) => { Ok(infos) => {
let mut found_local = false; let mut found_local = false;
let local_addr = &SocketAddr { let local_addr = &SocketAddr {
@ -207,9 +209,10 @@ mod test {
#[test] #[test]
fn issue_10663() { fn issue_10663() {
let loop_ = &mut local_loop().loop_;
// Something should happen here, but this certainly shouldn't cause // Something should happen here, but this certainly shouldn't cause
// everything to die. The actual outcome we don't care too much about. // everything to die. The actual outcome we don't care too much about.
GetAddrInfoRequest::run(local_loop(), Some("irc.n0v4.com"), None, GetAddrInfoRequest::run(loop_, Some("irc.n0v4.com"), None,
None); None);
} }
} }

View file

@ -127,15 +127,15 @@ impl Drop for AsyncWatcher {
mod test_remote { mod test_remote {
use std::rt::rtio::Callback; use std::rt::rtio::Callback;
use std::rt::thread::Thread; use std::rt::thread::Thread;
use std::rt::tube::Tube;
use super::AsyncWatcher;
use super::super::local_loop; use super::super::local_loop;
// Make sure that we can fire watchers in remote threads and that they // Make sure that we can fire watchers in remote threads and that they
// actually trigger what they say they will. // actually trigger what they say they will.
#[test] #[test]
fn smoke_test() { fn smoke_test() {
struct MyCallback(Option<Tube<int>>); struct MyCallback(Option<Chan<int>>);
impl Callback for MyCallback { impl Callback for MyCallback {
fn call(&mut self) { fn call(&mut self) {
// this can get called more than once, but we only want to send // this can get called more than once, but we only want to send
@ -146,16 +146,17 @@ mod test_remote {
} }
} }
let mut tube = Tube::new(); let (port, chan) = Chan::new();
let cb = ~MyCallback(Some(tube.clone())); let cb = ~MyCallback(Some(chan));
let watcher = AsyncWatcher::new(local_loop(), cb as ~Callback); let watcher = AsyncWatcher::new(&mut local_loop().loop_,
cb as ~Callback);
let thread = do Thread::start { let thread = do Thread::start {
let mut watcher = watcher; let mut watcher = watcher;
watcher.fire(); watcher.fire();
}; };
assert_eq!(tube.recv(), 1); assert_eq!(port.recv(), 1);
thread.join(); thread.join();
} }
} }

View file

@ -448,7 +448,11 @@ mod test {
use std::io; use std::io;
use std::str; use std::str;
use std::vec; use std::vec;
use l = super::super::local_loop; use super::FsRequest;
use super::super::Loop;
use super::super::local_loop;
fn l() -> &mut Loop { &mut local_loop().loop_ }
#[test] #[test]
fn file_test_full_simple_sync() { fn file_test_full_simple_sync() {
@ -459,7 +463,7 @@ mod test {
{ {
// open/create // open/create
let result = FsRequest::open(l(), &path_str.to_c_str(), let result = FsRequest::open(local_loop(), &path_str.to_c_str(),
create_flags as int, mode as int); create_flags as int, mode as int);
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); let result = result.unwrap();
@ -472,7 +476,7 @@ mod test {
{ {
// re-open // re-open
let result = FsRequest::open(l(), &path_str.to_c_str(), let result = FsRequest::open(local_loop(), &path_str.to_c_str(),
read_flags as int, 0); read_flags as int, 0);
assert!(result.is_ok()); assert!(result.is_ok());
let result = result.unwrap(); let result = result.unwrap();
@ -499,7 +503,7 @@ mod test {
let create_flags = (O_RDWR | O_CREAT) as int; let create_flags = (O_RDWR | O_CREAT) as int;
let mode = (S_IWUSR | S_IRUSR) as int; let mode = (S_IWUSR | S_IRUSR) as int;
let result = FsRequest::open(l(), path, create_flags, mode); let result = FsRequest::open(local_loop(), path, create_flags, mode);
assert!(result.is_ok()); assert!(result.is_ok());
let file = result.unwrap(); let file = result.unwrap();

View file

@ -142,3 +142,124 @@ impl Drop for HomingMissile {
self.check("task moved away from the home scheduler"); self.check("task moved away from the home scheduler");
} }
} }
#[cfg(test)]
mod test {
// On one thread, create a udp socket. Then send that socket to another
// thread and destroy the socket on the remote thread. This should make sure
// that homing kicks in for the socket to go back home to the original
// thread, close itself, and then come back to the last thread.
//#[test]
//fn test_homing_closes_correctly() {
// let (port, chan) = Chan::new();
// do task::spawn_sched(task::SingleThreaded) {
// let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
// chan.send(listener);
// }
// do task::spawn_sched(task::SingleThreaded) {
// port.recv();
// }
//}
// This is a bit of a crufty old test, but it has its uses.
//#[test]
//fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
// use std::cast;
// use std::rt::local::Local;
// use std::rt::rtio::{EventLoop, IoFactory};
// use std::rt::sched::Scheduler;
// use std::rt::sched::{Shutdown, TaskFromFriend};
// use std::rt::sleeper_list::SleeperList;
// use std::rt::task::Task;
// use std::rt::task::UnwindResult;
// use std::rt::thread::Thread;
// use std::rt::deque::BufferPool;
// use std::unstable::run_in_bare_thread;
// use uvio::UvEventLoop;
// do run_in_bare_thread {
// let sleepers = SleeperList::new();
// let mut pool = BufferPool::new();
// let (worker1, stealer1) = pool.deque();
// let (worker2, stealer2) = pool.deque();
// let queues = ~[stealer1, stealer2];
// let loop1 = ~UvEventLoop::new() as ~EventLoop;
// let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
// sleepers.clone());
// let loop2 = ~UvEventLoop::new() as ~EventLoop;
// let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
// sleepers.clone());
// let handle1 = sched1.make_handle();
// let handle2 = sched2.make_handle();
// let tasksFriendHandle = sched2.make_handle();
// let on_exit: proc(UnwindResult) = proc(exit_status) {
// let mut handle1 = handle1;
// let mut handle2 = handle2;
// handle1.send(Shutdown);
// handle2.send(Shutdown);
// assert!(exit_status.is_success());
// };
// unsafe fn local_io() -> &'static mut IoFactory {
// let mut sched = Local::borrow(None::<Scheduler>);
// let io = sched.get().event_loop.io();
// cast::transmute(io.unwrap())
// }
// let test_function: proc() = proc() {
// let io = unsafe { local_io() };
// let addr = next_test_ip4();
// let maybe_socket = io.udp_bind(addr);
// // this socket is bound to this event loop
// assert!(maybe_socket.is_ok());
// // block self on sched1
// let scheduler: ~Scheduler = Local::take();
// let mut tasksFriendHandle = Some(tasksFriendHandle);
// scheduler.deschedule_running_task_and_then(|_, task| {
// // unblock task
// task.wake().map(|task| {
// // send self to sched2
// tasksFriendHandle.take_unwrap()
// .send(TaskFromFriend(task));
// });
// // sched1 should now sleep since it has nothing else to do
// })
// // sched2 will wake up and get the task as we do nothing else,
// // the function ends and the socket goes out of scope sched2
// // will start to run the destructor the destructor will first
// // block the task, set it's home as sched1, then enqueue it
// // sched2 will dequeue the task, see that it has a home, and
// // send it to sched1 sched1 will wake up, exec the close
// // function on the correct loop, and then we're done
// };
// let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
// test_function);
// main_task.death.on_exit = Some(on_exit);
// let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
// // nothing
// };
// let main_task = main_task;
// let sched1 = sched1;
// let thread1 = do Thread::start {
// sched1.bootstrap(main_task);
// };
// let sched2 = sched2;
// let thread2 = do Thread::start {
// sched2.bootstrap(null_task);
// };
// thread1.join();
// thread2.join();
// }
//}
}

View file

@ -97,71 +97,102 @@ impl Drop for IdleWatcher {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::rt::tube::Tube; use std::cast;
use std::rt::rtio::{Callback, PausableIdleCallback}; use std::cell::RefCell;
use std::rc::Rc;
use std::rt::rtio::{Callback, PausibleIdleCallback};
use std::rt::task::{BlockedTask, Task};
use std::rt::local::Local;
use super::IdleWatcher;
use super::super::local_loop; use super::super::local_loop;
struct MyCallback(Tube<int>, int); type Chan = Rc<RefCell<(Option<BlockedTask>, uint)>>;
struct MyCallback(Rc<RefCell<(Option<BlockedTask>, uint)>>, uint);
impl Callback for MyCallback { impl Callback for MyCallback {
fn call(&mut self) { fn call(&mut self) {
match *self { let task = match *self {
MyCallback(ref mut tube, val) => tube.send(val) MyCallback(ref rc, n) => {
} let mut slot = rc.borrow().borrow_mut();
match *slot.get() {
(ref mut task, ref mut val) => {
*val = n;
task.take_unwrap()
}
}
}
};
task.wake().map(|t| t.reawaken(true));
} }
} }
fn mk(v: uint) -> (~IdleWatcher, Chan) {
let rc = Rc::from_send(RefCell::new((None, 0)));
let cb = ~MyCallback(rc.clone(), v);
let cb = cb as ~Callback:;
let cb = unsafe { cast::transmute(cb) };
(IdleWatcher::new(&mut local_loop().loop_, cb), rc)
}
fn sleep(chan: &Chan) -> uint {
let task: ~Task = Local::take();
task.deschedule(1, |task| {
let mut slot = chan.borrow().borrow_mut();
match *slot.get() {
(ref mut slot, _) => {
assert!(slot.is_none());
*slot = Some(task);
}
}
Ok(())
});
let slot = chan.borrow().borrow();
match *slot.get() { (_, n) => n }
}
#[test] #[test]
fn not_used() { fn not_used() {
let cb = ~MyCallback(Tube::new(), 1); let (_idle, _chan) = mk(1);
let _idle = IdleWatcher::new(local_loop(), cb as ~Callback);
} }
#[test] #[test]
fn smoke_test() { fn smoke_test() {
let mut tube = Tube::new(); let (mut idle, chan) = mk(1);
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
idle.resume(); idle.resume();
tube.recv(); assert_eq!(sleep(&chan), 1);
} }
#[test] #[should_fail] #[test] #[should_fail]
fn smoke_fail() { fn smoke_fail() {
let tube = Tube::new(); let (mut idle, _chan) = mk(1);
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
idle.resume(); idle.resume();
fail!(); fail!();
} }
#[test] #[test]
fn fun_combinations_of_methods() { fn fun_combinations_of_methods() {
let mut tube = Tube::new(); let (mut idle, chan) = mk(1);
let cb = ~MyCallback(tube.clone(), 1);
let mut idle = IdleWatcher::new(local_loop(), cb as ~Callback);
idle.resume(); idle.resume();
tube.recv(); assert_eq!(sleep(&chan), 1);
idle.pause(); idle.pause();
idle.resume(); idle.resume();
idle.resume(); idle.resume();
tube.recv(); assert_eq!(sleep(&chan), 1);
idle.pause(); idle.pause();
idle.pause(); idle.pause();
idle.resume(); idle.resume();
tube.recv(); assert_eq!(sleep(&chan), 1);
} }
#[test] #[test]
fn pause_pauses() { fn pause_pauses() {
let mut tube = Tube::new(); let (mut idle1, chan1) = mk(1);
let cb = ~MyCallback(tube.clone(), 1); let (mut idle2, chan2) = mk(2);
let mut idle1 = IdleWatcher::new(local_loop(), cb as ~Callback);
let cb = ~MyCallback(tube.clone(), 2);
let mut idle2 = IdleWatcher::new(local_loop(), cb as ~Callback);
idle2.resume(); idle2.resume();
assert_eq!(tube.recv(), 2); assert_eq!(sleep(&chan2), 2);
idle2.pause(); idle2.pause();
idle1.resume(); idle1.resume();
assert_eq!(tube.recv(), 1); assert_eq!(sleep(&chan1), 1);
} }
} }

View file

@ -43,6 +43,8 @@ via `close` and `delete` methods.
#[feature(macro_rules)]; #[feature(macro_rules)];
#[cfg(test)] extern mod green;
use std::cast; use std::cast;
use std::io; use std::io;
use std::io::IoError; use std::io::IoError;
@ -392,15 +394,17 @@ pub fn slice_to_uv_buf(v: &[u8]) -> Buf {
uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t } uvll::uv_buf_t { base: data, len: v.len() as uvll::uv_buf_len_t }
} }
// This function is full of lies!
#[cfg(test)] #[cfg(test)]
fn local_loop() -> &'static mut Loop { fn local_loop() -> &'static mut uvio::UvIoFactory {
unsafe { unsafe {
cast::transmute({ cast::transmute({
let mut sched = Local::borrow(None::<Scheduler>); let mut task = Local::borrow(None::<Task>);
let mut io = task.get().local_io().unwrap();
let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) = let (_vtable, uvio): (uint, &'static mut uvio::UvIoFactory) =
cast::transmute(sched.get().event_loop.io().unwrap()); cast::transmute(io.get());
uvio uvio
}.uv_loop()) })
} }
} }

View file

@ -86,21 +86,19 @@ pub fn sockaddr_to_socket_addr(addr: *sockaddr) -> SocketAddr {
} }
} }
#[cfg(test)]
#[test] #[test]
fn test_ip4_conversion() { fn test_ip4_conversion() {
use std::rt; use std::io::net::ip::{SocketAddr, Ipv4Addr};
let ip4 = rt::test::next_test_ip4(); let ip4 = SocketAddr { ip: Ipv4Addr(127, 0, 0, 1), port: 4824 };
socket_addr_as_sockaddr(ip4, |addr| { socket_addr_as_sockaddr(ip4, |addr| {
assert_eq!(ip4, sockaddr_to_socket_addr(addr)); assert_eq!(ip4, sockaddr_to_socket_addr(addr));
}) })
} }
#[cfg(test)]
#[test] #[test]
fn test_ip6_conversion() { fn test_ip6_conversion() {
use std::rt; use std::io::net::ip::{SocketAddr, Ipv6Addr};
let ip6 = rt::test::next_test_ip6(); let ip6 = SocketAddr { ip: Ipv6Addr(0, 0, 0, 0, 0, 0, 0, 1), port: 4824 };
socket_addr_as_sockaddr(ip6, |addr| { socket_addr_as_sockaddr(ip6, |addr| {
assert_eq!(ip6, sockaddr_to_socket_addr(addr)); assert_eq!(ip6, sockaddr_to_socket_addr(addr));
}) })
@ -634,16 +632,13 @@ impl Drop for UdpWatcher {
} }
} }
////////////////////////////////////////////////////////////////////////////////
/// UV request support
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor, use std::rt::rtio::{RtioTcpStream, RtioTcpListener, RtioTcpAcceptor,
RtioUdpSocket}; RtioUdpSocket};
use std::task; use std::io::test::{next_test_ip4, next_test_ip6};
use super::{UdpWatcher, TcpWatcher, TcpListener};
use super::super::local_loop; use super::super::local_loop;
#[test] #[test]
@ -834,20 +829,18 @@ mod test {
} }
} }
do spawn { port.recv();
port.recv(); let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap();
let mut stream = TcpWatcher::connect(local_loop(), addr).unwrap(); let mut buf = [0, .. 2048];
let mut buf = [0, .. 2048]; let mut total_bytes_read = 0;
let mut total_bytes_read = 0; while total_bytes_read < MAX {
while total_bytes_read < MAX { let nread = stream.read(buf).unwrap();
let nread = stream.read(buf).unwrap(); total_bytes_read += nread;
total_bytes_read += nread; for i in range(0u, nread) {
for i in range(0u, nread) { assert_eq!(buf[i], 1);
assert_eq!(buf[i], 1);
}
} }
uvdebug!("read {} bytes total", total_bytes_read);
} }
uvdebug!("read {} bytes total", total_bytes_read);
} }
#[test] #[test]
@ -913,65 +906,35 @@ mod test {
assert!(total_bytes_sent >= MAX); assert!(total_bytes_sent >= MAX);
} }
do spawn { let l = local_loop();
let l = local_loop(); let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap();
let mut client_out = UdpWatcher::bind(l, client_out_addr).unwrap(); let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap();
let mut client_in = UdpWatcher::bind(l, client_in_addr).unwrap(); let (port, chan) = (p2, c1);
let (port, chan) = (p2, c1); port.recv();
port.recv(); chan.send(());
chan.send(()); let mut total_bytes_recv = 0;
let mut total_bytes_recv = 0; let mut buf = [0, .. 2048];
let mut buf = [0, .. 2048]; while total_bytes_recv < MAX {
while total_bytes_recv < MAX { // ask for more
// ask for more assert!(client_out.sendto([1], server_in_addr).is_ok());
assert!(client_out.sendto([1], server_in_addr).is_ok()); // wait for data
// wait for data let res = client_in.recvfrom(buf);
let res = client_in.recvfrom(buf); assert!(res.is_ok());
assert!(res.is_ok()); let (nread, src) = res.unwrap();
let (nread, src) = res.unwrap(); assert_eq!(src, server_out_addr);
assert_eq!(src, server_out_addr); total_bytes_recv += nread;
total_bytes_recv += nread; for i in range(0u, nread) {
for i in range(0u, nread) { assert_eq!(buf[i], 1);
assert_eq!(buf[i], 1);
}
} }
// tell the server we're done
assert!(client_out.sendto([0], server_in_addr).is_ok());
} }
// tell the server we're done
assert!(client_out.sendto([0], server_in_addr).is_ok());
} }
#[test] #[test]
fn test_read_and_block() { fn test_read_and_block() {
let addr = next_test_ip4(); let addr = next_test_ip4();
let (port, chan) = Chan::new(); let (port, chan) = Chan::<Port<()>>::new();
do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let (port2, chan2) = Chan::new();
chan.send(port2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for i in range(0u, nread) {
let val = buf[i] as uint;
assert_eq!(val, current % 8);
current += 1;
}
reads += 1;
chan2.send(());
}
// Make sure we had multiple reads
assert!(reads > 1);
}
do spawn { do spawn {
let port2 = port.recv(); let port2 = port.recv();
@ -983,13 +946,39 @@ mod test {
stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
port2.recv(); port2.recv();
} }
let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap();
let (port2, chan2) = Chan::new();
chan.send(port2);
let mut stream = acceptor.accept().unwrap();
let mut buf = [0, .. 2048];
let expected = 32;
let mut current = 0;
let mut reads = 0;
while current < expected {
let nread = stream.read(buf).unwrap();
for i in range(0u, nread) {
let val = buf[i] as uint;
assert_eq!(val, current % 8);
current += 1;
}
reads += 1;
chan2.try_send(());
}
// Make sure we had multiple reads
assert!(reads > 1);
} }
#[test] #[test]
fn test_simple_tcp_server_and_client_on_diff_threads() { fn test_simple_tcp_server_and_client_on_diff_threads() {
let addr = next_test_ip4(); let addr = next_test_ip4();
do task::spawn_sched(task::SingleThreaded) { do spawn {
let listener = TcpListener::bind(local_loop(), addr).unwrap(); let listener = TcpListener::bind(local_loop(), addr).unwrap();
let mut acceptor = listener.listen().unwrap(); let mut acceptor = listener.listen().unwrap();
let mut stream = acceptor.accept().unwrap(); let mut stream = acceptor.accept().unwrap();
@ -1001,131 +990,11 @@ mod test {
} }
} }
do task::spawn_sched(task::SingleThreaded) { let mut stream = TcpWatcher::connect(local_loop(), addr);
let mut stream = TcpWatcher::connect(local_loop(), addr); while stream.is_err() {
while stream.is_err() { stream = TcpWatcher::connect(local_loop(), addr);
stream = TcpWatcher::connect(local_loop(), addr);
}
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
}
}
// On one thread, create a udp socket. Then send that socket to another
// thread and destroy the socket on the remote thread. This should make sure
// that homing kicks in for the socket to go back home to the original
// thread, close itself, and then come back to the last thread.
#[test]
fn test_homing_closes_correctly() {
let (port, chan) = Chan::new();
do task::spawn_sched(task::SingleThreaded) {
let listener = UdpWatcher::bind(local_loop(), next_test_ip4()).unwrap();
chan.send(listener);
}
do task::spawn_sched(task::SingleThreaded) {
port.recv();
}
}
// This is a bit of a crufty old test, but it has its uses.
#[test]
fn test_simple_homed_udp_io_bind_then_move_task_then_home_and_close() {
use std::cast;
use std::rt::local::Local;
use std::rt::rtio::{EventLoop, IoFactory};
use std::rt::sched::Scheduler;
use std::rt::sched::{Shutdown, TaskFromFriend};
use std::rt::sleeper_list::SleeperList;
use std::rt::task::Task;
use std::rt::thread::Thread;
use std::rt::deque::BufferPool;
use std::task::TaskResult;
use std::unstable::run_in_bare_thread;
use uvio::UvEventLoop;
do run_in_bare_thread {
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let (worker1, stealer1) = pool.deque();
let (worker2, stealer2) = pool.deque();
let queues = ~[stealer1, stealer2];
let loop1 = ~UvEventLoop::new() as ~EventLoop;
let mut sched1 = ~Scheduler::new(loop1, worker1, queues.clone(),
sleepers.clone());
let loop2 = ~UvEventLoop::new() as ~EventLoop;
let mut sched2 = ~Scheduler::new(loop2, worker2, queues.clone(),
sleepers.clone());
let handle1 = sched1.make_handle();
let handle2 = sched2.make_handle();
let tasksFriendHandle = sched2.make_handle();
let on_exit: proc(TaskResult) = proc(exit_status) {
let mut handle1 = handle1;
let mut handle2 = handle2;
handle1.send(Shutdown);
handle2.send(Shutdown);
assert!(exit_status.is_ok());
};
unsafe fn local_io() -> &'static mut IoFactory {
let mut sched = Local::borrow(None::<Scheduler>);
let io = sched.get().event_loop.io();
cast::transmute(io.unwrap())
}
let test_function: proc() = proc() {
let io = unsafe { local_io() };
let addr = next_test_ip4();
let maybe_socket = io.udp_bind(addr);
// this socket is bound to this event loop
assert!(maybe_socket.is_ok());
// block self on sched1
let scheduler: ~Scheduler = Local::take();
let mut tasksFriendHandle = Some(tasksFriendHandle);
scheduler.deschedule_running_task_and_then(|_, task| {
// unblock task
task.wake().map(|task| {
// send self to sched2
tasksFriendHandle.take_unwrap()
.send(TaskFromFriend(task));
});
// sched1 should now sleep since it has nothing else to do
})
// sched2 will wake up and get the task as we do nothing else,
// the function ends and the socket goes out of scope sched2
// will start to run the destructor the destructor will first
// block the task, set it's home as sched1, then enqueue it
// sched2 will dequeue the task, see that it has a home, and
// send it to sched1 sched1 will wake up, exec the close
// function on the correct loop, and then we're done
};
let mut main_task = ~Task::new_root(&mut sched1.stack_pool, None,
test_function);
main_task.death.on_exit = Some(on_exit);
let null_task = ~do Task::new_root(&mut sched2.stack_pool, None) {
// nothing
};
let main_task = main_task;
let sched1 = sched1;
let thread1 = do Thread::start {
sched1.bootstrap(main_task);
};
let sched2 = sched2;
let thread2 = do Thread::start {
sched2.bootstrap(null_task);
};
thread1.join();
thread2.join();
} }
stream.unwrap().write([0, 1, 2, 3, 4, 5, 6, 7]);
} }
#[should_fail] #[test] #[should_fail] #[test]
@ -1167,7 +1036,7 @@ mod test {
// force the handle to be created on a different scheduler, failure in // force the handle to be created on a different scheduler, failure in
// the original task will force a homing operation back to this // the original task will force a homing operation back to this
// scheduler. // scheduler.
do task::spawn_sched(task::SingleThreaded) { do spawn {
let w = UdpWatcher::bind(local_loop(), addr).unwrap(); let w = UdpWatcher::bind(local_loop(), addr).unwrap();
chan.send(w); chan.send(w);
} }
@ -1175,67 +1044,4 @@ mod test {
let _w = port.recv(); let _w = port.recv();
fail!(); fail!();
} }
#[should_fail]
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure1() {
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
chan.send(());
w.accept();
}
port.recv();
fail!();
}
#[should_fail]
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure2() {
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
chan.send(());
let mut buf = [0];
w.accept().unwrap().read(buf);
}
port.recv();
let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
fail!();
}
#[should_fail]
#[test]
#[ignore(reason = "linked failure")]
fn linked_failure3() {
let (port, chan) = Chan::new();
let addr = next_test_ip4();
do spawn {
let chan = chan;
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
chan.send(());
let mut conn = w.accept().unwrap();
chan.send(());
let buf = [0, ..65536];
conn.write(buf);
}
port.recv();
let _w = TcpWatcher::connect(local_loop(), addr).unwrap();
port.recv();
fail!();
}
} }

View file

@ -235,8 +235,9 @@ impl HomingIO for PipeAcceptor {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe}; use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::rt::test::next_test_unix; use std::io::test::next_test_unix;
use super::{PipeWatcher, PipeListener};
use super::super::local_loop; use super::super::local_loop;
#[test] #[test]

View file

@ -52,7 +52,7 @@ impl SignalWatcher {
extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) { extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) }; let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
assert_eq!(signum as int, s.signal as int); assert_eq!(signum as int, s.signal as int);
s.channel.send_deferred(s.signal); s.channel.try_send_deferred(s.signal);
} }
impl HomingIO for SignalWatcher { impl HomingIO for SignalWatcher {
@ -76,6 +76,7 @@ impl Drop for SignalWatcher {
mod test { mod test {
use super::super::local_loop; use super::super::local_loop;
use std::io::signal; use std::io::signal;
use super::SignalWatcher;
#[test] #[test]
fn closing_channel_during_drop_doesnt_kill_everything() { fn closing_channel_during_drop_doesnt_kill_everything() {

View file

@ -179,6 +179,7 @@ impl Drop for TimerWatcher {
mod test { mod test {
use std::rt::rtio::RtioTimer; use std::rt::rtio::RtioTimer;
use super::super::local_loop; use super::super::local_loop;
use super::TimerWatcher;
#[test] #[test]
fn oneshot() { fn oneshot() {

View file

@ -102,6 +102,7 @@ pub extern "C" fn new_loop() -> ~rtio::EventLoop {
#[test] #[test]
fn test_callback_run_once() { fn test_callback_run_once() {
use std::rt::rtio::EventLoop;
do run_in_bare_thread { do run_in_bare_thread {
let mut event_loop = UvEventLoop::new(); let mut event_loop = UvEventLoop::new();
let mut count = 0; let mut count = 0;

View file

@ -313,9 +313,8 @@ pub use self::net::udp::UdpStream;
pub use self::pipe::PipeStream; pub use self::pipe::PipeStream;
pub use self::process::Process; pub use self::process::Process;
/// Testing helpers /// Various utility functions useful for writing I/O tests
#[cfg(test)] pub mod test;
mod test;
/// Synchronous, non-blocking filesystem operations. /// Synchronous, non-blocking filesystem operations.
pub mod fs; pub mod fs;