diff options
Diffstat (limited to 'src/task.rs')
-rw-r--r-- | src/task.rs | 97 |
1 files changed, 65 insertions, 32 deletions
diff --git a/src/task.rs b/src/task.rs index 8ecd746..178b28e 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,6 +1,6 @@ use core::fmt; use core::future::Future; -use core::marker::{PhantomData, Unpin}; +use core::marker::PhantomData; use core::mem; use core::pin::Pin; use core::ptr::NonNull; @@ -8,6 +8,8 @@ use core::sync::atomic::Ordering; use core::task::{Context, Poll}; use crate::header::Header; +use crate::raw::Panic; +use crate::runnable::ScheduleInfo; use crate::state::*; /// A spawned task. @@ -44,25 +46,25 @@ use crate::state::*; /// assert_eq!(future::block_on(task), 3); /// ``` #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct Task<T> { +pub struct Task<T, M = ()> { /// A raw task pointer. pub(crate) ptr: NonNull<()>, - /// A marker capturing generic type `T`. - pub(crate) _marker: PhantomData<T>, + /// A marker capturing generic types `T` and `M`. + pub(crate) _marker: PhantomData<(T, M)>, } -unsafe impl<T: Send> Send for Task<T> {} -unsafe impl<T> Sync for Task<T> {} +unsafe impl<T: Send, M: Send + Sync> Send for Task<T, M> {} +unsafe impl<T, M: Send + Sync> Sync for Task<T, M> {} -impl<T> Unpin for Task<T> {} +impl<T, M> Unpin for Task<T, M> {} #[cfg(feature = "std")] -impl<T> std::panic::UnwindSafe for Task<T> {} +impl<T, M> std::panic::UnwindSafe for Task<T, M> {} #[cfg(feature = "std")] -impl<T> std::panic::RefUnwindSafe for Task<T> {} +impl<T, M> std::panic::RefUnwindSafe for Task<T, M> {} -impl<T> Task<T> { +impl<T, M> Task<T, M> { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -173,14 +175,14 @@ impl<T> Task<T> { /// // Wait for the task's output. /// assert_eq!(future::block_on(task.fallible()), None); /// ``` - pub fn fallible(self) -> FallibleTask<T> { + pub fn fallible(self) -> FallibleTask<T, M> { FallibleTask { task: self } } /// Puts the task in canceled state. fn set_canceled(&mut self) { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header<M>; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -209,7 +211,7 @@ impl<T> Task<T> { // If the task is not scheduled nor running, schedule it one more time so // that its future gets dropped by the executor. if state & (SCHEDULED | RUNNING) == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } // Notify the awaiter that the task has been closed. @@ -226,9 +228,9 @@ impl<T> Task<T> { } /// Puts the task in detached state. - fn set_detached(&mut self) -> Option<T> { + fn set_detached(&mut self) -> Option<Result<T, Panic>> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header<M>; unsafe { // A place where the output will be stored in case it needs to be dropped. @@ -256,8 +258,10 @@ impl<T> Task<T> { ) { Ok(_) => { // Read the output. - output = - Some((((*header).vtable.get_output)(ptr) as *mut T).read()); + output = Some( + (((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>) + .read(), + ); // Update the state variable because we're continuing the loop. state |= CLOSED; @@ -286,7 +290,7 @@ impl<T> Task<T> { // schedule dropping its future or destroy it. if state & !(REFERENCE - 1) == 0 { if state & CLOSED == 0 { - ((*header).vtable.schedule)(ptr); + ((*header).vtable.schedule)(ptr, ScheduleInfo::new(false)); } else { ((*header).vtable.destroy)(ptr); } @@ -316,7 +320,7 @@ impl<T> Task<T> { /// 4. It is completed and the `Task` gets dropped. fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header<M>; unsafe { let mut state = (*header).state.load(Ordering::Acquire); @@ -382,8 +386,22 @@ impl<T> Task<T> { } // Take the output from the task. - let output = ((*header).vtable.get_output)(ptr) as *mut T; - return Poll::Ready(Some(output.read())); + let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>; + let output = output.read(); + + // Propagate the panic if the task panicked. + let output = match output { + Ok(output) => output, + Err(panic) => { + #[cfg(feature = "std")] + std::panic::resume_unwind(panic); + + #[cfg(not(feature = "std"))] + match panic {} + } + }; + + return Poll::Ready(Some(output)); } Err(s) => state = s, } @@ -391,9 +409,9 @@ impl<T> Task<T> { } } - fn header(&self) -> &Header { + fn header(&self) -> &Header<M> { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header<M>; unsafe { &*header } } @@ -402,23 +420,31 @@ impl<T> Task<T> { /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. pub fn is_finished(&self) -> bool { let ptr = self.ptr.as_ptr(); - let header = ptr as *const Header; + let header = ptr as *const Header<M>; unsafe { let state = (*header).state.load(Ordering::Acquire); state & (CLOSED | COMPLETED) != 0 } } + + /// Get the metadata associated with this task. + /// + /// Tasks can be created with a metadata object associated with them; by default, this + /// is a `()` value. See the [`Builder::metadata()`] method for more information. + pub fn metadata(&self) -> &M { + &self.header().metadata + } } -impl<T> Drop for Task<T> { +impl<T, M> Drop for Task<T, M> { fn drop(&mut self) { self.set_canceled(); self.set_detached(); } } -impl<T> Future for Task<T> { +impl<T, M> Future for Task<T, M> { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { @@ -429,7 +455,7 @@ impl<T> Future for Task<T> { } } -impl<T> fmt::Debug for Task<T> { +impl<T, M: fmt::Debug> fmt::Debug for Task<T, M> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Task") .field("header", self.header()) @@ -446,11 +472,11 @@ impl<T> fmt::Debug for Task<T> { /// This can be useful to avoid the panic produced when polling the `Task` /// future if the executor dropped its `Runnable`. #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -pub struct FallibleTask<T> { - task: Task<T>, +pub struct FallibleTask<T, M = ()> { + task: Task<T, M>, } -impl<T> FallibleTask<T> { +impl<T, M> FallibleTask<T, M> { /// Detaches the task to let it keep running in the background. /// /// # Examples @@ -513,9 +539,16 @@ impl<T> FallibleTask<T> { pub async fn cancel(self) -> Option<T> { self.task.cancel().await } + + /// Returns `true` if the current task is finished. + /// + /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. + pub fn is_finished(&self) -> bool { + self.task.is_finished() + } } -impl<T> Future for FallibleTask<T> { +impl<T, M> Future for FallibleTask<T, M> { type Output = Option<T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { @@ -523,7 +556,7 @@ impl<T> Future for FallibleTask<T> { } } -impl<T> fmt::Debug for FallibleTask<T> { +impl<T, M: fmt::Debug> fmt::Debug for FallibleTask<T, M> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FallibleTask") .field("header", self.task.header()) |