diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2023-02-22 00:20:03 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2023-02-22 00:20:03 +0000 |
commit | 2332197d2ed36dd4fe7d02e9537fbb24c1199fef (patch) | |
tree | b898c5c893d624eb817c709c79d68042aaf32ca5 | |
parent | 8744ac584cce1d169a3770fee8297866c0ecc442 (diff) | |
parent | 24fd79a671a4cfec4f9a0bf8047af8b58010a78b (diff) | |
download | spin-2332197d2ed36dd4fe7d02e9537fbb24c1199fef.tar.gz |
Upgrade spin to 0.9.5 am: 4ed5d5944d am: 101f35a0cc am: f66058a72e am: 24fd79a671
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/spin/+/2443199
Change-Id: I5520805e694d99cf929060b000d6e34a9803804c
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | .github/workflows/rust.yml | 46 | ||||
-rw-r--r-- | Android.bp | 5 | ||||
-rw-r--r-- | CHANGELOG.md | 17 | ||||
-rw-r--r-- | Cargo.toml | 4 | ||||
-rw-r--r-- | Cargo.toml.orig | 6 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | README.md | 5 | ||||
-rw-r--r-- | src/barrier.rs | 16 | ||||
-rw-r--r-- | src/lazy.rs | 12 | ||||
-rw-r--r-- | src/lib.rs | 41 | ||||
-rw-r--r-- | src/mutex.rs | 20 | ||||
-rw-r--r-- | src/mutex/fair.rs | 732 | ||||
-rw-r--r-- | src/mutex/spin.rs | 63 | ||||
-rw-r--r-- | src/mutex/ticket.rs | 15 | ||||
-rw-r--r-- | src/once.rs | 125 | ||||
-rw-r--r-- | src/relax.rs | 5 | ||||
-rw-r--r-- | src/rwlock.rs | 96 |
18 files changed, 1064 insertions, 156 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index a0a5511..6f677e3 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "3ee23eeaf394bf11bfb308fb7e1b1184d3653723" + "sha1": "5087c8ddb5d080b5bd6c898f95e239bcb3512c22" }, "path_in_vcs": "" }
\ No newline at end of file diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3c13d1b..04280c5 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -10,13 +10,47 @@ env: CARGO_TERM_COLOR: always jobs: - build: - + test: runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + rust: [stable, beta, nightly] steps: - - uses: actions/checkout@v2 - - name: Build - run: cargo build --verbose - - name: Run tests + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} + - name: Run Tests run: cargo test --verbose + - run: cargo build --all --all-features --all-targets + - name: Catch missing feature flags + if: startsWith(matrix.rust, 'nightly') + run: cargo check -Z features=dev_dep + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - name: Ensure we don't depend on libstd + run: cargo hack build --target thumbv7m-none-eabi --no-dev-deps --no-default-features + + msrv: + runs-on: ubuntu-latest + strategy: + matrix: + version: [1.38.0] + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup update ${{ matrix.version }} && rustup default ${{ matrix.version }} + - run: cargo build --all --all-features --all-targets + + miri: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Install Rust + run: rustup toolchain install nightly --component miri && rustup default nightly + - run: cargo miri test + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout @@ -36,7 +36,7 @@ rust_library { host_supported: true, crate_name: "spin", cargo_env_compat: true, - cargo_pkg_version: "0.9.4", + cargo_pkg_version: "0.9.5", srcs: ["src/lib.rs"], edition: "2015", features: [ @@ -57,7 +57,7 @@ rust_test { host_supported: true, crate_name: "spin", cargo_env_compat: true, - cargo_pkg_version: "0.9.4", + cargo_pkg_version: "0.9.5", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -95,4 +95,3 @@ rust_library_rlib { // error[E0433]: failed to resolve: could not find `Mutex` in `spin` // error[E0433]: failed to resolve: could not find `RwLock` in `spin` // error: could not compile `spin` due to 2 previous errors -// error: build failed diff --git a/CHANGELOG.md b/CHANGELOG.md index 200a7cc..5093ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +# [0.9.5] - 2023-02-07 + +### Added + +- `FairMutex`, a new mutex implementation that reduces writer starvation. +- A MSRV policy: Rust 1.38 is currently required + +### Changed + +- The crate's CI now has full MIRI integration, further improving the confidence you can have in the implementation. + +### Fixed + +- Ensured that the crate's abstractions comply with stacked borrows rules. +- Unsoundness in the `RwLock` that could be triggered via a reader overflow +- Relaxed various `Send`/`Sync` bound requirements to make the crate more flexible + # [0.9.4] - 2022-07-14 ### Fixed @@ -10,8 +10,9 @@ # See Cargo.toml.orig for the original contents. [package] +rust-version = "1.38" name = "spin" -version = "0.9.4" +version = "0.9.5" authors = [ "Mathijs van de Nes <git@mathijs.vd-nes.nl>", "John Ericson <git@JohnEricson.me>", @@ -55,6 +56,7 @@ default = [ "lazy", "barrier", ] +fair_mutex = ["mutex"] lazy = ["once"] lock_api = ["lock_api_crate"] mutex = [] diff --git a/Cargo.toml.orig b/Cargo.toml.orig index 7dccb72..cb9df1d 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "spin" -version = "0.9.4" +version = "0.9.5" authors = [ "Mathijs van de Nes <git@mathijs.vd-nes.nl>", "John Ericson <git@JohnEricson.me>", @@ -10,6 +10,7 @@ license = "MIT" repository = "https://github.com/mvdnes/spin-rs.git" keywords = ["spinlock", "mutex", "rwlock"] description = "Spin-based synchronization primitives" +rust-version = "1.38" [dependencies] lock_api_crate = { package = "lock_api", version = "0.4", optional = true } @@ -27,6 +28,9 @@ spin_mutex = ["mutex"] # Enables `TicketMutex`. ticket_mutex = ["mutex"] +# Enables `FairMutex`. +fair_mutex = ["mutex"] + # Enables the non-default ticket mutex implementation for `Mutex`. use_ticket_mutex = ["mutex", "ticket_mutex"] @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/spin/spin-0.9.4.crate" + value: "https://static.crates.io/crates/spin/spin-0.9.5.crate" } - version: "0.9.4" + version: "0.9.5" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 20 + year: 2023 + month: 2 + day: 17 } } @@ -127,6 +127,11 @@ time for your crate's users. You can do this like so: spin = { version = "x.y", default-features = false, features = [...] } ``` +## Minimum Safe Rust Version (MSRV) + +This crate is guaranteed to compile on a Minimum Safe Rust Version (MSRV) of 1.38.0 and above. +This version will not be changed without a minor version bump. + ## License `spin` is distributed under the MIT License, (See `LICENSE`). diff --git a/src/barrier.rs b/src/barrier.rs index 7a13890..c3a1c92 100644 --- a/src/barrier.rs +++ b/src/barrier.rs @@ -115,8 +115,7 @@ impl<R: RelaxStrategy> Barrier<R> { // not the leader let local_gen = lock.generation_id; - while local_gen == lock.generation_id && - lock.count < self.num_threads { + while local_gen == lock.generation_id && lock.count < self.num_threads { drop(lock); R::relax(); lock = self.lock.lock(); @@ -176,7 +175,9 @@ impl BarrierWaitResult { /// let barrier_wait_result = barrier.wait(); /// println!("{:?}", barrier_wait_result.is_leader()); /// ``` - pub fn is_leader(&self) -> bool { self.0 } + pub fn is_leader(&self) -> bool { + self.0 + } } #[cfg(test)] @@ -192,12 +193,13 @@ mod tests { fn use_barrier(n: usize, barrier: Arc<Barrier>) { let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..n - 1 { let c = barrier.clone(); let tx = tx.clone(); - thread::spawn(move|| { + ts.push(thread::spawn(move || { tx.send(c.wait().is_leader()).unwrap(); - }); + })); } // At this point, all spawned threads should be blocked, @@ -217,6 +219,10 @@ mod tests { } } assert!(leader_found); + + for t in ts { + t.join().unwrap(); + } } #[test] diff --git a/src/lazy.rs b/src/lazy.rs index 1473db1..6e5efe4 100644 --- a/src/lazy.rs +++ b/src/lazy.rs @@ -3,8 +3,8 @@ //! Implementation adapted from the `SyncLazy` type of the standard library. See: //! <https://doc.rust-lang.org/std/lazy/struct.SyncLazy.html> -use core::{cell::Cell, fmt, ops::Deref}; use crate::{once::Once, RelaxStrategy, Spin}; +use core::{cell::Cell, fmt, ops::Deref}; /// A value which is initialized on the first access. /// @@ -45,7 +45,10 @@ pub struct Lazy<T, F = fn() -> T, R = Spin> { impl<T: fmt::Debug, F, R> fmt::Debug for Lazy<T, F, R> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Lazy").field("cell", &self.cell).field("init", &"..").finish() + f.debug_struct("Lazy") + .field("cell", &self.cell) + .field("init", &"..") + .finish() } } @@ -61,7 +64,10 @@ impl<T, F, R> Lazy<T, F, R> { /// Creates a new lazy value with the given initializing /// function. pub const fn new(f: F) -> Self { - Self { cell: Once::new(), init: Cell::new(Some(f)) } + Self { + cell: Once::new(), + init: Cell::new(Some(f)), + } } /// Retrieves a mutable pointer to the inner data. /// @@ -55,6 +55,9 @@ //! //! - `ticket_mutex` uses a ticket lock for the implementation of `Mutex` //! +//! - `fair_mutex` enables a fairer implementation of `Mutex` that uses eventual fairness to avoid +//! starvation +//! //! - `std` enables support for thread yielding instead of spinning #[cfg(any(test, feature = "std"))] @@ -63,10 +66,10 @@ extern crate core; #[cfg(feature = "portable_atomic")] extern crate portable_atomic; -#[cfg(feature = "portable_atomic")] -use portable_atomic as atomic; #[cfg(not(feature = "portable_atomic"))] use core::sync::atomic; +#[cfg(feature = "portable_atomic")] +use portable_atomic as atomic; #[cfg(feature = "barrier")] #[cfg_attr(docsrs, doc(cfg(feature = "barrier")))] @@ -80,21 +83,21 @@ pub mod mutex; #[cfg(feature = "once")] #[cfg_attr(docsrs, doc(cfg(feature = "once")))] pub mod once; +pub mod relax; #[cfg(feature = "rwlock")] #[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] pub mod rwlock; -pub mod relax; #[cfg(feature = "mutex")] #[cfg_attr(docsrs, doc(cfg(feature = "mutex")))] pub use mutex::MutexGuard; -#[cfg(feature = "rwlock")] -#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] -pub use rwlock::RwLockReadGuard; -pub use relax::{Spin, RelaxStrategy}; #[cfg(feature = "std")] #[cfg_attr(docsrs, doc(cfg(feature = "std")))] pub use relax::Yield; +pub use relax::{RelaxStrategy, Spin}; +#[cfg(feature = "rwlock")] +#[cfg_attr(docsrs, doc(cfg(feature = "rwlock")))] +pub use rwlock::RwLockReadGuard; // Avoid confusing inference errors by aliasing away the relax strategy parameter. Users that need to use a different // relax strategy can do so by accessing the types through their fully-qualified path. This is a little bit horrible @@ -192,3 +195,27 @@ pub mod lock_api { pub type RwLockUpgradableReadGuard<'a, T> = lock_api_crate::RwLockUpgradableReadGuard<'a, crate::RwLock<()>, T>; } + +/// In the event of an invalid operation, it's best to abort the current process. +#[cfg(feature = "fair_mutex")] +fn abort() -> ! { + #[cfg(not(feature = "std"))] + { + // Panicking while panicking is defined by Rust to result in an abort. + struct Panic; + + impl Drop for Panic { + fn drop(&mut self) { + panic!("aborting due to invalid operation"); + } + } + + let _panic = Panic; + panic!("aborting due to invalid operation"); + } + + #[cfg(feature = "std")] + { + std::process::abort(); + } +} diff --git a/src/mutex.rs b/src/mutex.rs index 2335051..e333d8a 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -27,11 +27,18 @@ pub mod ticket; #[cfg_attr(docsrs, doc(cfg(feature = "ticket_mutex")))] pub use self::ticket::{TicketMutex, TicketMutexGuard}; +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub mod fair; +#[cfg(feature = "fair_mutex")] +#[cfg_attr(docsrs, doc(cfg(feature = "fair_mutex")))] +pub use self::fair::{FairMutex, FairMutexGuard, Starvation}; + +use crate::{RelaxStrategy, Spin}; use core::{ fmt, ops::{Deref, DerefMut}, }; -use crate::{RelaxStrategy, Spin}; #[cfg(all(not(feature = "spin_mutex"), not(feature = "use_ticket_mutex")))] compile_error!("The `mutex` feature flag was used (perhaps through another feature?) without either `spin_mutex` or `use_ticket_mutex`. One of these is required."); @@ -78,9 +85,11 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>; /// // We use a barrier to ensure the readout happens after all writing /// let barrier = Arc::new(Barrier::new(thread_count + 1)); /// +/// # let mut ts = Vec::new(); /// for _ in (0..thread_count) { /// let my_barrier = barrier.clone(); /// let my_lock = spin_mutex.clone(); +/// # let t = /// std::thread::spawn(move || { /// let mut guard = my_lock.lock(); /// *guard += 1; @@ -89,12 +98,17 @@ type InnerMutexGuard<'a, T> = self::ticket::TicketMutexGuard<'a, T>; /// drop(guard); /// my_barrier.wait(); /// }); +/// # ts.push(t); /// } /// /// barrier.wait(); /// /// let answer = { *spin_mutex.lock() }; /// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } /// ``` pub struct Mutex<T: ?Sized, R = Spin> { inner: InnerMutex<T, R>, @@ -132,7 +146,9 @@ impl<T, R> Mutex<T, R> { /// ``` #[inline(always)] pub const fn new(value: T) -> Self { - Self { inner: InnerMutex::new(value) } + Self { + inner: InnerMutex::new(value), + } } /// Consumes this [`Mutex`] and unwraps the underlying data. diff --git a/src/mutex/fair.rs b/src/mutex/fair.rs new file mode 100644 index 0000000..dde3994 --- /dev/null +++ b/src/mutex/fair.rs @@ -0,0 +1,732 @@ +//! A spinning mutex with a fairer unlock algorithm. +//! +//! This mutex is similar to the `SpinMutex` in that it uses spinning to avoid +//! context switches. However, it uses a fairer unlock algorithm that avoids +//! starvation of threads that are waiting for the lock. + +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + mem::ManuallyDrop, + ops::{Deref, DerefMut}, +}; + +// The lowest bit of `lock` is used to indicate whether the mutex is locked or not. The rest of the bits are used to +// store the number of starving threads. +const LOCKED: usize = 1; +const STARVED: usize = 2; + +/// Number chosen by fair roll of the dice, adjust as needed. +const STARVATION_SPINS: usize = 1024; + +/// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data, but with a fairer +/// algorithm. +/// +/// # Example +/// +/// ``` +/// use spin; +/// +/// let lock = spin::mutex::FairMutex::<_>::new(0); +/// +/// // Modify the data +/// *lock.lock() = 2; +/// +/// // Read the data +/// let answer = *lock.lock(); +/// assert_eq!(answer, 2); +/// ``` +/// +/// # Thread safety example +/// +/// ``` +/// use spin; +/// use std::sync::{Arc, Barrier}; +/// +/// let thread_count = 1000; +/// let spin_mutex = Arc::new(spin::mutex::FairMutex::<_>::new(0)); +/// +/// // We use a barrier to ensure the readout happens after all writing +/// let barrier = Arc::new(Barrier::new(thread_count + 1)); +/// +/// for _ in (0..thread_count) { +/// let my_barrier = barrier.clone(); +/// let my_lock = spin_mutex.clone(); +/// std::thread::spawn(move || { +/// let mut guard = my_lock.lock(); +/// *guard += 1; +/// +/// // Release the lock to prevent a deadlock +/// drop(guard); +/// my_barrier.wait(); +/// }); +/// } +/// +/// barrier.wait(); +/// +/// let answer = { *spin_mutex.lock() }; +/// assert_eq!(answer, thread_count); +/// ``` +pub struct FairMutex<T: ?Sized, R = Spin> { + phantom: PhantomData<R>, + pub(crate) lock: AtomicUsize, + data: UnsafeCell<T>, +} + +/// A guard that provides mutable data access. +/// +/// When the guard falls out of scope it will release the lock. +pub struct FairMutexGuard<'a, T: ?Sized + 'a> { + lock: &'a AtomicUsize, + data: *mut T, +} + +/// A handle that indicates that we have been trying to acquire the lock for a while. +/// +/// This handle is used to prevent starvation. +pub struct Starvation<'a, T: ?Sized + 'a, R> { + lock: &'a FairMutex<T, R>, +} + +/// Indicates whether a lock was rejected due to the lock being held by another thread or due to starvation. +#[derive(Debug)] +pub enum LockRejectReason { + /// The lock was rejected due to the lock being held by another thread. + Locked, + + /// The lock was rejected due to starvation. + Starved, +} + +// Same unsafe impls as `std::sync::Mutex` +unsafe impl<T: ?Sized + Send, R> Sync for FairMutex<T, R> {} +unsafe impl<T: ?Sized + Send, R> Send for FairMutex<T, R> {} + +impl<T, R> FairMutex<T, R> { + /// Creates a new [`FairMutex`] wrapping the supplied data. + /// + /// # Example + /// + /// ``` + /// use spin::mutex::FairMutex; + /// + /// static MUTEX: FairMutex<()> = FairMutex::<_>::new(()); + /// + /// fn demo() { + /// let lock = MUTEX.lock(); + /// // do something with lock + /// drop(lock); + /// } + /// ``` + #[inline(always)] + pub const fn new(data: T) -> Self { + FairMutex { + lock: AtomicUsize::new(0), + data: UnsafeCell::new(data), + phantom: PhantomData, + } + } + + /// Consumes this [`FairMutex`] and unwraps the underlying data. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// assert_eq!(42, lock.into_inner()); + /// ``` + #[inline(always)] + pub fn into_inner(self) -> T { + // We know statically that there are no outstanding references to + // `self` so there's no need to lock. + let FairMutex { data, .. } = self; + data.into_inner() + } + + /// Returns a mutable pointer to the underlying data. + /// + /// This is mostly meant to be used for applications which require manual unlocking, but where + /// storing both the lock and the pointer to the inner data gets inefficient. + /// + /// # Example + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// unsafe { + /// core::mem::forget(lock.lock()); + /// + /// assert_eq!(lock.as_mut_ptr().read(), 42); + /// lock.as_mut_ptr().write(58); + /// + /// lock.force_unlock(); + /// } + /// + /// assert_eq!(*lock.lock(), 58); + /// + /// ``` + #[inline(always)] + pub fn as_mut_ptr(&self) -> *mut T { + self.data.get() + } +} + +impl<T: ?Sized, R: RelaxStrategy> FairMutex<T, R> { + /// Locks the [`FairMutex`] and returns a guard that permits access to the inner data. + /// + /// The returned value may be dereferenced for data access + /// and the lock will be dropped when the guard falls out of scope. + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(0); + /// { + /// let mut data = lock.lock(); + /// // The lock is now locked and the data can be accessed + /// *data += 1; + /// // The lock is implicitly dropped at the end of the scope + /// } + /// ``` + #[inline(always)] + pub fn lock(&self) -> FairMutexGuard<T> { + // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` + // when called in a loop. + let mut spins = 0; + while self + .lock + .compare_exchange_weak(0, 1, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { + // Wait until the lock looks unlocked before retrying + while self.is_locked() { + R::relax(); + + // If we've been spinning for a while, switch to a fairer strategy that will prevent + // newer users from stealing our lock from us. + if spins > STARVATION_SPINS { + return self.starve().lock(); + } + spins += 1; + } + } + + FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + } + } +} + +impl<T: ?Sized, R> FairMutex<T, R> { + /// Returns `true` if the lock is currently held. + /// + /// # Safety + /// + /// This function provides no synchronization guarantees and so its result should be considered 'out of date' + /// the instant it is called. Do not use it for synchronization purposes. However, it may be useful as a heuristic. + #[inline(always)] + pub fn is_locked(&self) -> bool { + self.lock.load(Ordering::Relaxed) & LOCKED != 0 + } + + /// Force unlock this [`FairMutex`]. + /// + /// # Safety + /// + /// This is *extremely* unsafe if the lock is not held by the current + /// thread. However, this can be useful in some instances for exposing the + /// lock to FFI that doesn't know how to deal with RAII. + #[inline(always)] + pub unsafe fn force_unlock(&self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } + + /// Try to lock this [`FairMutex`], returning a lock guard if successful. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// let maybe_guard = lock.try_lock(); + /// assert!(maybe_guard.is_some()); + /// + /// // `maybe_guard` is still held, so the second call fails + /// let maybe_guard2 = lock.try_lock(); + /// assert!(maybe_guard2.is_none()); + /// ``` + #[inline(always)] + pub fn try_lock(&self) -> Option<FairMutexGuard<T>> { + self.try_lock_starver().ok() + } + + /// Tries to lock this [`FairMutex`] and returns a result that indicates whether the lock was + /// rejected due to a starver or not. + #[inline(always)] + pub fn try_lock_starver(&self) -> Result<FairMutexGuard<T>, LockRejectReason> { + match self + .lock + .compare_exchange(0, LOCKED, Ordering::Acquire, Ordering::Relaxed) + .unwrap_or_else(|x| x) + { + 0 => Ok(FairMutexGuard { + lock: &self.lock, + data: unsafe { &mut *self.data.get() }, + }), + LOCKED => Err(LockRejectReason::Locked), + _ => Err(LockRejectReason::Starved), + } + } + + /// Indicates that the current user has been waiting for the lock for a while + /// and that the lock should yield to this thread over a newly arriving thread. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn starve(&self) -> Starvation<'_, T, R> { + // Add a new starver to the state. + if self.lock.fetch_add(STARVED, Ordering::Relaxed) > (core::isize::MAX - 1) as usize { + // In the event of a potential lock overflow, abort. + crate::abort(); + } + + Starvation { lock: self } + } + + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the [`FairMutex`] mutably, and a mutable reference is guaranteed to be exclusive in + /// Rust, no actual locking needs to take place -- the mutable borrow statically guarantees no locks exist. As + /// such, this is a 'zero-cost' operation. + /// + /// # Example + /// + /// ``` + /// let mut lock = spin::mutex::FairMutex::<_>::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.lock(), 10); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + // We know statically that there are no other references to `self`, so + // there's no need to lock the inner mutex. + unsafe { &mut *self.data.get() } + } +} + +impl<T: ?Sized + fmt::Debug, R> fmt::Debug for FairMutex<T, R> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + struct LockWrapper<'a, T: ?Sized + fmt::Debug>(Option<FairMutexGuard<'a, T>>); + + impl<T: ?Sized + fmt::Debug> fmt::Debug for LockWrapper<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self.0 { + Some(guard) => fmt::Debug::fmt(guard, f), + None => f.write_str("<locked>"), + } + } + } + + f.debug_struct("FairMutex") + .field("data", &LockWrapper(self.try_lock())) + .finish() + } +} + +impl<T: ?Sized + Default, R> Default for FairMutex<T, R> { + fn default() -> Self { + Self::new(Default::default()) + } +} + +impl<T, R> From<T> for FairMutex<T, R> { + fn from(data: T) -> Self { + Self::new(data) + } +} + +impl<'a, T: ?Sized> FairMutexGuard<'a, T> { + /// Leak the lock guard, yielding a mutable reference to the underlying data. + /// + /// Note that this function will permanently lock the original [`FairMutex`]. + /// + /// ``` + /// let mylock = spin::mutex::FairMutex::<_>::new(0); + /// + /// let data: &mut i32 = spin::mutex::FairMutexGuard::leak(mylock.lock()); + /// + /// *data = 1; + /// assert_eq!(*data, 1); + /// ``` + #[inline(always)] + pub fn leak(this: Self) -> &'a mut T { + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } + } +} + +impl<'a, T: ?Sized + fmt::Debug> fmt::Debug for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized + fmt::Display> fmt::Display for FairMutexGuard<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl<'a, T: ?Sized> Deref for FairMutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &T { + // We know statically that only we are referencing data + unsafe { &*self.data } + } +} + +impl<'a, T: ?Sized> DerefMut for FairMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + // We know statically that only we are referencing data + unsafe { &mut *self.data } + } +} + +impl<'a, T: ?Sized> Drop for FairMutexGuard<'a, T> { + /// The dropping of the MutexGuard will release the lock it was created from. + fn drop(&mut self) { + self.lock.fetch_and(!LOCKED, Ordering::Release); + } +} + +impl<'a, T: ?Sized, R> Starvation<'a, T, R> { + /// Attempts the lock the mutex if we are the only starving user. + /// + /// This allows another user to lock the mutex if they are starving as well. + pub fn try_lock_fair(self) -> Result<FairMutexGuard<'a, T>, Self> { + // Try to lock the mutex. + if self + .lock + .lock + .compare_exchange( + STARVED, + STARVED | LOCKED, + Ordering::Acquire, + Ordering::Relaxed, + ) + .is_ok() + { + // We are the only starving user, lock the mutex. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + // Another user is starving, fail. + Err(self) + } + } + + /// Attempts to lock the mutex. + /// + /// If the lock is currently held by another thread, this will return `None`. + /// + /// # Example + /// + /// ``` + /// let lock = spin::mutex::FairMutex::<_>::new(42); + /// + /// // Lock the mutex to simulate it being used by another user. + /// let guard1 = lock.lock(); + /// + /// // Try to lock the mutex. + /// let guard2 = lock.try_lock(); + /// assert!(guard2.is_none()); + /// + /// // Wait for a while. + /// wait_for_a_while(); + /// + /// // We are now starved, indicate as such. + /// let starve = lock.starve(); + /// + /// // Once the lock is released, another user trying to lock it will + /// // fail. + /// drop(guard1); + /// let guard3 = lock.try_lock(); + /// assert!(guard3.is_none()); + /// + /// // However, we will be able to lock it. + /// let guard4 = starve.try_lock(); + /// assert!(guard4.is_ok()); + /// + /// # fn wait_for_a_while() {} + /// ``` + pub fn try_lock(self) -> Result<FairMutexGuard<'a, T>, Self> { + // Try to lock the mutex. + if self.lock.lock.fetch_or(LOCKED, Ordering::Acquire) & LOCKED == 0 { + // We have successfully locked the mutex. + // By dropping `self` here, we decrement the starvation count. + Ok(FairMutexGuard { + lock: &self.lock.lock, + data: self.lock.data.get(), + }) + } else { + Err(self) + } + } +} + +impl<'a, T: ?Sized, R: RelaxStrategy> Starvation<'a, T, R> { + /// Locks the mutex. + pub fn lock(mut self) -> FairMutexGuard<'a, T> { + // Try to lock the mutex. + loop { + match self.try_lock() { + Ok(lock) => return lock, + Err(starve) => self = starve, + } + + // Relax until the lock is released. + while self.lock.is_locked() { + R::relax(); + } + } + } +} + +impl<'a, T: ?Sized, R> Drop for Starvation<'a, T, R> { + fn drop(&mut self) { + // As there is no longer a user being starved, we decrement the starver count. + self.lock.lock.fetch_sub(STARVED, Ordering::Release); + } +} + +impl fmt::Display for LockRejectReason { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + LockRejectReason::Locked => write!(f, "locked"), + LockRejectReason::Starved => write!(f, "starved"), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for LockRejectReason {} + +#[cfg(feature = "lock_api")] +unsafe impl<R: RelaxStrategy> lock_api_crate::RawMutex for FairMutex<(), R> { + type GuardMarker = lock_api_crate::GuardSend; + + const INIT: Self = Self::new(()); + + fn lock(&self) { + // Prevent guard destructor running + core::mem::forget(Self::lock(self)); + } + + fn try_lock(&self) -> bool { + // Prevent guard destructor running + Self::try_lock(self).map(core::mem::forget).is_some() + } + + unsafe fn unlock(&self) { + self.force_unlock(); + } + + fn is_locked(&self) -> bool { + Self::is_locked(self) + } +} + +#[cfg(test)] +mod tests { + use std::prelude::v1::*; + + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::mpsc::channel; + use std::sync::Arc; + use std::thread; + + type FairMutex<T> = super::FairMutex<T>; + + #[derive(Eq, PartialEq, Debug)] + struct NonCopy(i32); + + #[test] + fn smoke() { + let m = FairMutex::<_>::new(()); + drop(m.lock()); + drop(m.lock()); + } + + #[test] + fn lots_and_lots() { + static M: FairMutex<()> = FairMutex::<_>::new(()); + static mut CNT: u32 = 0; + const J: u32 = 1000; + const K: u32 = 3; + + fn inc() { + for _ in 0..J { + unsafe { + let _g = M.lock(); + CNT += 1; + } + } + } + + let (tx, rx) = channel(); + for _ in 0..K { + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + let tx2 = tx.clone(); + thread::spawn(move || { + inc(); + tx2.send(()).unwrap(); + }); + } + + drop(tx); + for _ in 0..2 * K { + rx.recv().unwrap(); + } + assert_eq!(unsafe { CNT }, J * K * 2); + } + + #[test] + fn try_lock() { + let mutex = FairMutex::<_>::new(42); + + // First lock succeeds + let a = mutex.try_lock(); + assert_eq!(a.as_ref().map(|r| **r), Some(42)); + + // Additional lock fails + let b = mutex.try_lock(); + assert!(b.is_none()); + + // After dropping lock, it succeeds again + ::core::mem::drop(a); + let c = mutex.try_lock(); + assert_eq!(c.as_ref().map(|r| **r), Some(42)); + } + + #[test] + fn test_into_inner() { + let m = FairMutex::<_>::new(NonCopy(10)); + assert_eq!(m.into_inner(), NonCopy(10)); + } + + #[test] + fn test_into_inner_drop() { + struct Foo(Arc<AtomicUsize>); + impl Drop for Foo { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let num_drops = Arc::new(AtomicUsize::new(0)); + let m = FairMutex::<_>::new(Foo(num_drops.clone())); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + { + let _inner = m.into_inner(); + assert_eq!(num_drops.load(Ordering::SeqCst), 0); + } + assert_eq!(num_drops.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_mutex_arc_nested() { + // Tests nested mutexes and access + // to underlying data. + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = Arc::new(FairMutex::<_>::new(arc)); + let (tx, rx) = channel(); + let _t = thread::spawn(move || { + let lock = arc2.lock(); + let lock2 = lock.lock(); + assert_eq!(*lock2, 1); + tx.send(()).unwrap(); + }); + rx.recv().unwrap(); + } + + #[test] + fn test_mutex_arc_access_in_unwind() { + let arc = Arc::new(FairMutex::<_>::new(1)); + let arc2 = arc.clone(); + let _ = thread::spawn(move || -> () { + struct Unwinder { + i: Arc<FairMutex<i32>>, + } + impl Drop for Unwinder { + fn drop(&mut self) { + *self.i.lock() += 1; + } + } + let _u = Unwinder { i: arc2 }; + panic!(); + }) + .join(); + let lock = arc.lock(); + assert_eq!(*lock, 2); + } + + #[test] + fn test_mutex_unsized() { + let mutex: &FairMutex<[i32]> = &FairMutex::<_>::new([1, 2, 3]); + { + let b = &mut *mutex.lock(); + b[0] = 4; + b[2] = 5; + } + let comp: &[i32] = &[4, 2, 5]; + assert_eq!(&*mutex.lock(), comp); + } + + #[test] + fn test_mutex_force_lock() { + let lock = FairMutex::<_>::new(()); + ::std::mem::forget(lock.lock()); + unsafe { + lock.force_unlock(); + } + assert!(lock.try_lock().is_some()); + } +} diff --git a/src/mutex/spin.rs b/src/mutex/spin.rs index fedff67..1ee572d 100644 --- a/src/mutex/spin.rs +++ b/src/mutex/spin.rs @@ -3,15 +3,16 @@ //! Waiting threads hammer an atomic variable until it becomes available. Best-case latency is low, but worst-case //! latency is theoretically infinite. +use crate::{ + atomic::{AtomicBool, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, fmt, - ops::{Deref, DerefMut}, marker::PhantomData, -}; -use crate::{ - atomic::{AtomicBool, Ordering}, - RelaxStrategy, Spin + mem::ManuallyDrop, + ops::{Deref, DerefMut}, }; /// A [spin lock](https://en.m.wikipedia.org/wiki/Spinlock) providing mutually exclusive access to data. @@ -43,9 +44,11 @@ use crate::{ /// // We use a barrier to ensure the readout happens after all writing /// let barrier = Arc::new(Barrier::new(thread_count + 1)); /// +/// # let mut ts = Vec::new(); /// for _ in (0..thread_count) { /// let my_barrier = barrier.clone(); /// let my_lock = spin_mutex.clone(); +/// # let t = /// std::thread::spawn(move || { /// let mut guard = my_lock.lock(); /// *guard += 1; @@ -54,12 +57,17 @@ use crate::{ /// drop(guard); /// my_barrier.wait(); /// }); +/// # ts.push(t); /// } /// /// barrier.wait(); /// /// let answer = { *spin_mutex.lock() }; /// assert_eq!(answer, thread_count); +/// +/// # for t in ts { +/// # t.join().unwrap(); +/// # } /// ``` pub struct SpinMutex<T: ?Sized, R = Spin> { phantom: PhantomData<R>, @@ -72,7 +80,7 @@ pub struct SpinMutex<T: ?Sized, R = Spin> { /// When the guard falls out of scope it will release the lock. pub struct SpinMutexGuard<'a, T: ?Sized + 'a> { lock: &'a AtomicBool, - data: &'a mut T, + data: *mut T, } // Same unsafe impls as `std::sync::Mutex` @@ -166,7 +174,11 @@ impl<T: ?Sized, R: RelaxStrategy> SpinMutex<T, R> { pub fn lock(&self) -> SpinMutexGuard<T> { // Can fail to lock even if the spinlock is not locked. May be more efficient than `try_lock` // when called in a loop. - while self.lock.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + while self + .lock + .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_err() + { // Wait until the lock looks unlocked before retrying while self.is_locked() { R::relax(); @@ -222,7 +234,11 @@ impl<T: ?Sized, R> SpinMutex<T, R> { pub fn try_lock(&self) -> Option<SpinMutexGuard<T>> { // The reason for using a strong compare_exchange is explained here: // https://github.com/Amanieu/parking_lot/pull/207#issuecomment-575869107 - if self.lock.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_ok() { + if self + .lock + .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { Some(SpinMutexGuard { lock: &self.lock, data: unsafe { &mut *self.data.get() }, @@ -291,9 +307,10 @@ impl<'a, T: ?Sized> SpinMutexGuard<'a, T> { /// ``` #[inline(always)] pub fn leak(this: Self) -> &'a mut T { - let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing - core::mem::forget(this); - unsafe { &mut *data } + // Use ManuallyDrop to avoid stacked-borrow invalidation + let mut this = ManuallyDrop::new(this); + // We know statically that only we are referencing data + unsafe { &mut *this.data } } } @@ -312,13 +329,15 @@ impl<'a, T: ?Sized + fmt::Display> fmt::Display for SpinMutexGuard<'a, T> { impl<'a, T: ?Sized> Deref for SpinMutexGuard<'a, T> { type Target = T; fn deref(&self) -> &T { - self.data + // We know statically that only we are referencing data + unsafe { &*self.data } } } impl<'a, T: ?Sized> DerefMut for SpinMutexGuard<'a, T> { fn deref_mut(&mut self) -> &mut T { - self.data + // We know statically that only we are referencing data + unsafe { &mut *self.data } } } @@ -392,17 +411,18 @@ mod tests { } let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..K { let tx2 = tx.clone(); - thread::spawn(move || { + ts.push(thread::spawn(move || { inc(); tx2.send(()).unwrap(); - }); + })); let tx2 = tx.clone(); - thread::spawn(move || { + ts.push(thread::spawn(move || { inc(); tx2.send(()).unwrap(); - }); + })); } drop(tx); @@ -410,6 +430,10 @@ mod tests { rx.recv().unwrap(); } assert_eq!(unsafe { CNT }, J * K * 2); + + for t in ts { + t.join().unwrap(); + } } #[test] @@ -420,7 +444,7 @@ mod tests { let a = mutex.try_lock(); assert_eq!(a.as_ref().map(|r| **r), Some(42)); - // Additional lock failes + // Additional lock fails let b = mutex.try_lock(); assert!(b.is_none()); @@ -461,13 +485,14 @@ mod tests { let arc = Arc::new(SpinMutex::<_>::new(1)); let arc2 = Arc::new(SpinMutex::<_>::new(arc)); let (tx, rx) = channel(); - let _t = thread::spawn(move || { + let t = thread::spawn(move || { let lock = arc2.lock(); let lock2 = lock.lock(); assert_eq!(*lock2, 1); tx.send(()).unwrap(); }); rx.recv().unwrap(); + t.join().unwrap(); } #[test] diff --git a/src/mutex/ticket.rs b/src/mutex/ticket.rs index a2567ce..01b905e 100644 --- a/src/mutex/ticket.rs +++ b/src/mutex/ticket.rs @@ -5,21 +5,20 @@ //! latency is infinitely better. Waiting threads simply need to wait for all threads that come before them in the //! queue to finish. +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, fmt, - ops::{Deref, DerefMut}, marker::PhantomData, + ops::{Deref, DerefMut}, }; -use crate::{ - atomic::{AtomicUsize, Ordering}, - RelaxStrategy, Spin -}; - /// A spin-based [ticket lock](https://en.wikipedia.org/wiki/Ticket_lock) providing mutually exclusive access to data. /// -/// A ticket lock is analagous to a queue management system for lock requests. When a thread tries to take a lock, it +/// A ticket lock is analogous to a queue management system for lock requests. When a thread tries to take a lock, it /// is assigned a 'ticket'. It then spins until its ticket becomes next in line. When the lock guard is released, the /// next ticket will be processed. /// @@ -443,7 +442,7 @@ mod tests { let a = mutex.try_lock(); assert_eq!(a.as_ref().map(|r| **r), Some(42)); - // Additional lock failes + // Additional lock fails let b = mutex.try_lock(); assert!(b.is_none()); diff --git a/src/once.rs b/src/once.rs index a88b4bd..0b4a30c 100644 --- a/src/once.rs +++ b/src/once.rs @@ -1,16 +1,11 @@ - //! Synchronization primitives for one-time evaluation. -use core::{ - cell::UnsafeCell, - mem::MaybeUninit, - marker::PhantomData, - fmt, -}; +//! Synchronization primitives for one-time evaluation. + use crate::{ atomic::{AtomicU8, Ordering}, - RelaxStrategy, Spin + RelaxStrategy, Spin, }; - +use core::{cell::UnsafeCell, fmt, marker::PhantomData, mem::MaybeUninit}; /// A primitive that provides lazy one-time initialization. /// @@ -38,16 +33,18 @@ pub struct Once<T = (), R = Spin> { } impl<T, R> Default for Once<T, R> { - fn default() -> Self { Self::new() } + fn default() -> Self { + Self::new() + } } impl<T: fmt::Debug, R> fmt::Debug for Once<T, R> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.get() { Some(s) => write!(f, "Once {{ data: ") - .and_then(|()| s.fmt(f)) - .and_then(|()| write!(f, "}}")), - None => write!(f, "Once {{ <uninitialized> }}") + .and_then(|()| s.fmt(f)) + .and_then(|()| write!(f, "}}")), + None => write!(f, "Once {{ <uninitialized> }}"), } } } @@ -106,12 +103,20 @@ mod status { self.0.store(status as u8, ordering); } #[inline(always)] - pub fn compare_exchange(&self, old: Status, new: Status, success: Ordering, failure: Ordering) -> Result<Status, Status> { - match self.0.compare_exchange(old as u8, new as u8, success, failure) { + pub fn compare_exchange( + &self, + old: Status, + new: Status, + success: Ordering, + failure: Ordering, + ) -> Result<Status, Status> { + match self + .0 + .compare_exchange(old as u8, new as u8, success, failure) + { // SAFETY: A compare exchange will always return a value that was later stored into // the atomic u8, but due to the invariant that it must be a valid Status, we know // that both Ok(_) and Err(_) will be safely transmutable. - Ok(ok) => Ok(unsafe { Status::new_unchecked(ok) }), Err(err) => Err(unsafe { Status::new_unchecked(err) }), } @@ -124,7 +129,7 @@ mod status { } } } -use self::status::{Status, AtomicStatus}; +use self::status::{AtomicStatus, Status}; use core::hint::unreachable_unchecked as unreachable; @@ -228,10 +233,12 @@ impl<T, R: RelaxStrategy> Once<T, R> { Ordering::Acquire, ) { Ok(_must_be_state_incomplete) => { - // The compare-exchange suceeded, so we shall initialize it. + // The compare-exchange succeeded, so we shall initialize it. // We use a guard (Finish) to catch panics caused by builder - let finish = Finish { status: &self.status }; + let finish = Finish { + status: &self.status, + }; let val = match f() { Ok(val) => val, Err(err) => { @@ -283,24 +290,22 @@ impl<T, R: RelaxStrategy> Once<T, R> { // initialized it ourselves, in which case no additional synchronization is needed. Status::Complete => unsafe { self.force_get() }, Status::Panicked => panic!("Once panicked"), - Status::Running => self - .poll() - .unwrap_or_else(|| { - if cfg!(debug_assertions) { - unreachable!("Encountered INCOMPLETE when polling Once") - } else { - // SAFETY: This poll is guaranteed never to fail because the API of poll - // promises spinning if initialization is in progress. We've already - // checked that initialisation is in progress, and initialisation is - // monotonic: once done, it cannot be undone. We also fetched the status - // with Acquire semantics, thereby guaranteeing that the later-executed - // poll will also agree with us that initialization is in progress. Ergo, - // this poll cannot fail. - unsafe { - unreachable(); - } + Status::Running => self.poll().unwrap_or_else(|| { + if cfg!(debug_assertions) { + unreachable!("Encountered INCOMPLETE when polling Once") + } else { + // SAFETY: This poll is guaranteed never to fail because the API of poll + // promises spinning if initialization is in progress. We've already + // checked that initialisation is in progress, and initialisation is + // monotonic: once done, it cannot be undone. We also fetched the status + // with Acquire semantics, thereby guaranteeing that the later-executed + // poll will also agree with us that initialization is in progress. Ergo, + // this poll cannot fail. + unsafe { + unreachable(); } - }), + } + }), // SAFETY: The only invariant possible in addition to the aforementioned ones at the // moment, is INCOMPLETE. However, the only way for this match statement to be @@ -364,7 +369,7 @@ impl<T, R> Once<T, R> { }; /// Creates a new [`Once`]. - pub const fn new() -> Self{ + pub const fn new() -> Self { Self::INIT } @@ -540,10 +545,13 @@ mod tests { static mut RUN: bool = false; let (tx, rx) = channel(); + let mut ts = Vec::new(); for _ in 0..10 { let tx = tx.clone(); - thread::spawn(move|| { - for _ in 0..4 { thread::yield_now() } + ts.push(thread::spawn(move || { + for _ in 0..4 { + thread::yield_now() + } unsafe { O.call_once(|| { assert!(!RUN); @@ -552,7 +560,7 @@ mod tests { assert!(RUN); } tx.send(()).unwrap(); - }); + })); } unsafe { @@ -566,6 +574,10 @@ mod tests { for _ in 0..10 { rx.recv().unwrap(); } + + for t in ts { + t.join().unwrap(); + } } #[test] @@ -582,12 +594,16 @@ mod tests { static INIT: Once<usize> = Once::new(); assert!(INIT.get().is_none()); - thread::spawn(move|| { - INIT.call_once(|| loop { }); + let t = thread::spawn(move || { + INIT.call_once(|| { + thread::sleep(std::time::Duration::from_secs(3)); + 42 + }); }); assert!(INIT.get().is_none()); - } + t.join().unwrap(); + } #[test] fn poll() { @@ -598,26 +614,29 @@ mod tests { assert_eq!(INIT.poll().map(|r| *r), Some(3)); } - #[test] fn wait() { static INIT: Once<usize> = Once::new(); - std::thread::spawn(|| { + let t = std::thread::spawn(|| { assert_eq!(*INIT.wait(), 3); assert!(INIT.is_completed()); }); - for _ in 0..4 { thread::yield_now() } + for _ in 0..4 { + thread::yield_now() + } assert!(INIT.poll().is_none()); INIT.call_once(|| 3); + + t.join().unwrap(); } #[test] #[ignore = "Android uses panic_abort"] fn panic() { - use ::std::panic; + use std::panic; static INIT: Once = Once::new(); @@ -670,9 +689,7 @@ mod tests { once.call_once(|| DropTest {}); } - assert!(unsafe { - CALLED - }); + assert!(unsafe { CALLED }); // Now test that we skip drops for the uninitialized case. unsafe { CALLED = false; @@ -681,16 +698,14 @@ mod tests { let once = Once::<DropTest>::new(); drop(once); - assert!(unsafe { - !CALLED - }); + assert!(unsafe { !CALLED }); } #[test] fn call_once_test() { for _ in 0..20 { - use std::sync::Arc; use std::sync::atomic::AtomicUsize; + use std::sync::Arc; use std::time::Duration; let share = Arc::new(AtomicUsize::new(0)); let once = Arc::new(Once::<_, Spin>::new()); @@ -709,7 +724,7 @@ mod tests { hs.push(h); } for h in hs { - let _ = h.join(); + h.join().unwrap(); } assert_eq!(1, share.load(Ordering::SeqCst)); } diff --git a/src/relax.rs b/src/relax.rs index 6d9a690..8842f80 100644 --- a/src/relax.rs +++ b/src/relax.rs @@ -23,7 +23,10 @@ pub struct Spin; impl RelaxStrategy for Spin { #[inline(always)] fn relax() { - core::hint::spin_loop(); + // Use the deprecated spin_loop_hint() to ensure that we don't get + // a higher MSRV than we need to. + #[allow(deprecated)] + core::sync::atomic::spin_loop_hint(); } } diff --git a/src/rwlock.rs b/src/rwlock.rs index 8e5d6c9..ab5fbf3 100644 --- a/src/rwlock.rs +++ b/src/rwlock.rs @@ -1,18 +1,18 @@ //! A lock that provides data access to either one writer or many readers. +use crate::{ + atomic::{AtomicUsize, Ordering}, + RelaxStrategy, Spin, +}; use core::{ cell::UnsafeCell, - ops::{Deref, DerefMut}, - marker::PhantomData, fmt, + marker::PhantomData, mem, -}; -use crate::{ - atomic::{AtomicUsize, Ordering}, - RelaxStrategy, Spin + mem::ManuallyDrop, + ops::{Deref, DerefMut}, }; - /// A lock that provides data access to either one writer or many readers. /// /// This lock behaves in a similar manner to its namesake `std::sync::RwLock` but uses @@ -82,7 +82,7 @@ const WRITER: usize = 1; /// potentially releasing the lock. pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { lock: &'a AtomicUsize, - data: &'a T, + data: *const T, } /// A guard that provides mutable data access. @@ -91,7 +91,7 @@ pub struct RwLockReadGuard<'a, T: 'a + ?Sized> { pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> { phantom: PhantomData<R>, inner: &'a RwLock<T, R>, - data: &'a mut T, + data: *mut T, } /// A guard that provides immutable data access but can be upgraded to [`RwLockWriteGuard`]. @@ -104,7 +104,7 @@ pub struct RwLockWriteGuard<'a, T: 'a + ?Sized, R = Spin> { pub struct RwLockUpgradableGuard<'a, T: 'a + ?Sized, R = Spin> { phantom: PhantomData<R>, inner: &'a RwLock<T, R>, - data: &'a T, + data: *const T, } // Same unsafe impls as `std::sync::RwLock` @@ -251,7 +251,7 @@ impl<T: ?Sized, R> RwLock<T, R> { // Acquire a read lock, returning the new lock value. fn acquire_reader(&self) -> usize { // An arbitrary cap that allows us to catch overflows long before they happen - const MAX_READERS: usize = usize::MAX / READER / 2; + const MAX_READERS: usize = core::usize::MAX / READER / 2; let value = self.lock.fetch_add(READER, Ordering::Acquire); @@ -416,18 +416,18 @@ impl<T: ?Sized, R> RwLock<T, R> { } } - /// Returns a mutable reference to the underlying data. - /// - /// Since this call borrows the `RwLock` mutably, no actual locking needs to - /// take place -- the mutable borrow statically guarantees no locks exist. - /// - /// # Examples - /// - /// ``` - /// let mut lock = spin::RwLock::new(0); - /// *lock.get_mut() = 10; - /// assert_eq!(*lock.read(), 10); - /// ``` + /// Returns a mutable reference to the underlying data. + /// + /// Since this call borrows the `RwLock` mutably, no actual locking needs to + /// take place -- the mutable borrow statically guarantees no locks exist. + /// + /// # Examples + /// + /// ``` + /// let mut lock = spin::RwLock::new(0); + /// *lock.get_mut() = 10; + /// assert_eq!(*lock.read(), 10); + /// ``` pub fn get_mut(&mut self) -> &mut T { // We know statically that there are no other references to `self`, so // there's no need to lock the inner lock. @@ -472,8 +472,9 @@ impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock T { - let Self { data, .. } = this; - data + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } } } @@ -598,8 +599,9 @@ impl<'rwlock, T: ?Sized, R> RwLockUpgradableGuard<'rwlock, T, R> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock T { - let Self { data, .. } = this; - data + let this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &*this.data } } } @@ -657,7 +659,10 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { /// ``` #[inline] pub fn downgrade_to_upgradeable(self) -> RwLockUpgradableGuard<'rwlock, T, R> { - debug_assert_eq!(self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), WRITER); + debug_assert_eq!( + self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED), + WRITER + ); // Reserve the read guard for ourselves self.inner.lock.store(UPGRADED, Ordering::Release); @@ -688,9 +693,9 @@ impl<'rwlock, T: ?Sized, R> RwLockWriteGuard<'rwlock, T, R> { /// ``` #[inline] pub fn leak(this: Self) -> &'rwlock mut T { - let data = this.data as *mut _; // Keep it in pointer form temporarily to avoid double-aliasing - core::mem::forget(this); - unsafe { &mut *data } + let mut this = ManuallyDrop::new(this); + // Safety: We know statically that only we are referencing data + unsafe { &mut *this.data } } } @@ -710,7 +715,8 @@ impl<'rwlock, T: ?Sized> Deref for RwLockReadGuard<'rwlock, T> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } @@ -718,7 +724,8 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockUpgradableGuard<'rwlock, T, R> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } @@ -726,13 +733,15 @@ impl<'rwlock, T: ?Sized, R> Deref for RwLockWriteGuard<'rwlock, T, R> { type Target = T; fn deref(&self) -> &T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &*self.data } } } impl<'rwlock, T: ?Sized, R> DerefMut for RwLockWriteGuard<'rwlock, T, R> { fn deref_mut(&mut self) -> &mut T { - self.data + // Safety: We know statically that only we are referencing data + unsafe { &mut *self.data } } } @@ -759,7 +768,9 @@ impl<'rwlock, T: ?Sized, R> Drop for RwLockWriteGuard<'rwlock, T, R> { // Writer is responsible for clearing both WRITER and UPGRADED bits. // The UPGRADED bit may be set if an upgradeable lock attempts an upgrade while this lock is held. - self.inner.lock.fetch_and(!(WRITER | UPGRADED), Ordering::Release); + self.inner + .lock + .fetch_and(!(WRITER | UPGRADED), Ordering::Release); } } @@ -843,7 +854,9 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R> #[inline(always)] fn try_lock_upgradable(&self) -> bool { // Prevent guard destructor running - self.try_upgradeable_read().map(|g| core::mem::forget(g)).is_some() + self.try_upgradeable_read() + .map(|g| core::mem::forget(g)) + .is_some() } #[inline(always)] @@ -872,7 +885,10 @@ unsafe impl<R: RelaxStrategy> lock_api_crate::RawRwLockUpgrade for RwLock<(), R> data: &(), phantom: PhantomData, }; - tmp_guard.try_upgrade().map(|g| core::mem::forget(g)).is_ok() + tmp_guard + .try_upgrade() + .map(|g| core::mem::forget(g)) + .is_ok() } } @@ -965,7 +981,7 @@ mod tests { let arc2 = arc.clone(); let (tx, rx) = channel(); - thread::spawn(move || { + let t = thread::spawn(move || { let mut lock = arc2.write(); for _ in 0..10 { let tmp = *lock; @@ -995,6 +1011,8 @@ mod tests { rx.recv().unwrap(); let lock = arc.read(); assert_eq!(*lock, 10); + + assert!(t.join().is_ok()); } #[test] |