Rollup merge of #95467 - ChrisDenton:async-read-pipe, r=joshtriplett
Windows: Synchronize asynchronous pipe reads and writes On Windows, the pipes used for spawned processes are opened for asynchronous access but `read` and `write` are done using the standard methods that assume synchronous access. This means that the buffer (and variables on the stack) may be read/written to after the function returns. This PR ensures reads/writes complete before returning. Note that this only applies to pipes we create and does not affect the standard file read/write methods. Fixes #95411
This commit is contained in:
commit
4cbc003577
2 changed files with 140 additions and 2 deletions
|
@ -326,6 +326,12 @@ impl Default for IO_STATUS_BLOCK {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub type LPOVERLAPPED_COMPLETION_ROUTINE = unsafe extern "system" fn(
|
||||||
|
dwErrorCode: DWORD,
|
||||||
|
dwNumberOfBytesTransfered: DWORD,
|
||||||
|
lpOverlapped: *mut OVERLAPPED,
|
||||||
|
);
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[cfg(not(target_pointer_width = "64"))]
|
#[cfg(not(target_pointer_width = "64"))]
|
||||||
pub struct WSADATA {
|
pub struct WSADATA {
|
||||||
|
@ -891,6 +897,7 @@ extern "system" {
|
||||||
pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
|
pub fn WaitForSingleObject(hHandle: HANDLE, dwMilliseconds: DWORD) -> DWORD;
|
||||||
pub fn SwitchToThread() -> BOOL;
|
pub fn SwitchToThread() -> BOOL;
|
||||||
pub fn Sleep(dwMilliseconds: DWORD);
|
pub fn Sleep(dwMilliseconds: DWORD);
|
||||||
|
pub fn SleepEx(dwMilliseconds: DWORD, bAlertable: BOOL) -> DWORD;
|
||||||
pub fn GetProcessId(handle: HANDLE) -> DWORD;
|
pub fn GetProcessId(handle: HANDLE) -> DWORD;
|
||||||
pub fn CopyFileExW(
|
pub fn CopyFileExW(
|
||||||
lpExistingFileName: LPCWSTR,
|
lpExistingFileName: LPCWSTR,
|
||||||
|
@ -957,6 +964,13 @@ extern "system" {
|
||||||
lpNumberOfBytesRead: LPDWORD,
|
lpNumberOfBytesRead: LPDWORD,
|
||||||
lpOverlapped: LPOVERLAPPED,
|
lpOverlapped: LPOVERLAPPED,
|
||||||
) -> BOOL;
|
) -> BOOL;
|
||||||
|
pub fn ReadFileEx(
|
||||||
|
hFile: BorrowedHandle<'_>,
|
||||||
|
lpBuffer: LPVOID,
|
||||||
|
nNumberOfBytesToRead: DWORD,
|
||||||
|
lpOverlapped: LPOVERLAPPED,
|
||||||
|
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||||
|
) -> BOOL;
|
||||||
pub fn WriteFile(
|
pub fn WriteFile(
|
||||||
hFile: BorrowedHandle<'_>,
|
hFile: BorrowedHandle<'_>,
|
||||||
lpBuffer: LPVOID,
|
lpBuffer: LPVOID,
|
||||||
|
@ -964,6 +978,13 @@ extern "system" {
|
||||||
lpNumberOfBytesWritten: LPDWORD,
|
lpNumberOfBytesWritten: LPDWORD,
|
||||||
lpOverlapped: LPOVERLAPPED,
|
lpOverlapped: LPOVERLAPPED,
|
||||||
) -> BOOL;
|
) -> BOOL;
|
||||||
|
pub fn WriteFileEx(
|
||||||
|
hFile: BorrowedHandle<'_>,
|
||||||
|
lpBuffer: LPVOID,
|
||||||
|
nNumberOfBytesToWrite: DWORD,
|
||||||
|
lpOverlapped: LPOVERLAPPED,
|
||||||
|
lpCompletionRoutine: LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||||
|
) -> BOOL;
|
||||||
pub fn CloseHandle(hObject: HANDLE) -> BOOL;
|
pub fn CloseHandle(hObject: HANDLE) -> BOOL;
|
||||||
pub fn MoveFileExW(lpExistingFileName: LPCWSTR, lpNewFileName: LPCWSTR, dwFlags: DWORD)
|
pub fn MoveFileExW(lpExistingFileName: LPCWSTR, lpNewFileName: LPCWSTR, dwFlags: DWORD)
|
||||||
-> BOOL;
|
-> BOOL;
|
||||||
|
|
|
@ -173,6 +173,15 @@ fn random_number() -> usize {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Abstracts over `ReadFileEx` and `WriteFileEx`
|
||||||
|
type AlertableIoFn = unsafe extern "system" fn(
|
||||||
|
BorrowedHandle<'_>,
|
||||||
|
c::LPVOID,
|
||||||
|
c::DWORD,
|
||||||
|
c::LPOVERLAPPED,
|
||||||
|
c::LPOVERLAPPED_COMPLETION_ROUTINE,
|
||||||
|
) -> c::BOOL;
|
||||||
|
|
||||||
impl AnonPipe {
|
impl AnonPipe {
|
||||||
pub fn handle(&self) -> &Handle {
|
pub fn handle(&self) -> &Handle {
|
||||||
&self.inner
|
&self.inner
|
||||||
|
@ -182,7 +191,19 @@ impl AnonPipe {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
|
pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
|
||||||
self.inner.read(buf)
|
let result = unsafe {
|
||||||
|
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
|
||||||
|
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
|
||||||
|
};
|
||||||
|
|
||||||
|
match result {
|
||||||
|
// The special treatment of BrokenPipe is to deal with Windows
|
||||||
|
// pipe semantics, which yields this error when *reading* from
|
||||||
|
// a pipe after the other end has closed; we interpret that as
|
||||||
|
// EOF on the pipe.
|
||||||
|
Err(ref e) if e.kind() == io::ErrorKind::BrokenPipe => Ok(0),
|
||||||
|
_ => result,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
|
pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
|
||||||
|
@ -195,7 +216,10 @@ impl AnonPipe {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
|
pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
|
||||||
self.inner.write(buf)
|
unsafe {
|
||||||
|
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
|
||||||
|
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
|
pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
|
||||||
|
@ -206,6 +230,99 @@ impl AnonPipe {
|
||||||
pub fn is_write_vectored(&self) -> bool {
|
pub fn is_write_vectored(&self) -> bool {
|
||||||
self.inner.is_write_vectored()
|
self.inner.is_write_vectored()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Synchronizes asynchronous reads or writes using our anonymous pipe.
|
||||||
|
///
|
||||||
|
/// This is a wrapper around [`ReadFileEx`] or [`WriteFileEx`] that uses
|
||||||
|
/// [Asynchronous Procedure Call] (APC) to synchronize reads or writes.
|
||||||
|
///
|
||||||
|
/// Note: This should not be used for handles we don't create.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// `buf` must be a pointer to a buffer that's valid for reads or writes
|
||||||
|
/// up to `len` bytes. The `AlertableIoFn` must be either `ReadFileEx` or `WriteFileEx`
|
||||||
|
///
|
||||||
|
/// [`ReadFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfileex
|
||||||
|
/// [`WriteFileEx`]: https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefileex
|
||||||
|
/// [Asynchronous Procedure Call]: https://docs.microsoft.com/en-us/windows/win32/sync/asynchronous-procedure-calls
|
||||||
|
unsafe fn alertable_io_internal(
|
||||||
|
&self,
|
||||||
|
io: AlertableIoFn,
|
||||||
|
buf: c::LPVOID,
|
||||||
|
len: c::DWORD,
|
||||||
|
) -> io::Result<usize> {
|
||||||
|
// Use "alertable I/O" to synchronize the pipe I/O.
|
||||||
|
// This has four steps.
|
||||||
|
//
|
||||||
|
// STEP 1: Start the asynchronous I/O operation.
|
||||||
|
// This simply calls either `ReadFileEx` or `WriteFileEx`,
|
||||||
|
// giving it a pointer to the buffer and callback function.
|
||||||
|
//
|
||||||
|
// STEP 2: Enter an alertable state.
|
||||||
|
// The callback set in step 1 will not be called until the thread
|
||||||
|
// enters an "alertable" state. This can be done using `SleepEx`.
|
||||||
|
//
|
||||||
|
// STEP 3: The callback
|
||||||
|
// Once the I/O is complete and the thread is in an alertable state,
|
||||||
|
// the callback will be run on the same thread as the call to
|
||||||
|
// `ReadFileEx` or `WriteFileEx` done in step 1.
|
||||||
|
// In the callback we simply set the result of the async operation.
|
||||||
|
//
|
||||||
|
// STEP 4: Return the result.
|
||||||
|
// At this point we'll have a result from the callback function
|
||||||
|
// and can simply return it. Note that we must not return earlier,
|
||||||
|
// while the I/O is still in progress.
|
||||||
|
|
||||||
|
// The result that will be set from the asynchronous callback.
|
||||||
|
let mut async_result: Option<AsyncResult> = None;
|
||||||
|
struct AsyncResult {
|
||||||
|
error: u32,
|
||||||
|
transfered: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
// STEP 3: The callback.
|
||||||
|
unsafe extern "system" fn callback(
|
||||||
|
dwErrorCode: u32,
|
||||||
|
dwNumberOfBytesTransfered: u32,
|
||||||
|
lpOverlapped: *mut c::OVERLAPPED,
|
||||||
|
) {
|
||||||
|
// Set `async_result` using a pointer smuggled through `hEvent`.
|
||||||
|
let result = AsyncResult { error: dwErrorCode, transfered: dwNumberOfBytesTransfered };
|
||||||
|
*(*lpOverlapped).hEvent.cast::<Option<AsyncResult>>() = Some(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
// STEP 1: Start the I/O operation.
|
||||||
|
let mut overlapped: c::OVERLAPPED = crate::mem::zeroed();
|
||||||
|
// `hEvent` is unused by `ReadFileEx` and `WriteFileEx`.
|
||||||
|
// Therefore the documentation suggests using it to smuggle a pointer to the callback.
|
||||||
|
overlapped.hEvent = &mut async_result as *mut _ as *mut _;
|
||||||
|
|
||||||
|
// Asynchronous read of the pipe.
|
||||||
|
// If successful, `callback` will be called once it completes.
|
||||||
|
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
|
||||||
|
if result == c::FALSE {
|
||||||
|
// We can return here because the call failed.
|
||||||
|
// After this we must not return until the I/O completes.
|
||||||
|
return Err(io::Error::last_os_error());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait indefinitely for the result.
|
||||||
|
let result = loop {
|
||||||
|
// STEP 2: Enter an alertable state.
|
||||||
|
// The second parameter of `SleepEx` is used to make this sleep alertable.
|
||||||
|
c::SleepEx(c::INFINITE, c::TRUE);
|
||||||
|
if let Some(result) = async_result {
|
||||||
|
break result;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// STEP 4: Return the result.
|
||||||
|
// `async_result` is always `Some` at this point
|
||||||
|
match result.error {
|
||||||
|
c::ERROR_SUCCESS => Ok(result.transfered as usize),
|
||||||
|
error => Err(io::Error::from_raw_os_error(error as _)),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
|
pub fn read2(p1: AnonPipe, v1: &mut Vec<u8>, p2: AnonPipe, v2: &mut Vec<u8>) -> io::Result<()> {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue