1
Fork 0

rollup merge of #20315: alexcrichton/std-sync

Conflicts:
	src/libstd/rt/exclusive.rs
	src/libstd/sync/barrier.rs
	src/libstd/sys/unix/pipe.rs
	src/test/bench/shootout-binarytrees.rs
	src/test/bench/shootout-fannkuch-redux.rs
This commit is contained in:
Alex Crichton 2015-01-02 09:19:00 -08:00
commit 009ec5d2b0
45 changed files with 167 additions and 792 deletions

View file

@ -206,6 +206,7 @@ getting the result later.
The basic example below illustrates this. The basic example below illustrates this.
```{rust,ignore} ```{rust,ignore}
# #![allow(deprecated)]
use std::sync::Future; use std::sync::Future;
# fn main() { # fn main() {
@ -233,6 +234,7 @@ Here is another example showing how futures allow you to background
computations. The workload will be distributed on the available cores. computations. The workload will be distributed on the available cores.
```{rust,ignore} ```{rust,ignore}
# #![allow(deprecated)]
# use std::num::Float; # use std::num::Float;
# use std::sync::Future; # use std::sync::Future;
fn partial_sum(start: uint) -> f64 { fn partial_sum(start: uint) -> f64 {

View file

@ -5339,6 +5339,7 @@ example, if you wish to compute some value in the background, `Future` is
a useful thing to use: a useful thing to use:
```{rust} ```{rust}
# #![allow(deprecated)]
use std::sync::Future; use std::sync::Future;
let mut delayed_value = Future::spawn(move || { let mut delayed_value = Future::spawn(move || {

View file

@ -1480,9 +1480,9 @@ data are being stored, or single-address and mutability properties are required.
``` ```
use std::sync::atomic; use std::sync::atomic;
// Note that INIT_ATOMIC_UINT is a *const*, but it may be used to initialize a // Note that ATOMIC_UINT_INIT is a *const*, but it may be used to initialize a
// static. This static can be modified, so it is not placed in read-only memory. // static. This static can be modified, so it is not placed in read-only memory.
static COUNTER: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static COUNTER: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
// This table is a candidate to be placed in read-only memory. // This table is a candidate to be placed in read-only memory.
static TABLE: &'static [uint] = &[1, 2, 3, /* ... */]; static TABLE: &'static [uint] = &[1, 2, 3, /* ... */];

View file

@ -2265,7 +2265,7 @@ mod tests {
} }
} }
const NUM_ELEMENTS: uint = 2; const NUM_ELEMENTS: uint = 2;
static DROP_COUNTER: AtomicUint = atomic::INIT_ATOMIC_UINT; static DROP_COUNTER: AtomicUint = atomic::ATOMIC_UINT_INIT;
let v = Vec::from_elem(NUM_ELEMENTS, Nothing); let v = Vec::from_elem(NUM_ELEMENTS, Nothing);

View file

@ -89,17 +89,27 @@ pub enum Ordering {
/// An `AtomicBool` initialized to `false`. /// An `AtomicBool` initialized to `false`.
#[unstable = "may be renamed, pending conventions for static initalizers"] #[unstable = "may be renamed, pending conventions for static initalizers"]
pub const INIT_ATOMIC_BOOL: AtomicBool = pub const ATOMIC_BOOL_INIT: AtomicBool =
AtomicBool { v: UnsafeCell { value: 0 } }; AtomicBool { v: UnsafeCell { value: 0 } };
/// An `AtomicInt` initialized to `0`. /// An `AtomicInt` initialized to `0`.
#[unstable = "may be renamed, pending conventions for static initalizers"] #[unstable = "may be renamed, pending conventions for static initalizers"]
pub const INIT_ATOMIC_INT: AtomicInt = pub const ATOMIC_INT_INIT: AtomicInt =
AtomicInt { v: UnsafeCell { value: 0 } }; AtomicInt { v: UnsafeCell { value: 0 } };
/// An `AtomicUint` initialized to `0`. /// An `AtomicUint` initialized to `0`.
#[unstable = "may be renamed, pending conventions for static initalizers"] #[unstable = "may be renamed, pending conventions for static initalizers"]
pub const INIT_ATOMIC_UINT: AtomicUint = pub const ATOMIC_UINT_INIT: AtomicUint =
AtomicUint { v: UnsafeCell { value: 0, } }; AtomicUint { v: UnsafeCell { value: 0, } };
/// Deprecated
#[deprecated = "renamed to ATOMIC_BOOL_INIT"]
pub const INIT_ATOMIC_BOOL: AtomicBool = ATOMIC_BOOL_INIT;
/// Deprecated
#[deprecated = "renamed to ATOMIC_INT_INIT"]
pub const INIT_ATOMIC_INT: AtomicInt = ATOMIC_INT_INIT;
/// Deprecated
#[deprecated = "renamed to ATOMIC_UINT_INIT"]
pub const INIT_ATOMIC_UINT: AtomicUint = ATOMIC_UINT_INIT;
// NB: Needs to be -1 (0b11111111...) to make fetch_nand work correctly // NB: Needs to be -1 (0b11111111...) to make fetch_nand work correctly
const UINT_TRUE: uint = -1; const UINT_TRUE: uint = -1;

View file

@ -155,6 +155,8 @@
// FIXME: Can't be shared between threads. Dynamic borrows // FIXME: Can't be shared between threads. Dynamic borrows
// FIXME: Relationship to Atomic types and RWLock // FIXME: Relationship to Atomic types and RWLock
#![stable]
use clone::Clone; use clone::Clone;
use cmp::PartialEq; use cmp::PartialEq;
use default::Default; use default::Default;

View file

@ -70,9 +70,9 @@ fn int_xor() {
assert_eq!(x.load(SeqCst), 0xf731 ^ 0x137f); assert_eq!(x.load(SeqCst), 0xf731 ^ 0x137f);
} }
static S_BOOL : AtomicBool = INIT_ATOMIC_BOOL; static S_BOOL : AtomicBool = ATOMIC_BOOL_INIT;
static S_INT : AtomicInt = INIT_ATOMIC_INT; static S_INT : AtomicInt = ATOMIC_INT_INIT;
static S_UINT : AtomicUint = INIT_ATOMIC_UINT; static S_UINT : AtomicUint = ATOMIC_UINT_INIT;
#[test] #[test]
fn static_init() { fn static_init() {

View file

@ -352,7 +352,7 @@ pub struct LogLocation {
#[doc(hidden)] #[doc(hidden)]
pub fn mod_enabled(level: u32, module: &str) -> bool { pub fn mod_enabled(level: u32, module: &str) -> bool {
static INIT: Once = ONCE_INIT; static INIT: Once = ONCE_INIT;
INIT.doit(init); INIT.call_once(init);
// It's possible for many threads are in this function, only one of them // It's possible for many threads are in this function, only one of them
// will perform the global initialization, but all of them will need to check // will perform the global initialization, but all of them will need to check

View file

@ -73,7 +73,7 @@ pub fn maybe_print_constraints_for<'a, 'tcx>(region_vars: &RegionVarBindings<'a,
let output_path = { let output_path = {
let output_template = match requested_output { let output_template = match requested_output {
Some(ref s) if s.as_slice() == "help" => { Some(ref s) if s.as_slice() == "help" => {
static PRINTED_YET : atomic::AtomicBool = atomic::INIT_ATOMIC_BOOL; static PRINTED_YET : atomic::AtomicBool = atomic::ATOMIC_BOOL_INIT;
if !PRINTED_YET.load(atomic::SeqCst) { if !PRINTED_YET.load(atomic::SeqCst) {
print_help_message(); print_help_message();
PRINTED_YET.store(true, atomic::SeqCst); PRINTED_YET.store(true, atomic::SeqCst);

View file

@ -1011,7 +1011,7 @@ unsafe fn configure_llvm(sess: &Session) {
} }
} }
INIT.doit(|| { INIT.call_once(|| {
llvm::LLVMInitializePasses(); llvm::LLVMInitializePasses();
// Only initialize the platforms supported by Rust here, because // Only initialize the platforms supported by Rust here, because

View file

@ -3097,7 +3097,7 @@ pub fn trans_crate<'tcx>(analysis: ty::CrateAnalysis<'tcx>)
use std::sync::{Once, ONCE_INIT}; use std::sync::{Once, ONCE_INIT};
static INIT: Once = ONCE_INIT; static INIT: Once = ONCE_INIT;
static mut POISONED: bool = false; static mut POISONED: bool = false;
INIT.doit(|| { INIT.call_once(|| {
if llvm::LLVMStartMultithreaded() != 1 { if llvm::LLVMStartMultithreaded() != 1 {
// use an extra bool to make sure that all future usage of LLVM // use an extra bool to make sure that all future usage of LLVM
// cannot proceed despite the Once not running more than once. // cannot proceed despite the Once not running more than once.

View file

@ -22,7 +22,6 @@ use result::Result::{Ok, Err};
use slice::{SliceExt}; use slice::{SliceExt};
use slice; use slice;
use vec::Vec; use vec::Vec;
use kinds::{Send,Sync};
/// Wraps a Reader and buffers input from it /// Wraps a Reader and buffers input from it
/// ///
@ -52,11 +51,6 @@ pub struct BufferedReader<R> {
cap: uint, cap: uint,
} }
unsafe impl<R: Send> Send for BufferedReader<R> {}
unsafe impl<R: Send+Sync> Sync for BufferedReader<R> {}
impl<R: Reader> BufferedReader<R> { impl<R: Reader> BufferedReader<R> {
/// Creates a new `BufferedReader` with the specified buffer capacity /// Creates a new `BufferedReader` with the specified buffer capacity
pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> { pub fn with_capacity(cap: uint, inner: R) -> BufferedReader<R> {

View file

@ -218,7 +218,7 @@ pub fn stdin() -> StdinReader {
static ONCE: Once = ONCE_INIT; static ONCE: Once = ONCE_INIT;
unsafe { unsafe {
ONCE.doit(|| { ONCE.call_once(|| {
// The default buffer capacity is 64k, but apparently windows doesn't like // The default buffer capacity is 64k, but apparently windows doesn't like
// 64k reads on stdin. See #13304 for details, but the idea is that on // 64k reads on stdin. See #13304 for details, but the idea is that on
// windows we use a slightly smaller buffer that's been seen to be // windows we use a slightly smaller buffer that's been seen to be

View file

@ -90,7 +90,7 @@ impl TempDir {
return TempDir::new_in(&abs_tmpdir, suffix); return TempDir::new_in(&abs_tmpdir, suffix);
} }
static CNT: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static CNT: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
let mut attempts = 0u; let mut attempts = 0u;
loop { loop {

View file

@ -17,17 +17,17 @@ use prelude::v1::*;
use libc; use libc;
use os; use os;
use std::io::net::ip::*; use std::io::net::ip::*;
use sync::atomic::{AtomicUint, INIT_ATOMIC_UINT, Relaxed}; use sync::atomic::{AtomicUint, ATOMIC_UINT_INIT, Relaxed};
/// Get a port number, starting at 9600, for use in tests /// Get a port number, starting at 9600, for use in tests
pub fn next_test_port() -> u16 { pub fn next_test_port() -> u16 {
static NEXT_OFFSET: AtomicUint = INIT_ATOMIC_UINT; static NEXT_OFFSET: AtomicUint = ATOMIC_UINT_INIT;
base_port() + NEXT_OFFSET.fetch_add(1, Relaxed) as u16 base_port() + NEXT_OFFSET.fetch_add(1, Relaxed) as u16
} }
/// Get a temporary path which could be the location of a unix socket /// Get a temporary path which could be the location of a unix socket
pub fn next_test_unix() -> Path { pub fn next_test_unix() -> Path {
static COUNT: AtomicUint = INIT_ATOMIC_UINT; static COUNT: AtomicUint = ATOMIC_UINT_INIT;
// base port and pid are an attempt to be unique between multiple // base port and pid are an attempt to be unique between multiple
// test-runners of different configurations running on one // test-runners of different configurations running on one
// buildbot, the count is to be unique within this executable. // buildbot, the count is to be unique within this executable.

View file

@ -55,7 +55,7 @@ use slice::{AsSlice, SliceExt};
use slice::CloneSliceExt; use slice::CloneSliceExt;
use str::{Str, StrExt}; use str::{Str, StrExt};
use string::{String, ToString}; use string::{String, ToString};
use sync::atomic::{AtomicInt, INIT_ATOMIC_INT, SeqCst}; use sync::atomic::{AtomicInt, ATOMIC_INT_INIT, SeqCst};
use vec::Vec; use vec::Vec;
#[cfg(unix)] use c_str::ToCStr; #[cfg(unix)] use c_str::ToCStr;
@ -596,7 +596,7 @@ pub fn last_os_error() -> String {
error_string(errno() as uint) error_string(errno() as uint)
} }
static EXIT_STATUS: AtomicInt = INIT_ATOMIC_INT; static EXIT_STATUS: AtomicInt = ATOMIC_INT_INIT;
/// Sets the process exit code /// Sets the process exit code
/// ///

View file

@ -84,10 +84,10 @@ mod imp {
#[cfg(all(target_os = "linux", #[cfg(all(target_os = "linux",
any(target_arch = "x86_64", target_arch = "x86", target_arch = "arm")))] any(target_arch = "x86_64", target_arch = "x86", target_arch = "arm")))]
fn is_getrandom_available() -> bool { fn is_getrandom_available() -> bool {
use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Relaxed}; use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Relaxed};
static GETRANDOM_CHECKED: AtomicBool = INIT_ATOMIC_BOOL; static GETRANDOM_CHECKED: AtomicBool = ATOMIC_BOOL_INIT;
static GETRANDOM_AVAILABLE: AtomicBool = INIT_ATOMIC_BOOL; static GETRANDOM_AVAILABLE: AtomicBool = ATOMIC_BOOL_INIT;
if !GETRANDOM_CHECKED.load(Relaxed) { if !GETRANDOM_CHECKED.load(Relaxed) {
let mut buf: [u8; 0] = []; let mut buf: [u8; 0] = [];

View file

@ -22,7 +22,7 @@ pub use sys::backtrace::write;
// For now logging is turned off by default, and this function checks to see // For now logging is turned off by default, and this function checks to see
// whether the magical environment variable is present to see if it's turned on. // whether the magical environment variable is present to see if it's turned on.
pub fn log_enabled() -> bool { pub fn log_enabled() -> bool {
static ENABLED: atomic::AtomicInt = atomic::INIT_ATOMIC_INT; static ENABLED: atomic::AtomicInt = atomic::ATOMIC_INT_INIT;
match ENABLED.load(atomic::SeqCst) { match ENABLED.load(atomic::SeqCst) {
1 => return false, 1 => return false,
2 => return true, 2 => return true,

View file

@ -1,119 +0,0 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use core::prelude::*;
use cell::UnsafeCell;
use rt::mutex;
/// An OS mutex over some data.
///
/// This is not a safe primitive to use, it is unaware of the libgreen
/// scheduler, as well as being easily susceptible to misuse due to the usage of
/// the inner NativeMutex.
///
/// > **Note**: This type is not recommended for general use. The mutex provided
/// > as part of `libsync` should almost always be favored.
pub struct Exclusive<T> {
lock: mutex::NativeMutex,
data: UnsafeCell<T>,
}
unsafe impl<T:Send> Send for Exclusive<T> { }
unsafe impl<T:Send> Sync for Exclusive<T> { }
/// An RAII guard returned via `lock`
pub struct ExclusiveGuard<'a, T:'a> {
// FIXME #12808: strange name to try to avoid interfering with
// field accesses of the contained type via Deref
_data: &'a mut T,
_guard: mutex::LockGuard<'a>,
}
impl<T: Send> Exclusive<T> {
/// Creates a new `Exclusive` which will protect the data provided.
pub fn new(user_data: T) -> Exclusive<T> {
Exclusive {
lock: unsafe { mutex::NativeMutex::new() },
data: UnsafeCell::new(user_data),
}
}
/// Acquires this lock, returning a guard which the data is accessed through
/// and from which that lock will be unlocked.
///
/// This method is unsafe due to many of the same reasons that the
/// NativeMutex itself is unsafe.
pub unsafe fn lock<'a>(&'a self) -> ExclusiveGuard<'a, T> {
let guard = self.lock.lock();
let data = &mut *self.data.get();
ExclusiveGuard {
_data: data,
_guard: guard,
}
}
}
impl<'a, T: Send> ExclusiveGuard<'a, T> {
// The unsafety here should be ok because our loan guarantees that the lock
// itself is not moving
pub fn signal(&self) {
unsafe { self._guard.signal() }
}
pub fn wait(&self) {
unsafe { self._guard.wait() }
}
}
impl<'a, T: Send> Deref<T> for ExclusiveGuard<'a, T> {
fn deref(&self) -> &T { &*self._data }
}
impl<'a, T: Send> DerefMut<T> for ExclusiveGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T { &mut *self._data }
}
#[cfg(test)]
mod tests {
use prelude::v1::*;
use sync::Arc;
use super::Exclusive;
use task;
#[test]
fn exclusive_new_arc() {
unsafe {
let mut futures = Vec::new();
let num_tasks = 10;
let count = 10;
let total = Arc::new(Exclusive::new(box 0));
for _ in range(0u, num_tasks) {
let total = total.clone();
let (tx, rx) = channel();
futures.push(rx);
task::spawn(move || {
for _ in range(0u, count) {
**total.lock() += 1;
}
tx.send(());
});
};
for f in futures.iter_mut() { f.recv() }
assert_eq!(**total.lock(), num_tasks * count);
}
}
}

View file

@ -1,554 +0,0 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Language-level runtime services that should reasonably expected
//! to be available 'everywhere'. Unwinding, local storage, and logging.
//! Even a 'freestanding' Rust would likely want to implement this.
pub use self::BlockedTask::*;
use self::TaskState::*;
use any::Any;
use boxed::Box;
use sync::Arc;
use sync::atomic::{AtomicUint, SeqCst};
use iter::{IteratorExt, Take};
use kinds::marker;
use mem;
use ops::FnMut;
use core::prelude::{Clone, Drop, Err, Iterator, None, Ok, Option, Send, Some};
use core::prelude::{drop};
use str::SendStr;
use thunk::Thunk;
use rt;
use rt::mutex::NativeMutex;
use rt::local::Local;
use rt::thread::{mod, Thread};
use sys_common::stack;
use rt::unwind;
use rt::unwind::Unwinder;
/// State associated with Rust threads
///
/// This structure is currently undergoing major changes, and is
/// likely to be move/be merged with a `Thread` structure.
pub struct Task {
pub unwinder: Unwinder,
pub death: Death,
pub name: Option<SendStr>,
state: TaskState,
lock: NativeMutex, // native synchronization
awoken: bool, // used to prevent spurious wakeups
// This field holds the known bounds of the stack in (lo, hi) form. Not all
// native threads necessarily know their precise bounds, hence this is
// optional.
stack_bounds: (uint, uint),
stack_guard: uint
}
// Once a thread has entered the `Armed` state it must be destroyed via `drop`,
// and no other method. This state is used to track this transition.
#[deriving(PartialEq)]
enum TaskState {
New,
Armed,
Destroyed,
}
pub struct TaskOpts {
/// Invoke this procedure with the result of the thread when it finishes.
pub on_exit: Option<Thunk<Result>>,
/// A name for the thread-to-be, for identification in panic messages
pub name: Option<SendStr>,
/// The size of the stack for the spawned thread
pub stack_size: Option<uint>,
}
/// Indicates the manner in which a thread exited.
///
/// A thread that completes without panicking is considered to exit successfully.
///
/// If you wish for this result's delivery to block until all
/// children threads complete, recommend using a result future.
pub type Result = ::core::result::Result<(), Box<Any + Send>>;
/// A handle to a blocked thread. Usually this means having the Box<Task>
/// pointer by ownership, but if the thread is killable, a killer can steal it
/// at any time.
pub enum BlockedTask {
Owned(Box<Task>),
Shared(Arc<AtomicUint>),
}
/// Per-thread state related to thread death, killing, panic, etc.
pub struct Death {
pub on_exit: Option<Thunk<Result>>,
}
pub struct BlockedTasks {
inner: Arc<AtomicUint>,
}
impl Task {
/// Creates a new uninitialized thread.
pub fn new(stack_bounds: Option<(uint, uint)>, stack_guard: Option<uint>) -> Task {
Task {
unwinder: Unwinder::new(),
death: Death::new(),
state: New,
name: None,
lock: unsafe { NativeMutex::new() },
awoken: false,
// these *should* get overwritten
stack_bounds: stack_bounds.unwrap_or((0, 0)),
stack_guard: stack_guard.unwrap_or(0)
}
}
pub fn spawn<F>(opts: TaskOpts, f: F)
where F : FnOnce(), F : Send
{
Task::spawn_thunk(opts, Thunk::new(f))
}
fn spawn_thunk(opts: TaskOpts, f: Thunk) {
let TaskOpts { name, stack_size, on_exit } = opts;
let mut task = box Task::new(None, None);
task.name = name;
task.death.on_exit = on_exit;
let stack = stack_size.unwrap_or(rt::min_stack());
// Spawning a new OS thread guarantees that __morestack will never get
// triggered, but we must manually set up the actual stack bounds once
// this function starts executing. This raises the lower limit by a bit
// because by the time that this function is executing we've already
// consumed at least a little bit of stack (we don't know the exact byte
// address at which our stack started).
Thread::spawn_stack(stack, move|| {
let something_around_the_top_of_the_stack = 1;
let addr = &something_around_the_top_of_the_stack as *const int;
let my_stack = addr as uint;
unsafe {
stack::record_os_managed_stack_bounds(my_stack - stack + 1024,
my_stack);
}
task.stack_guard = thread::current_guard_page();
task.stack_bounds = (my_stack - stack + 1024, my_stack);
let mut f = Some(f);
drop(task.run(|| { f.take().unwrap().invoke(()) }).destroy());
})
}
/// Consumes ownership of a thread, runs some code, and returns the thread back.
///
/// This function can be used as an emulated "try/catch" to interoperate
/// with the rust runtime at the outermost boundary. It is not possible to
/// use this function in a nested fashion (a try/catch inside of another
/// try/catch). Invoking this function is quite cheap.
///
/// If the closure `f` succeeds, then the returned thread can be used again
/// for another invocation of `run`. If the closure `f` panics then `self`
/// will be internally destroyed along with all of the other associated
/// resources of this thread. The `on_exit` callback is invoked with the
/// cause of panic (not returned here). This can be discovered by querying
/// `is_destroyed()`.
///
/// Note that it is possible to view partial execution of the closure `f`
/// because it is not guaranteed to run to completion, but this function is
/// guaranteed to return if it panicks. Care should be taken to ensure that
/// stack references made by `f` are handled appropriately.
///
/// It is invalid to call this function with a thread that has been previously
/// destroyed via a failed call to `run`.
pub fn run<F>(mut self: Box<Task>, f: F) -> Box<Task> where F: FnOnce() {
assert!(!self.is_destroyed(), "cannot re-use a destroyed thread");
// First, make sure that no one else is in TLS. This does not allow
// recursive invocations of run(). If there's no one else, then
// relinquish ownership of ourselves back into TLS.
if Local::exists(None::<Task>) {
panic!("cannot run a thread recursively inside another");
}
self.state = Armed;
Local::put(self);
// There are two primary reasons that general try/catch is unsafe. The
// first is that we do not support nested try/catch. The above check for
// an existing thread in TLS is sufficient for this invariant to be
// upheld. The second is that unwinding while unwinding is not defined.
// We take care of that by having an 'unwinding' flag in the thread
// itself. For these reasons, this unsafety should be ok.
let result = unsafe { unwind::try(f) };
// After running the closure given return the thread back out if it ran
// successfully, or clean up the thread if it panicked.
let task: Box<Task> = Local::take();
match result {
Ok(()) => task,
Err(cause) => { task.cleanup(Err(cause)) }
}
}
/// Destroy all associated resources of this thread.
///
/// This function will perform any necessary clean up to prepare the thread
/// for destruction. It is required that this is called before a `Task`
/// falls out of scope.
///
/// The returned thread cannot be used for running any more code, but it may
/// be used to extract the runtime as necessary.
pub fn destroy(self: Box<Task>) -> Box<Task> {
if self.is_destroyed() {
self
} else {
self.cleanup(Ok(()))
}
}
/// Cleans up a thread, processing the result of the thread as appropriate.
///
/// This function consumes ownership of the thread, deallocating it once it's
/// done being processed. It is assumed that TLD and the local heap have
/// already been destroyed and/or annihilated.
fn cleanup(mut self: Box<Task>, result: Result) -> Box<Task> {
// After taking care of the data above, we need to transmit the result
// of this thread.
let what_to_do = self.death.on_exit.take();
Local::put(self);
// FIXME: this is running in a seriously constrained context. If this
// allocates TLD then it will likely abort the runtime. Similarly,
// if this panics, this will also likely abort the runtime.
//
// This closure is currently limited to a channel send via the
// standard library's thread interface, but this needs
// reconsideration to whether it's a reasonable thing to let a
// thread to do or not.
match what_to_do {
Some(f) => { f.invoke(result) }
None => { drop(result) }
}
// Now that we're done, we remove the thread from TLS and flag it for
// destruction.
let mut task: Box<Task> = Local::take();
task.state = Destroyed;
return task;
}
/// Queries whether this can be destroyed or not.
pub fn is_destroyed(&self) -> bool { self.state == Destroyed }
/// Deschedules the current thread, invoking `f` `amt` times. It is not
/// recommended to use this function directly, but rather communication
/// primitives in `std::comm` should be used.
//
// This function gets a little interesting. There are a few safety and
// ownership violations going on here, but this is all done in the name of
// shared state. Additionally, all of the violations are protected with a
// mutex, so in theory there are no races.
//
// The first thing we need to do is to get a pointer to the thread's internal
// mutex. This address will not be changing (because the thread is allocated
// on the heap). We must have this handle separately because the thread will
// have its ownership transferred to the given closure. We're guaranteed,
// however, that this memory will remain valid because *this* is the current
// thread's execution thread.
//
// The next weird part is where ownership of the thread actually goes. We
// relinquish it to the `f` blocking function, but upon returning this
// function needs to replace the thread back in TLS. There is no communication
// from the wakeup thread back to this thread about the thread pointer, and
// there's really no need to. In order to get around this, we cast the thread
// to a `uint` which is then used at the end of this function to cast back
// to a `Box<Task>` object. Naturally, this looks like it violates
// ownership semantics in that there may be two `Box<Task>` objects.
//
// The fun part is that the wakeup half of this implementation knows to
// "forget" the thread on the other end. This means that the awakening half of
// things silently relinquishes ownership back to this thread, but not in a
// way that the compiler can understand. The thread's memory is always valid
// for both threads because these operations are all done inside of a mutex.
//
// You'll also find that if blocking fails (the `f` function hands the
// BlockedTask back to us), we will `mem::forget` the handles. The
// reasoning for this is the same logic as above in that the thread silently
// transfers ownership via the `uint`, not through normal compiler
// semantics.
//
// On a mildly unrelated note, it should also be pointed out that OS
// condition variables are susceptible to spurious wakeups, which we need to
// be ready for. In order to accommodate for this fact, we have an extra
// `awoken` field which indicates whether we were actually woken up via some
// invocation of `reawaken`. This flag is only ever accessed inside the
// lock, so there's no need to make it atomic.
pub fn deschedule<F>(mut self: Box<Task>, times: uint, mut f: F) where
F: FnMut(BlockedTask) -> ::core::result::Result<(), BlockedTask>,
{
unsafe {
let me = &mut *self as *mut Task;
let task = BlockedTask::block(self);
if times == 1 {
let guard = (*me).lock.lock();
(*me).awoken = false;
match f(task) {
Ok(()) => {
while !(*me).awoken {
guard.wait();
}
}
Err(task) => { mem::forget(task.wake()); }
}
} else {
let iter = task.make_selectable(times);
let guard = (*me).lock.lock();
(*me).awoken = false;
// Apply the given closure to all of the "selectable threads",
// bailing on the first one that produces an error. Note that
// care must be taken such that when an error is occurred, we
// may not own the thread, so we may still have to wait for the
// thread to become available. In other words, if thread.wake()
// returns `None`, then someone else has ownership and we must
// wait for their signal.
match iter.map(f).filter_map(|a| a.err()).next() {
None => {}
Some(task) => {
match task.wake() {
Some(task) => {
mem::forget(task);
(*me).awoken = true;
}
None => {}
}
}
}
while !(*me).awoken {
guard.wait();
}
}
// put the thread back in TLS, and everything is as it once was.
Local::put(mem::transmute(me));
}
}
/// Wakes up a previously blocked thread. This function can only be
/// called on threads that were previously blocked in `deschedule`.
//
// See the comments on `deschedule` for why the thread is forgotten here, and
// why it's valid to do so.
pub fn reawaken(mut self: Box<Task>) {
unsafe {
let me = &mut *self as *mut Task;
mem::forget(self);
let guard = (*me).lock.lock();
(*me).awoken = true;
guard.signal();
}
}
/// Yields control of this thread to another thread. This function will
/// eventually return, but possibly not immediately. This is used as an
/// opportunity to allow other threads a chance to run.
pub fn yield_now() {
Thread::yield_now();
}
/// Returns the stack bounds for this thread in (lo, hi) format. The stack
/// bounds may not be known for all threads, so the return value may be
/// `None`.
pub fn stack_bounds(&self) -> (uint, uint) {
self.stack_bounds
}
/// Returns the stack guard for this thread, if known.
pub fn stack_guard(&self) -> Option<uint> {
if self.stack_guard != 0 {
Some(self.stack_guard)
} else {
None
}
}
/// Consume this thread, flagging it as a candidate for destruction.
///
/// This function is required to be invoked to destroy a thread. A thread
/// destroyed through a normal drop will abort.
pub fn drop(mut self) {
self.state = Destroyed;
}
}
impl Drop for Task {
fn drop(&mut self) {
rtdebug!("called drop for a thread: {}", self as *mut Task as uint);
rtassert!(self.state != Armed);
}
}
impl TaskOpts {
pub fn new() -> TaskOpts {
TaskOpts { on_exit: None, name: None, stack_size: None }
}
}
impl Iterator<BlockedTask> for BlockedTasks {
fn next(&mut self) -> Option<BlockedTask> {
Some(Shared(self.inner.clone()))
}
}
impl BlockedTask {
/// Returns Some if the thread was successfully woken; None if already killed.
pub fn wake(self) -> Option<Box<Task>> {
match self {
Owned(task) => Some(task),
Shared(arc) => {
match arc.swap(0, SeqCst) {
0 => None,
n => Some(unsafe { mem::transmute(n) }),
}
}
}
}
/// Reawakens this thread if ownership is acquired. If finer-grained control
/// is desired, use `wake` instead.
pub fn reawaken(self) {
self.wake().map(|t| t.reawaken());
}
// This assertion has two flavours because the wake involves an atomic op.
// In the faster version, destructors will panic dramatically instead.
#[cfg(not(test))] pub fn trash(self) { }
#[cfg(test)] pub fn trash(self) { assert!(self.wake().is_none()); }
/// Create a blocked thread, unless the thread was already killed.
pub fn block(task: Box<Task>) -> BlockedTask {
Owned(task)
}
/// Converts one blocked thread handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> Take<BlockedTasks> {
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
Arc::new(flag)
}
Shared(arc) => arc.clone(),
};
BlockedTasks{ inner: arc }.take(num_handles)
}
/// Convert to an unsafe uint value. Useful for storing in a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
match self {
Owned(task) => {
let blocked_task_ptr: uint = mem::transmute(task);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr
}
Shared(arc) => {
let blocked_task_ptr: uint = mem::transmute(box arc);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr | 0x1
}
}
}
/// Convert from an unsafe uint value. Useful for retrieving a pipe's state
/// flag.
#[inline]
pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Owned(mem::transmute(blocked_task_ptr))
} else {
let ptr: Box<Arc<AtomicUint>> =
mem::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}
}
}
impl Death {
pub fn new() -> Death {
Death { on_exit: None }
}
}
#[cfg(test)]
mod test {
use super::*;
use prelude::v1::*;
use task;
use rt::unwind;
#[test]
fn unwind() {
let result = task::try(move|| ());
rtdebug!("trying first assert");
assert!(result.is_ok());
let result = task::try(move|| -> () panic!());
rtdebug!("trying second assert");
assert!(result.is_err());
}
#[test]
fn rng() {
use rand::{StdRng, Rng};
let mut r = StdRng::new().ok().unwrap();
let _ = r.next_u32();
}
#[test]
fn comm_stream() {
let (tx, rx) = channel();
tx.send(10i);
assert!(rx.recv() == 10);
}
#[test]
fn comm_shared_chan() {
let (tx, rx) = channel();
tx.send(10i);
assert!(rx.recv() == 10);
}
#[test]
#[should_fail]
fn test_begin_unwind() {
use rt::unwind::begin_unwind;
begin_unwind("cause", &(file!(), line!()))
}
#[test]
fn drop_new_task_ok() {
drop(Task::new(None, None));
}
// Thread blocking tests
#[test]
fn block_and_wake() {
let task = box Task::new(None, None);
let task = BlockedTask::block(task).wake().unwrap();
task.drop();
}
}

View file

@ -84,15 +84,15 @@ pub type Callback = fn(msg: &(Any + Send), file: &'static str, line: uint);
// For more information, see below. // For more information, see below.
const MAX_CALLBACKS: uint = 16; const MAX_CALLBACKS: uint = 16;
static CALLBACKS: [atomic::AtomicUint; MAX_CALLBACKS] = static CALLBACKS: [atomic::AtomicUint; MAX_CALLBACKS] =
[atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, [atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT, atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT,
atomic::INIT_ATOMIC_UINT, atomic::INIT_ATOMIC_UINT]; atomic::ATOMIC_UINT_INIT, atomic::ATOMIC_UINT_INIT];
static CALLBACK_CNT: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static CALLBACK_CNT: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
thread_local! { static PANICKING: Cell<bool> = Cell::new(false) } thread_local! { static PANICKING: Cell<bool> = Cell::new(false) }
@ -533,7 +533,7 @@ fn begin_unwind_inner(msg: Box<Any + Send>, file_line: &(&'static str, uint)) ->
// Make sure the default failure handler is registered before we look at the // Make sure the default failure handler is registered before we look at the
// callbacks. // callbacks.
static INIT: Once = ONCE_INIT; static INIT: Once = ONCE_INIT;
INIT.doit(|| unsafe { register(failure::on_fail); }); INIT.call_once(|| unsafe { register(failure::on_fail); });
// First, invoke call the user-defined callbacks triggered on thread panic. // First, invoke call the user-defined callbacks triggered on thread panic.
// //

View file

@ -46,7 +46,7 @@ pub fn limit_thread_creation_due_to_osx_and_valgrind() -> bool {
} }
pub fn min_stack() -> uint { pub fn min_stack() -> uint {
static MIN: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static MIN: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
match MIN.load(atomic::SeqCst) { match MIN.load(atomic::SeqCst) {
0 => {} 0 => {}
n => return n - 1, n => return n - 1,

View file

@ -86,15 +86,15 @@
//! Keep a global count of live tasks: //! Keep a global count of live tasks:
//! //!
//! ``` //! ```
//! use std::sync::atomic::{AtomicUint, SeqCst, INIT_ATOMIC_UINT}; //! use std::sync::atomic::{AtomicUint, SeqCst, ATOMIC_UINT_INIT};
//! //!
//! static GLOBAL_TASK_COUNT: AtomicUint = INIT_ATOMIC_UINT; //! static GLOBAL_TASK_COUNT: AtomicUint = ATOMIC_UINT_INIT;
//! //!
//! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst); //! let old_task_count = GLOBAL_TASK_COUNT.fetch_add(1, SeqCst);
//! println!("live tasks: {}", old_task_count + 1); //! println!("live tasks: {}", old_task_count + 1);
//! ``` //! ```
#![allow(deprecated)] #![stable]
use alloc::boxed::Box; use alloc::boxed::Box;
use core::mem; use core::mem;
@ -102,6 +102,7 @@ use core::prelude::{Send, Drop, None, Option, Some};
pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr}; pub use core::atomic::{AtomicBool, AtomicInt, AtomicUint, AtomicPtr};
pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT}; pub use core::atomic::{INIT_ATOMIC_BOOL, INIT_ATOMIC_INT, INIT_ATOMIC_UINT};
pub use core::atomic::{ATOMIC_BOOL_INIT, ATOMIC_INT_INIT, ATOMIC_UINT_INIT};
pub use core::atomic::fence; pub use core::atomic::fence;
pub use core::atomic::Ordering::{mod, Relaxed, Release, Acquire, AcqRel, SeqCst}; pub use core::atomic::Ordering::{mod, Relaxed, Release, Acquire, AcqRel, SeqCst};
@ -116,6 +117,7 @@ pub struct AtomicOption<T> {
p: AtomicUint, p: AtomicUint,
} }
#[allow(deprecated)]
impl<T: Send> AtomicOption<T> { impl<T: Send> AtomicOption<T> {
/// Create a new `AtomicOption` /// Create a new `AtomicOption`
pub fn new(p: Box<T>) -> AtomicOption<T> { pub fn new(p: Box<T>) -> AtomicOption<T> {

View file

@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
use kinds::{Send, Sync};
use sync::{Mutex, Condvar}; use sync::{Mutex, Condvar};
/// A barrier enables multiple tasks to synchronize the beginning /// A barrier enables multiple tasks to synchronize the beginning
@ -30,29 +29,32 @@ use sync::{Mutex, Condvar};
/// }).detach(); /// }).detach();
/// } /// }
/// ``` /// ```
#[stable]
pub struct Barrier { pub struct Barrier {
lock: Mutex<BarrierState>, lock: Mutex<BarrierState>,
cvar: Condvar, cvar: Condvar,
num_threads: uint, num_threads: uint,
} }
unsafe impl Send for Barrier {}
unsafe impl Sync for Barrier {}
// The inner state of a double barrier // The inner state of a double barrier
struct BarrierState { struct BarrierState {
count: uint, count: uint,
generation_id: uint, generation_id: uint,
} }
unsafe impl Send for BarrierState {} /// A result returned from wait.
unsafe impl Sync for BarrierState {} ///
/// Currently this opaque structure only has one method, `.is_leader()`. Only
/// one thread will receive a result that will return `true` from this function.
#[allow(missing_copy_implementations)]
pub struct BarrierWaitResult(bool);
impl Barrier { impl Barrier {
/// Create a new barrier that can block a given number of threads. /// Create a new barrier that can block a given number of threads.
/// ///
/// A barrier will block `n`-1 threads which call `wait` and then wake up /// A barrier will block `n`-1 threads which call `wait` and then wake up
/// all threads at once when the `n`th thread calls `wait`. /// all threads at once when the `n`th thread calls `wait`.
#[stable]
pub fn new(n: uint) -> Barrier { pub fn new(n: uint) -> Barrier {
Barrier { Barrier {
lock: Mutex::new(BarrierState { lock: Mutex::new(BarrierState {
@ -68,7 +70,13 @@ impl Barrier {
/// ///
/// Barriers are re-usable after all threads have rendezvoused once, and can /// Barriers are re-usable after all threads have rendezvoused once, and can
/// be used continuously. /// be used continuously.
pub fn wait(&self) { ///
/// A single (arbitrary) thread will receive a `BarrierWaitResult` that
/// returns `true` from `is_leader` when returning from this function, and
/// all other threads will receive a result that will return `false` from
/// `is_leader`
#[stable]
pub fn wait(&self) -> BarrierWaitResult {
let mut lock = self.lock.lock().unwrap(); let mut lock = self.lock.lock().unwrap();
let local_gen = lock.generation_id; let local_gen = lock.generation_id;
lock.count += 1; lock.count += 1;
@ -79,14 +87,25 @@ impl Barrier {
lock.count < self.num_threads { lock.count < self.num_threads {
lock = self.cvar.wait(lock).unwrap(); lock = self.cvar.wait(lock).unwrap();
} }
BarrierWaitResult(false)
} else { } else {
lock.count = 0; lock.count = 0;
lock.generation_id += 1; lock.generation_id += 1;
self.cvar.notify_all(); self.cvar.notify_all();
BarrierWaitResult(true)
} }
} }
} }
impl BarrierWaitResult {
/// Return whether this thread from `wait` is the "leader thread".
///
/// Only one thread will have `true` returned from their result, all other
/// threads will have `false` returned.
#[stable]
pub fn is_leader(&self) -> bool { self.0 }
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use prelude::v1::*; use prelude::v1::*;
@ -97,15 +116,16 @@ mod tests {
#[test] #[test]
fn test_barrier() { fn test_barrier() {
let barrier = Arc::new(Barrier::new(10)); const N: uint = 10;
let barrier = Arc::new(Barrier::new(N));
let (tx, rx) = channel(); let (tx, rx) = channel();
for _ in range(0u, 9) { for _ in range(0u, N - 1) {
let c = barrier.clone(); let c = barrier.clone();
let tx = tx.clone(); let tx = tx.clone();
Thread::spawn(move|| { Thread::spawn(move|| {
c.wait(); tx.send(c.wait().is_leader()).unwrap();
tx.send(true).unwrap();
}).detach(); }).detach();
} }
@ -116,10 +136,15 @@ mod tests {
_ => false, _ => false,
}); });
barrier.wait(); let mut leader_found = barrier.wait().is_leader();
// Now, the barrier is cleared and we should get data. // Now, the barrier is cleared and we should get data.
for _ in range(0u, 9) { for _ in range(0u, N - 1) {
rx.recv().unwrap(); if rx.recv().unwrap() {
assert!(!leader_found);
leader_found = true;
}
} }
assert!(leader_found);
} }
} }

View file

@ -88,7 +88,7 @@ unsafe impl Sync for StaticCondvar {}
#[unstable = "may be merged with Condvar in the future"] #[unstable = "may be merged with Condvar in the future"]
pub const CONDVAR_INIT: StaticCondvar = StaticCondvar { pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
inner: sys::CONDVAR_INIT, inner: sys::CONDVAR_INIT,
mutex: atomic::INIT_ATOMIC_UINT, mutex: atomic::ATOMIC_UINT_INIT,
}; };
impl Condvar { impl Condvar {

View file

@ -8,8 +8,8 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
//! A type representing values that may be computed concurrently and operations for working with //! A type representing values that may be computed concurrently and operations
//! them. //! for working with them.
//! //!
//! # Example //! # Example
//! //!
@ -23,6 +23,9 @@
//! ``` //! ```
#![allow(missing_docs)] #![allow(missing_docs)]
#![unstable = "futures as-is have yet to be deeply reevaluated with recent \
core changes to Rust's synchronization story, and will likely \
become stable in the future but are unstable until that time"]
use core::prelude::*; use core::prelude::*;
use core::mem::replace; use core::mem::replace;

View file

@ -26,7 +26,7 @@ pub use self::rwlock::{RWLockReadGuard, RWLockWriteGuard};
pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT}; pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT};
pub use self::once::{Once, ONCE_INIT}; pub use self::once::{Once, ONCE_INIT};
pub use self::semaphore::{Semaphore, SemaphoreGuard}; pub use self::semaphore::{Semaphore, SemaphoreGuard};
pub use self::barrier::Barrier; pub use self::barrier::{Barrier, BarrierWaitResult};
pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult};
pub use self::future::Future; pub use self::future::Future;

View file

@ -11,7 +11,7 @@
//! Generic support for building blocking abstractions. //! Generic support for building blocking abstractions.
use thread::Thread; use thread::Thread;
use sync::atomic::{AtomicBool, INIT_ATOMIC_BOOL, Ordering}; use sync::atomic::{AtomicBool, ATOMIC_BOOL_INIT, Ordering};
use sync::Arc; use sync::Arc;
use kinds::{Sync, Send}; use kinds::{Sync, Send};
use kinds::marker::{NoSend, NoSync}; use kinds::marker::{NoSend, NoSync};
@ -40,7 +40,7 @@ pub struct WaitToken {
pub fn tokens() -> (WaitToken, SignalToken) { pub fn tokens() -> (WaitToken, SignalToken) {
let inner = Arc::new(Inner { let inner = Arc::new(Inner {
thread: Thread::current(), thread: Thread::current(),
woken: INIT_ATOMIC_BOOL, woken: ATOMIC_BOOL_INIT,
}); });
let wait_token = WaitToken { let wait_token = WaitToken {
inner: inner.clone(), inner: inner.clone(),

View file

@ -32,10 +32,11 @@ use sync::{StaticMutex, MUTEX_INIT};
/// ///
/// static START: Once = ONCE_INIT; /// static START: Once = ONCE_INIT;
/// ///
/// START.doit(|| { /// START.call_once(|| {
/// // run initialization here /// // run initialization here
/// }); /// });
/// ``` /// ```
#[stable]
pub struct Once { pub struct Once {
mutex: StaticMutex, mutex: StaticMutex,
cnt: atomic::AtomicInt, cnt: atomic::AtomicInt,
@ -45,23 +46,25 @@ pub struct Once {
unsafe impl Sync for Once {} unsafe impl Sync for Once {}
/// Initialization value for static `Once` values. /// Initialization value for static `Once` values.
#[stable]
pub const ONCE_INIT: Once = Once { pub const ONCE_INIT: Once = Once {
mutex: MUTEX_INIT, mutex: MUTEX_INIT,
cnt: atomic::INIT_ATOMIC_INT, cnt: atomic::ATOMIC_INT_INIT,
lock_cnt: atomic::INIT_ATOMIC_INT, lock_cnt: atomic::ATOMIC_INT_INIT,
}; };
impl Once { impl Once {
/// Perform an initialization routine once and only once. The given closure /// Perform an initialization routine once and only once. The given closure
/// will be executed if this is the first time `doit` has been called, and /// will be executed if this is the first time `call_once` has been called,
/// otherwise the routine will *not* be invoked. /// and otherwise the routine will *not* be invoked.
/// ///
/// This method will block the calling task if another initialization /// This method will block the calling task if another initialization
/// routine is currently running. /// routine is currently running.
/// ///
/// When this function returns, it is guaranteed that some initialization /// When this function returns, it is guaranteed that some initialization
/// has run and completed (it may not be the closure specified). /// has run and completed (it may not be the closure specified).
pub fn doit<F>(&'static self, f: F) where F: FnOnce() { #[stable]
pub fn call_once<F>(&'static self, f: F) where F: FnOnce() {
// Optimize common path: load is much cheaper than fetch_add. // Optimize common path: load is much cheaper than fetch_add.
if self.cnt.load(atomic::SeqCst) < 0 { if self.cnt.load(atomic::SeqCst) < 0 {
return return
@ -91,13 +94,13 @@ impl Once {
// //
// It is crucial that the negative value is swapped in *after* the // It is crucial that the negative value is swapped in *after* the
// initialization routine has completed because otherwise new threads // initialization routine has completed because otherwise new threads
// calling `doit` will return immediately before the initialization has // calling `call_once` will return immediately before the initialization
// completed. // has completed.
let prev = self.cnt.fetch_add(1, atomic::SeqCst); let prev = self.cnt.fetch_add(1, atomic::SeqCst);
if prev < 0 { if prev < 0 {
// Make sure we never overflow, we'll never have int::MIN // Make sure we never overflow, we'll never have int::MIN
// simultaneous calls to `doit` to make this value go back to 0 // simultaneous calls to `call_once` to make this value go back to 0
self.cnt.store(int::MIN, atomic::SeqCst); self.cnt.store(int::MIN, atomic::SeqCst);
return return
} }
@ -118,6 +121,10 @@ impl Once {
unsafe { self.mutex.destroy() } unsafe { self.mutex.destroy() }
} }
} }
/// Deprecated
#[deprecated = "renamed to `call_once`"]
pub fn doit<F>(&'static self, f: F) where F: FnOnce() { self.call_once(f) }
} }
#[cfg(test)] #[cfg(test)]
@ -132,9 +139,9 @@ mod test {
fn smoke_once() { fn smoke_once() {
static O: Once = ONCE_INIT; static O: Once = ONCE_INIT;
let mut a = 0i; let mut a = 0i;
O.doit(|| a += 1); O.call_once(|| a += 1);
assert_eq!(a, 1); assert_eq!(a, 1);
O.doit(|| a += 1); O.call_once(|| a += 1);
assert_eq!(a, 1); assert_eq!(a, 1);
} }
@ -149,7 +156,7 @@ mod test {
Thread::spawn(move|| { Thread::spawn(move|| {
for _ in range(0u, 4) { Thread::yield_now() } for _ in range(0u, 4) { Thread::yield_now() }
unsafe { unsafe {
O.doit(|| { O.call_once(|| {
assert!(!run); assert!(!run);
run = true; run = true;
}); });
@ -160,7 +167,7 @@ mod test {
} }
unsafe { unsafe {
O.doit(|| { O.call_once(|| {
assert!(!run); assert!(!run);
run = true; run = true;
}); });

View file

@ -8,6 +8,9 @@
// option. This file may not be copied, modified, or distributed // option. This file may not be copied, modified, or distributed
// except according to those terms. // except according to those terms.
#![unstable = "the interaction between semaphores and the acquisition/release \
of resources is currently unclear"]
use ops::Drop; use ops::Drop;
use sync::{Mutex, Condvar}; use sync::{Mutex, Condvar};

View file

@ -10,6 +10,11 @@
//! Abstraction of a thread pool for basic parallelism. //! Abstraction of a thread pool for basic parallelism.
#![unstable = "the semantics of a failing task and whether a thread is \
re-attached to a thread pool are somewhat unclear, and the \
utility of this type in `std::sync` is questionable with \
respect to the jobs of other primitives"]
use core::prelude::*; use core::prelude::*;
use sync::{Arc, Mutex}; use sync::{Arc, Mutex};

View file

@ -137,7 +137,7 @@ pub const INIT: StaticKey = StaticKey {
/// ///
/// This value allows specific configuration of the destructor for a TLS key. /// This value allows specific configuration of the destructor for a TLS key.
pub const INIT_INNER: StaticKeyInner = StaticKeyInner { pub const INIT_INNER: StaticKeyInner = StaticKeyInner {
key: atomic::INIT_ATOMIC_UINT, key: atomic::ATOMIC_UINT_INIT,
}; };
static INIT_KEYS: Once = ONCE_INIT; static INIT_KEYS: Once = ONCE_INIT;

View file

@ -20,7 +20,7 @@ use libc::{mod, c_int, c_char, c_void};
use os; use os;
use path::{BytesContainer}; use path::{BytesContainer};
use ptr; use ptr;
use sync::atomic::{AtomicInt, INIT_ATOMIC_INT, SeqCst}; use sync::atomic::{AtomicInt, SeqCst};
use sys::fs::FileDesc; use sys::fs::FileDesc;
use os::TMPBUF_SZ; use os::TMPBUF_SZ;

View file

@ -10,6 +10,8 @@
use prelude::v1::*; use prelude::v1::*;
use prelude::*;
use libc; use libc;
use c_str::CString; use c_str::CString;
use mem; use mem;
@ -117,9 +119,6 @@ pub struct UnixStream {
write_deadline: u64, write_deadline: u64,
} }
unsafe impl Send for UnixStream {}
unsafe impl Sync for UnixStream {}
impl UnixStream { impl UnixStream {
pub fn connect(addr: &CString, pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> { timeout: Option<u64>) -> IoResult<UnixStream> {
@ -218,6 +217,7 @@ pub struct UnixListener {
path: CString, path: CString,
} }
// we currently own the CString, so these impls should be safe
unsafe impl Send for UnixListener {} unsafe impl Send for UnixListener {}
unsafe impl Sync for UnixListener {} unsafe impl Sync for UnixListener {}
@ -265,9 +265,6 @@ struct AcceptorInner {
closed: atomic::AtomicBool, closed: atomic::AtomicBool,
} }
unsafe impl Send for AcceptorInner {}
unsafe impl Sync for AcceptorInner {}
impl UnixAcceptor { impl UnixAcceptor {
pub fn fd(&self) -> fd_t { self.inner.listener.fd() } pub fn fd(&self) -> fd_t { self.inner.listener.fd() }

View file

@ -211,7 +211,7 @@ impl Timer {
// instead of () // instead of ()
HELPER.boot(|| {}, helper); HELPER.boot(|| {}, helper);
static ID: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static ID: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
let id = ID.fetch_add(1, atomic::Relaxed); let id = ID.fetch_add(1, atomic::Relaxed);
Ok(Timer { Ok(Timer {
id: id, id: id,

View file

@ -173,7 +173,7 @@ pub fn init_net() {
unsafe { unsafe {
static START: Once = ONCE_INIT; static START: Once = ONCE_INIT;
START.doit(|| { START.call_once(|| {
let mut data: c::WSADATA = mem::zeroed(); let mut data: c::WSADATA = mem::zeroed();
let ret = c::WSAStartup(0x202, // version 2.2 let ret = c::WSAStartup(0x202, // version 2.2
&mut data); &mut data);

View file

@ -20,7 +20,7 @@ const SPIN_COUNT: DWORD = 4000;
pub struct Mutex { inner: atomic::AtomicUint } pub struct Mutex { inner: atomic::AtomicUint }
pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::INIT_ATOMIC_UINT }; pub const MUTEX_INIT: Mutex = Mutex { inner: atomic::ATOMIC_UINT_INIT };
unsafe impl Sync for Mutex {} unsafe impl Sync for Mutex {}

View file

@ -45,7 +45,7 @@ fn precise_time_ns() -> u64 {
denom: 0 }; denom: 0 };
static ONCE: sync::Once = sync::ONCE_INIT; static ONCE: sync::Once = sync::ONCE_INIT;
unsafe { unsafe {
ONCE.doit(|| { ONCE.call_once(|| {
imp::mach_timebase_info(&mut TIMEBASE); imp::mach_timebase_info(&mut TIMEBASE);
}); });
let time = imp::mach_absolute_time(); let time = imp::mach_absolute_time();

View file

@ -197,7 +197,7 @@ pub fn precise_time_ns() -> u64 {
denom: 0 }; denom: 0 };
static ONCE: std::sync::Once = std::sync::ONCE_INIT; static ONCE: std::sync::Once = std::sync::ONCE_INIT;
unsafe { unsafe {
ONCE.doit(|| { ONCE.call_once(|| {
imp::mach_timebase_info(&mut TIMEBASE); imp::mach_timebase_info(&mut TIMEBASE);
}); });
let time = imp::mach_absolute_time(); let time = imp::mach_absolute_time();

View file

@ -11,12 +11,12 @@
use std::sync::atomic; use std::sync::atomic;
pub const C1: uint = 1; pub const C1: uint = 1;
pub const C2: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; pub const C2: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
pub const C3: fn() = foo; pub const C3: fn() = foo;
pub const C4: uint = C1 * C1 + C1 / C1; pub const C4: uint = C1 * C1 + C1 / C1;
pub const C5: &'static uint = &C4; pub const C5: &'static uint = &C4;
pub static S1: uint = 3; pub static S1: uint = 3;
pub static S2: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; pub static S2: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
fn foo() {} fn foo() {}

View file

@ -41,9 +41,7 @@
extern crate arena; extern crate arena;
use std::iter::range_step; use std::iter::range_step;
use std::str::from_str; use std::thread::Thread;
use std::sync::Future;
use arena::TypedArena; use arena::TypedArena;
enum Tree<'a> { enum Tree<'a> {
@ -97,7 +95,7 @@ fn main() {
let mut messages = range_step(min_depth, max_depth + 1, 2).map(|depth| { let mut messages = range_step(min_depth, max_depth + 1, 2).map(|depth| {
use std::num::Int; use std::num::Int;
let iterations = 2i.pow((max_depth - depth + min_depth) as uint); let iterations = 2i.pow((max_depth - depth + min_depth) as uint);
Future::spawn(move|| { Thread::spawn(move|| {
let mut chk = 0; let mut chk = 0;
for i in range(1, iterations + 1) { for i in range(1, iterations + 1) {
let arena = TypedArena::new(); let arena = TypedArena::new();
@ -108,10 +106,10 @@ fn main() {
format!("{}\t trees of depth {}\t check: {}", format!("{}\t trees of depth {}\t check: {}",
iterations * 2, depth, chk) iterations * 2, depth, chk)
}) })
}).collect::<Vec<Future<String>>>(); }).collect::<Vec<_>>();
for message in messages.iter_mut() { for message in messages.into_iter() {
println!("{}", *message.get_ref()); println!("{}", message.join().ok().unwrap());
} }
println!("long lived tree of depth {}\t check: {}", println!("long lived tree of depth {}\t check: {}",

View file

@ -40,9 +40,8 @@
#![feature(slicing_syntax)] #![feature(slicing_syntax)]
use std::str::from_str;
use std::sync::Future;
use std::{cmp, iter, mem}; use std::{cmp, iter, mem};
use std::thread::Thread;
fn rotate(x: &mut [i32]) { fn rotate(x: &mut [i32]) {
let mut prev = x[0]; let mut prev = x[0];
@ -169,15 +168,15 @@ fn fannkuch(n: i32) -> (i32, i32) {
for (i, j) in range(0, N).zip(iter::count(0, k)) { for (i, j) in range(0, N).zip(iter::count(0, k)) {
let max = cmp::min(j+k, perm.max()); let max = cmp::min(j+k, perm.max());
futures.push(Future::spawn(move|| { futures.push(Thread::spawn(move|| {
work(perm, j as uint, max as uint) work(perm, j as uint, max as uint)
})) }))
} }
let mut checksum = 0; let mut checksum = 0;
let mut maxflips = 0; let mut maxflips = 0;
for fut in futures.iter_mut() { for fut in futures.into_iter() {
let (cs, mf) = fut.get(); let (cs, mf) = fut.join().ok().unwrap();
checksum += cs; checksum += cs;
maxflips = cmp::max(maxflips, mf); maxflips = cmp::max(maxflips, mf);
} }

View file

@ -16,11 +16,11 @@ use std::sync::atomic::*;
use std::ptr; use std::ptr;
fn main() { fn main() {
let x = INIT_ATOMIC_BOOL; let x = ATOMIC_BOOL_INIT;
let x = *&x; //~ ERROR: cannot move out of dereference let x = *&x; //~ ERROR: cannot move out of dereference
let x = INIT_ATOMIC_INT; let x = ATOMIC_INT_INIT;
let x = *&x; //~ ERROR: cannot move out of dereference let x = *&x; //~ ERROR: cannot move out of dereference
let x = INIT_ATOMIC_UINT; let x = ATOMIC_UINT_INIT;
let x = *&x; //~ ERROR: cannot move out of dereference let x = *&x; //~ ERROR: cannot move out of dereference
let x: AtomicPtr<uint> = AtomicPtr::new(ptr::null_mut()); let x: AtomicPtr<uint> = AtomicPtr::new(ptr::null_mut());
let x = *&x; //~ ERROR: cannot move out of dereference let x = *&x; //~ ERROR: cannot move out of dereference

View file

@ -15,7 +15,7 @@ extern crate "issue-17718" as other;
use std::sync::atomic; use std::sync::atomic;
const C1: uint = 1; const C1: uint = 1;
const C2: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; const C2: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
const C3: fn() = foo; const C3: fn() = foo;
const C4: uint = C1 * C1 + C1 / C1; const C4: uint = C1 * C1 + C1 / C1;
const C5: &'static uint = &C4; const C5: &'static uint = &C4;
@ -25,7 +25,7 @@ const C6: uint = {
}; };
static S1: uint = 3; static S1: uint = 3;
static S2: atomic::AtomicUint = atomic::INIT_ATOMIC_UINT; static S2: atomic::AtomicUint = atomic::ATOMIC_UINT_INIT;
mod test { mod test {
static A: uint = 4; static A: uint = 4;

View file

@ -9,7 +9,7 @@
// except according to those terms. // except according to those terms.
use std::task; use std::task;
use std::sync::atomic::{AtomicUint, INIT_ATOMIC_UINT, Relaxed}; use std::sync::atomic::{AtomicUint, ATOMIC_UINT_INIT, Relaxed};
use std::rand::{thread_rng, Rng, Rand}; use std::rand::{thread_rng, Rng, Rand};
const REPEATS: uint = 5; const REPEATS: uint = 5;
@ -17,18 +17,18 @@ const MAX_LEN: uint = 32;
static drop_counts: [AtomicUint; MAX_LEN] = static drop_counts: [AtomicUint; MAX_LEN] =
// FIXME #5244: AtomicUint is not Copy. // FIXME #5244: AtomicUint is not Copy.
[ [
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, INIT_ATOMIC_UINT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT, ATOMIC_UINT_INIT,
]; ];
static creation_count: AtomicUint = INIT_ATOMIC_UINT; static creation_count: AtomicUint = ATOMIC_UINT_INIT;
#[deriving(Clone, PartialEq, PartialOrd, Eq, Ord)] #[deriving(Clone, PartialEq, PartialOrd, Eq, Ord)]
struct DropCounter { x: uint, creation_id: uint } struct DropCounter { x: uint, creation_id: uint }