1
Fork 0

std: Move comm primitives away from UnsafeArc

They currently still use `&mut self`, this migration was aimed towards moving
from UnsafeArc<T> to Arc<Unsafe<T>>
This commit is contained in:
Alex Crichton 2014-05-19 17:50:57 -07:00
parent d49aef7c02
commit 73729e94c8
4 changed files with 39 additions and 30 deletions

View file

@ -271,6 +271,8 @@
// And now that you've seen all the races that I found and attempted to fix, // And now that you've seen all the races that I found and attempted to fix,
// here's the code for you to find some more! // here's the code for you to find some more!
use alloc::arc::Arc;
use cell::Cell; use cell::Cell;
use clone::Clone; use clone::Clone;
use iter::Iterator; use iter::Iterator;
@ -283,7 +285,6 @@ use owned::Box;
use result::{Ok, Err, Result}; use result::{Ok, Err, Result};
use rt::local::Local; use rt::local::Local;
use rt::task::{Task, BlockedTask}; use rt::task::{Task, BlockedTask};
use sync::arc::UnsafeArc;
use ty::Unsafe; use ty::Unsafe;
pub use comm::select::{Select, Handle}; pub use comm::select::{Select, Handle};
@ -352,7 +353,7 @@ pub struct Sender<T> {
/// The sending-half of Rust's synchronous channel type. This half can only be /// The sending-half of Rust's synchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks. /// owned by one task, but it can be cloned to send to other tasks.
pub struct SyncSender<T> { pub struct SyncSender<T> {
inner: UnsafeArc<sync::Packet<T>>, inner: Arc<Unsafe<sync::Packet<T>>>,
// can't share in an arc // can't share in an arc
marker: marker::NoShare, marker: marker::NoShare,
} }
@ -386,10 +387,10 @@ pub enum TrySendError<T> {
} }
enum Flavor<T> { enum Flavor<T> {
Oneshot(UnsafeArc<oneshot::Packet<T>>), Oneshot(Arc<Unsafe<oneshot::Packet<T>>>),
Stream(UnsafeArc<stream::Packet<T>>), Stream(Arc<Unsafe<stream::Packet<T>>>),
Shared(UnsafeArc<shared::Packet<T>>), Shared(Arc<Unsafe<shared::Packet<T>>>),
Sync(UnsafeArc<sync::Packet<T>>), Sync(Arc<Unsafe<sync::Packet<T>>>),
} }
#[doc(hidden)] #[doc(hidden)]
@ -435,8 +436,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// println!("{}", rx.recv()); /// println!("{}", rx.recv());
/// ``` /// ```
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) { pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); let a = Arc::new(Unsafe::new(oneshot::Packet::new()));
(Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) (Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
} }
/// Creates a new synchronous, bounded channel. /// Creates a new synchronous, bounded channel.
@ -471,8 +472,8 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
/// assert_eq!(rx.recv(), 2); /// assert_eq!(rx.recv(), 2);
/// ``` /// ```
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) { pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(sync::Packet::new(bound)); let a = Arc::new(Unsafe::new(sync::Packet::new(bound)));
(SyncSender::new(a), Receiver::new(Sync(b))) (SyncSender::new(a.clone()), Receiver::new(Sync(a)))
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
@ -557,13 +558,13 @@ impl<T: Send> Sender<T> {
let (new_inner, ret) = match *unsafe { self.inner() } { let (new_inner, ret) = match *unsafe { self.inner() } {
Oneshot(ref p) => { Oneshot(ref p) => {
let p = p.get();
unsafe { unsafe {
let p = p.get();
if !(*p).sent() { if !(*p).sent() {
return (*p).send(t); return (*p).send(t);
} else { } else {
let (a, b) = UnsafeArc::new2(stream::Packet::new()); let a = Arc::new(Unsafe::new(stream::Packet::new()));
match (*p).upgrade(Receiver::new(Stream(b))) { match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
oneshot::UpSuccess => { oneshot::UpSuccess => {
let ret = (*a.get()).send(t); let ret = (*a.get()).send(t);
(a, ret) (a, ret)
@ -598,17 +599,21 @@ impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> { fn clone(&self) -> Sender<T> {
let (packet, sleeper) = match *unsafe { self.inner() } { let (packet, sleeper) = match *unsafe { self.inner() } {
Oneshot(ref p) => { Oneshot(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new()); let a = Arc::new(Unsafe::new(shared::Packet::new()));
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { match unsafe {
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), (*p.get()).upgrade(Receiver::new(Shared(a.clone())))
oneshot::UpWoke(task) => (b, Some(task)) } {
oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
oneshot::UpWoke(task) => (a, Some(task))
} }
} }
Stream(ref p) => { Stream(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new()); let a = Arc::new(Unsafe::new(shared::Packet::new()));
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { match unsafe {
stream::UpSuccess | stream::UpDisconnected => (b, None), (*p.get()).upgrade(Receiver::new(Shared(a.clone())))
stream::UpWoke(task) => (b, Some(task)), } {
stream::UpSuccess | stream::UpDisconnected => (a, None),
stream::UpWoke(task) => (a, Some(task)),
} }
} }
Shared(ref p) => { Shared(ref p) => {
@ -645,7 +650,7 @@ impl<T: Send> Drop for Sender<T> {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
impl<T: Send> SyncSender<T> { impl<T: Send> SyncSender<T> {
fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> { fn new(inner: Arc<Unsafe<sync::Packet<T>>>) -> SyncSender<T> {
SyncSender { inner: inner, marker: marker::NoShare } SyncSender { inner: inner, marker: marker::NoShare }
} }

View file

@ -15,7 +15,7 @@
/// this type is to have one and exactly one allocation when the chan/port pair /// this type is to have one and exactly one allocation when the chan/port pair
/// is created. /// is created.
/// ///
/// Another possible optimization would be to not use an UnsafeArc box because /// Another possible optimization would be to not use an Arc box because
/// in theory we know when the shared packet can be deallocated (no real need /// in theory we know when the shared packet can be deallocated (no real need
/// for the atomic reference counting), but I was having trouble how to destroy /// for the atomic reference counting), but I was having trouble how to destroy
/// the data early in a drop of a Port. /// the data early in a drop of a Port.

View file

@ -158,9 +158,10 @@ impl<T: Send> Drop for Queue<T> {
mod tests { mod tests {
use prelude::*; use prelude::*;
use alloc::arc::Arc;
use native; use native;
use super::{Queue, Data, Empty, Inconsistent}; use super::{Queue, Data, Empty, Inconsistent};
use sync::arc::UnsafeArc;
#[test] #[test]
fn test_full() { fn test_full() {
@ -179,14 +180,14 @@ mod tests {
Inconsistent | Data(..) => fail!() Inconsistent | Data(..) => fail!()
} }
let (tx, rx) = channel(); let (tx, rx) = channel();
let q = UnsafeArc::new(q); let q = Arc::new(q);
for _ in range(0, nthreads) { for _ in range(0, nthreads) {
let tx = tx.clone(); let tx = tx.clone();
let q = q.clone(); let q = q.clone();
native::task::spawn(proc() { native::task::spawn(proc() {
for i in range(0, nmsgs) { for i in range(0, nmsgs) {
unsafe { (*q.get()).push(i); } q.push(i);
} }
tx.send(()); tx.send(());
}); });
@ -194,7 +195,7 @@ mod tests {
let mut i = 0u; let mut i = 0u;
while i < nthreads * nmsgs { while i < nthreads * nmsgs {
match unsafe { (*q.get()).pop() } { match q.pop() {
Empty | Inconsistent => {}, Empty | Inconsistent => {},
Data(_) => { i += 1 } Data(_) => { i += 1 }
} }

View file

@ -52,7 +52,7 @@ struct Node<T> {
} }
/// The single-producer single-consumer queue. This structure is not cloneable, /// The single-producer single-consumer queue. This structure is not cloneable,
/// but it can be safely shared in an UnsafeArc if it is guaranteed that there /// but it can be safely shared in an Arc if it is guaranteed that there
/// is only one popper and one pusher touching the queue at any one point in /// is only one popper and one pusher touching the queue at any one point in
/// time. /// time.
pub struct Queue<T> { pub struct Queue<T> {
@ -227,9 +227,11 @@ impl<T: Send> Drop for Queue<T> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use prelude::*; use prelude::*;
use alloc::arc::Arc;
use native; use native;
use super::Queue; use super::Queue;
use sync::arc::UnsafeArc;
#[test] #[test]
fn smoke() { fn smoke() {
@ -274,7 +276,8 @@ mod test {
stress_bound(1); stress_bound(1);
fn stress_bound(bound: uint) { fn stress_bound(bound: uint) {
let (a, b) = UnsafeArc::new2(Queue::new(bound)); let a = Arc::new(Queue::new(bound));
let b = a.clone();
let (tx, rx) = channel(); let (tx, rx) = channel();
native::task::spawn(proc() { native::task::spawn(proc() {
for _ in range(0, 100000) { for _ in range(0, 100000) {