Add core::stream::Stream
This patch adds the `core::stream` submodule and implements `core::stream::Stream` in accordance with RFC2996. Add feedback from @camelid
This commit is contained in:
parent
bbc01bb624
commit
0c8db16a67
8 changed files with 347 additions and 0 deletions
|
@ -149,6 +149,7 @@ use core::ops::{
|
|||
};
|
||||
use core::pin::Pin;
|
||||
use core::ptr::{self, Unique};
|
||||
use core::stream::Stream;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use crate::alloc::{handle_alloc_error, AllocError, Allocator, Global, Layout, WriteCloneIntoRaw};
|
||||
|
@ -1618,3 +1619,16 @@ where
|
|||
F::poll(Pin::new(&mut *self), cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut **self).poll_next(cx)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(**self).size_hint()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -82,6 +82,7 @@
|
|||
#![feature(array_windows)]
|
||||
#![feature(allow_internal_unstable)]
|
||||
#![feature(arbitrary_self_types)]
|
||||
#![feature(async_stream)]
|
||||
#![feature(box_patterns)]
|
||||
#![feature(box_syntax)]
|
||||
#![feature(cfg_sanitize)]
|
||||
|
|
|
@ -254,6 +254,8 @@ pub mod panicking;
|
|||
pub mod pin;
|
||||
pub mod raw;
|
||||
pub mod result;
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
pub mod stream;
|
||||
pub mod sync;
|
||||
|
||||
pub mod fmt;
|
||||
|
|
154
library/core/src/stream/mod.rs
Normal file
154
library/core/src/stream/mod.rs
Normal file
|
@ -0,0 +1,154 @@
|
|||
//! Composable asynchronous iteration.
|
||||
//!
|
||||
//! If futures are asynchronous values, then streams are asynchronous
|
||||
//! iterators. If you've found yourself with an asynchronous collection of some kind,
|
||||
//! and needed to perform an operation on the elements of said collection,
|
||||
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
|
||||
//! asynchronous Rust code, so it's worth becoming familiar with them.
|
||||
//!
|
||||
//! Before explaining more, let's talk about how this module is structured:
|
||||
//!
|
||||
//! # Organization
|
||||
//!
|
||||
//! This module is largely organized by type:
|
||||
//!
|
||||
//! * [Traits] are the core portion: these traits define what kind of streams
|
||||
//! exist and what you can do with them. The methods of these traits are worth
|
||||
//! putting some extra study time into.
|
||||
//! * Functions provide some helpful ways to create some basic streams.
|
||||
//! * [Structs] are often the return types of the various methods on this
|
||||
//! module's traits. You'll usually want to look at the method that creates
|
||||
//! the `struct`, rather than the `struct` itself. For more detail about why,
|
||||
//! see '[Implementing Stream](#implementing-stream)'.
|
||||
//!
|
||||
//! [Traits]: #traits
|
||||
//! [Structs]: #structs
|
||||
//!
|
||||
//! That's it! Let's dig into streams.
|
||||
//!
|
||||
//! # Stream
|
||||
//!
|
||||
//! The heart and soul of this module is the [`Stream`] trait. The core of
|
||||
//! [`Stream`] looks like this:
|
||||
//!
|
||||
//! ```
|
||||
//! # use core::task::{Context, Poll};
|
||||
//! # use core::pin::Pin;
|
||||
//! trait Stream {
|
||||
//! type Item;
|
||||
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! Unlike `Iterator`, `Stream` makes a distinction between the [`poll_next`]
|
||||
//! method which is used when implementing a `Stream`, and the [`next`] method
|
||||
//! which is used when consuming a stream. Consumers of `Stream` only need to
|
||||
//! consider [`next`], which when called, returns a future which yields
|
||||
//! yields [`Option`][`<Item>`].
|
||||
//!
|
||||
//! The future returned by [`next`] will yield `Some(Item)` as long as there are
|
||||
//! elements, and once they've all been exhausted, will yield `None` to indicate
|
||||
//! that iteration is finished. If we're waiting on something asynchronous to
|
||||
//! resolve, the future will wait until the stream is ready to yield again.
|
||||
//!
|
||||
//! Individual streams may choose to resume iteration, and so calling [`next`]
|
||||
//! again may or may not eventually yield `Some(Item)` again at some point.
|
||||
//!
|
||||
//! [`Stream`]'s full definition includes a number of other methods as well,
|
||||
//! but they are default methods, built on top of [`poll_next`], and so you get
|
||||
//! them for free.
|
||||
//!
|
||||
//! [`Poll`]: super::task::Poll
|
||||
//! [`poll_next`]: Stream::poll_next
|
||||
//! [`next`]: Stream::next
|
||||
//! [`<Item>`]: Stream::Item
|
||||
//!
|
||||
//! # Implementing Stream
|
||||
//!
|
||||
//! Creating a stream of your own involves two steps: creating a `struct` to
|
||||
//! hold the stream's state, and then implementing [`Stream`] for that
|
||||
//! `struct`.
|
||||
//!
|
||||
//! Let's make a stream named `Counter` which counts from `1` to `5`:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! #![feature(async_stream)]
|
||||
//! # use core::stream::Stream;
|
||||
//! # use core::task::{Context, Poll};
|
||||
//! # use core::pin::Pin;
|
||||
//!
|
||||
//! // First, the struct:
|
||||
//!
|
||||
//! /// A stream which counts from one to five
|
||||
//! struct Counter {
|
||||
//! count: usize,
|
||||
//! }
|
||||
//!
|
||||
//! // we want our count to start at one, so let's add a new() method to help.
|
||||
//! // This isn't strictly necessary, but is convenient. Note that we start
|
||||
//! // `count` at zero, we'll see why in `poll_next()`'s implementation below.
|
||||
//! impl Counter {
|
||||
//! fn new() -> Counter {
|
||||
//! Counter { count: 0 }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! // Then, we implement `Stream` for our `Counter`:
|
||||
//!
|
||||
//! impl Stream for Counter {
|
||||
//! // we will be counting with usize
|
||||
//! type Item = usize;
|
||||
//!
|
||||
//! // poll_next() is the only required method
|
||||
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
//! // Increment our count. This is why we started at zero.
|
||||
//! self.count += 1;
|
||||
//!
|
||||
//! // Check to see if we've finished counting or not.
|
||||
//! if self.count < 6 {
|
||||
//! Poll::Ready(Some(self.count))
|
||||
//! } else {
|
||||
//! Poll::Ready(None)
|
||||
//! }
|
||||
//! }
|
||||
//! }
|
||||
//!
|
||||
//! // And now we can use it!
|
||||
//! # async fn run() {
|
||||
//! #
|
||||
//! let mut counter = Counter::new();
|
||||
//!
|
||||
//! let x = counter.next().await.unwrap();
|
||||
//! println!("{}", x);
|
||||
//!
|
||||
//! let x = counter.next().await.unwrap();
|
||||
//! println!("{}", x);
|
||||
//!
|
||||
//! let x = counter.next().await.unwrap();
|
||||
//! println!("{}", x);
|
||||
//!
|
||||
//! let x = counter.next().await.unwrap();
|
||||
//! println!("{}", x);
|
||||
//!
|
||||
//! let x = counter.next().await.unwrap();
|
||||
//! println!("{}", x);
|
||||
//! #
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! This will print `1` through `5`, each on their own line.
|
||||
//!
|
||||
//! # Laziness
|
||||
//!
|
||||
//! Streams are *lazy*. This means that just creating a stream doesn't _do_ a
|
||||
//! whole lot. Nothing really happens until you call [`next`]. This is sometimes a
|
||||
//! source of confusion when creating a stream solely for its side effects. The
|
||||
//! compiler will warn us about this kind of behavior:
|
||||
//!
|
||||
//! ```text
|
||||
//! warning: unused result that must be used: streams do nothing unless polled
|
||||
//! ```
|
||||
|
||||
mod stream;
|
||||
|
||||
pub use stream::{Next, Stream};
|
129
library/core/src/stream/stream/mod.rs
Normal file
129
library/core/src/stream/stream/mod.rs
Normal file
|
@ -0,0 +1,129 @@
|
|||
mod next;
|
||||
|
||||
pub use next::Next;
|
||||
|
||||
use crate::ops::DerefMut;
|
||||
use crate::pin::Pin;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// An interface for dealing with asynchronous iterators.
|
||||
///
|
||||
/// This is the main stream trait. For more about the concept of streams
|
||||
/// generally, please see the [module-level documentation]. In particular, you
|
||||
/// may want to know how to [implement `Stream`][impl].
|
||||
///
|
||||
/// [module-level documentation]: index.html
|
||||
/// [impl]: index.html#implementing-stream
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
#[must_use = "streams do nothing unless polled"]
|
||||
pub trait Stream {
|
||||
/// The type of items yielded by the stream.
|
||||
type Item;
|
||||
|
||||
/// Attempt to pull out the next value of this stream, registering the
|
||||
/// current task for wakeup if the value is not yet available, and returning
|
||||
/// `None` if the stream is exhausted.
|
||||
///
|
||||
/// # Return value
|
||||
///
|
||||
/// There are several possible return values, each indicating a distinct
|
||||
/// stream state:
|
||||
///
|
||||
/// - `Poll::Pending` means that this stream's next value is not ready
|
||||
/// yet. Implementations will ensure that the current task will be notified
|
||||
/// when the next value may be ready.
|
||||
///
|
||||
/// - `Poll::Ready(Some(val))` means that the stream has successfully
|
||||
/// produced a value, `val`, and may produce further values on subsequent
|
||||
/// `poll_next` calls.
|
||||
///
|
||||
/// - `Poll::Ready(None)` means that the stream has terminated, and
|
||||
/// `poll_next` should not be invoked again.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
|
||||
/// `poll_next` method again may panic, block forever, or cause other kinds of
|
||||
/// problems; the `Stream` trait places no requirements on the effects of
|
||||
/// such a call. However, as the `poll_next` method is not marked `unsafe`,
|
||||
/// Rust's usual rules apply: calls must never cause undefined behavior
|
||||
/// (memory corruption, incorrect use of `unsafe` functions, or the like),
|
||||
/// regardless of the stream's state.
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
|
||||
|
||||
/// Returns the bounds on the remaining length of the stream.
|
||||
///
|
||||
/// Specifically, `size_hint()` returns a tuple where the first element
|
||||
/// is the lower bound, and the second element is the upper bound.
|
||||
///
|
||||
/// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
|
||||
/// A [`None`] here means that either there is no known upper bound, or the
|
||||
/// upper bound is larger than [`usize`].
|
||||
///
|
||||
/// # Implementation notes
|
||||
///
|
||||
/// It is not enforced that a stream implementation yields the declared
|
||||
/// number of elements. A buggy stream may yield less than the lower bound
|
||||
/// or more than the upper bound of elements.
|
||||
///
|
||||
/// `size_hint()` is primarily intended to be used for optimizations such as
|
||||
/// reserving space for the elements of the stream, but must not be
|
||||
/// trusted to e.g., omit bounds checks in unsafe code. An incorrect
|
||||
/// implementation of `size_hint()` should not lead to memory safety
|
||||
/// violations.
|
||||
///
|
||||
/// That said, the implementation should provide a correct estimation,
|
||||
/// because otherwise it would be a violation of the trait's protocol.
|
||||
///
|
||||
/// The default implementation returns `(0, `[`None`]`)` which is correct for any
|
||||
/// stream.
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(0, None)
|
||||
}
|
||||
|
||||
/// Advances the stream and returns a future which yields the next value.
|
||||
///
|
||||
/// The returned future yields [`None`] when iteration is finished.
|
||||
/// Individual stream implementations may choose to resume iteration, and so
|
||||
/// calling `next()` again may or may not eventually start yielding
|
||||
/// [`Some(Item)`] again at some point.
|
||||
///
|
||||
/// [`Some(Item)`]: Some
|
||||
fn next(&mut self) -> Next<'_, Self>
|
||||
where
|
||||
Self: Unpin,
|
||||
{
|
||||
Next::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
S::poll_next(Pin::new(&mut **self), cx)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(**self).size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
impl<P> Stream for Pin<P>
|
||||
where
|
||||
P: DerefMut + Unpin,
|
||||
P::Target: Stream,
|
||||
{
|
||||
type Item = <P::Target as Stream>::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.get_mut().as_mut().poll_next(cx)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
(**self).size_hint()
|
||||
}
|
||||
}
|
30
library/core/src/stream/stream/next.rs
Normal file
30
library/core/src/stream/stream/next.rs
Normal file
|
@ -0,0 +1,30 @@
|
|||
use crate::future::Future;
|
||||
use crate::pin::Pin;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A future which advances the stream and returns the next value.
|
||||
///
|
||||
/// This `struct` is created by [`Stream::next`]. See its documentation for more.
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
#[derive(Debug)]
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub struct Next<'a, S: ?Sized> {
|
||||
stream: &'a mut S,
|
||||
}
|
||||
|
||||
impl<'a, S: ?Sized> Next<'a, S> {
|
||||
/// Create a new instance of `Next`.
|
||||
pub(crate) fn new(stream: &'a mut S) -> Self {
|
||||
Self { stream }
|
||||
}
|
||||
}
|
||||
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
impl<S: Stream + Unpin + ?Sized> Future for Next<'_, S> {
|
||||
type Output = Option<S::Item>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
Pin::new(&mut *self.stream).poll_next(cx)
|
||||
}
|
||||
}
|
|
@ -224,6 +224,7 @@
|
|||
#![feature(allocator_internals)]
|
||||
#![feature(allow_internal_unsafe)]
|
||||
#![feature(allow_internal_unstable)]
|
||||
#![feature(async_stream)]
|
||||
#![feature(arbitrary_self_types)]
|
||||
#![feature(array_error_internals)]
|
||||
#![feature(asm)]
|
||||
|
@ -448,6 +449,8 @@ pub use core::ptr;
|
|||
pub use core::raw;
|
||||
#[stable(feature = "rust1", since = "1.0.0")]
|
||||
pub use core::result;
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
pub use core::stream;
|
||||
#[stable(feature = "i128", since = "1.26.0")]
|
||||
#[allow(deprecated, deprecated_in_future)]
|
||||
pub use core::u128;
|
||||
|
|
|
@ -12,6 +12,7 @@ use crate::panicking;
|
|||
use crate::pin::Pin;
|
||||
use crate::ptr::{NonNull, Unique};
|
||||
use crate::rc::Rc;
|
||||
use crate::stream::Stream;
|
||||
use crate::sync::atomic;
|
||||
use crate::sync::{Arc, Mutex, RwLock};
|
||||
use crate::task::{Context, Poll};
|
||||
|
@ -340,6 +341,19 @@ impl<F: Future> Future for AssertUnwindSafe<F> {
|
|||
}
|
||||
}
|
||||
|
||||
#[unstable(feature = "async_stream", issue = "79024")]
|
||||
impl<S: Stream> Stream for AssertUnwindSafe<S> {
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||
unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
self.0.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
/// Invokes a closure, capturing the cause of an unwinding panic if one occurs.
|
||||
///
|
||||
/// This function will return `Ok` with the closure's result if the closure
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue