aboutsummaryrefslogtreecommitdiff
path: root/src/task.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/task.rs')
-rw-r--r--src/task.rs97
1 files changed, 65 insertions, 32 deletions
diff --git a/src/task.rs b/src/task.rs
index 8ecd746..178b28e 100644
--- a/src/task.rs
+++ b/src/task.rs
@@ -1,6 +1,6 @@
use core::fmt;
use core::future::Future;
-use core::marker::{PhantomData, Unpin};
+use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::ptr::NonNull;
@@ -8,6 +8,8 @@ use core::sync::atomic::Ordering;
use core::task::{Context, Poll};
use crate::header::Header;
+use crate::raw::Panic;
+use crate::runnable::ScheduleInfo;
use crate::state::*;
/// A spawned task.
@@ -44,25 +46,25 @@ use crate::state::*;
/// assert_eq!(future::block_on(task), 3);
/// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
-pub struct Task<T> {
+pub struct Task<T, M = ()> {
/// A raw task pointer.
pub(crate) ptr: NonNull<()>,
- /// A marker capturing generic type `T`.
- pub(crate) _marker: PhantomData<T>,
+ /// A marker capturing generic types `T` and `M`.
+ pub(crate) _marker: PhantomData<(T, M)>,
}
-unsafe impl<T: Send> Send for Task<T> {}
-unsafe impl<T> Sync for Task<T> {}
+unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {}
+unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {}
-impl<T> Unpin for Task<T> {}
+impl<T, M> Unpin for Task<T, M> {}
#[cfg(feature = "std")]
-impl<T> std::panic::UnwindSafe for Task<T> {}
+impl<T, M> std::panic::UnwindSafe for Task<T, M> {}
#[cfg(feature = "std")]
-impl<T> std::panic::RefUnwindSafe for Task<T> {}
+impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {}
-impl<T> Task<T> {
+impl<T, M> Task<T, M> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
@@ -173,14 +175,14 @@ impl<T> Task<T> {
/// // Wait for the task's output.
/// assert_eq!(future::block_on(task.fallible()), None);
/// ```
- pub fn fallible(self) -> FallibleTask<T> {
+ pub fn fallible(self) -> FallibleTask<T, M> {
FallibleTask { task: self }
}
/// Puts the task in canceled state.
fn set_canceled(&mut self) {
let ptr = self.ptr.as_ptr();
- let header = ptr as *const Header;
+ let header = ptr as *const Header<M>;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
@@ -209,7 +211,7 @@ impl<T> Task<T> {
// If the task is not scheduled nor running, schedule it one more time so
// that its future gets dropped by the executor.
if state & (SCHEDULED | RUNNING) == 0 {
- ((*header).vtable.schedule)(ptr);
+ ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
}
// Notify the awaiter that the task has been closed.
@@ -226,9 +228,9 @@ impl<T> Task<T> {
}
/// Puts the task in detached state.
- fn set_detached(&mut self) -> Option<T> {
+ fn set_detached(&mut self) -> Option<Result<T, Panic>> {
let ptr = self.ptr.as_ptr();
- let header = ptr as *const Header;
+ let header = ptr as *const Header<M>;
unsafe {
// A place where the output will be stored in case it needs to be dropped.
@@ -256,8 +258,10 @@ impl<T> Task<T> {
) {
Ok(_) => {
// Read the output.
- output =
- Some((((*header).vtable.get_output)(ptr) as *mut T).read());
+ output = Some(
+ (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
+ .read(),
+ );
// Update the state variable because we're continuing the loop.
state |= CLOSED;
@@ -286,7 +290,7 @@ impl<T> Task<T> {
// schedule dropping its future or destroy it.
if state & !(REFERENCE - 1) == 0 {
if state & CLOSED == 0 {
- ((*header).vtable.schedule)(ptr);
+ ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false));
} else {
((*header).vtable.destroy)(ptr);
}
@@ -316,7 +320,7 @@ impl<T> Task<T> {
/// 4. It is completed and the `Task` gets dropped.
fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
let ptr = self.ptr.as_ptr();
- let header = ptr as *const Header;
+ let header = ptr as *const Header<M>;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
@@ -382,8 +386,22 @@ impl<T> Task<T> {
}
// Take the output from the task.
- let output = ((*header).vtable.get_output)(ptr) as *mut T;
- return Poll::Ready(Some(output.read()));
+ let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
+ let output = output.read();
+
+ // Propagate the panic if the task panicked.
+ let output = match output {
+ Ok(output) => output,
+ Err(panic) => {
+ #[cfg(feature = "std")]
+ std::panic::resume_unwind(panic);
+
+ #[cfg(not(feature = "std"))]
+ match panic {}
+ }
+ };
+
+ return Poll::Ready(Some(output));
}
Err(s) => state = s,
}
@@ -391,9 +409,9 @@ impl<T> Task<T> {
}
}
- fn header(&self) -> &Header {
+ fn header(&self) -> &Header<M> {
let ptr = self.ptr.as_ptr();
- let header = ptr as *const Header;
+ let header = ptr as *const Header<M>;
unsafe { &*header }
}
@@ -402,23 +420,31 @@ impl<T> Task<T> {
/// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
pub fn is_finished(&self) -> bool {
let ptr = self.ptr.as_ptr();
- let header = ptr as *const Header;
+ let header = ptr as *const Header<M>;
unsafe {
let state = (*header).state.load(Ordering::Acquire);
state & (CLOSED | COMPLETED) != 0
}
}
+
+ /// Get the metadata associated with this task.
+ ///
+ /// Tasks can be created with a metadata object associated with them; by default, this
+ /// is a `()` value. See the [`Builder::metadata()`] method for more information.
+ pub fn metadata(&self) -> &M {
+ &self.header().metadata
+ }
}
-impl<T> Drop for Task<T> {
+impl<T, M> Drop for Task<T, M> {
fn drop(&mut self) {
self.set_canceled();
self.set_detached();
}
}
-impl<T> Future for Task<T> {
+impl<T, M> Future for Task<T, M> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -429,7 +455,7 @@ impl<T> Future for Task<T> {
}
}
-impl<T> fmt::Debug for Task<T> {
+impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task")
.field("header", self.header())
@@ -446,11 +472,11 @@ impl<T> fmt::Debug for Task<T> {
/// This can be useful to avoid the panic produced when polling the `Task`
/// future if the executor dropped its `Runnable`.
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
-pub struct FallibleTask<T> {
- task: Task<T>,
+pub struct FallibleTask<T, M = ()> {
+ task: Task<T, M>,
}
-impl<T> FallibleTask<T> {
+impl<T, M> FallibleTask<T, M> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
@@ -513,9 +539,16 @@ impl<T> FallibleTask<T> {
pub async fn cancel(self) -> Option<T> {
self.task.cancel().await
}
+
+ /// Returns `true` if the current task is finished.
+ ///
+ /// Note that in a multithreaded environment, this task can change finish immediately after calling this function.
+ pub fn is_finished(&self) -> bool {
+ self.task.is_finished()
+ }
}
-impl<T> Future for FallibleTask<T> {
+impl<T, M> Future for FallibleTask<T, M> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@@ -523,7 +556,7 @@ impl<T> Future for FallibleTask<T> {
}
}
-impl<T> fmt::Debug for FallibleTask<T> {
+impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FallibleTask")
.field("header", self.task.header())