2014-01-25 20:37:51 +13:00
|
|
|
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
|
2013-12-05 17:56:17 -08:00
|
|
|
// file at the top-level directory of this distribution and at
|
|
|
|
// http://rust-lang.org/COPYRIGHT.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
|
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
|
|
// option. This file may not be copied, modified, or distributed
|
|
|
|
// except according to those terms.
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
//! Communication primitives for concurrent tasks
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
|
|
|
//! Rust makes it very difficult to share data among tasks to prevent race
|
|
|
|
//! conditions and to improve parallelism, but there is often a need for
|
|
|
|
//! communication between concurrent tasks. The primitives defined in this
|
|
|
|
//! module are the building blocks for synchronization in rust.
|
|
|
|
//!
|
2014-03-09 14:58:32 -07:00
|
|
|
//! This module provides message-based communication over channels, concretely
|
|
|
|
//! defined as two types:
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
2014-03-09 14:58:32 -07:00
|
|
|
//! * `Sender`
|
|
|
|
//! * `Receiver`
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
2014-03-09 14:58:32 -07:00
|
|
|
//! A `Sender` is used to send data to a `Receiver`. A `Sender` is clone-able
|
|
|
|
//! such that many tasks can send simultaneously to one receiver. These
|
|
|
|
//! channels are *task blocking*, not *thread blocking*. This means that if one
|
|
|
|
//! task is blocked on a channel, other tasks can continue to make progress.
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
|
|
|
//! Rust channels can be used as if they have an infinite internal buffer. What
|
2014-03-09 14:58:32 -07:00
|
|
|
//! this means is that the `send` operation will never block. `Receiver`s, on
|
|
|
|
//! the other hand, will block the task if there is no data to be received.
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
|
|
|
//! ## Failure Propagation
|
|
|
|
//!
|
|
|
|
//! In addition to being a core primitive for communicating in rust, channels
|
2014-03-09 14:58:32 -07:00
|
|
|
//! are the points at which failure is propagated among tasks. Whenever the one
|
|
|
|
//! half of channel is closed, the other half will have its next operation
|
|
|
|
//! `fail!`. The purpose of this is to allow propagation of failure among tasks
|
|
|
|
//! that are linked to one another via channels.
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
2014-03-09 14:58:32 -07:00
|
|
|
//! There are methods on both of `Sender` and `Receiver` to perform their
|
|
|
|
//! respective operations without failing, however.
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
|
|
|
//! ## Outside the Runtime
|
|
|
|
//!
|
|
|
|
//! All channels and ports work seamlessly inside and outside of the rust
|
|
|
|
//! runtime. This means that code may use channels to communicate information
|
|
|
|
//! inside and outside of the runtime. For example, if rust were embedded as an
|
|
|
|
//! FFI module in another application, the rust runtime would probably be
|
|
|
|
//! running in its own external thread pool. Channels created can communicate
|
|
|
|
//! from the native application threads to the rust threads through the use of
|
|
|
|
//! native mutexes and condition variables.
|
|
|
|
//!
|
|
|
|
//! What this means is that if a native thread is using a channel, execution
|
|
|
|
//! will be blocked accordingly by blocking the OS thread.
|
|
|
|
//!
|
|
|
|
//! # Example
|
|
|
|
//!
|
2013-12-22 13:31:23 -08:00
|
|
|
//! ```rust,should_fail
|
2013-12-05 17:56:17 -08:00
|
|
|
//! // Create a simple streaming channel
|
2014-03-09 14:58:32 -07:00
|
|
|
//! let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
//! spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
//! tx.send(10);
|
2014-02-14 23:44:22 -08:00
|
|
|
//! });
|
2014-03-09 14:58:32 -07:00
|
|
|
//! assert_eq!(rx.recv(), 10);
|
2013-12-05 17:56:17 -08:00
|
|
|
//!
|
|
|
|
//! // Create a shared channel which can be sent along from many tasks
|
2014-03-09 14:58:32 -07:00
|
|
|
//! let (tx, rx) = channel();
|
2013-12-05 17:56:17 -08:00
|
|
|
//! for i in range(0, 10) {
|
2014-03-09 14:58:32 -07:00
|
|
|
//! let tx = tx.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
//! spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
//! tx.send(i);
|
2014-01-26 22:42:26 -05:00
|
|
|
//! })
|
2013-12-05 17:56:17 -08:00
|
|
|
//! }
|
|
|
|
//!
|
|
|
|
//! for _ in range(0, 10) {
|
2014-03-09 14:58:32 -07:00
|
|
|
//! let j = rx.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
//! assert!(0 <= j && j < 10);
|
|
|
|
//! }
|
|
|
|
//!
|
|
|
|
//! // The call to recv() will fail!() because the channel has already hung
|
|
|
|
//! // up (or been deallocated)
|
2014-03-09 14:58:32 -07:00
|
|
|
//! let (tx, rx) = channel::<int>();
|
|
|
|
//! drop(tx);
|
|
|
|
//! rx.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
//! ```
|
|
|
|
|
|
|
|
// A description of how Rust's channel implementation works
|
|
|
|
//
|
|
|
|
// Channels are supposed to be the basic building block for all other
|
|
|
|
// concurrent primitives that are used in Rust. As a result, the channel type
|
|
|
|
// needs to be highly optimized, flexible, and broad enough for use everywhere.
|
|
|
|
//
|
|
|
|
// The choice of implementation of all channels is to be built on lock-free data
|
|
|
|
// structures. The channels themselves are then consequently also lock-free data
|
|
|
|
// structures. As always with lock-free code, this is a very "here be dragons"
|
|
|
|
// territory, especially because I'm unaware of any academic papers which have
|
|
|
|
// gone into great length about channels of these flavors.
|
|
|
|
//
|
|
|
|
// ## Flavors of channels
|
|
|
|
//
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
// From the perspective of a consumer of this library, there is only one flavor
|
|
|
|
// of channel. This channel can be used as a stream and cloned to allow multiple
|
|
|
|
// senders. Under the hood, however, there are actually three flavors of
|
|
|
|
// channels in play.
|
|
|
|
//
|
|
|
|
// * Oneshots - these channels are highly optimized for the one-send use case.
|
|
|
|
// They contain as few atomics as possible and involve one and
|
|
|
|
// exactly one allocation.
|
|
|
|
// * Streams - these channels are optimized for the non-shared use case. They
|
|
|
|
// use a different concurrent queue which is more tailored for this
|
|
|
|
// use case. The initial allocation of this flavor of channel is not
|
|
|
|
// optimized.
|
|
|
|
// * Shared - this is the most general form of channel that this module offers,
|
|
|
|
// a channel with multiple senders. This type is as optimized as it
|
|
|
|
// can be, but the previous two types mentioned are much faster for
|
|
|
|
// their use-cases.
|
2013-12-05 17:56:17 -08:00
|
|
|
//
|
|
|
|
// ## Concurrent queues
|
|
|
|
//
|
2014-03-09 14:58:32 -07:00
|
|
|
// The basic idea of Rust's Sender/Receiver types is that send() never blocks, but
|
2013-12-05 17:56:17 -08:00
|
|
|
// recv() obviously blocks. This means that under the hood there must be some
|
|
|
|
// shared and concurrent queue holding all of the actual data.
|
|
|
|
//
|
|
|
|
// With two flavors of channels, two flavors of queues are also used. We have
|
|
|
|
// chosen to use queues from a well-known author which are abbreviated as SPSC
|
|
|
|
// and MPSC (single producer, single consumer and multiple producer, single
|
|
|
|
// consumer). SPSC queues are used for streams while MPSC queues are used for
|
|
|
|
// shared channels.
|
|
|
|
//
|
|
|
|
// ### SPSC optimizations
|
|
|
|
//
|
|
|
|
// The SPSC queue found online is essentially a linked list of nodes where one
|
|
|
|
// half of the nodes are the "queue of data" and the other half of nodes are a
|
|
|
|
// cache of unused nodes. The unused nodes are used such that an allocation is
|
|
|
|
// not required on every push() and a free doesn't need to happen on every
|
|
|
|
// pop().
|
|
|
|
//
|
|
|
|
// As found online, however, the cache of nodes is of an infinite size. This
|
|
|
|
// means that if a channel at one point in its life had 50k items in the queue,
|
|
|
|
// then the queue will always have the capacity for 50k items. I believed that
|
|
|
|
// this was an unnecessary limitation of the implementation, so I have altered
|
|
|
|
// the queue to optionally have a bound on the cache size.
|
|
|
|
//
|
|
|
|
// By default, streams will have an unbounded SPSC queue with a small-ish cache
|
|
|
|
// size. The hope is that the cache is still large enough to have very fast
|
|
|
|
// send() operations while not too large such that millions of channels can
|
|
|
|
// coexist at once.
|
|
|
|
//
|
|
|
|
// ### MPSC optimizations
|
|
|
|
//
|
|
|
|
// Right now the MPSC queue has not been optimized. Like the SPSC queue, it uses
|
|
|
|
// a linked list under the hood to earn its unboundedness, but I have not put
|
|
|
|
// forth much effort into having a cache of nodes similar to the SPSC queue.
|
|
|
|
//
|
|
|
|
// For now, I believe that this is "ok" because shared channels are not the most
|
|
|
|
// common type, but soon we may wish to revisit this queue choice and determine
|
|
|
|
// another candidate for backend storage of shared channels.
|
|
|
|
//
|
|
|
|
// ## Overview of the Implementation
|
|
|
|
//
|
|
|
|
// Now that there's a little background on the concurrent queues used, it's
|
|
|
|
// worth going into much more detail about the channels themselves. The basic
|
|
|
|
// pseudocode for a send/recv are:
|
|
|
|
//
|
|
|
|
//
|
|
|
|
// send(t) recv()
|
|
|
|
// queue.push(t) return if queue.pop()
|
|
|
|
// if increment() == -1 deschedule {
|
|
|
|
// wakeup() if decrement() > 0
|
|
|
|
// cancel_deschedule()
|
|
|
|
// }
|
|
|
|
// queue.pop()
|
|
|
|
//
|
|
|
|
// As mentioned before, there are no locks in this implementation, only atomic
|
|
|
|
// instructions are used.
|
|
|
|
//
|
|
|
|
// ### The internal atomic counter
|
|
|
|
//
|
2014-03-09 14:58:32 -07:00
|
|
|
// Every channel has a shared counter with each half to keep track of the size
|
|
|
|
// of the queue. This counter is used to abort descheduling by the receiver and
|
|
|
|
// to know when to wake up on the sending side.
|
2013-12-05 17:56:17 -08:00
|
|
|
//
|
|
|
|
// As seen in the pseudocode, senders will increment this count and receivers
|
|
|
|
// will decrement the count. The theory behind this is that if a sender sees a
|
|
|
|
// -1 count, it will wake up the receiver, and if the receiver sees a 1+ count,
|
|
|
|
// then it doesn't need to block.
|
|
|
|
//
|
|
|
|
// The recv() method has a beginning call to pop(), and if successful, it needs
|
|
|
|
// to decrement the count. It is a crucial implementation detail that this
|
|
|
|
// decrement does *not* happen to the shared counter. If this were the case,
|
|
|
|
// then it would be possible for the counter to be very negative when there were
|
|
|
|
// no receivers waiting, in which case the senders would have to determine when
|
|
|
|
// it was actually appropriate to wake up a receiver.
|
|
|
|
//
|
|
|
|
// Instead, the "steal count" is kept track of separately (not atomically
|
2014-03-09 14:58:32 -07:00
|
|
|
// because it's only used by receivers), and then the decrement() call when
|
2013-12-05 17:56:17 -08:00
|
|
|
// descheduling will lump in all of the recent steals into one large decrement.
|
|
|
|
//
|
|
|
|
// The implication of this is that if a sender sees a -1 count, then there's
|
|
|
|
// guaranteed to be a waiter waiting!
|
|
|
|
//
|
|
|
|
// ## Native Implementation
|
|
|
|
//
|
|
|
|
// A major goal of these channels is to work seamlessly on and off the runtime.
|
|
|
|
// All of the previous race conditions have been worded in terms of
|
|
|
|
// scheduler-isms (which is obviously not available without the runtime).
|
|
|
|
//
|
|
|
|
// For now, native usage of channels (off the runtime) will fall back onto
|
|
|
|
// mutexes/cond vars for descheduling/atomic decisions. The no-contention path
|
|
|
|
// is still entirely lock-free, the "deschedule" blocks above are surrounded by
|
|
|
|
// a mutex and the "wakeup" blocks involve grabbing a mutex and signaling on a
|
|
|
|
// condition variable.
|
|
|
|
//
|
|
|
|
// ## Select
|
|
|
|
//
|
|
|
|
// Being able to support selection over channels has greatly influenced this
|
|
|
|
// design, and not only does selection need to work inside the runtime, but also
|
|
|
|
// outside the runtime.
|
|
|
|
//
|
|
|
|
// The implementation is fairly straightforward. The goal of select() is not to
|
|
|
|
// return some data, but only to return which channel can receive data without
|
|
|
|
// blocking. The implementation is essentially the entire blocking procedure
|
|
|
|
// followed by an increment as soon as its woken up. The cancellation procedure
|
|
|
|
// involves an increment and swapping out of to_wake to acquire ownership of the
|
|
|
|
// task to unblock.
|
|
|
|
//
|
|
|
|
// Sadly this current implementation requires multiple allocations, so I have
|
|
|
|
// seen the throughput of select() be much worse than it should be. I do not
|
|
|
|
// believe that there is anything fundamental which needs to change about these
|
|
|
|
// channels, however, in order to support a more efficient select().
|
|
|
|
//
|
|
|
|
// # Conclusion
|
|
|
|
//
|
|
|
|
// And now that you've seen all the races that I found and attempted to fix,
|
|
|
|
// here's the code for you to find some more!
|
|
|
|
|
|
|
|
use cast;
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
use cell::Cell;
|
2013-12-05 17:56:17 -08:00
|
|
|
use clone::Clone;
|
|
|
|
use iter::Iterator;
|
|
|
|
use kinds::Send;
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
use kinds::marker;
|
2014-02-11 16:33:34 -08:00
|
|
|
use mem;
|
2013-12-05 17:56:17 -08:00
|
|
|
use ops::Drop;
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
use option::{Some, None, Option};
|
|
|
|
use result::{Ok, Err, Result};
|
2013-12-12 17:53:05 -08:00
|
|
|
use rt::local::Local;
|
|
|
|
use rt::task::{Task, BlockedTask};
|
2014-01-06 15:23:37 -08:00
|
|
|
use sync::arc::UnsafeArc;
|
2013-12-05 17:56:17 -08:00
|
|
|
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
pub use comm::select::{Select, Handle};
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
macro_rules! test (
|
|
|
|
{ fn $name:ident() $b:block $($a:attr)*} => (
|
|
|
|
mod $name {
|
|
|
|
#[allow(unused_imports)];
|
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
use native;
|
2013-12-21 22:15:04 -08:00
|
|
|
use comm::*;
|
2013-12-05 17:56:17 -08:00
|
|
|
use prelude::*;
|
2013-12-12 21:38:57 -08:00
|
|
|
use super::*;
|
|
|
|
use super::super::*;
|
|
|
|
use task;
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
fn f() $b
|
|
|
|
|
|
|
|
$($a)* #[test] fn uv() { f() }
|
2013-12-12 21:38:57 -08:00
|
|
|
$($a)* #[test] fn native() {
|
|
|
|
use native;
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
native::task::spawn(proc() { tx.send(f()) });
|
|
|
|
rx.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
mod select;
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
mod oneshot;
|
|
|
|
mod stream;
|
|
|
|
mod shared;
|
2013-12-05 17:56:17 -08:00
|
|
|
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
// Use a power of 2 to allow LLVM to optimize to something that's not a
|
|
|
|
// division, this is hit pretty regularly.
|
|
|
|
static RESCHED_FREQ: int = 256;
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
/// The receiving-half of Rust's channel type. This half can only be owned by
|
|
|
|
/// one task
|
2014-03-09 14:58:32 -07:00
|
|
|
pub struct Receiver<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
priv inner: Flavor<T>,
|
|
|
|
priv receives: Cell<uint>,
|
2014-01-22 14:03:02 -05:00
|
|
|
// can't share in an arc
|
|
|
|
priv marker: marker::NoFreeze,
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
/// An iterator over messages on a receiver, this iterator will block
|
2013-12-05 17:56:17 -08:00
|
|
|
/// whenever `next` is called, waiting for a new message, and `None` will be
|
|
|
|
/// returned when the corresponding channel has hung up.
|
2014-01-14 22:32:24 -05:00
|
|
|
pub struct Messages<'a, T> {
|
2014-03-09 14:58:32 -07:00
|
|
|
priv rx: &'a Receiver<T>
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// The sending-half of Rust's channel type. This half can only be owned by one
|
|
|
|
/// task
|
2014-03-09 14:58:32 -07:00
|
|
|
pub struct Sender<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
priv inner: Flavor<T>,
|
|
|
|
priv sends: Cell<uint>,
|
2014-01-22 14:03:02 -05:00
|
|
|
// can't share in an arc
|
|
|
|
priv marker: marker::NoFreeze,
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2013-12-21 22:15:04 -08:00
|
|
|
/// This enumeration is the list of the possible reasons that try_recv could not
|
|
|
|
/// return data when called.
|
2014-02-28 01:23:06 -08:00
|
|
|
#[deriving(Eq, Clone, Show)]
|
2013-12-21 22:15:04 -08:00
|
|
|
pub enum TryRecvResult<T> {
|
|
|
|
/// This channel is currently empty, but the sender(s) have not yet
|
|
|
|
/// disconnected, so data may yet become available.
|
|
|
|
Empty,
|
|
|
|
/// This channel's sending half has become disconnected, and there will
|
|
|
|
/// never be any more data received on this channel
|
|
|
|
Disconnected,
|
|
|
|
/// The channel had some data and we successfully popped it
|
|
|
|
Data(T),
|
|
|
|
}
|
|
|
|
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
enum Flavor<T> {
|
|
|
|
Oneshot(UnsafeArc<oneshot::Packet<T>>),
|
|
|
|
Stream(UnsafeArc<stream::Packet<T>>),
|
|
|
|
Shared(UnsafeArc<shared::Packet<T>>),
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
/// Creates a new channel, returning the sender/receiver halves. All data sent
|
|
|
|
/// on the sender will become available on the receiver. See the documentation
|
|
|
|
/// of `Receiver` and `Sender` to see what's possible with them.
|
|
|
|
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
|
|
|
|
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
|
|
|
|
(Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
|
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> Sender<T> {
|
|
|
|
fn my_new(inner: Flavor<T>) -> Sender<T> {
|
|
|
|
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze }
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Sends a value along this channel to be received by the corresponding
|
2014-03-09 14:58:32 -07:00
|
|
|
/// receiver.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// Rust channels are infinitely buffered so this method will never block.
|
|
|
|
///
|
|
|
|
/// # Failure
|
|
|
|
///
|
|
|
|
/// This function will fail if the other end of the channel has hung up.
|
2014-03-09 14:58:32 -07:00
|
|
|
/// This means that if the corresponding receiver has fallen out of scope,
|
|
|
|
/// this function will trigger a fail message saying that a message is
|
|
|
|
/// being sent on a closed channel.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// Note that if this function does *not* fail, it does not mean that the
|
|
|
|
/// data will be successfully received. All sends are placed into a queue,
|
|
|
|
/// so it is possible for a send to succeed (the other end is alive), but
|
|
|
|
/// then the other end could immediately disconnect.
|
|
|
|
///
|
|
|
|
/// The purpose of this functionality is to propagate failure among tasks.
|
|
|
|
/// If failure is not desired, then consider using the `try_send` method
|
|
|
|
pub fn send(&self, t: T) {
|
|
|
|
if !self.try_send(t) {
|
|
|
|
fail!("sending on a closed channel");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Attempts to send a value on this channel, returning whether it was
|
|
|
|
/// successfully sent.
|
|
|
|
///
|
2014-03-09 14:58:32 -07:00
|
|
|
/// A successful send occurs when it is determined that the other end of
|
|
|
|
/// the channel has not hung up already. An unsuccessful send would be one
|
|
|
|
/// where the corresponding receiver has already been deallocated. Note
|
|
|
|
/// that a return value of `false` means that the data will never be
|
|
|
|
/// received, but a return value of `true` does *not* mean that the data
|
|
|
|
/// will be received. It is possible for the corresponding receiver to
|
|
|
|
/// hang up immediately after this function returns `true`.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// Like `send`, this method will never block. If the failure of send cannot
|
|
|
|
/// be tolerated, then this method should be used instead.
|
2014-01-16 19:58:42 -08:00
|
|
|
pub fn try_send(&self, t: T) -> bool {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
// In order to prevent starvation of other tasks in situations where
|
|
|
|
// a task sends repeatedly without ever receiving, we occassionally
|
2014-02-19 08:26:11 -08:00
|
|
|
// yield instead of doing a send immediately.
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
//
|
2014-02-19 08:26:11 -08:00
|
|
|
// Don't unconditionally attempt to yield because the TLS overhead can
|
|
|
|
// be a bit much, and also use `try_take` instead of `take` because
|
|
|
|
// there's no reason that this send shouldn't be usable off the
|
|
|
|
// runtime.
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
let cnt = self.sends.get() + 1;
|
|
|
|
self.sends.set(cnt);
|
|
|
|
if cnt % (RESCHED_FREQ as uint) == 0 {
|
2014-02-19 08:26:11 -08:00
|
|
|
let task: Option<~Task> = Local::try_take();
|
|
|
|
task.map(|t| t.maybe_yield());
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
let (new_inner, ret) = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
let p = p.get();
|
|
|
|
unsafe {
|
|
|
|
if !(*p).sent() {
|
|
|
|
return (*p).send(t);
|
|
|
|
} else {
|
|
|
|
let (a, b) = UnsafeArc::new2(stream::Packet::new());
|
2014-03-09 14:58:32 -07:00
|
|
|
match (*p).upgrade(Receiver::my_new(Stream(b))) {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
oneshot::UpSuccess => {
|
|
|
|
(*a.get()).send(t);
|
|
|
|
(a, true)
|
|
|
|
}
|
|
|
|
oneshot::UpDisconnected => (a, false),
|
|
|
|
oneshot::UpWoke(task) => {
|
|
|
|
(*a.get()).send(t);
|
|
|
|
task.wake().map(|t| t.reawaken());
|
|
|
|
(a, true)
|
|
|
|
}
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
Stream(ref p) => return unsafe { (*p.get()).send(t) },
|
|
|
|
Shared(ref p) => return unsafe { (*p.get()).send(t) },
|
|
|
|
};
|
2013-12-05 17:56:17 -08:00
|
|
|
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
unsafe {
|
2014-03-09 14:58:32 -07:00
|
|
|
let mut tmp = Sender::my_new(Stream(new_inner));
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
return ret;
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> Clone for Sender<T> {
|
|
|
|
fn clone(&self) -> Sender<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
let (packet, sleeper) = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
let (a, b) = UnsafeArc::new2(shared::Packet::new());
|
2014-03-09 14:58:32 -07:00
|
|
|
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
|
|
|
|
oneshot::UpWoke(task) => (b, Some(task))
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
Stream(ref p) => {
|
|
|
|
let (a, b) = UnsafeArc::new2(shared::Packet::new());
|
2014-03-09 14:58:32 -07:00
|
|
|
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
stream::UpSuccess | stream::UpDisconnected => (b, None),
|
|
|
|
stream::UpWoke(task) => (b, Some(task)),
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
Shared(ref p) => {
|
|
|
|
unsafe { (*p.get()).clone_chan(); }
|
2014-03-09 14:58:32 -07:00
|
|
|
return Sender::my_new(Shared(p.clone()));
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
unsafe {
|
|
|
|
(*packet.get()).inherit_blocker(sleeper);
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
let mut tmp = Sender::my_new(Shared(packet.clone()));
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
Sender::my_new(Shared(packet))
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[unsafe_destructor]
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> Drop for Sender<T> {
|
2013-12-05 17:56:17 -08:00
|
|
|
fn drop(&mut self) {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
match self.inner {
|
|
|
|
Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
|
|
|
|
Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
|
|
|
|
Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> Receiver<T> {
|
|
|
|
fn my_new(inner: Flavor<T>) -> Receiver<T> {
|
|
|
|
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze }
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
/// Blocks waiting for a value on this receiver
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// This function will block if necessary to wait for a corresponding send
|
2014-03-09 14:58:32 -07:00
|
|
|
/// on the channel from its paired `Sender` structure. This receiver will
|
|
|
|
/// be woken up when data is ready, and the data will be returned.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// # Failure
|
|
|
|
///
|
|
|
|
/// Similar to channels, this method will trigger a task failure if the
|
|
|
|
/// other end of the channel has hung up (been deallocated). The purpose of
|
|
|
|
/// this is to propagate failure among tasks.
|
|
|
|
///
|
|
|
|
/// If failure is not desired, then there are two options:
|
|
|
|
///
|
|
|
|
/// * If blocking is still desired, the `recv_opt` method will return `None`
|
|
|
|
/// when the other end hangs up
|
|
|
|
///
|
|
|
|
/// * If blocking is not desired, then the `try_recv` method will attempt to
|
2014-03-09 14:58:32 -07:00
|
|
|
/// peek at a value on this receiver.
|
2013-12-05 17:56:17 -08:00
|
|
|
pub fn recv(&self) -> T {
|
|
|
|
match self.recv_opt() {
|
|
|
|
Some(t) => t,
|
|
|
|
None => fail!("receiving on a closed channel"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
/// Attempts to return a pending value on this receiver without blocking
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// This method will never block the caller in order to wait for data to
|
|
|
|
/// become available. Instead, this will always return immediately with a
|
|
|
|
/// possible option of pending data on the channel.
|
|
|
|
///
|
|
|
|
/// This is useful for a flavor of "optimistic check" before deciding to
|
2014-03-09 14:58:32 -07:00
|
|
|
/// block on a receiver.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// This function cannot fail.
|
2013-12-21 22:15:04 -08:00
|
|
|
pub fn try_recv(&self) -> TryRecvResult<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
// If a thread is spinning in try_recv, we should take the opportunity
|
|
|
|
// to reschedule things occasionally. See notes above in scheduling on
|
2014-02-19 08:26:11 -08:00
|
|
|
// sends for why this doesn't always hit TLS, and also for why this uses
|
|
|
|
// `try_take` instead of `take`.
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
let cnt = self.receives.get() + 1;
|
|
|
|
self.receives.set(cnt);
|
|
|
|
if cnt % (RESCHED_FREQ as uint) == 0 {
|
2014-02-19 08:26:11 -08:00
|
|
|
let task: Option<~Task> = Local::try_take();
|
|
|
|
task.map(|t| t.maybe_yield());
|
2013-12-13 18:27:13 -08:00
|
|
|
}
|
|
|
|
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
loop {
|
|
|
|
let mut new_port = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
match unsafe { (*p.get()).try_recv() } {
|
|
|
|
Ok(t) => return Data(t),
|
|
|
|
Err(oneshot::Empty) => return Empty,
|
|
|
|
Err(oneshot::Disconnected) => return Disconnected,
|
2014-03-09 14:58:32 -07:00
|
|
|
Err(oneshot::Upgraded(rx)) => rx,
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
Stream(ref p) => {
|
|
|
|
match unsafe { (*p.get()).try_recv() } {
|
|
|
|
Ok(t) => return Data(t),
|
|
|
|
Err(stream::Empty) => return Empty,
|
|
|
|
Err(stream::Disconnected) => return Disconnected,
|
2014-03-09 14:58:32 -07:00
|
|
|
Err(stream::Upgraded(rx)) => rx,
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Shared(ref p) => {
|
|
|
|
match unsafe { (*p.get()).try_recv() } {
|
|
|
|
Ok(t) => return Data(t),
|
|
|
|
Err(shared::Empty) => return Empty,
|
|
|
|
Err(shared::Disconnected) => return Disconnected,
|
2013-12-21 22:15:04 -08:00
|
|
|
}
|
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
};
|
|
|
|
unsafe {
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner,
|
|
|
|
&mut new_port.inner);
|
2013-12-21 22:15:04 -08:00
|
|
|
}
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
/// Attempt to wait for a value on this receiver, but does not fail if the
|
2013-12-05 17:56:17 -08:00
|
|
|
/// corresponding channel has hung up.
|
|
|
|
///
|
|
|
|
/// This implementation of iterators for ports will always block if there is
|
2014-03-09 14:58:32 -07:00
|
|
|
/// not data available on the receiver, but it will not fail in the case
|
|
|
|
/// that the channel has been deallocated.
|
2013-12-05 17:56:17 -08:00
|
|
|
///
|
|
|
|
/// In other words, this function has the same semantics as the `recv`
|
|
|
|
/// method except for the failure aspect.
|
|
|
|
///
|
|
|
|
/// If the channel has hung up, then `None` is returned. Otherwise `Some` of
|
2014-03-09 14:58:32 -07:00
|
|
|
/// the value found on the receiver is returned.
|
2013-12-05 17:56:17 -08:00
|
|
|
pub fn recv_opt(&self) -> Option<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
loop {
|
|
|
|
let mut new_port = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
match unsafe { (*p.get()).recv() } {
|
|
|
|
Ok(t) => return Some(t),
|
|
|
|
Err(oneshot::Empty) => return unreachable!(),
|
|
|
|
Err(oneshot::Disconnected) => return None,
|
2014-03-09 14:58:32 -07:00
|
|
|
Err(oneshot::Upgraded(rx)) => rx,
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
2013-12-12 17:53:05 -08:00
|
|
|
}
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
Stream(ref p) => {
|
|
|
|
match unsafe { (*p.get()).recv() } {
|
|
|
|
Ok(t) => return Some(t),
|
|
|
|
Err(stream::Empty) => return unreachable!(),
|
|
|
|
Err(stream::Disconnected) => return None,
|
2014-03-09 14:58:32 -07:00
|
|
|
Err(stream::Upgraded(rx)) => rx,
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Shared(ref p) => {
|
|
|
|
match unsafe { (*p.get()).recv() } {
|
|
|
|
Ok(t) => return Some(t),
|
|
|
|
Err(shared::Empty) => return unreachable!(),
|
|
|
|
Err(shared::Disconnected) => return None,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
};
|
|
|
|
unsafe {
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner,
|
|
|
|
&mut new_port.inner);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns an iterator which will block waiting for messages, but never
|
|
|
|
/// `fail!`. It will return `None` when the channel has hung up.
|
2014-01-14 22:32:24 -05:00
|
|
|
pub fn iter<'a>(&'a self) -> Messages<'a, T> {
|
2014-03-09 14:58:32 -07:00
|
|
|
Messages { rx: self }
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> select::Packet for Receiver<T> {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
fn can_recv(&self) -> bool {
|
|
|
|
loop {
|
|
|
|
let mut new_port = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
match unsafe { (*p.get()).can_recv() } {
|
|
|
|
Ok(ret) => return ret,
|
|
|
|
Err(upgrade) => upgrade,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Stream(ref p) => {
|
|
|
|
match unsafe { (*p.get()).can_recv() } {
|
|
|
|
Ok(ret) => return ret,
|
|
|
|
Err(upgrade) => upgrade,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Shared(ref p) => {
|
|
|
|
return unsafe { (*p.get()).can_recv() };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
unsafe {
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner,
|
|
|
|
&mut new_port.inner);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{
|
|
|
|
loop {
|
|
|
|
let (t, mut new_port) = match self.inner {
|
|
|
|
Oneshot(ref p) => {
|
|
|
|
match unsafe { (*p.get()).start_selection(task) } {
|
|
|
|
oneshot::SelSuccess => return Ok(()),
|
|
|
|
oneshot::SelCanceled(task) => return Err(task),
|
2014-03-09 14:58:32 -07:00
|
|
|
oneshot::SelUpgraded(t, rx) => (t, rx),
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Stream(ref p) => {
|
|
|
|
match unsafe { (*p.get()).start_selection(task) } {
|
|
|
|
stream::SelSuccess => return Ok(()),
|
|
|
|
stream::SelCanceled(task) => return Err(task),
|
2014-03-09 14:58:32 -07:00
|
|
|
stream::SelUpgraded(t, rx) => (t, rx),
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Shared(ref p) => {
|
|
|
|
return unsafe { (*p.get()).start_selection(task) };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
task = t;
|
|
|
|
unsafe {
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner,
|
|
|
|
&mut new_port.inner);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn abort_selection(&self) -> bool {
|
|
|
|
let mut was_upgrade = false;
|
|
|
|
loop {
|
|
|
|
let result = match self.inner {
|
|
|
|
Oneshot(ref p) => unsafe { (*p.get()).abort_selection() },
|
|
|
|
Stream(ref p) => unsafe {
|
|
|
|
(*p.get()).abort_selection(was_upgrade)
|
|
|
|
},
|
|
|
|
Shared(ref p) => return unsafe {
|
|
|
|
(*p.get()).abort_selection(was_upgrade)
|
|
|
|
},
|
|
|
|
};
|
|
|
|
let mut new_port = match result { Ok(b) => return b, Err(p) => p };
|
|
|
|
was_upgrade = true;
|
|
|
|
unsafe {
|
2014-02-11 16:33:34 -08:00
|
|
|
mem::swap(&mut cast::transmute_mut(self).inner,
|
|
|
|
&mut new_port.inner);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-01-14 22:32:24 -05:00
|
|
|
impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
|
2014-03-09 14:58:32 -07:00
|
|
|
fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[unsafe_destructor]
|
2014-03-09 14:58:32 -07:00
|
|
|
impl<T: Send> Drop for Receiver<T> {
|
2013-12-05 17:56:17 -08:00
|
|
|
fn drop(&mut self) {
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
match self.inner {
|
|
|
|
Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
|
|
|
|
Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
|
|
|
|
Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use prelude::*;
|
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
use native;
|
|
|
|
use os;
|
2013-12-05 17:56:17 -08:00
|
|
|
use super::*;
|
2013-12-12 21:38:57 -08:00
|
|
|
|
|
|
|
pub fn stress_factor() -> uint {
|
|
|
|
match os::getenv("RUST_TEST_STRESS") {
|
|
|
|
Some(val) => from_str::<uint>(val).unwrap(),
|
|
|
|
None => 1,
|
|
|
|
}
|
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
test!(fn smoke() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
tx.send(1);
|
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
})
|
|
|
|
|
|
|
|
test!(fn drop_full() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, _rx) = channel();
|
|
|
|
tx.send(~1);
|
2013-12-05 17:56:17 -08:00
|
|
|
})
|
|
|
|
|
|
|
|
test!(fn drop_full_shared() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, _rx) = channel();
|
|
|
|
drop(tx.clone());
|
|
|
|
drop(tx.clone());
|
|
|
|
tx.send(~1);
|
2013-12-05 17:56:17 -08:00
|
|
|
})
|
|
|
|
|
|
|
|
test!(fn smoke_shared() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
tx.send(1);
|
|
|
|
assert_eq!(rx.recv(), 1);
|
|
|
|
let tx = tx.clone();
|
|
|
|
tx.send(1);
|
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
})
|
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_threads() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(1);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_port_gone() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
drop(rx);
|
|
|
|
tx.send(1);
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_shared_port_gone() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
drop(rx);
|
|
|
|
tx.send(1);
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_shared_port_gone2() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
drop(rx);
|
|
|
|
let tx2 = tx.clone();
|
|
|
|
drop(tx);
|
|
|
|
tx2.send(1);
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn port_gone_concurrent() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv();
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
loop { tx.send(1) }
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn port_gone_concurrent_shared() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
let tx2 = tx.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv();
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
loop {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(1);
|
|
|
|
tx2.send(1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_chan_gone() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
drop(tx);
|
|
|
|
rx.recv();
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn smoke_chan_gone_shared() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<()>();
|
|
|
|
let tx2 = tx.clone();
|
|
|
|
drop(tx);
|
|
|
|
drop(tx2);
|
|
|
|
rx.recv();
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn chan_gone_concurrent() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(1);
|
|
|
|
tx.send(1);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
loop { rx.recv(); }
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn stress() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
for _ in range(0, 10000) { tx.send(1); }
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, 10000) {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn stress_shared() {
|
2013-12-05 17:56:17 -08:00
|
|
|
static AMT: uint = 10000;
|
|
|
|
static NTHREADS: uint = 8;
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
let (dtx, drx) = channel::<()>();
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, AMT * NTHREADS) {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
match rx.try_recv() {
|
2013-12-21 22:15:04 -08:00
|
|
|
Data(..) => fail!(),
|
|
|
|
_ => {}
|
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
dtx.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
for _ in range(0, NTHREADS) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let tx = tx.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
for _ in range(0, AMT) { tx.send(1); }
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(tx);
|
|
|
|
drx.recv();
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn send_from_outside_runtime() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx1, rx1) = channel::<()>();
|
|
|
|
let (tx2, rx2) = channel::<int>();
|
|
|
|
let (tx3, rx3) = channel::<()>();
|
|
|
|
let tx4 = tx3.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx1.send(());
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, 40) {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx2.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
tx3.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
rx1.recv();
|
2014-01-26 22:42:26 -05:00
|
|
|
native::task::spawn(proc() {
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, 40) {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx2.send(1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
tx4.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
rx3.recv();
|
|
|
|
rx3.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn recv_from_outside_runtime() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
let (dtx, drx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
native::task::spawn(proc() {
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, 40) {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx.recv(), 1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
dtx.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
for _ in range(0, 40) {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(1);
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
drx.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn no_runtime() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx1, rx1) = channel::<int>();
|
|
|
|
let (tx2, rx2) = channel::<int>();
|
|
|
|
let (tx3, rx3) = channel::<()>();
|
|
|
|
let tx4 = tx3.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
native::task::spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx1.recv(), 1);
|
|
|
|
tx2.send(2);
|
|
|
|
tx4.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
|
|
|
native::task::spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx1.send(1);
|
|
|
|
assert_eq!(rx2.recv(), 2);
|
|
|
|
tx3.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
rx3.recv();
|
|
|
|
rx3.recv();
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_close_port_first() {
|
2013-12-05 17:56:17 -08:00
|
|
|
// Simple test of closing without sending
|
2014-03-09 14:58:32 -07:00
|
|
|
let (_tx, rx) = channel::<int>();
|
|
|
|
drop(rx);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_close_chan_first() {
|
2013-12-05 17:56:17 -08:00
|
|
|
// Simple test of closing without sending
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, _rx) = channel::<int>();
|
|
|
|
drop(tx);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_send_port_close() {
|
2013-12-05 17:56:17 -08:00
|
|
|
// Testing that the sender cleans up the payload if receiver is closed
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<~int>();
|
|
|
|
drop(rx);
|
|
|
|
tx.send(~0);
|
2013-12-12 21:38:57 -08:00
|
|
|
} #[should_fail])
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_recv_chan_close() {
|
2013-12-05 17:56:17 -08:00
|
|
|
// Receiving on a closed chan will fail
|
2014-01-26 22:42:26 -05:00
|
|
|
let res = task::try(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
drop(tx);
|
|
|
|
rx.recv();
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-12 21:38:57 -08:00
|
|
|
// What is our res?
|
|
|
|
assert!(res.is_err());
|
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_send_then_recv() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<~int>();
|
|
|
|
tx.send(~10);
|
|
|
|
assert!(rx.recv() == ~10);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_try_send_open() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
assert!(tx.try_send(10));
|
|
|
|
assert!(rx.recv() == 10);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_try_send_closed() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
drop(rx);
|
|
|
|
assert!(!tx.try_send(10));
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_try_recv_open() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
tx.send(10);
|
|
|
|
assert!(rx.recv_opt() == Some(10));
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_try_recv_closed() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
drop(tx);
|
|
|
|
assert!(rx.recv_opt() == None);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_peek_data() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
assert_eq!(rx.try_recv(), Empty)
|
|
|
|
tx.send(10);
|
|
|
|
assert_eq!(rx.try_recv(), Data(10));
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_peek_close() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!(rx.try_recv(), Disconnected);
|
|
|
|
assert_eq!(rx.try_recv(), Disconnected);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_single_thread_peek_open() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (_tx, rx) = channel::<int>();
|
|
|
|
assert_eq!(rx.try_recv(), Empty);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_multi_task_recv_then_send() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<~int>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert!(rx.recv() == ~10);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(~10);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
|
|
|
|
|
|
|
test!(fn oneshot_multi_task_recv_then_close() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<~int>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(tx);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
|
|
|
let res = task::try(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert!(rx.recv() == ~10);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-12 21:38:57 -08:00
|
|
|
assert!(res.is_err());
|
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_multi_thread_close_stress() {
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, stress_factor()) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(rx);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(tx);
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_multi_thread_send_close_stress() {
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, stress_factor()) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(rx);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-01-30 14:10:53 -08:00
|
|
|
let _ = task::try(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(1);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_multi_thread_recv_close_stress() {
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, stress_factor()) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
|
|
|
let res = task::try(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv();
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
assert!(res.is_err());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
|
|
|
spawn(proc() {
|
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
drop(tx);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
|
|
|
});
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn oneshot_multi_thread_send_recv_stress() {
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, stress_factor()) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(~10);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert!(rx.recv() == ~10);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn stream_send_recv_stress() {
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, stress_factor()) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
send(tx, 0);
|
|
|
|
recv(rx, 0);
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
fn send(tx: Sender<~int>, i: int) {
|
2013-12-05 17:56:17 -08:00
|
|
|
if i == 10 { return }
|
|
|
|
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(~i);
|
|
|
|
send(tx, i + 1);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
fn recv(rx: Receiver<~int>, i: int) {
|
2013-12-05 17:56:17 -08:00
|
|
|
if i == 10 { return }
|
|
|
|
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
assert!(rx.recv() == ~i);
|
|
|
|
recv(rx, i + 1);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn recv_a_lot() {
|
2013-12-05 17:56:17 -08:00
|
|
|
// Regression test that we don't run out of stack in scheduler context
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
for _ in range(0, 10000) { tx.send(()); }
|
|
|
|
for _ in range(0, 10000) { rx.recv(); }
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn shared_chan_stress() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2013-12-12 21:38:57 -08:00
|
|
|
let total = stress_factor() + 100;
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, total) {
|
2014-03-09 14:58:32 -07:00
|
|
|
let tx = tx.clone();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-01-30 11:20:34 +11:00
|
|
|
for _ in range(0, total) {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv();
|
2014-01-30 11:20:34 +11:00
|
|
|
}
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn test_nested_recv_iter() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
let (total_tx, total_rx) = channel::<int>();
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2013-12-05 17:56:17 -08:00
|
|
|
let mut acc = 0;
|
2014-03-09 14:58:32 -07:00
|
|
|
for x in rx.iter() {
|
2013-12-05 17:56:17 -08:00
|
|
|
acc += x;
|
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
total_tx.send(acc);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(3);
|
|
|
|
tx.send(1);
|
|
|
|
tx.send(2);
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!(total_rx.recv(), 6);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2013-12-12 21:38:57 -08:00
|
|
|
test!(fn test_recv_iter_break() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel::<int>();
|
|
|
|
let (count_tx, count_rx) = channel();
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2013-12-05 17:56:17 -08:00
|
|
|
let mut count = 0;
|
2014-03-09 14:58:32 -07:00
|
|
|
for x in rx.iter() {
|
2013-12-05 17:56:17 -08:00
|
|
|
if count >= 3 {
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
count += x;
|
|
|
|
}
|
|
|
|
}
|
2014-03-09 14:58:32 -07:00
|
|
|
count_tx.send(count);
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-05 17:56:17 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(2);
|
|
|
|
tx.send(2);
|
|
|
|
tx.send(2);
|
|
|
|
tx.try_send(2);
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!(count_rx.recv(), 4);
|
2013-12-12 21:38:57 -08:00
|
|
|
})
|
2013-12-21 22:15:04 -08:00
|
|
|
|
|
|
|
test!(fn try_recv_states() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx1, rx1) = channel::<int>();
|
|
|
|
let (tx2, rx2) = channel::<()>();
|
|
|
|
let (tx3, rx3) = channel::<()>();
|
2014-01-26 22:42:26 -05:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx2.recv();
|
|
|
|
tx1.send(1);
|
|
|
|
tx3.send(());
|
|
|
|
rx2.recv();
|
|
|
|
drop(tx1);
|
|
|
|
tx3.send(());
|
2014-01-26 22:42:26 -05:00
|
|
|
});
|
2013-12-21 22:15:04 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
assert_eq!(rx1.try_recv(), Empty);
|
|
|
|
tx2.send(());
|
|
|
|
rx3.recv();
|
|
|
|
assert_eq!(rx1.try_recv(), Data(1));
|
|
|
|
assert_eq!(rx1.try_recv(), Empty);
|
|
|
|
tx2.send(());
|
|
|
|
rx3.recv();
|
|
|
|
assert_eq!(rx1.try_recv(), Disconnected);
|
2013-12-21 22:15:04 -08:00
|
|
|
})
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
// This bug used to end up in a livelock inside of the Receiver destructor
|
|
|
|
// because the internal state of the Shared packet was corrupted
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
let (tx2, rx2) = channel();
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
spawn(proc() {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv(); // wait on a oneshot
|
|
|
|
drop(rx); // destroy a shared
|
|
|
|
tx2.send(());
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
});
|
|
|
|
// make sure the other task has gone to sleep
|
|
|
|
for _ in range(0, 5000) { task::deschedule(); }
|
|
|
|
|
|
|
|
// upgrade to a shared chan and send a message
|
2014-03-09 14:58:32 -07:00
|
|
|
let t = tx.clone();
|
|
|
|
drop(tx);
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
t.send(());
|
|
|
|
|
|
|
|
// wait for the child task to exit before we exit
|
2014-03-09 14:58:32 -07:00
|
|
|
rx2.recv();
|
Rewrite channels yet again for upgradeability
This, the Nth rewrite of channels, is not a rewrite of the core logic behind
channels, but rather their API usage. In the past, we had the distinction
between oneshot, stream, and shared channels, but the most recent rewrite
dropped oneshots in favor of streams and shared channels.
This distinction of stream vs shared has shown that it's not quite what we'd
like either, and this moves the `std::comm` module in the direction of "one
channel to rule them all". There now remains only one Chan and one Port.
This new channel is actually a hybrid oneshot/stream/shared channel under the
hood in order to optimize for the use cases in question. Additionally, this also
reduces the cognitive burden of having to choose between a Chan or a SharedChan
in an API.
My simple benchmarks show no reduction in efficiency over the existing channels
today, and a 3x improvement in the oneshot case. I sadly don't have a
pre-last-rewrite compiler to test out the old old oneshots, but I would imagine
that the performance is comparable, but slightly slower (due to atomic reference
counting).
This commit also brings the bonus bugfix to channels that the pending queue of
messages are all dropped when a Port disappears rather then when both the Port
and the Chan disappear.
2014-01-08 18:31:48 -08:00
|
|
|
})
|
2014-02-19 08:26:11 -08:00
|
|
|
|
|
|
|
test!(fn sends_off_the_runtime() {
|
|
|
|
use rt::thread::Thread;
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
2014-02-19 08:26:11 -08:00
|
|
|
let t = Thread::start(proc() {
|
|
|
|
for _ in range(0, 1000) {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(());
|
2014-02-19 08:26:11 -08:00
|
|
|
}
|
|
|
|
});
|
|
|
|
for _ in range(0, 1000) {
|
2014-03-09 14:58:32 -07:00
|
|
|
rx.recv();
|
2014-02-19 08:26:11 -08:00
|
|
|
}
|
|
|
|
t.join();
|
|
|
|
})
|
|
|
|
|
|
|
|
test!(fn try_recvs_off_the_runtime() {
|
|
|
|
use rt::thread::Thread;
|
|
|
|
|
2014-03-09 14:58:32 -07:00
|
|
|
let (tx, rx) = channel();
|
|
|
|
let (cdone, pdone) = channel();
|
2014-02-19 08:26:11 -08:00
|
|
|
let t = Thread::start(proc() {
|
|
|
|
let mut hits = 0;
|
|
|
|
while hits < 10 {
|
2014-03-09 14:58:32 -07:00
|
|
|
match rx.try_recv() {
|
2014-02-19 08:26:11 -08:00
|
|
|
Data(()) => { hits += 1; }
|
|
|
|
Empty => { Thread::yield_now(); }
|
|
|
|
Disconnected => return,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cdone.send(());
|
|
|
|
});
|
|
|
|
for _ in range(0, 10) {
|
2014-03-09 14:58:32 -07:00
|
|
|
tx.send(());
|
2014-02-19 08:26:11 -08:00
|
|
|
}
|
|
|
|
t.join();
|
|
|
|
pdone.recv();
|
|
|
|
})
|
2013-12-05 17:56:17 -08:00
|
|
|
}
|