Auto merge of #3815 - RalfJung:pipe, r=RalfJung
implement pipe and pipe2 Fixes https://github.com/rust-lang/miri/issues/3746
This commit is contained in:
commit
a4222b97ca
6 changed files with 253 additions and 66 deletions
|
@ -288,14 +288,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
||||||
this.write_scalar(result, dest)?;
|
this.write_scalar(result, dest)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sockets
|
// Unnamed sockets and pipes
|
||||||
"socketpair" => {
|
"socketpair" => {
|
||||||
let [domain, type_, protocol, sv] =
|
let [domain, type_, protocol, sv] =
|
||||||
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
|
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
|
||||||
|
|
||||||
let result = this.socketpair(domain, type_, protocol, sv)?;
|
let result = this.socketpair(domain, type_, protocol, sv)?;
|
||||||
this.write_scalar(result, dest)?;
|
this.write_scalar(result, dest)?;
|
||||||
}
|
}
|
||||||
|
"pipe" => {
|
||||||
|
let [pipefd] =
|
||||||
|
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
|
||||||
|
let result = this.pipe2(pipefd, /*flags*/ None)?;
|
||||||
|
this.write_scalar(result, dest)?;
|
||||||
|
}
|
||||||
|
"pipe2" => {
|
||||||
|
let [pipefd, flags] =
|
||||||
|
this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?;
|
||||||
|
let result = this.pipe2(pipefd, Some(flags))?;
|
||||||
|
this.write_scalar(result, dest)?;
|
||||||
|
}
|
||||||
|
|
||||||
// Time
|
// Time
|
||||||
"gettimeofday" => {
|
"gettimeofday" => {
|
||||||
|
|
|
@ -62,9 +62,10 @@ pub struct EpollEventInterest {
|
||||||
|
|
||||||
/// EpollReadyEvents reflects the readiness of a file description.
|
/// EpollReadyEvents reflects the readiness of a file description.
|
||||||
pub struct EpollReadyEvents {
|
pub struct EpollReadyEvents {
|
||||||
/// The associated file is available for read(2) operations.
|
/// The associated file is available for read(2) operations, in the sense that a read will not block.
|
||||||
|
/// (I.e., returning EOF is considered "ready".)
|
||||||
pub epollin: bool,
|
pub epollin: bool,
|
||||||
/// The associated file is available for write(2) operations.
|
/// The associated file is available for write(2) operations, in the sense that a write will not block.
|
||||||
pub epollout: bool,
|
pub epollout: bool,
|
||||||
/// Stream socket peer closed connection, or shut down writing
|
/// Stream socket peer closed connection, or shut down writing
|
||||||
/// half of connection.
|
/// half of connection.
|
||||||
|
|
|
@ -4,9 +4,9 @@ mod env;
|
||||||
mod fd;
|
mod fd;
|
||||||
mod fs;
|
mod fs;
|
||||||
mod mem;
|
mod mem;
|
||||||
mod socket;
|
|
||||||
mod sync;
|
mod sync;
|
||||||
mod thread;
|
mod thread;
|
||||||
|
mod unnamed_socket;
|
||||||
|
|
||||||
mod android;
|
mod android;
|
||||||
mod freebsd;
|
mod freebsd;
|
||||||
|
@ -23,9 +23,9 @@ pub use env::EvalContextExt as _;
|
||||||
pub use fd::EvalContextExt as _;
|
pub use fd::EvalContextExt as _;
|
||||||
pub use fs::EvalContextExt as _;
|
pub use fs::EvalContextExt as _;
|
||||||
pub use mem::EvalContextExt as _;
|
pub use mem::EvalContextExt as _;
|
||||||
pub use socket::EvalContextExt as _;
|
|
||||||
pub use sync::EvalContextExt as _;
|
pub use sync::EvalContextExt as _;
|
||||||
pub use thread::EvalContextExt as _;
|
pub use thread::EvalContextExt as _;
|
||||||
|
pub use unnamed_socket::EvalContextExt as _;
|
||||||
|
|
||||||
// Make up some constants.
|
// Make up some constants.
|
||||||
const UID: u32 = 1000;
|
const UID: u32 = 1000;
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
//! This implements "anonymous" sockets, that do not correspond to anything on the host system and
|
||||||
|
//! are entirely implemented inside Miri.
|
||||||
|
//! We also use the same infrastructure to implement unnamed pipes.
|
||||||
|
|
||||||
use std::cell::{OnceCell, RefCell};
|
use std::cell::{OnceCell, RefCell};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -13,12 +17,13 @@ use crate::{concurrency::VClock, *};
|
||||||
/// be configured in the real system.
|
/// be configured in the real system.
|
||||||
const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
|
const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992;
|
||||||
|
|
||||||
/// Pair of connected sockets.
|
/// One end of a pair of connected unnamed sockets.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct SocketPair {
|
struct AnonSocket {
|
||||||
/// The buffer we are reading from.
|
/// The buffer we are reading from, or `None` if this is the writing end of a pipe.
|
||||||
readbuf: RefCell<Buffer>,
|
/// (In that case, the peer FD will be the reading end of that pipe.)
|
||||||
/// The `SocketPair` file descriptor that is our "peer", and that holds the buffer we are
|
readbuf: Option<RefCell<Buffer>>,
|
||||||
|
/// The `AnonSocket` file descriptor that is our "peer", and that holds the buffer we are
|
||||||
/// writing to. This is a weak reference because the other side may be closed before us; all
|
/// writing to. This is a weak reference because the other side may be closed before us; all
|
||||||
/// future writes will then trigger EPIPE.
|
/// future writes will then trigger EPIPE.
|
||||||
peer_fd: OnceCell<WeakFileDescriptionRef>,
|
peer_fd: OnceCell<WeakFileDescriptionRef>,
|
||||||
|
@ -37,13 +42,13 @@ impl Buffer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SocketPair {
|
impl AnonSocket {
|
||||||
fn peer_fd(&self) -> &WeakFileDescriptionRef {
|
fn peer_fd(&self) -> &WeakFileDescriptionRef {
|
||||||
self.peer_fd.get().unwrap()
|
self.peer_fd.get().unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FileDescription for SocketPair {
|
impl FileDescription for AnonSocket {
|
||||||
fn name(&self) -> &'static str {
|
fn name(&self) -> &'static str {
|
||||||
"socketpair"
|
"socketpair"
|
||||||
}
|
}
|
||||||
|
@ -55,19 +60,27 @@ impl FileDescription for SocketPair {
|
||||||
let mut epoll_ready_events = EpollReadyEvents::new();
|
let mut epoll_ready_events = EpollReadyEvents::new();
|
||||||
|
|
||||||
// Check if it is readable.
|
// Check if it is readable.
|
||||||
let readbuf = self.readbuf.borrow();
|
if let Some(readbuf) = &self.readbuf {
|
||||||
if !readbuf.buf.is_empty() {
|
if !readbuf.borrow().buf.is_empty() {
|
||||||
|
epoll_ready_events.epollin = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Without a read buffer, reading never blocks, so we are always ready.
|
||||||
epoll_ready_events.epollin = true;
|
epoll_ready_events.epollin = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if is writable.
|
// Check if is writable.
|
||||||
if let Some(peer_fd) = self.peer_fd().upgrade() {
|
if let Some(peer_fd) = self.peer_fd().upgrade() {
|
||||||
let writebuf = &peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow();
|
if let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf {
|
||||||
let data_size = writebuf.buf.len();
|
let data_size = writebuf.borrow().buf.len();
|
||||||
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
|
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
|
||||||
if available_space != 0 {
|
if available_space != 0 {
|
||||||
epoll_ready_events.epollout = true;
|
epoll_ready_events.epollout = true;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Without a write buffer, writing never blocks.
|
||||||
|
epoll_ready_events.epollout = true;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Peer FD has been closed. This always sets both the RDHUP and HUP flags
|
// Peer FD has been closed. This always sets both the RDHUP and HUP flags
|
||||||
// as we do not support `shutdown` that could be used to partially close the stream.
|
// as we do not support `shutdown` that could be used to partially close the stream.
|
||||||
|
@ -108,7 +121,12 @@ impl FileDescription for SocketPair {
|
||||||
return Ok(Ok(0));
|
return Ok(Ok(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut readbuf = self.readbuf.borrow_mut();
|
let Some(readbuf) = &self.readbuf else {
|
||||||
|
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||||
|
// corresponding ErrorKind variant.
|
||||||
|
throw_unsup_format!("reading from the write end of a pipe");
|
||||||
|
};
|
||||||
|
let mut readbuf = readbuf.borrow_mut();
|
||||||
if readbuf.buf.is_empty() {
|
if readbuf.buf.is_empty() {
|
||||||
if self.peer_fd().upgrade().is_none() {
|
if self.peer_fd().upgrade().is_none() {
|
||||||
// Socketpair with no peer and empty buffer.
|
// Socketpair with no peer and empty buffer.
|
||||||
|
@ -176,7 +194,13 @@ impl FileDescription for SocketPair {
|
||||||
// closed.
|
// closed.
|
||||||
return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
|
return Ok(Err(Error::from(ErrorKind::BrokenPipe)));
|
||||||
};
|
};
|
||||||
let mut writebuf = peer_fd.downcast::<SocketPair>().unwrap().readbuf.borrow_mut();
|
|
||||||
|
let Some(writebuf) = &peer_fd.downcast::<AnonSocket>().unwrap().readbuf else {
|
||||||
|
// FIXME: This should return EBADF, but there's no nice way to do that as there's no
|
||||||
|
// corresponding ErrorKind variant.
|
||||||
|
throw_unsup_format!("writing to the reading end of a pipe");
|
||||||
|
};
|
||||||
|
let mut writebuf = writebuf.borrow_mut();
|
||||||
let data_size = writebuf.buf.len();
|
let data_size = writebuf.buf.len();
|
||||||
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
|
let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size);
|
||||||
if available_space == 0 {
|
if available_space == 0 {
|
||||||
|
@ -227,12 +251,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
||||||
|
|
||||||
let mut is_sock_nonblock = false;
|
let mut is_sock_nonblock = false;
|
||||||
|
|
||||||
// Parse and remove the type flags that we support. If type != 0 after removing,
|
// Parse and remove the type flags that we support.
|
||||||
// unsupported flags are used.
|
|
||||||
if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") {
|
|
||||||
type_ &= !(this.eval_libc_i32("SOCK_STREAM"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// SOCK_NONBLOCK only exists on Linux.
|
// SOCK_NONBLOCK only exists on Linux.
|
||||||
if this.tcx.sess.target.os == "linux" {
|
if this.tcx.sess.target.os == "linux" {
|
||||||
if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
|
if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") {
|
||||||
|
@ -253,7 +272,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
||||||
and AF_LOCAL are allowed",
|
and AF_LOCAL are allowed",
|
||||||
domain
|
domain
|
||||||
);
|
);
|
||||||
} else if type_ != 0 {
|
} else if type_ != this.eval_libc_i32("SOCK_STREAM") {
|
||||||
throw_unsup_format!(
|
throw_unsup_format!(
|
||||||
"socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
|
"socketpair: type {:#x} is unsupported, only SOCK_STREAM, \
|
||||||
SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
|
SOCK_CLOEXEC and SOCK_NONBLOCK are allowed",
|
||||||
|
@ -268,20 +287,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
||||||
|
|
||||||
// Generate file descriptions.
|
// Generate file descriptions.
|
||||||
let fds = &mut this.machine.fds;
|
let fds = &mut this.machine.fds;
|
||||||
let fd0 = fds.new_ref(SocketPair {
|
let fd0 = fds.new_ref(AnonSocket {
|
||||||
readbuf: RefCell::new(Buffer::new()),
|
readbuf: Some(RefCell::new(Buffer::new())),
|
||||||
peer_fd: OnceCell::new(),
|
peer_fd: OnceCell::new(),
|
||||||
is_nonblock: is_sock_nonblock,
|
is_nonblock: is_sock_nonblock,
|
||||||
});
|
});
|
||||||
let fd1 = fds.new_ref(SocketPair {
|
let fd1 = fds.new_ref(AnonSocket {
|
||||||
readbuf: RefCell::new(Buffer::new()),
|
readbuf: Some(RefCell::new(Buffer::new())),
|
||||||
peer_fd: OnceCell::new(),
|
peer_fd: OnceCell::new(),
|
||||||
is_nonblock: is_sock_nonblock,
|
is_nonblock: is_sock_nonblock,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Make the file descriptions point to each other.
|
// Make the file descriptions point to each other.
|
||||||
fd0.downcast::<SocketPair>().unwrap().peer_fd.set(fd1.downgrade()).unwrap();
|
fd0.downcast::<AnonSocket>().unwrap().peer_fd.set(fd1.downgrade()).unwrap();
|
||||||
fd1.downcast::<SocketPair>().unwrap().peer_fd.set(fd0.downgrade()).unwrap();
|
fd1.downcast::<AnonSocket>().unwrap().peer_fd.set(fd0.downgrade()).unwrap();
|
||||||
|
|
||||||
// Insert the file description to the fd table, generating the file descriptors.
|
// Insert the file description to the fd table, generating the file descriptors.
|
||||||
let sv0 = fds.insert(fd0);
|
let sv0 = fds.insert(fd0);
|
||||||
|
@ -295,4 +314,51 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
|
||||||
|
|
||||||
Ok(Scalar::from_i32(0))
|
Ok(Scalar::from_i32(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pipe2(
|
||||||
|
&mut self,
|
||||||
|
pipefd: &OpTy<'tcx>,
|
||||||
|
flags: Option<&OpTy<'tcx>>,
|
||||||
|
) -> InterpResult<'tcx, Scalar> {
|
||||||
|
let this = self.eval_context_mut();
|
||||||
|
|
||||||
|
let pipefd = this.deref_pointer(pipefd)?;
|
||||||
|
let flags = match flags {
|
||||||
|
Some(flags) => this.read_scalar(flags)?.to_i32()?,
|
||||||
|
None => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
// As usual we ignore CLOEXEC.
|
||||||
|
let cloexec = this.eval_libc_i32("O_CLOEXEC");
|
||||||
|
if flags != 0 && flags != cloexec {
|
||||||
|
throw_unsup_format!("unsupported flags in `pipe2`");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate file descriptions.
|
||||||
|
// pipefd[0] refers to the read end of the pipe.
|
||||||
|
let fds = &mut this.machine.fds;
|
||||||
|
let fd0 = fds.new_ref(AnonSocket {
|
||||||
|
readbuf: Some(RefCell::new(Buffer::new())),
|
||||||
|
peer_fd: OnceCell::new(),
|
||||||
|
is_nonblock: false,
|
||||||
|
});
|
||||||
|
let fd1 =
|
||||||
|
fds.new_ref(AnonSocket { readbuf: None, peer_fd: OnceCell::new(), is_nonblock: false });
|
||||||
|
|
||||||
|
// Make the file descriptions point to each other.
|
||||||
|
fd0.downcast::<AnonSocket>().unwrap().peer_fd.set(fd1.downgrade()).unwrap();
|
||||||
|
fd1.downcast::<AnonSocket>().unwrap().peer_fd.set(fd0.downgrade()).unwrap();
|
||||||
|
|
||||||
|
// Insert the file description to the fd table, generating the file descriptors.
|
||||||
|
let pipefd0 = fds.insert(fd0);
|
||||||
|
let pipefd1 = fds.insert(fd1);
|
||||||
|
|
||||||
|
// Return file descriptors to the caller.
|
||||||
|
let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size);
|
||||||
|
let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size);
|
||||||
|
this.write_scalar(pipefd0, &pipefd)?;
|
||||||
|
this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?;
|
||||||
|
|
||||||
|
Ok(Scalar::from_i32(0))
|
||||||
|
}
|
||||||
}
|
}
|
99
src/tools/miri/tests/pass-dep/libc/libc-pipe.rs
Normal file
99
src/tools/miri/tests/pass-dep/libc/libc-pipe.rs
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
//@ignore-target-windows: No libc pipe on Windows
|
||||||
|
// test_race depends on a deterministic schedule.
|
||||||
|
//@compile-flags: -Zmiri-preemption-rate=0
|
||||||
|
use std::thread;
|
||||||
|
fn main() {
|
||||||
|
test_pipe();
|
||||||
|
test_pipe_threaded();
|
||||||
|
test_race();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_pipe() {
|
||||||
|
let mut fds = [-1, -1];
|
||||||
|
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||||
|
assert_eq!(res, 0);
|
||||||
|
|
||||||
|
// Read size == data available in buffer.
|
||||||
|
let data = "12345".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
let mut buf3: [u8; 5] = [0; 5];
|
||||||
|
let res = unsafe { libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
assert_eq!(buf3, "12345".as_bytes());
|
||||||
|
|
||||||
|
// Read size > data available in buffer.
|
||||||
|
let data = "123".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3) };
|
||||||
|
assert_eq!(res, 3);
|
||||||
|
let mut buf4: [u8; 5] = [0; 5];
|
||||||
|
let res = unsafe { libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t) };
|
||||||
|
assert_eq!(res, 3);
|
||||||
|
assert_eq!(&buf4[0..3], "123".as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_pipe_threaded() {
|
||||||
|
let mut fds = [-1, -1];
|
||||||
|
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||||
|
assert_eq!(res, 0);
|
||||||
|
|
||||||
|
let thread1 = thread::spawn(move || {
|
||||||
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
|
let res: i64 = unsafe {
|
||||||
|
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
|
||||||
|
.try_into()
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
assert_eq!(buf, "abcde".as_bytes());
|
||||||
|
});
|
||||||
|
// FIXME: we should yield here once blocking is implemented.
|
||||||
|
//thread::yield_now();
|
||||||
|
let data = "abcde".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
thread1.join().unwrap();
|
||||||
|
|
||||||
|
// Read and write from different direction
|
||||||
|
let thread2 = thread::spawn(move || {
|
||||||
|
// FIXME: we should yield here once blocking is implemented.
|
||||||
|
//thread::yield_now();
|
||||||
|
let data = "12345".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
});
|
||||||
|
// FIXME: we should not yield here once blocking is implemented.
|
||||||
|
thread::yield_now();
|
||||||
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
|
let res = unsafe { libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
|
assert_eq!(buf, "12345".as_bytes());
|
||||||
|
thread2.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_race() {
|
||||||
|
static mut VAL: u8 = 0;
|
||||||
|
let mut fds = [-1, -1];
|
||||||
|
let res = unsafe { libc::pipe(fds.as_mut_ptr()) };
|
||||||
|
assert_eq!(res, 0);
|
||||||
|
let thread1 = thread::spawn(move || {
|
||||||
|
let mut buf: [u8; 1] = [0; 1];
|
||||||
|
// write() from the main thread will occur before the read() here
|
||||||
|
// because preemption is disabled and the main thread yields after write().
|
||||||
|
let res: i32 = unsafe {
|
||||||
|
libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t)
|
||||||
|
.try_into()
|
||||||
|
.unwrap()
|
||||||
|
};
|
||||||
|
assert_eq!(res, 1);
|
||||||
|
assert_eq!(buf, "a".as_bytes());
|
||||||
|
// The read above establishes a happens-before so it is now safe to access this global variable.
|
||||||
|
unsafe { assert_eq!(VAL, 1) };
|
||||||
|
});
|
||||||
|
unsafe { VAL = 1 };
|
||||||
|
let data = "a".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1) };
|
||||||
|
assert_eq!(res, 1);
|
||||||
|
thread::yield_now();
|
||||||
|
thread1.join().unwrap();
|
||||||
|
}
|
|
@ -10,65 +10,68 @@ fn main() {
|
||||||
|
|
||||||
fn test_socketpair() {
|
fn test_socketpair() {
|
||||||
let mut fds = [-1, -1];
|
let mut fds = [-1, -1];
|
||||||
let mut res =
|
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
||||||
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
|
|
||||||
// Read size == data available in buffer.
|
// Read size == data available in buffer.
|
||||||
let data = "abcde".as_bytes().as_ptr();
|
let data = "abcde".as_bytes().as_ptr();
|
||||||
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
|
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5) };
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
let mut buf: [u8; 5] = [0; 5];
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
res = unsafe {
|
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
|
||||||
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
|
|
||||||
};
|
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
assert_eq!(buf, "abcde".as_bytes());
|
assert_eq!(buf, "abcde".as_bytes());
|
||||||
|
|
||||||
// Read size > data available in buffer.
|
// Read size > data available in buffer.
|
||||||
let data = "abc".as_bytes().as_ptr();
|
let data = "abc".as_bytes().as_ptr();
|
||||||
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3).try_into().unwrap() };
|
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
|
||||||
assert_eq!(res, 3);
|
assert_eq!(res, 3);
|
||||||
let mut buf2: [u8; 5] = [0; 5];
|
let mut buf2: [u8; 5] = [0; 5];
|
||||||
res = unsafe {
|
let res = unsafe { libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t) };
|
||||||
libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t).try_into().unwrap()
|
|
||||||
};
|
|
||||||
assert_eq!(res, 3);
|
assert_eq!(res, 3);
|
||||||
assert_eq!(&buf2[0..3], "abc".as_bytes());
|
assert_eq!(&buf2[0..3], "abc".as_bytes());
|
||||||
|
|
||||||
// Test read and write from another direction.
|
// Test read and write from another direction.
|
||||||
// Read size == data available in buffer.
|
// Read size == data available in buffer.
|
||||||
let data = "12345".as_bytes().as_ptr();
|
let data = "12345".as_bytes().as_ptr();
|
||||||
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() };
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
let mut buf3: [u8; 5] = [0; 5];
|
let mut buf3: [u8; 5] = [0; 5];
|
||||||
res = unsafe {
|
let res = unsafe { libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t) };
|
||||||
libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap()
|
|
||||||
};
|
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
assert_eq!(buf3, "12345".as_bytes());
|
assert_eq!(buf3, "12345".as_bytes());
|
||||||
|
|
||||||
// Read size > data available in buffer.
|
// Read size > data available in buffer.
|
||||||
let data = "123".as_bytes().as_ptr();
|
let data = "123".as_bytes().as_ptr();
|
||||||
res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() };
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3) };
|
||||||
assert_eq!(res, 3);
|
assert_eq!(res, 3);
|
||||||
let mut buf4: [u8; 5] = [0; 5];
|
let mut buf4: [u8; 5] = [0; 5];
|
||||||
res = unsafe {
|
let res = unsafe { libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t) };
|
||||||
libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap()
|
|
||||||
};
|
|
||||||
assert_eq!(res, 3);
|
assert_eq!(res, 3);
|
||||||
assert_eq!(&buf4[0..3], "123".as_bytes());
|
assert_eq!(&buf4[0..3], "123".as_bytes());
|
||||||
|
|
||||||
|
// Test when happens when we close one end, with some data in the buffer.
|
||||||
|
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) };
|
||||||
|
assert_eq!(res, 3);
|
||||||
|
unsafe { libc::close(fds[0]) };
|
||||||
|
// Reading the other end should return that data, then EOF.
|
||||||
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
|
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
|
||||||
|
assert_eq!(res, 3);
|
||||||
|
assert_eq!(&buf[0..3], "123".as_bytes());
|
||||||
|
let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
|
||||||
|
assert_eq!(res, 0); // 0-sized read: EOF.
|
||||||
|
// Writing the other end should emit EPIPE.
|
||||||
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1) };
|
||||||
|
assert_eq!(res, -1);
|
||||||
|
assert_eq!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPIPE));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_socketpair_threaded() {
|
fn test_socketpair_threaded() {
|
||||||
let mut fds = [-1, -1];
|
let mut fds = [-1, -1];
|
||||||
let mut res =
|
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
||||||
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
|
|
||||||
let data = "abcde".as_bytes().as_ptr();
|
|
||||||
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
|
|
||||||
assert_eq!(res, 5);
|
|
||||||
let thread1 = thread::spawn(move || {
|
let thread1 = thread::spawn(move || {
|
||||||
let mut buf: [u8; 5] = [0; 5];
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
let res: i64 = unsafe {
|
let res: i64 = unsafe {
|
||||||
|
@ -79,28 +82,34 @@ fn test_socketpair_threaded() {
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
assert_eq!(buf, "abcde".as_bytes());
|
assert_eq!(buf, "abcde".as_bytes());
|
||||||
});
|
});
|
||||||
|
// FIXME: we should yield here once blocking is implemented.
|
||||||
|
//thread::yield_now();
|
||||||
|
let data = "abcde".as_bytes().as_ptr();
|
||||||
|
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5) };
|
||||||
|
assert_eq!(res, 5);
|
||||||
thread1.join().unwrap();
|
thread1.join().unwrap();
|
||||||
|
|
||||||
// Read and write from different direction
|
// Read and write from different direction
|
||||||
let thread2 = thread::spawn(move || {
|
let thread2 = thread::spawn(move || {
|
||||||
|
// FIXME: we should yield here once blocking is implemented.
|
||||||
|
//thread::yield_now();
|
||||||
let data = "12345".as_bytes().as_ptr();
|
let data = "12345".as_bytes().as_ptr();
|
||||||
let res: i64 =
|
let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) };
|
||||||
unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() };
|
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
});
|
});
|
||||||
thread2.join().unwrap();
|
// FIXME: we should not yield here once blocking is implemented.
|
||||||
|
thread::yield_now();
|
||||||
let mut buf: [u8; 5] = [0; 5];
|
let mut buf: [u8; 5] = [0; 5];
|
||||||
res = unsafe {
|
let res = unsafe { libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) };
|
||||||
libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap()
|
|
||||||
};
|
|
||||||
assert_eq!(res, 5);
|
assert_eq!(res, 5);
|
||||||
assert_eq!(buf, "12345".as_bytes());
|
assert_eq!(buf, "12345".as_bytes());
|
||||||
|
thread2.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn test_race() {
|
fn test_race() {
|
||||||
static mut VAL: u8 = 0;
|
static mut VAL: u8 = 0;
|
||||||
let mut fds = [-1, -1];
|
let mut fds = [-1, -1];
|
||||||
let mut res =
|
let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
||||||
unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) };
|
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
let thread1 = thread::spawn(move || {
|
let thread1 = thread::spawn(move || {
|
||||||
let mut buf: [u8; 1] = [0; 1];
|
let mut buf: [u8; 1] = [0; 1];
|
||||||
|
@ -113,11 +122,12 @@ fn test_race() {
|
||||||
};
|
};
|
||||||
assert_eq!(res, 1);
|
assert_eq!(res, 1);
|
||||||
assert_eq!(buf, "a".as_bytes());
|
assert_eq!(buf, "a".as_bytes());
|
||||||
|
// The read above establishes a happens-before so it is now safe to access this global variable.
|
||||||
unsafe { assert_eq!(VAL, 1) };
|
unsafe { assert_eq!(VAL, 1) };
|
||||||
});
|
});
|
||||||
unsafe { VAL = 1 };
|
unsafe { VAL = 1 };
|
||||||
let data = "a".as_bytes().as_ptr();
|
let data = "a".as_bytes().as_ptr();
|
||||||
res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1).try_into().unwrap() };
|
let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1) };
|
||||||
assert_eq!(res, 1);
|
assert_eq!(res, 1);
|
||||||
thread::yield_now();
|
thread::yield_now();
|
||||||
thread1.join().unwrap();
|
thread1.join().unwrap();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue