From e1020e5c50ebac3e39b4ec0a42be20959ad40b88 Mon Sep 17 00:00:00 2001 From: Martin Geisler Date: Tue, 9 Apr 2024 20:15:33 +0200 Subject: Import 'oneshot-uniffi' crate Request Document: go/android-rust-importing-crates For CL Reviewers: go/android3p#cl-review For Build Team: go/ab-third-party-imports Bug: http://b/330717829 Test: m liboneshot_uniffi Change-Id: I934958fc68736c85a39b309165a4660a88bb0026 --- .cargo_vcs_info.json | 6 + .github/workflows/build-and-test.yml | 40 + .github/workflows/style-sanity.yml | 55 + .gitignore | 2 + CHANGELOG.md | 69 ++ Cargo.toml | 64 + LICENSE | 1 + LICENSE-APACHE | 201 ++++ LICENSE-MIT | 19 + METADATA | 21 + MODULE_LICENSE_APACHE2 | 0 OWNERS | 2 + README.md | 94 ++ benches/benches.rs | 122 ++ cargo_embargo.json | 8 + check_mem_leaks.sh | 13 + examples/recv_before_send.rs | 18 + examples/recv_before_send_then_drop_sender.rs | 18 + examples/recv_ref_before_send.rs | 18 + examples/recv_ref_before_send_then_drop_sender.rs | 18 + examples/recv_timeout_before_send.rs | 18 + .../recv_timeout_before_send_then_drop_sender.rs | 18 + examples/recv_with_dropped_sender.rs | 11 + examples/send_before_recv.rs | 11 + examples/send_then_drop_receiver.rs | 7 + examples/send_with_dropped_receiver.rs | 8 + src/errors.rs | 147 +++ src/lib.rs | 1242 ++++++++++++++++++++ src/loombox.rs | 151 +++ tests/assert_mem.rs | 37 + tests/async.rs | 128 ++ tests/future.rs | 65 + tests/helpers/mod.rs | 63 + tests/helpers/waker.rs | 64 + tests/loom.rs | 223 ++++ tests/raw.rs | 46 + tests/sync.rs | 343 ++++++ 37 files changed, 3371 insertions(+) create mode 100644 .cargo_vcs_info.json create mode 100644 .github/workflows/build-and-test.yml create mode 100644 .github/workflows/style-sanity.yml create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 Cargo.toml create mode 120000 LICENSE create mode 100644 LICENSE-APACHE create mode 100644 LICENSE-MIT create mode 100644 METADATA create mode 100644 MODULE_LICENSE_APACHE2 create mode 100644 OWNERS create mode 100644 README.md create mode 100644 benches/benches.rs create mode 100644 cargo_embargo.json create mode 100755 check_mem_leaks.sh create mode 100644 examples/recv_before_send.rs create mode 100644 examples/recv_before_send_then_drop_sender.rs create mode 100644 examples/recv_ref_before_send.rs create mode 100644 examples/recv_ref_before_send_then_drop_sender.rs create mode 100644 examples/recv_timeout_before_send.rs create mode 100644 examples/recv_timeout_before_send_then_drop_sender.rs create mode 100644 examples/recv_with_dropped_sender.rs create mode 100644 examples/send_before_recv.rs create mode 100644 examples/send_then_drop_receiver.rs create mode 100644 examples/send_with_dropped_receiver.rs create mode 100644 src/errors.rs create mode 100644 src/lib.rs create mode 100644 src/loombox.rs create mode 100644 tests/assert_mem.rs create mode 100644 tests/async.rs create mode 100644 tests/future.rs create mode 100644 tests/helpers/mod.rs create mode 100644 tests/helpers/waker.rs create mode 100644 tests/loom.rs create mode 100644 tests/raw.rs create mode 100644 tests/sync.rs diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..018637a --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "eb8f8ebacb9c38861d88923830a53715ef733fa4" + }, + "path_in_vcs": "" +} \ No newline at end of file diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml new file mode 100644 index 0000000..6aeb92a --- /dev/null +++ b/.github/workflows/build-and-test.yml @@ -0,0 +1,40 @@ +name: Cargo build and test +on: [pull_request, workflow_dispatch] +env: + CARGO_TERM_COLOR: always + RUSTFLAGS: "--deny warnings " +jobs: + build-and-test: + strategy: + matrix: + os: [ubuntu-latest, macos-latest, windows-latest] + rust: [stable, beta] + include: + - os: ubuntu-latest + rust: nightly + - os: ubuntu-latest + rust: 1.65.0 + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Install Rust + uses: ATiltedTree/setup-rust@v1.0.4 + with: + rust-version: ${{ matrix.rust }} + + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + + - name: Build + run: cargo build + + - name: Test + run: cargo hack --feature-powerset test + + - name: Test with artificial delay + run: RUSTFLAGS+="--cfg oneshot_test_delay" cargo hack --feature-powerset test + + - name: Test with loom + run: RUSTFLAGS+="--cfg loom" LOOM_MAX_BRANCHES=100000 cargo hack --feature-powerset test --test sync --test loom diff --git a/.github/workflows/style-sanity.yml b/.github/workflows/style-sanity.yml new file mode 100644 index 0000000..8458efd --- /dev/null +++ b/.github/workflows/style-sanity.yml @@ -0,0 +1,55 @@ +name: Rust linting, formatting and audit +on: + pull_request: + paths: + - .github/workflows/*.yml + - '**/*.rs' + - Cargo.toml + - Cargo.lock + workflow_dispatch: +jobs: + clippy-linting: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@v1.0.6 + with: + toolchain: stable + components: clippy + override: true + + - name: Clippy check + run: | + export RUSTFLAGS="--deny warnings" + time cargo clippy --verbose + + check-formatting: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - uses: actions-rs/toolchain@v1.0.6 + with: + toolchain: stable + components: rustfmt + override: true + + - name: Check formatting + run: | + rustfmt --version + cargo fmt -- --check + + audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Install cargo-audit + uses: actions-rs/install@v0.1.2 + with: + crate: cargo-audit + version: latest + + - name: Audit + run: cargo audit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..c4c9283 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,69 @@ +# Changelog +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) +and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). + +### Categories each change fall into + +* **Added**: for new features. +* **Changed**: for changes in existing functionality. +* **Deprecated**: for soon-to-be removed features. +* **Removed**: for now removed features. +* **Fixed**: for any bug fixes. +* **Security**: in case of vulnerabilities. + + +## [Unreleased] + + +## [0.1.6] - 2023-09-14 +### Added +* Add `into_raw` and `from_raw` methods on both `Sender` and `Receiver`. Allows passing `oneshot` + channels over FFI without an extra layer of heap allocation. + + +## [0.1.5] - 2022-09-01 +### Fixed +- Handle the UNPARKING state correctly in all recv methods. `try_recv` will now not panic + if used on a `Receiver` that is being unparked from an async wait. The other `recv` methods + will still panic (as they should), but with a better error message. + + +## [0.1.4] - 2022-08-30 +### Changed +- Upgrade to Rust edition 2021. Also increases the MSRV to Rust 1.60. +- Add null-pointer optimization to `Sender`, `Receiver` and `SendError`. + This reduces the call stack size of Sender::send and it makes + `Option` and `Option` pointer sized (#18). +- Relax the memory ordering of all atomic operations from `SeqCst` to the most appropriate + lower ordering (#17 + #20). + +### Fixed +- Fix undefined behavior due to multiple mutable references to the same channel instance (#18). +- Fix race condition that could happen during unparking of a receiving `Receiver` (#17 + #20). + + +## [0.1.3] - 2021-11-23 +### Fixed +- Keep the *last* `Waker` in `Future::poll`, not the *first* one. Stops breaking the contract + on how futures should work. + + +## [0.1.2] - 2020-08-11 +### Fixed +- Fix unreachable code panic that happened if the `Receiver` of an empty but open channel was + polled and then dropped. + + +## [0.1.1] - 2020-05-10 +Initial implementation. Supports basically all the (for now) intended functionality. +Sender is as lock-free as I think it can get and the receiver can both do thread blocking +and be awaited asynchronously. The receiver also has a wait-free `try_recv` method. + +The crate has two features. They are activated by default, but the user can opt out of async +support as well as usage of libstd (making the crate `no_std` but still requiring liballoc) + + +## [0.1.0] - 2019-05-30 +Name reserved on crate.io by someone other than the author of this crate. diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..cba7266 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,64 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2021" +rust-version = "1.60.0" +name = "oneshot-uniffi" +version = "0.1.6" +authors = ["Linus Färnstrand "] +description = """ +Patched version of oneshot specifically for the UniFFI project. + +This removes the `loom` target and dependency which helps with UniFFI's downstream consumers. +""" +readme = "README.md" +keywords = [ + "oneshot", + "spsc", + "async", + "sync", + "channel", +] +categories = [ + "asynchronous", + "concurrency", +] +license = "MIT OR Apache-2.0" +repository = "https://github.com/faern/oneshot" + +[[bench]] +name = "benches" +harness = false + +[dev-dependencies.async-std] +version = "1" +features = ["attributes"] + +[dev-dependencies.criterion] +version = "0.3" + +[dev-dependencies.tokio] +version = "1" +features = [ + "rt", + "rt-multi-thread", + "macros", + "time", +] + +[features] +async = [] +default = [ + "std", + "async", +] +std = [] diff --git a/LICENSE b/LICENSE new file mode 120000 index 0000000..6b579aa --- /dev/null +++ b/LICENSE @@ -0,0 +1 @@ +LICENSE-APACHE \ No newline at end of file diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..9cf1062 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,19 @@ +MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/METADATA b/METADATA new file mode 100644 index 0000000..0fc6c63 --- /dev/null +++ b/METADATA @@ -0,0 +1,21 @@ +name: "oneshot-uniffi" +description: "()" +third_party { + identifier { + type: "crates.io" + value: "oneshot-uniffi" + } + identifier { + type: "Archive" + value: "https://static.crates.io/crates/oneshot-uniffi/oneshot-uniffi-0.1.6.crate" + primary_source: true + } + version: "0.1.6" + # Dual-licensed, using the least restrictive per go/thirdpartylicenses#same. + license_type: NOTICE + last_upgrade_date { + year: 2024 + month: 3 + day: 21 + } +} diff --git a/MODULE_LICENSE_APACHE2 b/MODULE_LICENSE_APACHE2 new file mode 100644 index 0000000..e69de29 diff --git a/OWNERS b/OWNERS new file mode 100644 index 0000000..48bea6e --- /dev/null +++ b/OWNERS @@ -0,0 +1,2 @@ +# Bug component: 688011 +include platform/prebuilts/rust:main:/OWNERS diff --git a/README.md b/README.md new file mode 100644 index 0000000..535f011 --- /dev/null +++ b/README.md @@ -0,0 +1,94 @@ +# oneshot + +Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance +can only transport a single message. This has a few nice outcomes. One thing is that +the implementation can be very efficient, utilizing the knowledge that there will +only be one message. But more importantly, it allows the API to be expressed in such +a way that certain edge cases that you don't want to care about when only sending a +single message on a channel does not exist. For example: The sender can't be copied +or cloned, and the send method takes ownership and consumes the sender. +So you are guaranteed, at the type level, that there can only be one message sent. + +The sender's send method is non-blocking, and potentially lock- and wait-free. +See documentation on [Sender::send] for situations where it might not be fully wait-free. +The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time +limited thread blocking receive operations. The receiver also implements `Future` and +supports asynchronously awaiting the message. + + +## Examples + +This example sets up a background worker that processes requests coming in on a standard +mpsc channel and replies on a oneshot channel provided with each request. The worker can +be interacted with both from sync and async contexts since the oneshot receiver +can receive both blocking and async. + +```rust +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +type Request = String; + +// Starts a background thread performing some computation on requests sent to it. +// Delivers the response back over a oneshot channel. +fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender)> { + let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender)>(); + thread::spawn(move || { + for (request_data, response_sender) in request_receiver.iter() { + let compute_operation = || request_data.len(); + let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel + } + }); + request_sender +} + +let processor = spawn_processing_thread(); + +// If compiled with `std` the library can receive messages with timeout on regular threads +#[cfg(feature = "std")] { + let (response_sender, response_receiver) = oneshot::channel(); + let request = Request::from("data from sync thread"); + + processor.send((request, response_sender)).expect("Processor down"); + match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel + Ok(result) => println!("Processor returned {}", result), + Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"), + Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"), + } +} + +// If compiled with the `async` feature, the `Receiver` can be awaited in an async context +#[cfg(feature = "async")] { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let (response_sender, response_receiver) = oneshot::channel(); + let request = Request::from("data from sync thread"); + + processor.send((request, response_sender)).expect("Processor down"); + match response_receiver.await { // <- Receive on the oneshot channel asynchronously + Ok(result) => println!("Processor returned {}", result), + Err(_e) => panic!("Processor exited"), + } + }); +} +``` + +## Sync vs async + +The main motivation for writing this library was that there were no (known to me) channel +implementations allowing you to seamlessly send messages between a normal thread and an async +task, or the other way around. If message passing is the way you are communicating, of course +that should work smoothly between the sync and async parts of the program! + +This library achieves that by having a fast and cheap send operation that can +be used in both sync threads and async tasks. The receiver has both thread blocking +receive methods for synchronous usage, and implements `Future` for asynchronous usage. + +The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +in an asynchronous task. This implementation is completely executor/runtime agnostic. It should +be possible to use this library with any executor. + + +License: MIT OR Apache-2.0 diff --git a/benches/benches.rs b/benches/benches.rs new file mode 100644 index 0000000..438d46a --- /dev/null +++ b/benches/benches.rs @@ -0,0 +1,122 @@ +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use std::mem; +use std::time::{Duration, Instant}; + +criterion_group!(benches, bench); +criterion_main!(benches); + +macro_rules! bench_send_and_recv { + ($c:expr, $($type:ty => $value:expr);+) => { + // Sanity check that all $values are of $type. + $(let _: $type = $value;)* + { + let mut group = $c.benchmark_group("create_channel"); + $(group.bench_function(stringify!($type), |b| { + b.iter(oneshot::channel::<$type>) + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_and_send"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, _receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_and_send_on_closed"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, _) = oneshot::channel(); + sender.send(black_box($value)).unwrap_err() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_send_and_recv"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap(); + receiver.recv().unwrap() + }); + });)* + group.finish(); + } + { + let mut group = $c.benchmark_group("create_send_and_recv_ref"); + $(group.bench_function(stringify!($type), |b| { + b.iter(|| { + let (sender, receiver) = oneshot::channel(); + sender.send(black_box($value)).unwrap(); + receiver.recv_ref().unwrap() + }); + });)* + group.finish(); + } + }; +} + +fn bench(c: &mut Criterion) { + bench_send_and_recv!(c, + () => (); + u8 => 7u8; + usize => 9876usize; + u128 => 1234567u128; + [u8; 64] => [0b10101010u8; 64]; + [u8; 4096] => [0b10101010u8; 4096] + ); + + bench_try_recv(c); + bench_recv_deadline_now(c); + bench_recv_timeout_zero(c); +} + +fn bench_try_recv(c: &mut Criterion) { + let (sender, receiver) = oneshot::channel::(); + c.bench_function("try_recv_empty", |b| { + b.iter(|| receiver.try_recv().unwrap_err()) + }); + mem::drop(sender); + c.bench_function("try_recv_empty_closed", |b| { + b.iter(|| receiver.try_recv().unwrap_err()) + }); +} + +fn bench_recv_deadline_now(c: &mut Criterion) { + let now = Instant::now(); + { + let (_sender, receiver) = oneshot::channel::(); + c.bench_function("recv_deadline_now", |b| { + b.iter(|| receiver.recv_deadline(now).unwrap_err()) + }); + } + { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + c.bench_function("recv_deadline_now_closed", |b| { + b.iter(|| receiver.recv_deadline(now).unwrap_err()) + }); + } +} + +fn bench_recv_timeout_zero(c: &mut Criterion) { + let zero = Duration::from_nanos(0); + { + let (_sender, receiver) = oneshot::channel::(); + c.bench_function("recv_timeout_zero", |b| { + b.iter(|| receiver.recv_timeout(zero).unwrap_err()) + }); + } + { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + c.bench_function("recv_timeout_zero_closed", |b| { + b.iter(|| receiver.recv_timeout(zero).unwrap_err()) + }); + } +} diff --git a/cargo_embargo.json b/cargo_embargo.json new file mode 100644 index 0000000..762838b --- /dev/null +++ b/cargo_embargo.json @@ -0,0 +1,8 @@ +{ + "run_cargo": false, + "module_visibility": { + "liboneshot_uniffi": [ + "//external/rust/crates/uniffi_core" + ] + } +} diff --git a/check_mem_leaks.sh b/check_mem_leaks.sh new file mode 100755 index 0000000..5a10835 --- /dev/null +++ b/check_mem_leaks.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -eu + +SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$SCRIPT_DIR" + +for example_path in examples/*.rs; do + example_filename=$(basename -- $example_path) + example=${example_filename%.*} + echo $example + cargo valgrind run --example "$example" +done diff --git a/examples/recv_before_send.rs b/examples/recv_before_send.rs new file mode 100644 index 0000000..2eda3dd --- /dev/null +++ b/examples/recv_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv(), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_before_send_then_drop_sender.rs b/examples/recv_before_send_then_drop_sender.rs new file mode 100644 index 0000000..aea7d66 --- /dev/null +++ b/examples/recv_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv().is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_ref_before_send.rs b/examples/recv_ref_before_send.rs new file mode 100644 index 0000000..6ed74dd --- /dev/null +++ b/examples/recv_ref_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_ref(), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_ref_before_send_then_drop_sender.rs b/examples/recv_ref_before_send_then_drop_sender.rs new file mode 100644 index 0000000..75ff3d6 --- /dev/null +++ b/examples/recv_ref_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv_ref().is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_timeout_before_send.rs b/examples/recv_timeout_before_send.rs new file mode 100644 index 0000000..85a2ac8 --- /dev/null +++ b/examples/recv_timeout_before_send.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_timeout(Duration::from_millis(100)), Ok(9)); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_timeout_before_send_then_drop_sender.rs b/examples/recv_timeout_before_send_then_drop_sender.rs new file mode 100644 index 0000000..32c31fc --- /dev/null +++ b/examples/recv_timeout_before_send_then_drop_sender.rs @@ -0,0 +1,18 @@ +#[cfg(feature = "std")] +fn main() { + use std::thread; + use std::time::Duration; + + let (sender, receiver) = oneshot::channel::(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + std::mem::drop(sender); + }); + assert!(receiver.recv_timeout(Duration::from_millis(100)).is_err()); + t.join().unwrap(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/recv_with_dropped_sender.rs b/examples/recv_with_dropped_sender.rs new file mode 100644 index 0000000..f7a7171 --- /dev/null +++ b/examples/recv_with_dropped_sender.rs @@ -0,0 +1,11 @@ +#[cfg(feature = "std")] +fn main() { + let (sender, receiver) = oneshot::channel::(); + std::mem::drop(sender); + receiver.recv().unwrap_err(); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/send_before_recv.rs b/examples/send_before_recv.rs new file mode 100644 index 0000000..c31ba65 --- /dev/null +++ b/examples/send_before_recv.rs @@ -0,0 +1,11 @@ +#[cfg(feature = "std")] +fn main() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.recv(), Ok(19i128)); +} + +#[cfg(not(feature = "std"))] +fn main() { + panic!("This example is only for when the \"sync\" feature is used"); +} diff --git a/examples/send_then_drop_receiver.rs b/examples/send_then_drop_receiver.rs new file mode 100644 index 0000000..941c508 --- /dev/null +++ b/examples/send_then_drop_receiver.rs @@ -0,0 +1,7 @@ +use std::mem; + +fn main() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + mem::drop(receiver); +} diff --git a/examples/send_with_dropped_receiver.rs b/examples/send_with_dropped_receiver.rs new file mode 100644 index 0000000..19bfa38 --- /dev/null +++ b/examples/send_with_dropped_receiver.rs @@ -0,0 +1,8 @@ +use std::mem; + +fn main() { + let (sender, receiver) = oneshot::channel(); + mem::drop(receiver); + let send_error = sender.send(5u128).unwrap_err(); + assert_eq!(send_error.into_inner(), 5); +} diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 0000000..1fd0de1 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,147 @@ +use super::{dealloc, Channel}; +use core::fmt; +use core::mem; +use core::ptr::NonNull; + +/// An error returned when trying to send on a closed channel. Returned from +/// [`Sender::send`](crate::Sender::send) if the corresponding [`Receiver`](crate::Receiver) +/// has already been dropped. +/// +/// The message that could not be sent can be retreived again with [`SendError::into_inner`]. +pub struct SendError { + channel_ptr: NonNull>, +} + +unsafe impl Send for SendError {} +unsafe impl Sync for SendError {} + +impl SendError { + /// # Safety + /// + /// By calling this function, the caller semantically transfers ownership of the + /// channel's resources to the created `SendError`. Thus the caller must ensure that the + /// pointer is not used in a way which would violate this ownership transfer. Moreover, + /// the caller must assert that the channel contains a valid, initialized message. + pub(crate) const unsafe fn new(channel_ptr: NonNull>) -> Self { + Self { channel_ptr } + } + + /// Consumes the error and returns the message that failed to be sent. + #[inline] + pub fn into_inner(self) -> T { + let channel_ptr = self.channel_ptr; + + // Don't run destructor if we consumed ourselves. Freeing happens here. + mem::forget(self); + + // SAFETY: we have ownership of the channel + let channel: &Channel = unsafe { channel_ptr.as_ref() }; + + // SAFETY: we know that the message is initialized according to the safety requirements of + // `new` + let message = unsafe { channel.take_message() }; + + // SAFETY: we own the channel + unsafe { dealloc(channel_ptr) }; + + message + } + + /// Get a reference to the message that failed to be sent. + #[inline] + pub fn as_inner(&self) -> &T { + unsafe { self.channel_ptr.as_ref().message().assume_init_ref() } + } +} + +impl Drop for SendError { + fn drop(&mut self) { + // SAFETY: we have ownership of the channel and require that the message is initialized + // upon construction + unsafe { + self.channel_ptr.as_ref().drop_message(); + dealloc(self.channel_ptr); + } + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "sending on a closed channel".fmt(f) + } +} + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "SendError<{}>(_)", stringify!(T)) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for SendError {} + +/// An error returned from the blocking [`Receiver::recv`](crate::Receiver::recv) method. +/// +/// The receive operation can only fail if the corresponding [`Sender`](crate::Sender) was dropped +/// before sending any message, or if a message has already been sent and received on the channel. +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub struct RecvError; + +impl fmt::Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "receiving on a closed channel".fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RecvError {} + +/// An error returned when failing to receive a message in the non-blocking +/// [`Receiver::try_recv`](crate::Receiver::try_recv). +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum TryRecvError { + /// The channel is still open, but there was no message present in it. + Empty, + + /// The channel is closed. Either the sender was dropped before sending any message, or the + /// message has already been extracted from the receiver. + Disconnected, +} + +impl fmt::Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let msg = match self { + TryRecvError::Empty => "receiving on an empty channel", + TryRecvError::Disconnected => "receiving on a closed channel", + }; + msg.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for TryRecvError {} + +/// An error returned when failing to receive a message in +/// [`Receiver::recv_timeout`](crate::Receiver::recv_timeout). +#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] +pub enum RecvTimeoutError { + /// No message arrived on the channel before the timeout was reached. The channel is still open. + Timeout, + + /// The channel is closed. Either the sender was dropped before sending any message, or the + /// message has already been extracted from the receiver. + Disconnected, +} + +impl fmt::Display for RecvTimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let msg = match self { + RecvTimeoutError::Timeout => "timed out waiting on channel", + RecvTimeoutError::Disconnected => "channel is empty and sending half is closed", + }; + msg.fmt(f) + } +} + +#[cfg(feature = "std")] +impl std::error::Error for RecvTimeoutError {} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..8da012b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1242 @@ +//! Oneshot spsc (single producer, single consumer) channel. Meaning each channel instance +//! can only transport a single message. This has a few nice outcomes. One thing is that +//! the implementation can be very efficient, utilizing the knowledge that there will +//! only be one message. But more importantly, it allows the API to be expressed in such +//! a way that certain edge cases that you don't want to care about when only sending a +//! single message on a channel does not exist. For example: The sender can't be copied +//! or cloned, and the send method takes ownership and consumes the sender. +//! So you are guaranteed, at the type level, that there can only be one message sent. +//! +//! The sender's send method is non-blocking, and potentially lock- and wait-free. +//! See documentation on [Sender::send] for situations where it might not be fully wait-free. +//! The receiver supports both lock- and wait-free `try_recv` as well as indefinite and time +//! limited thread blocking receive operations. The receiver also implements `Future` and +//! supports asynchronously awaiting the message. +//! +//! +//! # Examples +//! +//! This example sets up a background worker that processes requests coming in on a standard +//! mpsc channel and replies on a oneshot channel provided with each request. The worker can +//! be interacted with both from sync and async contexts since the oneshot receiver +//! can receive both blocking and async. +//! +//! ```rust +//! use std::sync::mpsc; +//! use std::thread; +//! use std::time::Duration; +//! +//! type Request = String; +//! +//! // Starts a background thread performing some computation on requests sent to it. +//! // Delivers the response back over a oneshot channel. +//! fn spawn_processing_thread() -> mpsc::Sender<(Request, oneshot::Sender)> { +//! let (request_sender, request_receiver) = mpsc::channel::<(Request, oneshot::Sender)>(); +//! thread::spawn(move || { +//! for (request_data, response_sender) in request_receiver.iter() { +//! let compute_operation = || request_data.len(); +//! let _ = response_sender.send(compute_operation()); // <- Send on the oneshot channel +//! } +//! }); +//! request_sender +//! } +//! +//! let processor = spawn_processing_thread(); +//! +//! // If compiled with `std` the library can receive messages with timeout on regular threads +//! #[cfg(feature = "std")] { +//! let (response_sender, response_receiver) = oneshot::channel(); +//! let request = Request::from("data from sync thread"); +//! +//! processor.send((request, response_sender)).expect("Processor down"); +//! match response_receiver.recv_timeout(Duration::from_secs(1)) { // <- Receive on the oneshot channel +//! Ok(result) => println!("Processor returned {}", result), +//! Err(oneshot::RecvTimeoutError::Timeout) => eprintln!("Processor was too slow"), +//! Err(oneshot::RecvTimeoutError::Disconnected) => panic!("Processor exited"), +//! } +//! } +//! +//! // If compiled with the `async` feature, the `Receiver` can be awaited in an async context +//! #[cfg(feature = "async")] { +//! tokio::runtime::Runtime::new() +//! .unwrap() +//! .block_on(async move { +//! let (response_sender, response_receiver) = oneshot::channel(); +//! let request = Request::from("data from sync thread"); +//! +//! processor.send((request, response_sender)).expect("Processor down"); +//! match response_receiver.await { // <- Receive on the oneshot channel asynchronously +//! Ok(result) => println!("Processor returned {}", result), +//! Err(_e) => panic!("Processor exited"), +//! } +//! }); +//! } +//! ``` +//! +//! # Sync vs async +//! +//! The main motivation for writing this library was that there were no (known to me) channel +//! implementations allowing you to seamlessly send messages between a normal thread and an async +//! task, or the other way around. If message passing is the way you are communicating, of course +//! that should work smoothly between the sync and async parts of the program! +//! +//! This library achieves that by having a fast and cheap send operation that can +//! be used in both sync threads and async tasks. The receiver has both thread blocking +//! receive methods for synchronous usage, and implements `Future` for asynchronous usage. +//! +//! The receiving endpoint of this channel implements Rust's `Future` trait and can be waited on +//! in an asynchronous task. This implementation is completely executor/runtime agnostic. It should +//! be possible to use this library with any executor. +//! + +// # Implementation description +// +// When a channel is created via the channel function, it creates a single heap allocation +// containing: +// * A one byte atomic integer that represents the current channel state, +// * Uninitialized memory to fit the message, +// * Uninitialized memory to fit the waker that can wake the receiving task or thread up. +// +// The size of the waker depends on which features are activated, it ranges from 0 to 24 bytes[1]. +// So with all features enabled (the default) each channel allocates 25 bytes plus the size of the +// message, plus any padding needed to get correct memory alignment. +// +// The Sender and Receiver only holds a raw pointer to the heap channel object. The last endpoint +// to be consumed or dropped is responsible for freeing the heap memory. The first endpoint to +// be consumed or dropped signal via the state that it is gone. And the second one see this and +// frees the memory. +// +// ## Footnotes +// +// [1]: Mind that the waker only takes zero bytes when all features are disabled, making it +// impossible to *wait* for the message. `try_recv` the only available method in this scenario. + +#![deny(rust_2018_idioms)] +#![cfg_attr(not(feature = "std"), no_std)] + +#[cfg(not(loom))] +extern crate alloc; + +use core::{ + marker::PhantomData, + mem::{self, MaybeUninit}, + ptr::{self, NonNull}, +}; + +#[cfg(not(loom))] +use core::{ + cell::UnsafeCell, + sync::atomic::{fence, AtomicU8, Ordering::*}, +}; +#[cfg(loom)] +use loom::{ + cell::UnsafeCell, + sync::atomic::{fence, AtomicU8, Ordering::*}, +}; + +#[cfg(all(feature = "async", not(loom)))] +use core::hint; +#[cfg(all(feature = "async", loom))] +use loom::hint; + +#[cfg(feature = "async")] +use core::{ + pin::Pin, + task::{self, Poll}, +}; +#[cfg(feature = "std")] +use std::time::{Duration, Instant}; + +#[cfg(feature = "std")] +mod thread { + #[cfg(not(loom))] + pub use std::thread::{current, park, park_timeout, yield_now, Thread}; + + #[cfg(loom)] + pub use loom::thread::{current, park, yield_now, Thread}; + + // loom does not support parking with a timeout. So we just + // yield. This means that the "park" will "spuriously" wake up + // way too early. But the code should properly handle this. + // One thing to note is that very short timeouts are needed + // when using loom, since otherwise the looping will cause + // an overflow in loom. + #[cfg(loom)] + pub fn park_timeout(_timeout: std::time::Duration) { + loom::thread::yield_now() + } +} + +#[cfg(loom)] +mod loombox; +#[cfg(not(loom))] +use alloc::boxed::Box; +#[cfg(loom)] +use loombox::Box; + +mod errors; +pub use errors::{RecvError, RecvTimeoutError, SendError, TryRecvError}; + +/// Creates a new oneshot channel and returns the two endpoints, [`Sender`] and [`Receiver`]. +pub fn channel() -> (Sender, Receiver) { + // Allocate the channel on the heap and get the pointer. + // The last endpoint of the channel to be alive is responsible for freeing the channel + // and dropping any object that might have been written to it. + + let channel_ptr = Box::into_raw(Box::new(Channel::new())); + + // SAFETY: `channel_ptr` came from a Box and thus is not null + let channel_ptr = unsafe { NonNull::new_unchecked(channel_ptr) }; + + ( + Sender { + channel_ptr, + _invariant: PhantomData, + }, + Receiver { channel_ptr }, + ) +} + +#[derive(Debug)] +pub struct Sender { + channel_ptr: NonNull>, + // In reality we want contravariance, however we can't obtain that. + // + // Consider the following scenario: + // ``` + // let (mut tx, rx) = channel::<&'short u8>(); + // let (tx2, rx2) = channel::<&'long u8>(); + // + // tx = tx2; + // + // // Pretend short_ref is some &'short u8 + // tx.send(short_ref).unwrap(); + // let long_ref = rx2.recv().unwrap(); + // ``` + // + // If this type were covariant then we could safely extend lifetimes, which is not okay. + // Hence, we enforce invariance. + _invariant: PhantomData T>, +} + +#[derive(Debug)] +pub struct Receiver { + // Covariance is the right choice here. Consider the example presented in Sender, and you'll + // see that if we replaced `rx` instead then we would get the expected behavior + channel_ptr: NonNull>, +} + +unsafe impl Send for Sender {} +unsafe impl Send for Receiver {} +impl Unpin for Receiver {} + +impl Sender { + /// Sends `message` over the channel to the corresponding [`Receiver`]. + /// + /// Returns an error if the receiver has already been dropped. The message can + /// be extracted from the error. + /// + /// This method is lock-free and wait-free when sending on a channel that the + /// receiver is currently not receiving on. If the receiver is receiving during the send + /// operation this method includes waking up the thread/task. Unparking a thread involves + /// a mutex in Rust's standard library at the time of writing this. + /// How lock-free waking up an async task is + /// depends on your executor. If this method returns a `SendError`, please mind that dropping + /// the error involves running any drop implementation on the message type, and freeing the + /// channel's heap allocation, which might or might not be lock-free. + pub fn send(self, message: T) -> Result<(), SendError> { + let channel_ptr = self.channel_ptr; + + // Don't run our Drop implementation if send was called, any cleanup now happens here + mem::forget(self); + + // SAFETY: The channel exists on the heap for the entire duration of this method and we + // only ever acquire shared references to it. Note that if the receiver disconnects it + // does not free the channel. + let channel = unsafe { channel_ptr.as_ref() }; + + // Write the message into the channel on the heap. + // SAFETY: The receiver only ever accesses this memory location if we are in the MESSAGE + // state, and since we're responsible for setting that state, we can guarantee that we have + // exclusive access to this memory location to perform this write. + unsafe { channel.write_message(message) }; + + // Set the state to signal there is a message on the channel. + // ORDERING: we use release ordering to ensure the write of the message is visible to the + // receiving thread. The EMPTY and DISCONNECTED branches do not observe any shared state, + // and thus we do not need acquire orderng. The RECEIVING branch manages synchronization + // independent of this operation. + // + // EMPTY + 1 = MESSAGE + // RECEIVING + 1 = UNPARKING + // DISCONNECTED + 1 = invalid, however this state is never observed + match channel.state.fetch_add(1, Release) { + // The receiver is alive and has not started waiting. Send done. + EMPTY => Ok(()), + // The receiver is waiting. Wake it up so it can return the message. + RECEIVING => { + // ORDERING: Synchronizes with the write of the waker to memory, and prevents the + // taking of the waker from being ordered before this operation. + fence(Acquire); + + // Take the waker, but critically do not unpark it. If we unparked now, then the + // receiving thread could still observe the UNPARKING state and re-park, meaning + // that after we change to the MESSAGE state, it would remain parked indefinitely + // or until a spurious wakeup. + // SAFETY: at this point we are in the UNPARKING state, and the receiving thread + // does not access the waker while in this state, nor does it free the channel + // allocation in this state. + let waker = unsafe { channel.take_waker() }; + + // ORDERING: this ordering serves two-fold: it synchronizes with the acquire load + // in the receiving thread, ensuring that both our read of the waker and write of + // the message happen-before the taking of the message and freeing of the channel. + // Furthermore, we need acquire ordering to ensure the unparking of the receiver + // happens after the channel state is updated. + channel.state.swap(MESSAGE, AcqRel); + + // Note: it is possible that between the store above and this statement that + // the receiving thread is spuriously unparked, takes the message, and frees + // the channel allocation. However, we took ownership of the channel out of + // that allocation, and freeing the channel does not drop the waker since the + // waker is wrapped in MaybeUninit. Therefore this data is valid regardless of + // whether or not the receive has completed by this point. + waker.unpark(); + + Ok(()) + } + // The receiver was already dropped. The error is responsible for freeing the channel. + // SAFETY: since the receiver disconnected it will no longer access `channel_ptr`, so + // we can transfer exclusive ownership of the channel's resources to the error. + // Moreover, since we just placed the message in the channel, the channel contains a + // valid message. + DISCONNECTED => Err(unsafe { SendError::new(channel_ptr) }), + _ => unreachable!(), + } + } + + /// Consumes the Sender, returning a raw pointer to the channel on the heap. + /// + /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing + /// to do with the returned pointer is to later reconstruct the Sender with [Sender::from_raw]. + /// Memory will leak if the Sender is never reconstructed. + pub fn into_raw(self) -> *mut () { + let raw = self.channel_ptr.as_ptr() as *mut (); + mem::forget(self); + raw + } + + /// Consumes a raw pointer from [Sender::into_raw], recreating the Sender. + /// + /// # Safety + /// + /// This pointer must have come from [`Sender::into_raw`] with the same message type, `T`. + /// At most one Sender must exist for a channel at any point in time. + /// Constructing multiple Senders from the same raw pointer leads to undefined behavior. + pub unsafe fn from_raw(raw: *mut ()) -> Self { + Self { + channel_ptr: NonNull::new_unchecked(raw as *mut Channel), + _invariant: PhantomData, + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + // SAFETY: The receiver only ever frees the channel if we are in the MESSAGE or + // DISCONNECTED states. If we are in the MESSAGE state, then we called + // mem::forget(self), so we should not be in this function call. If we are in the + // DISCONNECTED state, then the receiver either received a MESSAGE so this statement is + // unreachable, or was dropped and observed that our side was still alive, and thus didn't + // free the channel. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // Set the channel state to disconnected and read what state the receiver was in + // ORDERING: we don't need release ordering here since there are no modifications we + // need to make visible to other thread, and the Err(RECEIVING) branch handles + // synchronization independent of this cmpxchg + // + // EMPTY ^ 001 = DISCONNECTED + // RECEIVING ^ 001 = UNPARKING + // DISCONNECTED ^ 001 = EMPTY (invalid), but this state is never observed + match channel.state.fetch_xor(0b001, Relaxed) { + // The receiver has not started waiting, nor is it dropped. + EMPTY => (), + // The receiver is waiting. Wake it up so it can detect that the channel disconnected. + RECEIVING => { + // See comments in Sender::send + + fence(Acquire); + + let waker = unsafe { channel.take_waker() }; + + // We still need release ordering here to make sure our read of the waker happens + // before this, and acquire ordering to ensure the unparking of the receiver + // happens after this. + channel.state.swap(DISCONNECTED, AcqRel); + + // The Acquire ordering above ensures that the write of the DISCONNECTED state + // happens-before unparking the receiver. + waker.unpark(); + } + // The receiver was already dropped. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: when the receiver switches the state to DISCONNECTED they have received + // the message or will no longer be trying to receive the message, and have + // observed that the sender is still alive, meaning that we're responsible for + // freeing the channel allocation. + unsafe { dealloc(self.channel_ptr) }; + } + _ => unreachable!(), + } + } +} + +impl Receiver { + /// Checks if there is a message in the channel without blocking. Returns: + /// * `Ok(message)` if there was a message in the channel. + /// * `Err(Empty)` if the [`Sender`] is alive, but has not yet sent a message. + /// * `Err(Disconnected)` if the [`Sender`] was dropped before sending anything or if the + /// message has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// This method is completely lock-free and wait-free. The only thing it does is an atomic + /// integer load of the channel state. And if there is a message in the channel it additionally + /// performs one atomic integer store and copies the message from the heap to the stack for + /// returning it. + pub fn try_recv(&self) -> Result { + // SAFETY: The channel will not be freed while this method is still running. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the store of the message. + match channel.state.load(Acquire) { + MESSAGE => { + // It's okay to break up the load and store since once we're in the message state + // the sender no longer modifies the state + // ORDERING: at this point the sender has done its job and is no longer active, so + // we don't need to make any side effects visible to it + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the MESSAGE state so the message is present + Ok(unsafe { channel.take_message() }) + } + EMPTY => Err(TryRecvError::Empty), + DISCONNECTED => Err(TryRecvError::Disconnected), + #[cfg(feature = "async")] + RECEIVING | UNPARKING => Err(TryRecvError::Empty), + _ => unreachable!(), + } + } + + /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is + /// disconnected. + /// + /// This method will always block the current thread if there is no data available and it is + /// still possible for the message to be sent. Once the message is sent to the corresponding + /// [`Sender`], then this receiver will wake up and return that message. + /// + /// If the corresponding [`Sender`] has disconnected (been dropped), or it disconnects while + /// this call is blocking, this call will wake up and return `Err` to indicate that the message + /// can never be received on this channel. + /// + /// If a sent message has already been extracted from this channel this method will return an + /// error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv(self) -> Result { + // Note that we don't need to worry about changing the state to disconnected or setting the + // state to an invalid value at any point in this function because we take ownership of + // self, and this function does not exit until the message has been received or both side + // of the channel are inactive and cleaned up. + + let channel_ptr = self.channel_ptr; + + // Don't run our Drop implementation if we are receiving consuming ourselves. + mem::forget(self); + + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so channel_ptr is valid + let channel = unsafe { channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the write of the message in the + // case that it's available + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + + // Switch the state to RECEIVING. We need to do this in one atomic step in case the + // sender disconnected or sent the message while we wrote the waker to memory. We + // don't need to do a compare exchange here however because if the original state + // was not EMPTY, then the sender has either finished sending the message or is + // being dropped, so the RECEIVING state will never be observed after we return. + // ORDERING: we use release ordering so the sender can synchronize with our writing + // of the waker to memory. The individual branches handle any additional + // synchronizaton + match channel.state.swap(RECEIVING, Release) { + // We stored our waker, now we park until the sender has changed the state + EMPTY => loop { + thread::park(); + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + MESSAGE => { + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating + // the channel to us upon sending the message + unsafe { dealloc(channel_ptr) }; + + break Ok(message); + } + // The sender was dropped while we were parked. + DISCONNECTED => { + // SAFETY: the Sender doesn't deallocate the channel allocation in + // its drop implementation if we're receiving + unsafe { dealloc(channel_ptr) }; + + break Err(RecvError); + } + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + }, + // The sender sent the message while we prepared to park. + MESSAGE => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken, so it's likely more efficient to use a fence here + // instead of AcqRel ordering on the RMW operation + fence(Acquire); + + // SAFETY: we started in the empty state and the sender switched us to the + // message state. This means that it did not take the waker, so we're + // responsible for dropping it. + unsafe { channel.drop_waker() }; + + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: the Sender delegates the responsibility of deallocating the + // channel to us upon sending the message + unsafe { dealloc(channel_ptr) }; + + Ok(message) + } + // The sender was dropped before sending anything while we prepared to park. + DISCONNECTED => { + // SAFETY: we started in the empty state and the sender switched us to the + // disconnected state. It does not take the waker when it does this so we + // need to drop it. + unsafe { channel.drop_waker() }; + + // SAFETY: the sender does not deallocate the channel if it switches from + // empty to disconnected so we need to free the allocation + unsafe { dealloc(channel_ptr) }; + + Err(RecvError) + } + _ => unreachable!(), + } + } + // The sender already sent the message. + MESSAGE => { + // SAFETY: we are in the message state so the message is valid + let message = unsafe { channel.take_message() }; + + // SAFETY: we are already in the message state so the sender has been forgotten + // and it's our job to clean up resources + unsafe { dealloc(channel_ptr) }; + + Ok(message) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => { + // SAFETY: the sender does not deallocate the channel if it switches from empty to + // disconnected so we need to free the allocation + unsafe { dealloc(channel_ptr) }; + + Err(RecvError) + } + // The receiver must have been `Future::poll`ed prior to this call. + #[cfg(feature = "async")] + RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), + _ => unreachable!(), + } + } + + /// Attempts to wait for a message from the [`Sender`], returning an error if the channel is + /// disconnected. This is a non consuming version of [`Receiver::recv`], but with a bit + /// worse performance. Prefer `[`Receiver::recv`]` if your code allows consuming the receiver. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_ref(&self) -> Result { + self.start_recv_ref(RecvError, |channel| { + loop { + thread::park(); + + // ORDERING: we use acquire ordering to synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + // We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender is inactive at this point so we don't need to make + // any reads or writes visible to the sending thread + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we were just in the message state so the message is valid + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => break Err(RecvError), + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + } + }) + } + + /// Like [`Receiver::recv`], but will not block longer than `timeout`. Returns: + /// * `Ok(message)` if there was a message in the channel before the timeout was reached. + /// * `Err(Timeout)` if no message arrived on the channel before the timeout was reached. + /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message + /// has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// If the supplied `timeout` is so large that Rust's `Instant` type can't represent this point + /// in the future this falls back to an indefinitely blocking receive operation. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_timeout(&self, timeout: Duration) -> Result { + match Instant::now().checked_add(timeout) { + Some(deadline) => self.recv_deadline(deadline), + None => self.recv_ref().map_err(|_| RecvTimeoutError::Disconnected), + } + } + + /// Like [`Receiver::recv`], but will not block longer than until `deadline`. Returns: + /// * `Ok(message)` if there was a message in the channel before the deadline was reached. + /// * `Err(Timeout)` if no message arrived on the channel before the deadline was reached. + /// * `Err(Disconnected)` if the sender was dropped before sending anything or if the message + /// has already been extracted by a previous receive call. + /// + /// If a message is returned, the channel is disconnected and any subsequent receive operation + /// using this receiver will return an error. + /// + /// # Panics + /// + /// Panics if called after this receiver has been polled asynchronously. + #[cfg(feature = "std")] + pub fn recv_deadline(&self, deadline: Instant) -> Result { + /// # Safety + /// + /// If the sender is unparking us after a message send, the message must already have been + /// written to the channel and an acquire memory barrier issued before calling this function + #[cold] + unsafe fn wait_for_unpark(channel: &Channel) -> Result { + loop { + thread::park(); + + // ORDERING: The callee has already synchronized with any message write + match channel.state.load(Relaxed) { + MESSAGE => { + // ORDERING: the sender has been dropped, so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + break Ok(channel.take_message()); + } + DISCONNECTED => break Err(RecvTimeoutError::Disconnected), + // The sender is still unparking us. We continue on the empty state here since + // the current implementation eagerly sets the state to EMPTY upon timeout. + EMPTY => (), + _ => unreachable!(), + } + } + } + + self.start_recv_ref(RecvTimeoutError::Disconnected, |channel| { + loop { + match deadline.checked_duration_since(Instant::now()) { + Some(timeout) => { + thread::park_timeout(timeout); + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender sent the message while we were parked. + MESSAGE => { + // ORDERING: the sender has been `mem::forget`-ed so this update + // only needs to be visible to us. + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we either are in the message state or were just in the + // message state + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => break Err(RecvTimeoutError::Disconnected), + // State did not change, spurious wakeup, park again. + RECEIVING | UNPARKING => (), + _ => unreachable!(), + } + } + None => { + // ORDERING: synchronize with the write of the message + match channel.state.swap(EMPTY, Acquire) { + // We reached the end of the timeout without receiving a message + RECEIVING => { + // SAFETY: we were in the receiving state and are now in the empty + // state, so the sender has not and will not try to read the waker, + // so we have exclusive access to drop it. + unsafe { channel.drop_waker() }; + + break Err(RecvTimeoutError::Timeout); + } + // The sender sent the message while we were parked. + MESSAGE => { + // Same safety and ordering as the Some branch + + channel.state.store(DISCONNECTED, Relaxed); + break Ok(unsafe { channel.take_message() }); + } + // The sender was dropped while we were parked. + DISCONNECTED => { + // ORDERING: we were originally in the disconnected state meaning + // that the sender is inactive and no longer observing the state, + // so we only need to change it back to DISCONNECTED for if the + // receiver is dropped or a recv* method is called again + channel.state.store(DISCONNECTED, Relaxed); + + break Err(RecvTimeoutError::Disconnected); + } + // The sender sent the message and started unparking us + UNPARKING => { + // We were in the UNPARKING state and are now in the EMPTY state. + // We wait to be properly unparked and to observe if the sender + // sets MESSAGE or DISCONNECTED state. + // SAFETY: The load above has synchronized with any message write. + break unsafe { wait_for_unpark(channel) }; + } + _ => unreachable!(), + } + } + } + } + }) + } + + /// Begins the process of receiving on the channel by reference. If the message is already + /// ready, or the sender has disconnected, then this function will return the appropriate + /// Result immediately. Otherwise, it will write the waker to memory, check to see if the + /// sender has finished or disconnected again, and then will call `finish`. `finish` is + /// thus responsible for cleaning up the channel's resources appropriately before it returns, + /// such as destroying the waker, for instance. + #[cfg(feature = "std")] + #[inline] + fn start_recv_ref( + &self, + disconnected_error: E, + finish: impl FnOnce(&Channel) -> Result, + ) -> Result { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: synchronize with the write of the message + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. We prepare to park. + EMPTY => { + // Conditionally add a delay here to help the tests trigger the edge cases where + // the sender manages to be dropped or send something before we are able to store + // our waker object in the channel. + #[cfg(oneshot_test_delay)] + std::thread::sleep(std::time::Duration::from_millis(10)); + + // Write our waker instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + unsafe { channel.write_waker(ReceiverWaker::current_thread()) }; + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match channel + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we delegate to the callback to finish the receive + // operation + Ok(_) => finish(channel), + // The sender sent the message while we prepared to finish + Err(MESSAGE) => { + // See comments in `recv` for ordering and safety + + fence(Acquire); + + unsafe { channel.drop_waker() }; + + // ORDERING: the sender has been `mem::forget`-ed so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized + // message + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything while we prepared to park. + Err(DISCONNECTED) => { + // See comments in `recv` for safety + unsafe { channel.drop_waker() }; + Err(disconnected_error) + } + _ => unreachable!(), + } + } + // The sender sent the message. We take the message and mark the channel disconnected. + MESSAGE => { + // ORDERING: the sender has been `mem::forget`-ed so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + + // SAFETY: we are in the message state so the message is valid + Ok(unsafe { channel.take_message() }) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => Err(disconnected_error), + // The receiver must have been `Future::poll`ed prior to this call. + #[cfg(feature = "async")] + RECEIVING | UNPARKING => panic!("{}", RECEIVER_USED_SYNC_AND_ASYNC_ERROR), + _ => unreachable!(), + } + } + + /// Consumes the Receiver, returning a raw pointer to the channel on the heap. + /// + /// This is intended to simplify using oneshot channels with some FFI code. The only safe thing + /// to do with the returned pointer is to later reconstruct the Receiver with + /// [Receiver::from_raw]. Memory will leak if the Receiver is never reconstructed. + pub fn into_raw(self) -> *mut () { + let raw = self.channel_ptr.as_ptr() as *mut (); + mem::forget(self); + raw + } + + /// Consumes a raw pointer from [Receiver::into_raw], recreating the Receiver. + /// + /// # Safety + /// + /// This pointer must have come from [`Receiver::into_raw`] with the same message type, `T`. + /// At most one Receiver must exist for a channel at any point in time. + /// Constructing multiple Receivers from the same raw pointer leads to undefined behavior. + pub unsafe fn from_raw(raw: *mut ()) -> Self { + Self { + channel_ptr: NonNull::new_unchecked(raw as *mut Channel), + } + } +} + +#[cfg(feature = "async")] +impl core::future::Future for Receiver { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + // SAFETY: the existence of the `self` parameter serves as a certificate that the receiver + // is still alive, meaning that even if the sender was dropped then it would have observed + // the fact that we're still alive and left the responsibility of deallocating the + // channel to us, so `self.channel` is valid + let channel = unsafe { self.channel_ptr.as_ref() }; + + // ORDERING: we use acquire ordering to synchronize with the store of the message. + match channel.state.load(Acquire) { + // The sender is alive but has not sent anything yet. + EMPTY => { + // SAFETY: We can't be in the forbidden states, and no waker in the channel. + unsafe { channel.write_async_waker(cx) } + } + // We were polled again while waiting for the sender. Replace the waker with the new one. + RECEIVING => { + // ORDERING: We use relaxed ordering on both success and failure since we have not + // written anything above that must be released, and the individual match arms + // handle any additional synchronization. + match channel + .state + .compare_exchange(RECEIVING, EMPTY, Relaxed, Relaxed) + { + // We successfully changed the state back to EMPTY. Replace the waker. + // This is the most likely branch to be taken, which is why we don't use any + // memory barriers in the compare_exchange above. + Ok(_) => { + // SAFETY: We wrote the waker in a previous call to poll. We do not need + // a memory barrier since the previous write here was by ourselves. + unsafe { channel.drop_waker() }; + // SAFETY: We can't be in the forbidden states, and no waker in the channel. + unsafe { channel.write_async_waker(cx) } + } + // The sender sent the message while we prepared to replace the waker. + // We take the message and mark the channel disconnected. + // The sender has already taken the waker. + Err(MESSAGE) => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken. + channel.state.swap(DISCONNECTED, Acquire); + // SAFETY: The state tells us the sender has initialized the message. + Poll::Ready(Ok(unsafe { channel.take_message() })) + } + // The sender was dropped before sending anything while we prepared to park. + // The sender has taken the waker already. + Err(DISCONNECTED) => Poll::Ready(Err(RecvError)), + // The sender is currently waking us up. + Err(UNPARKING) => { + // We can't trust that the old waker that the sender has access to + // is honored by the async runtime at this point. So we wake ourselves + // up to get polled instantly again. + cx.waker().wake_by_ref(); + Poll::Pending + } + _ => unreachable!(), + } + } + // The sender sent the message. + MESSAGE => { + // ORDERING: the sender has been dropped so this update only needs to be + // visible to us + channel.state.store(DISCONNECTED, Relaxed); + Poll::Ready(Ok(unsafe { channel.take_message() })) + } + // The sender was dropped before sending anything, or we already received the message. + DISCONNECTED => Poll::Ready(Err(RecvError)), + // The sender has observed the RECEIVING state and is currently reading the waker from + // a previous poll. We need to loop here until we observe the MESSAGE or DISCONNECTED + // state. We busy loop here since we know the sender is done very soon. + UNPARKING => loop { + hint::spin_loop(); + // ORDERING: The load above has already synchronized with the write of the message. + match channel.state.load(Relaxed) { + MESSAGE => { + // ORDERING: the sender has been dropped, so this update only + // needs to be visible to us + channel.state.store(DISCONNECTED, Relaxed); + // SAFETY: We observed the MESSAGE state + break Poll::Ready(Ok(unsafe { channel.take_message() })); + } + DISCONNECTED => break Poll::Ready(Err(RecvError)), + UNPARKING => (), + _ => unreachable!(), + } + }, + _ => unreachable!(), + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + // SAFETY: since the receiving side is still alive the sender would have observed that and + // left deallocating the channel allocation to us. + let channel = unsafe { self.channel_ptr.as_ref() }; + + // Set the channel state to disconnected and read what state the receiver was in + match channel.state.swap(DISCONNECTED, Acquire) { + // The sender has not sent anything, nor is it dropped. + EMPTY => (), + // The sender already sent something. We must drop it, and free the channel. + MESSAGE => { + // SAFETY: we are in the message state so the message is initialized + unsafe { channel.drop_message() }; + + // SAFETY: see safety comment at top of function + unsafe { dealloc(self.channel_ptr) }; + } + // The receiver has been polled. + #[cfg(feature = "async")] + RECEIVING => { + // TODO: figure this out when async is fixed + unsafe { channel.drop_waker() }; + } + // The sender was already dropped. We are responsible for freeing the channel. + DISCONNECTED => { + // SAFETY: see safety comment at top of function + unsafe { dealloc(self.channel_ptr) }; + } + _ => unreachable!(), + } + } +} + +/// All the values that the `Channel::state` field can have during the lifetime of a channel. +mod states { + // These values are very explicitly chosen so that we can replace some cmpxchg calls with + // fetch_* calls. + + /// The initial channel state. Active while both endpoints are still alive, no message has been + /// sent, and the receiver is not receiving. + pub const EMPTY: u8 = 0b011; + /// A message has been sent to the channel, but the receiver has not yet read it. + pub const MESSAGE: u8 = 0b100; + /// No message has yet been sent on the channel, but the receiver is currently receiving. + pub const RECEIVING: u8 = 0b000; + #[cfg(any(feature = "std", feature = "async"))] + pub const UNPARKING: u8 = 0b001; + /// The channel has been closed. This means that either the sender or receiver has been dropped, + /// or the message sent to the channel has already been received. Since this is a oneshot + /// channel, it is disconnected after the one message it is supposed to hold has been + /// transmitted. + pub const DISCONNECTED: u8 = 0b010; +} +use states::*; + +/// Internal channel data structure structure. the `channel` method allocates and puts one instance +/// of this struct on the heap for each oneshot channel instance. The struct holds: +/// * The current state of the channel. +/// * The message in the channel. This memory is uninitialized until the message is sent. +/// * The waker instance for the thread or task that is currently receiving on this channel. +/// This memory is uninitialized until the receiver starts receiving. +struct Channel { + state: AtomicU8, + message: UnsafeCell>, + waker: UnsafeCell>, +} + +impl Channel { + pub fn new() -> Self { + Self { + state: AtomicU8::new(EMPTY), + message: UnsafeCell::new(MaybeUninit::uninit()), + waker: UnsafeCell::new(MaybeUninit::uninit()), + } + } + + #[inline(always)] + unsafe fn message(&self) -> &MaybeUninit { + #[cfg(loom)] + { + self.message.with(|ptr| &*ptr) + } + + #[cfg(not(loom))] + { + &*self.message.get() + } + } + + #[inline(always)] + unsafe fn with_message_mut(&self, op: F) + where + F: FnOnce(&mut MaybeUninit), + { + #[cfg(loom)] + { + self.message.with_mut(|ptr| op(&mut *ptr)) + } + + #[cfg(not(loom))] + { + op(&mut *self.message.get()) + } + } + + #[inline(always)] + #[cfg(any(feature = "std", feature = "async"))] + unsafe fn with_waker_mut(&self, op: F) + where + F: FnOnce(&mut MaybeUninit), + { + #[cfg(loom)] + { + self.waker.with_mut(|ptr| op(&mut *ptr)) + } + + #[cfg(not(loom))] + { + op(&mut *self.waker.get()) + } + } + + #[inline(always)] + unsafe fn write_message(&self, message: T) { + self.with_message_mut(|slot| slot.as_mut_ptr().write(message)); + } + + #[inline(always)] + unsafe fn take_message(&self) -> T { + #[cfg(loom)] + { + self.message.with(|ptr| ptr::read(ptr)).assume_init() + } + + #[cfg(not(loom))] + { + ptr::read(self.message.get()).assume_init() + } + } + + #[inline(always)] + unsafe fn drop_message(&self) { + self.with_message_mut(|slot| slot.assume_init_drop()); + } + + #[cfg(any(feature = "std", feature = "async"))] + #[inline(always)] + unsafe fn write_waker(&self, waker: ReceiverWaker) { + self.with_waker_mut(|slot| slot.as_mut_ptr().write(waker)); + } + + #[inline(always)] + unsafe fn take_waker(&self) -> ReceiverWaker { + #[cfg(loom)] + { + self.waker.with(|ptr| ptr::read(ptr)).assume_init() + } + + #[cfg(not(loom))] + { + ptr::read(self.waker.get()).assume_init() + } + } + + #[cfg(any(feature = "std", feature = "async"))] + #[inline(always)] + unsafe fn drop_waker(&self) { + self.with_waker_mut(|slot| slot.assume_init_drop()); + } + + /// # Safety + /// + /// * `Channel::waker` must not have a waker stored in it when calling this method. + /// * Channel state must not be RECEIVING or UNPARKING when calling this method. + #[cfg(feature = "async")] + unsafe fn write_async_waker(&self, cx: &mut task::Context<'_>) -> Poll> { + // Write our thread instance to the channel. + // SAFETY: we are not yet in the RECEIVING state, meaning that the sender will not + // try to access the waker until it sees the state set to RECEIVING below + self.write_waker(ReceiverWaker::task_waker(cx)); + + // ORDERING: we use release ordering on success so the sender can synchronize with + // our write of the waker. We use relaxed ordering on failure since the sender does + // not need to synchronize with our write and the individual match arms handle any + // additional synchronization + match self + .state + .compare_exchange(EMPTY, RECEIVING, Release, Relaxed) + { + // We stored our waker, now we return and let the sender wake us up + Ok(_) => Poll::Pending, + // The sender sent the message while we prepared to park. + // We take the message and mark the channel disconnected. + Err(MESSAGE) => { + // ORDERING: Synchronize with the write of the message. This branch is + // unlikely to be taken, so it's likely more efficient to use a fence here + // instead of AcqRel ordering on the compare_exchange operation + fence(Acquire); + + // SAFETY: we started in the EMPTY state and the sender switched us to the + // MESSAGE state. This means that it did not take the waker, so we're + // responsible for dropping it. + self.drop_waker(); + + // ORDERING: sender does not exist, so this update only needs to be visible to us + self.state.store(DISCONNECTED, Relaxed); + + // SAFETY: The MESSAGE state tells us there is a correctly initialized message + Poll::Ready(Ok(self.take_message())) + } + // The sender was dropped before sending anything while we prepared to park. + Err(DISCONNECTED) => { + // SAFETY: we started in the EMPTY state and the sender switched us to the + // DISCONNECTED state. This means that it did not take the waker, so we're + // responsible for dropping it. + self.drop_waker(); + Poll::Ready(Err(RecvError)) + } + _ => unreachable!(), + } + } +} + +enum ReceiverWaker { + /// The receiver is waiting synchronously. Its thread is parked. + #[cfg(feature = "std")] + Thread(thread::Thread), + /// The receiver is waiting asynchronously. Its task can be woken up with this `Waker`. + #[cfg(feature = "async")] + Task(task::Waker), + /// A little hack to not make this enum an uninhibitable type when no features are enabled. + #[cfg(not(any(feature = "async", feature = "std")))] + _Uninhabited, +} + +impl ReceiverWaker { + #[cfg(feature = "std")] + pub fn current_thread() -> Self { + Self::Thread(thread::current()) + } + + #[cfg(feature = "async")] + pub fn task_waker(cx: &task::Context<'_>) -> Self { + Self::Task(cx.waker().clone()) + } + + pub fn unpark(self) { + match self { + #[cfg(feature = "std")] + ReceiverWaker::Thread(thread) => thread.unpark(), + #[cfg(feature = "async")] + ReceiverWaker::Task(waker) => waker.wake(), + #[cfg(not(any(feature = "async", feature = "std")))] + ReceiverWaker::_Uninhabited => unreachable!(), + } + } +} + +#[cfg(not(loom))] +#[test] +fn receiver_waker_size() { + let expected: usize = match (cfg!(feature = "std"), cfg!(feature = "async")) { + (false, false) => 0, + (false, true) => 16, + (true, false) => 8, + (true, true) => 16, + }; + assert_eq!(mem::size_of::(), expected); +} + +#[cfg(all(feature = "std", feature = "async"))] +const RECEIVER_USED_SYNC_AND_ASYNC_ERROR: &str = + "Invalid to call a blocking receive method on oneshot::Receiver after it has been polled"; + +#[inline] +pub(crate) unsafe fn dealloc(channel: NonNull>) { + drop(Box::from_raw(channel.as_ptr())) +} diff --git a/src/loombox.rs b/src/loombox.rs new file mode 100644 index 0000000..615db30 --- /dev/null +++ b/src/loombox.rs @@ -0,0 +1,151 @@ +use core::{borrow, fmt, hash, mem, ptr}; +use loom::alloc; + +pub struct Box { + ptr: *mut T, +} + +impl Box { + pub fn new(value: T) -> Self { + let layout = alloc::Layout::new::(); + let ptr = unsafe { alloc::alloc(layout) } as *mut T; + unsafe { ptr::write(ptr, value) }; + Self { ptr } + } +} + +impl Box { + #[inline] + pub fn into_raw(b: Box) -> *mut T { + let ptr = b.ptr; + mem::forget(b); + ptr + } + + pub const unsafe fn from_raw(ptr: *mut T) -> Box { + Self { ptr } + } +} + +impl Drop for Box { + fn drop(&mut self) { + unsafe { + let size = mem::size_of_val(&*self.ptr); + let align = mem::align_of_val(&*self.ptr); + let layout = alloc::Layout::from_size_align(size, align).unwrap(); + ptr::drop_in_place(self.ptr); + alloc::dealloc(self.ptr as *mut u8, layout); + } + } +} + +unsafe impl Send for Box {} +unsafe impl Sync for Box {} + +impl core::ops::Deref for Box { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.ptr } + } +} + +impl core::ops::DerefMut for Box { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.ptr } + } +} + +impl borrow::Borrow for Box { + fn borrow(&self) -> &T { + &**self + } +} + +impl borrow::BorrowMut for Box { + fn borrow_mut(&mut self) -> &mut T { + &mut **self + } +} + +impl AsRef for Box { + fn as_ref(&self) -> &T { + &**self + } +} + +impl AsMut for Box { + fn as_mut(&mut self) -> &mut T { + &mut **self + } +} + +impl fmt::Display for Box { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl fmt::Debug for Box { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl Clone for Box { + #[inline] + fn clone(&self) -> Box { + Self::new(self.as_ref().clone()) + } +} + +impl PartialEq for Box { + #[inline] + fn eq(&self, other: &Box) -> bool { + PartialEq::eq(&**self, &**other) + } + + #[allow(clippy::partialeq_ne_impl)] + #[inline] + fn ne(&self, other: &Box) -> bool { + PartialEq::ne(&**self, &**other) + } +} + +impl Eq for Box {} + +impl PartialOrd for Box { + #[inline] + fn partial_cmp(&self, other: &Box) -> Option { + PartialOrd::partial_cmp(&**self, &**other) + } + #[inline] + fn lt(&self, other: &Box) -> bool { + PartialOrd::lt(&**self, &**other) + } + #[inline] + fn le(&self, other: &Box) -> bool { + PartialOrd::le(&**self, &**other) + } + #[inline] + fn ge(&self, other: &Box) -> bool { + PartialOrd::ge(&**self, &**other) + } + #[inline] + fn gt(&self, other: &Box) -> bool { + PartialOrd::gt(&**self, &**other) + } +} + +impl Ord for Box { + #[inline] + fn cmp(&self, other: &Box) -> core::cmp::Ordering { + Ord::cmp(&**self, &**other) + } +} + +impl hash::Hash for Box { + fn hash(&self, state: &mut H) { + (**self).hash(state); + } +} diff --git a/tests/assert_mem.rs b/tests/assert_mem.rs new file mode 100644 index 0000000..a993ad7 --- /dev/null +++ b/tests/assert_mem.rs @@ -0,0 +1,37 @@ +use oneshot::{Receiver, Sender}; +use std::mem; + +/// Just sanity check that both channel endpoints stay the size of a single pointer. +#[test] +fn channel_endpoints_single_pointer() { + const PTR_SIZE: usize = mem::size_of::<*const ()>(); + + assert_eq!(mem::size_of::>(), PTR_SIZE); + assert_eq!(mem::size_of::>(), PTR_SIZE); + + assert_eq!(mem::size_of::>(), PTR_SIZE); + assert_eq!(mem::size_of::>(), PTR_SIZE); + + assert_eq!(mem::size_of::>(), PTR_SIZE); + assert_eq!(mem::size_of::>(), PTR_SIZE); + + assert_eq!(mem::size_of::>>(), PTR_SIZE); + assert_eq!(mem::size_of::>>(), PTR_SIZE); +} + +/// Check that the `SendError` stays small. Useful to automatically detect if it is refactored +/// to become large. We do not want the stack requirement for calling `Sender::send` to grow. +#[test] +fn error_sizes() { + const PTR_SIZE: usize = mem::size_of::(); + + assert_eq!(mem::size_of::>(), PTR_SIZE); + assert_eq!(mem::size_of::>(), PTR_SIZE); + assert_eq!(mem::size_of::>(), PTR_SIZE); + + // The type returned from `Sender::send` is also just pointer sized + assert_eq!( + mem::size_of::>>(), + PTR_SIZE + ); +} diff --git a/tests/async.rs b/tests/async.rs new file mode 100644 index 0000000..e7633aa --- /dev/null +++ b/tests/async.rs @@ -0,0 +1,128 @@ +#![cfg(all(feature = "async", not(loom)))] + +use core::mem; +use core::time::Duration; + +mod helpers; +use helpers::DropCounter; + +#[tokio::test] +async fn send_before_await_tokio() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.await, Ok(19i128)); +} + +#[async_std::test] +async fn send_before_await_async_std() { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + assert_eq!(receiver.await, Ok(19i128)); +} + +#[tokio::test] +async fn await_with_dropped_sender_tokio() { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + receiver.await.unwrap_err(); +} + +#[async_std::test] +async fn await_with_dropped_sender_async_std() { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + receiver.await.unwrap_err(); +} + +#[tokio::test] +async fn await_before_send_tokio() { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(79u128); + let t = tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + sender.send(message) + }); + let returned_message = receiver.await.unwrap(); + assert_eq!(counter.count(), 0); + assert_eq!(*returned_message.value(), 79u128); + mem::drop(returned_message); + assert_eq!(counter.count(), 1); + t.await.unwrap().unwrap(); +} + +#[async_std::test] +async fn await_before_send_async_std() { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(79u128); + let t = async_std::task::spawn(async move { + async_std::task::sleep(Duration::from_millis(10)).await; + sender.send(message) + }); + let returned_message = receiver.await.unwrap(); + assert_eq!(counter.count(), 0); + assert_eq!(*returned_message.value(), 79u128); + mem::drop(returned_message); + assert_eq!(counter.count(), 1); + t.await.unwrap(); +} + +#[tokio::test] +async fn await_before_send_then_drop_sender_tokio() { + let (sender, receiver) = oneshot::channel::(); + let t = tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(10)).await; + mem::drop(sender); + }); + assert!(receiver.await.is_err()); + t.await.unwrap(); +} + +#[async_std::test] +async fn await_before_send_then_drop_sender_async_std() { + let (sender, receiver) = oneshot::channel::(); + let t = async_std::task::spawn(async { + async_std::task::sleep(Duration::from_millis(10)).await; + mem::drop(sender); + }); + assert!(receiver.await.is_err()); + t.await; +} + +// Tests that the Receiver handles being used synchronously even after being polled +#[tokio::test] +async fn poll_future_and_then_try_recv() { + use core::future::Future; + use core::pin::Pin; + use core::task::{self, Poll}; + + struct StupidReceiverFuture(oneshot::Receiver<()>); + + impl Future for StupidReceiverFuture { + type Output = Result<(), oneshot::RecvError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let poll_result = Future::poll(Pin::new(&mut self.0), cx); + self.0.try_recv().expect_err("Should never be a message"); + poll_result + } + } + + let (sender, receiver) = oneshot::channel(); + let t = tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(20)).await; + mem::drop(sender); + }); + StupidReceiverFuture(receiver).await.unwrap_err(); + t.await.unwrap(); +} + +#[tokio::test] +async fn poll_receiver_then_drop_it() { + let (sender, receiver) = oneshot::channel::<()>(); + // This will poll the receiver and then give up after 100 ms. + tokio::time::timeout(Duration::from_millis(100), receiver) + .await + .unwrap_err(); + // Make sure the receiver has been dropped by the runtime. + assert!(sender.send(()).is_err()); +} diff --git a/tests/future.rs b/tests/future.rs new file mode 100644 index 0000000..3895946 --- /dev/null +++ b/tests/future.rs @@ -0,0 +1,65 @@ +#![cfg(feature = "async")] + +use core::{future, mem, pin, task}; + +#[cfg(loom)] +pub use loom::sync::{Arc, Mutex}; +#[cfg(not(loom))] +pub use std::sync::{Arc, Mutex}; + +mod helpers; +use helpers::maybe_loom_model; + +#[test] +fn multiple_receiver_polls_keeps_only_latest_waker() { + #[derive(Default)] + struct MockWaker { + cloned: usize, + dropped: usize, + } + + fn clone_mock_waker(waker: *const ()) -> task::RawWaker { + let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex) }; + mock_waker.lock().unwrap().cloned += 1; + let new_waker = + task::RawWaker::new(Arc::into_raw(mock_waker.clone()) as *const (), &VTABLE); + mem::forget(mock_waker); + new_waker + } + + fn drop_mock_waker(waker: *const ()) { + let mock_waker = unsafe { Arc::from_raw(waker as *const Mutex) }; + mock_waker.lock().unwrap().dropped += 1; + } + + const VTABLE: task::RawWakerVTable = + task::RawWakerVTable::new(clone_mock_waker, |_| (), |_| (), drop_mock_waker); + + maybe_loom_model(|| { + let mock_waker1 = Arc::new(Mutex::new(MockWaker::default())); + let raw_waker1 = + task::RawWaker::new(Arc::into_raw(mock_waker1.clone()) as *const (), &VTABLE); + let waker1 = unsafe { task::Waker::from_raw(raw_waker1) }; + let mut context1 = task::Context::from_waker(&waker1); + + let (_sender, mut receiver) = oneshot::channel::<()>(); + + let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context1); + assert_eq!(poll_result, task::Poll::Pending); + assert_eq!(mock_waker1.lock().unwrap().cloned, 1); + assert_eq!(mock_waker1.lock().unwrap().dropped, 0); + + let mock_waker2 = Arc::new(Mutex::new(MockWaker::default())); + let raw_waker2 = + task::RawWaker::new(Arc::into_raw(mock_waker2.clone()) as *const (), &VTABLE); + let waker2 = unsafe { task::Waker::from_raw(raw_waker2) }; + let mut context2 = task::Context::from_waker(&waker2); + + let poll_result = future::Future::poll(pin::Pin::new(&mut receiver), &mut context2); + assert_eq!(poll_result, task::Poll::Pending); + assert_eq!(mock_waker2.lock().unwrap().cloned, 1); + assert_eq!(mock_waker2.lock().unwrap().dropped, 0); + assert_eq!(mock_waker1.lock().unwrap().cloned, 1); + assert_eq!(mock_waker1.lock().unwrap().dropped, 1); + }); +} diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs new file mode 100644 index 0000000..1b14539 --- /dev/null +++ b/tests/helpers/mod.rs @@ -0,0 +1,63 @@ +#![allow(dead_code)] + +extern crate alloc; + +#[cfg(not(loom))] +use alloc::sync::Arc; +#[cfg(not(loom))] +use core::sync::atomic::{AtomicUsize, Ordering::SeqCst}; +#[cfg(loom)] +use loom::sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, +}; + +#[cfg(loom)] +pub mod waker; + +pub fn maybe_loom_model(test: impl Fn() + Sync + Send + 'static) { + #[cfg(loom)] + loom::model(test); + #[cfg(not(loom))] + test(); +} + +pub struct DropCounter { + drop_count: Arc, + value: Option, +} + +pub struct DropCounterHandle(Arc); + +impl DropCounter { + pub fn new(value: T) -> (Self, DropCounterHandle) { + let drop_count = Arc::new(AtomicUsize::new(0)); + ( + Self { + drop_count: drop_count.clone(), + value: Some(value), + }, + DropCounterHandle(drop_count), + ) + } + + pub fn value(&self) -> &T { + self.value.as_ref().unwrap() + } + + pub fn into_value(mut self) -> T { + self.value.take().unwrap() + } +} + +impl DropCounterHandle { + pub fn count(&self) -> usize { + self.0.load(SeqCst) + } +} + +impl Drop for DropCounter { + fn drop(&mut self) { + self.drop_count.fetch_add(1, SeqCst); + } +} diff --git a/tests/helpers/waker.rs b/tests/helpers/waker.rs new file mode 100644 index 0000000..2e3f1be --- /dev/null +++ b/tests/helpers/waker.rs @@ -0,0 +1,64 @@ +//! Creates a Waker that can be observed from tests. + +use std::mem::forget; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::task::{RawWaker, RawWakerVTable, Waker}; + +#[derive(Default)] +pub struct WakerHandle { + clone_count: AtomicU32, + drop_count: AtomicU32, + wake_count: AtomicU32, +} + +impl WakerHandle { + pub fn clone_count(&self) -> u32 { + self.clone_count.load(Ordering::Relaxed) + } + + pub fn drop_count(&self) -> u32 { + self.drop_count.load(Ordering::Relaxed) + } + + pub fn wake_count(&self) -> u32 { + self.wake_count.load(Ordering::Relaxed) + } +} + +pub fn waker() -> (Waker, Arc) { + let waker_handle = Arc::new(WakerHandle::default()); + let waker_handle_ptr = Arc::into_raw(waker_handle.clone()); + let raw_waker = RawWaker::new(waker_handle_ptr as *const _, waker_vtable()); + (unsafe { Waker::from_raw(raw_waker) }, waker_handle) +} + +pub(super) fn waker_vtable() -> &'static RawWakerVTable { + &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) +} + +unsafe fn clone_raw(data: *const ()) -> RawWaker { + let handle: Arc = Arc::from_raw(data as *const _); + handle.clone_count.fetch_add(1, Ordering::Relaxed); + forget(handle.clone()); + forget(handle); + RawWaker::new(data, waker_vtable()) +} + +unsafe fn wake_raw(data: *const ()) { + let handle: Arc = Arc::from_raw(data as *const _); + handle.wake_count.fetch_add(1, Ordering::Relaxed); + handle.drop_count.fetch_add(1, Ordering::Relaxed); +} + +unsafe fn wake_by_ref_raw(data: *const ()) { + let handle: Arc = Arc::from_raw(data as *const _); + handle.wake_count.fetch_add(1, Ordering::Relaxed); + forget(handle) +} + +unsafe fn drop_raw(data: *const ()) { + let handle: Arc = Arc::from_raw(data as *const _); + handle.drop_count.fetch_add(1, Ordering::Relaxed); + drop(handle) +} diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..a7625a4 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,223 @@ +#![cfg(loom)] + +use oneshot::TryRecvError; + +use loom::hint; +use loom::thread; +#[cfg(feature = "async")] +use std::future::Future; +#[cfg(feature = "async")] +use std::pin::Pin; +#[cfg(feature = "async")] +use std::task::{self, Poll}; +#[cfg(feature = "std")] +use std::time::Duration; + +mod helpers; + +#[test] +fn try_recv() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::(); + + let t = thread::spawn(move || loop { + match receiver.try_recv() { + Ok(msg) => break msg, + Err(TryRecvError::Empty) => hint::spin_loop(), + Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"), + } + }); + + assert!(sender.send(19).is_ok()); + assert_eq!(t.join().unwrap(), 19); + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_recv_different_threads() { + loom::model(|| { + let (sender, receiver) = oneshot::channel(); + let t2 = thread::spawn(move || { + assert_eq!(receiver.recv_timeout(Duration::from_millis(1)), Ok(9)); + }); + let t1 = thread::spawn(move || { + sender.send(9u128).unwrap(); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_drop_sender_different_threads() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::(); + let t2 = thread::spawn(move || { + assert!(receiver.recv_timeout(Duration::from_millis(0)).is_err()); + }); + let t1 = thread::spawn(move || { + drop(sender); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "async")] +#[test] +fn async_recv() { + loom::model(|| { + let (sender, receiver) = oneshot::channel::(); + let t1 = thread::spawn(move || { + sender.send(987).unwrap(); + }); + assert_eq!(loom::future::block_on(receiver), Ok(987)); + t1.join().unwrap(); + }) +} + +#[cfg(feature = "async")] +#[test] +fn send_then_poll() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::(); + sender.send(1234).unwrap(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!( + Pin::new(&mut receiver).poll(&mut context), + Poll::Ready(Ok(1234)) + ); + assert_eq!(waker_handle.clone_count(), 0); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_send() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + sender.send(1234).unwrap(); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + + assert_eq!( + Pin::new(&mut receiver).poll(&mut context), + Poll::Ready(Ok(1234)) + ); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_with_different_wakers() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::(); + + let (waker1, waker_handle1) = helpers::waker::waker(); + let mut context1 = task::Context::from_waker(&waker1); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context1), Poll::Pending); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 0); + assert_eq!(waker_handle1.wake_count(), 0); + + let (waker2, waker_handle2) = helpers::waker::waker(); + let mut context2 = task::Context::from_waker(&waker2); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context2), Poll::Pending); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 1); + assert_eq!(waker_handle1.wake_count(), 0); + + assert_eq!(waker_handle2.clone_count(), 1); + assert_eq!(waker_handle2.drop_count(), 0); + assert_eq!(waker_handle2.wake_count(), 0); + + // Sending should cause the waker from the latest poll to be woken up + sender.send(1234).unwrap(); + assert_eq!(waker_handle1.clone_count(), 1); + assert_eq!(waker_handle1.drop_count(), 1); + assert_eq!(waker_handle1.wake_count(), 0); + + assert_eq!(waker_handle2.clone_count(), 1); + assert_eq!(waker_handle2.drop_count(), 1); + assert_eq!(waker_handle2.wake_count(), 1); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_try_recv() { + loom::model(|| { + let (_sender, mut receiver) = oneshot::channel::(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 2); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 0); + }) +} + +#[cfg(feature = "async")] +#[test] +fn poll_then_try_recv_while_sending() { + loom::model(|| { + let (sender, mut receiver) = oneshot::channel::(); + + let (waker, waker_handle) = helpers::waker::waker(); + let mut context = task::Context::from_waker(&waker); + + assert_eq!(Pin::new(&mut receiver).poll(&mut context), Poll::Pending); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 0); + assert_eq!(waker_handle.wake_count(), 0); + + let t = thread::spawn(move || { + sender.send(1234).unwrap(); + }); + + let msg = loop { + match receiver.try_recv() { + Ok(msg) => break msg, + Err(TryRecvError::Empty) => hint::spin_loop(), + Err(TryRecvError::Disconnected) => panic!("Should not be disconnected"), + } + }; + assert_eq!(msg, 1234); + assert_eq!(waker_handle.clone_count(), 1); + assert_eq!(waker_handle.drop_count(), 1); + assert_eq!(waker_handle.wake_count(), 1); + + t.join().unwrap(); + }) +} diff --git a/tests/raw.rs b/tests/raw.rs new file mode 100644 index 0000000..e38dc45 --- /dev/null +++ b/tests/raw.rs @@ -0,0 +1,46 @@ +#![cfg(not(loom))] + +use oneshot::{channel, Receiver, Sender}; + +#[test] +fn test_raw_sender() { + let (sender, receiver) = channel::(); + let raw = sender.into_raw(); + let recreated = unsafe { Sender::::from_raw(raw) }; + recreated + .send(100) + .unwrap_or_else(|e| panic!("error sending after into_raw/from_raw roundtrip: {e}")); + assert_eq!(receiver.try_recv(), Ok(100)) +} + +#[test] +fn test_raw_receiver() { + let (sender, receiver) = channel::(); + let raw = receiver.into_raw(); + sender.send(100).unwrap(); + let recreated = unsafe { Receiver::::from_raw(raw) }; + assert_eq!( + recreated + .try_recv() + .unwrap_or_else(|e| panic!("error receiving after into_raw/from_raw roundtrip: {e}")), + 100 + ) +} + +#[test] +fn test_raw_sender_and_receiver() { + let (sender, receiver) = channel::(); + let raw_receiver = receiver.into_raw(); + let raw_sender = sender.into_raw(); + + let recreated_sender = unsafe { Sender::::from_raw(raw_sender) }; + recreated_sender.send(100).unwrap(); + + let recreated_receiver = unsafe { Receiver::::from_raw(raw_receiver) }; + assert_eq!( + recreated_receiver + .try_recv() + .unwrap_or_else(|e| panic!("error receiving after into_raw/from_raw roundtrip: {e}")), + 100 + ) +} diff --git a/tests/sync.rs b/tests/sync.rs new file mode 100644 index 0000000..c6ba081 --- /dev/null +++ b/tests/sync.rs @@ -0,0 +1,343 @@ +use core::mem; +use oneshot::TryRecvError; + +#[cfg(feature = "std")] +use oneshot::{RecvError, RecvTimeoutError}; +#[cfg(feature = "std")] +use std::time::{Duration, Instant}; + +#[cfg(feature = "std")] +mod thread { + #[cfg(loom)] + pub use loom::thread::spawn; + #[cfg(not(loom))] + pub use std::thread::{sleep, spawn}; + + #[cfg(loom)] + pub fn sleep(_timeout: core::time::Duration) { + loom::thread::yield_now() + } +} + +mod helpers; +use helpers::{maybe_loom_model, DropCounter}; + +#[test] +fn send_before_try_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + assert_eq!(receiver.try_recv(), Ok(19i128)); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); + #[cfg(feature = "std")] + { + assert_eq!(receiver.recv_ref(), Err(RecvError)); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + } + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<()>(); + assert!(sender.send(()).is_ok()); + assert_eq!(receiver.recv(), Ok(())); + }); + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + assert!(sender.send(19).is_ok()); + assert_eq!(receiver.recv(), Ok(19)); + }); + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + assert!(sender.send(21).is_ok()); + assert_eq!(receiver.recv(), Ok(21)); + }); + // FIXME: This test does not work with loom. There is something that happens after the + // channel object becomes larger than ~500 bytes and that makes an atomic read from the state + // result in "signal: 10, SIGBUS: access to undefined memory" + #[cfg(not(loom))] + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::<[u8; 4096]>(); + assert!(sender.send([0b10101010; 4096]).is_ok()); + assert!(receiver.recv().unwrap()[..] == [0b10101010; 4096][..]); + }); +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv_ref() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + assert_eq!(receiver.recv_ref(), Ok(19i128)); + assert_eq!(receiver.recv_ref(), Err(RecvError)); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Disconnected)); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + }) +} + +#[cfg(feature = "std")] +#[test] +fn send_before_recv_timeout() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + + let start = Instant::now(); + let timeout = Duration::from_secs(1); + assert_eq!(receiver.recv_timeout(timeout), Ok(19i128)); + assert!(start.elapsed() < Duration::from_millis(100)); + + assert!(receiver.recv_timeout(timeout).is_err()); + assert!(receiver.try_recv().is_err()); + assert!(receiver.recv().is_err()); + }) +} + +#[test] +fn send_then_drop_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + assert!(sender.send(19i128).is_ok()); + mem::drop(receiver); + }) +} + +#[test] +fn send_with_dropped_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + mem::drop(receiver); + let send_error = sender.send(5u128).unwrap_err(); + assert_eq!(*send_error.as_inner(), 5); + assert_eq!(send_error.into_inner(), 5); + }) +} + +#[test] +fn try_recv_with_dropped_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + receiver.try_recv().unwrap_err(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_with_dropped_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + mem::drop(sender); + receiver.recv().unwrap_err(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_before_send() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv(), Ok(9)); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_timeout_before_send() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(2)); + sender.send(9u128).unwrap(); + }); + assert_eq!(receiver.recv_timeout(Duration::from_secs(1)), Ok(9)); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_before_send_then_drop_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + mem::drop(sender); + }); + assert!(receiver.recv().is_err()); + t.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_timeout_before_send_then_drop_sender() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + let t = thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + mem::drop(sender); + }); + assert!(receiver.recv_timeout(Duration::from_secs(1)).is_err()); + t.join().unwrap(); + }) +} + +#[test] +fn try_recv() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty)); + mem::drop(sender) + }) +} + +#[cfg(feature = "std")] +#[test] +fn try_recv_then_drop_receiver() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel::(); + let t1 = thread::spawn(move || { + let _ = sender.send(42); + }); + let t2 = thread::spawn(move || { + assert!(matches!( + receiver.try_recv(), + Ok(42) | Err(TryRecvError::Empty) + )); + mem::drop(receiver); + }); + t1.join().unwrap(); + t2.join().unwrap(); + }) +} + +#[cfg(feature = "std")] +#[test] +fn recv_deadline_and_timeout_no_time() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::(); + + let start = Instant::now(); + assert_eq!( + receiver.recv_deadline(start), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() < Duration::from_millis(200)); + + let start = Instant::now(); + assert_eq!( + receiver.recv_timeout(Duration::from_millis(0)), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() < Duration::from_millis(200)); + }) +} + +// This test doesn't give meaningful results when run with oneshot_test_delay and loom +#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))] +#[test] +fn recv_deadline_time_should_elapse() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::(); + + let start = Instant::now(); + #[cfg(not(loom))] + let timeout = Duration::from_millis(100); + #[cfg(loom)] + let timeout = Duration::from_millis(1); + assert_eq!( + receiver.recv_deadline(start + timeout), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() > timeout); + assert!(start.elapsed() < timeout * 3); + }) +} + +#[cfg(all(feature = "std", not(all(oneshot_test_delay, loom))))] +#[test] +fn recv_timeout_time_should_elapse() { + maybe_loom_model(|| { + let (_sender, receiver) = oneshot::channel::(); + + let start = Instant::now(); + #[cfg(not(loom))] + let timeout = Duration::from_millis(100); + #[cfg(loom)] + let timeout = Duration::from_millis(1); + + assert_eq!( + receiver.recv_timeout(timeout), + Err(RecvTimeoutError::Timeout) + ); + assert!(start.elapsed() > timeout); + assert!(start.elapsed() < timeout * 3); + }) +} + +#[cfg(not(loom))] +#[test] +fn non_send_type_can_be_used_on_same_thread() { + use std::ptr; + + #[derive(Debug, Eq, PartialEq)] + struct NotSend(*mut ()); + + let (sender, receiver) = oneshot::channel(); + sender.send(NotSend(ptr::null_mut())).unwrap(); + let reply = receiver.try_recv().unwrap(); + assert_eq!(reply, NotSend(ptr::null_mut())); +} + +#[test] +fn message_in_channel_dropped_on_receiver_drop() { + maybe_loom_model(|| { + let (sender, receiver) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + assert_eq!(counter.count(), 0); + sender.send(message).unwrap(); + assert_eq!(counter.count(), 0); + mem::drop(receiver); + assert_eq!(counter.count(), 1); + }) +} + +#[test] +fn send_error_drops_message_correctly() { + maybe_loom_model(|| { + let (sender, _) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + + let send_error = sender.send(message).unwrap_err(); + assert_eq!(counter.count(), 0); + mem::drop(send_error); + assert_eq!(counter.count(), 1); + }); +} + +#[test] +fn send_error_drops_message_correctly_on_into_inner() { + maybe_loom_model(|| { + let (sender, _) = oneshot::channel(); + let (message, counter) = DropCounter::new(()); + + let send_error = sender.send(message).unwrap_err(); + assert_eq!(counter.count(), 0); + let message = send_error.into_inner(); + assert_eq!(counter.count(), 0); + mem::drop(message); + assert_eq!(counter.count(), 1); + }); +} -- cgit v1.2.3