1
Fork 0

std: Unsafe-wrap std::sync

This commit is contained in:
Jubilee Young 2024-07-14 17:59:37 -07:00
parent 64fb2366da
commit e32460276c
8 changed files with 53 additions and 40 deletions

View file

@ -157,7 +157,6 @@
//! [`RwLock`]: crate::sync::RwLock //! [`RwLock`]: crate::sync::RwLock
#![stable(feature = "rust1", since = "1.0.0")] #![stable(feature = "rust1", since = "1.0.0")]
#![allow(unsafe_op_in_unsafe_fn)]
#[stable(feature = "rust1", since = "1.0.0")] #[stable(feature = "rust1", since = "1.0.0")]
pub use alloc_crate::sync::{Arc, Weak}; pub use alloc_crate::sync::{Arc, Weak};

View file

@ -200,11 +200,12 @@ impl<T> Channel<T> {
return Err(msg); return Err(msg);
} }
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
// Write the message into the slot and update the stamp. // Write the message into the slot and update the stamp.
slot.msg.get().write(MaybeUninit::new(msg)); unsafe {
slot.stamp.store(token.array.stamp, Ordering::Release); let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);
}
// Wake a sleeping receiver. // Wake a sleeping receiver.
self.receivers.notify(); self.receivers.notify();
@ -291,11 +292,14 @@ impl<T> Channel<T> {
return Err(()); return Err(());
} }
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
// Read the message from the slot and update the stamp. // Read the message from the slot and update the stamp.
let msg = slot.msg.get().read().assume_init(); let msg = unsafe {
slot.stamp.store(token.array.stamp, Ordering::Release); let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);
msg
};
// Wake a sleeping sender. // Wake a sleeping sender.
self.senders.notify(); self.senders.notify();
@ -471,7 +475,7 @@ impl<T> Channel<T> {
false false
}; };
self.discard_all_messages(tail); unsafe { self.discard_all_messages(tail) };
disconnected disconnected
} }

View file

@ -63,7 +63,7 @@ impl<C> Sender<C> {
disconnect(&self.counter().chan); disconnect(&self.counter().chan);
if self.counter().destroy.swap(true, Ordering::AcqRel) { if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter)); drop(unsafe { Box::from_raw(self.counter) });
} }
} }
} }
@ -116,7 +116,7 @@ impl<C> Receiver<C> {
disconnect(&self.counter().chan); disconnect(&self.counter().chan);
if self.counter().destroy.swap(true, Ordering::AcqRel) { if self.counter().destroy.swap(true, Ordering::AcqRel) {
drop(Box::from_raw(self.counter)); drop(unsafe { Box::from_raw(self.counter) });
} }
} }
} }

View file

@ -91,7 +91,7 @@ impl<T> Block<T> {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block. // begun destruction of the block.
for i in start..BLOCK_CAP - 1 { for i in start..BLOCK_CAP - 1 {
let slot = (*this).slots.get_unchecked(i); let slot = unsafe { (*this).slots.get_unchecked(i) };
// Mark the `DESTROY` bit if a thread is still using the slot. // Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0 if slot.state.load(Ordering::Acquire) & READ == 0
@ -103,7 +103,7 @@ impl<T> Block<T> {
} }
// No thread is using the block, now it is safe to destroy it. // No thread is using the block, now it is safe to destroy it.
drop(Box::from_raw(this)); drop(unsafe { Box::from_raw(this) });
} }
} }
@ -265,9 +265,11 @@ impl<T> Channel<T> {
// Write the message into the slot. // Write the message into the slot.
let block = token.list.block as *mut Block<T>; let block = token.list.block as *mut Block<T>;
let offset = token.list.offset; let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset); unsafe {
slot.msg.get().write(MaybeUninit::new(msg)); let slot = (*block).slots.get_unchecked(offset);
slot.state.fetch_or(WRITE, Ordering::Release); slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);
}
// Wake a sleeping receiver. // Wake a sleeping receiver.
self.receivers.notify(); self.receivers.notify();
@ -369,19 +371,21 @@ impl<T> Channel<T> {
// Read the message. // Read the message.
let block = token.list.block as *mut Block<T>; let block = token.list.block as *mut Block<T>;
let offset = token.list.offset; let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset); unsafe {
slot.wait_write(); let slot = (*block).slots.get_unchecked(offset);
let msg = slot.msg.get().read().assume_init(); slot.wait_write();
let msg = slot.msg.get().read().assume_init();
// Destroy the block if we've reached the end, or if another thread wanted to destroy but // Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot. // couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP { if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0); Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1); Block::destroy(block, offset + 1);
}
Ok(msg)
} }
Ok(msg)
} }
/// Attempts to send a message into the channel. /// Attempts to send a message into the channel.

View file

@ -103,9 +103,11 @@ impl<T> Channel<T> {
return Err(msg); return Err(msg);
} }
let packet = &*(token.zero.0 as *const Packet<T>); unsafe {
packet.msg.get().write(Some(msg)); let packet = &*(token.zero.0 as *const Packet<T>);
packet.ready.store(true, Ordering::Release); packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
}
Ok(()) Ok(())
} }
@ -116,22 +118,24 @@ impl<T> Channel<T> {
return Err(()); return Err(());
} }
let packet = &*(token.zero.0 as *const Packet<T>); let packet = unsafe { &*(token.zero.0 as *const Packet<T>) };
if packet.on_stack { if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait // The message has been in the packet from the beginning, so there is no need to wait
// for it. However, after reading the message, we need to set `ready` to `true` in // for it. However, after reading the message, we need to set `ready` to `true` in
// order to signal that the packet can be destroyed. // order to signal that the packet can be destroyed.
let msg = packet.msg.get().replace(None).unwrap(); let msg = unsafe { packet.msg.get().replace(None) }.unwrap();
packet.ready.store(true, Ordering::Release); packet.ready.store(true, Ordering::Release);
Ok(msg) Ok(msg)
} else { } else {
// Wait until the message becomes available, then read it and destroy the // Wait until the message becomes available, then read it and destroy the
// heap-allocated packet. // heap-allocated packet.
packet.wait_ready(); packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap(); unsafe {
drop(Box::from_raw(token.zero.0 as *mut Packet<T>)); let msg = packet.msg.get().replace(None).unwrap();
Ok(msg) drop(Box::from_raw(token.zero.0 as *mut Packet<T>));
Ok(msg)
}
} }
} }

View file

@ -502,7 +502,7 @@ impl<T> OnceLock<T> {
#[inline] #[inline]
unsafe fn get_unchecked(&self) -> &T { unsafe fn get_unchecked(&self) -> &T {
debug_assert!(self.is_initialized()); debug_assert!(self.is_initialized());
(&*self.value.get()).assume_init_ref() unsafe { (&*self.value.get()).assume_init_ref() }
} }
/// # Safety /// # Safety
@ -511,7 +511,7 @@ impl<T> OnceLock<T> {
#[inline] #[inline]
unsafe fn get_unchecked_mut(&mut self) -> &mut T { unsafe fn get_unchecked_mut(&mut self) -> &mut T {
debug_assert!(self.is_initialized()); debug_assert!(self.is_initialized());
(&mut *self.value.get()).assume_init_mut() unsafe { (&mut *self.value.get()).assume_init_mut() }
} }
} }

View file

@ -244,7 +244,9 @@ impl<T: ?Sized> ReentrantLock<T> {
} }
unsafe fn increment_lock_count(&self) -> Option<()> { unsafe fn increment_lock_count(&self) -> Option<()> {
*self.lock_count.get() = (*self.lock_count.get()).checked_add(1)?; unsafe {
*self.lock_count.get() = (*self.lock_count.get()).checked_add(1)?;
}
Some(()) Some(())
} }
} }

View file

@ -578,7 +578,7 @@ impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> {
// successfully called from the same thread before instantiating this object. // successfully called from the same thread before instantiating this object.
unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> { unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> {
poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard { poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard {
data: NonNull::new_unchecked(lock.data.get()), data: unsafe { NonNull::new_unchecked(lock.data.get()) },
inner_lock: &lock.inner, inner_lock: &lock.inner,
}) })
} }