diff options
author | Jeff Vander Stoep <jeffv@google.com> | 2023-02-16 08:11:46 +0000 |
---|---|---|
committer | Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com> | 2023-02-16 08:11:46 +0000 |
commit | ba8e3c645e64f514e55e72fadec1c9d0ca77d099 (patch) | |
tree | 891d5f9cf99250a6597cbba176ae3a55c5ca5b1c | |
parent | 2803a603612e7ea859eb320fb0f643197183cf0e (diff) | |
parent | 65c1ccd9611a8cb9f823e84b55a3e097625c90aa (diff) | |
download | futures-util-ba8e3c645e64f514e55e72fadec1c9d0ca77d099.tar.gz |
Upgrade futures-util to 0.3.26 am: 65c1ccd961
Original change: https://android-review.googlesource.com/c/platform/external/rust/crates/futures-util/+/2438159
Change-Id: I9f892cc729ba21bc7866a4dec4497cf285dfaf07
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
-rw-r--r-- | .cargo_vcs_info.json | 2 | ||||
-rw-r--r-- | Android.bp | 4 | ||||
-rw-r--r-- | Cargo.toml | 15 | ||||
-rw-r--r-- | Cargo.toml.orig | 15 | ||||
-rw-r--r-- | METADATA | 10 | ||||
-rw-r--r-- | src/future/either.rs | 58 | ||||
-rw-r--r-- | src/future/future/shared.rs | 42 | ||||
-rw-r--r-- | src/future/select_all.rs | 1 | ||||
-rw-r--r-- | src/future/try_join_all.rs | 14 | ||||
-rw-r--r-- | src/sink/unfold.rs | 5 | ||||
-rw-r--r-- | src/stream/futures_ordered.rs | 18 | ||||
-rw-r--r-- | src/stream/futures_unordered/mod.rs | 3 | ||||
-rw-r--r-- | src/stream/stream/buffered.rs | 12 | ||||
-rw-r--r-- | src/stream/stream/chain.rs | 3 | ||||
-rw-r--r-- | src/stream/stream/mod.rs | 3 | ||||
-rw-r--r-- | src/stream/stream/ready_chunks.rs | 48 |
16 files changed, 161 insertions, 92 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json index bd04ae0..dde4c7f 100644 --- a/.cargo_vcs_info.json +++ b/.cargo_vcs_info.json @@ -1,6 +1,6 @@ { "git": { - "sha1": "77d82198c5afd04af3e760a6aa50b7e875289fc3" + "sha1": "5e3693a350f96244151081d2c030208cd15f9572" }, "path_in_vcs": "futures-util" }
\ No newline at end of file @@ -42,7 +42,7 @@ rust_test { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.25", + cargo_pkg_version: "0.3.26", srcs: ["src/lib.rs"], test_suites: ["general-tests"], auto_gen_config: true, @@ -86,7 +86,7 @@ rust_library { host_supported: true, crate_name: "futures_util", cargo_env_compat: true, - cargo_pkg_version: "0.3.25", + cargo_pkg_version: "0.3.26", srcs: ["src/lib.rs"], edition: "2018", features: [ @@ -13,7 +13,7 @@ edition = "2018" rust-version = "1.45" name = "futures-util" -version = "0.3.25" +version = "0.3.26" description = """ Common utilities and extension traits for the futures-rs library. """ @@ -30,33 +30,33 @@ rustdoc-args = [ ] [dependencies.futures-channel] -version = "0.3.25" +version = "0.3.26" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.25" +version = "0.3.26" default-features = false [dependencies.futures-io] -version = "0.3.25" +version = "0.3.26" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.25" +version = "=0.3.26" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.25" +version = "0.3.26" optional = true default-features = false [dependencies.futures-task] -version = "0.3.25" +version = "0.3.26" default-features = false [dependencies.futures_01] @@ -120,6 +120,7 @@ io-compat = [ "compat", "tokio-io", ] +portable-atomic = ["futures-core/portable-atomic"] sink = ["futures-sink"] std = [ "alloc", diff --git a/Cargo.toml.orig b/Cargo.toml.orig index aeecf0f..95c3dee 100644 --- a/Cargo.toml.orig +++ b/Cargo.toml.orig @@ -1,6 +1,6 @@ [package] name = "futures-util" -version = "0.3.25" +version = "0.3.26" edition = "2018" rust-version = "1.45" license = "MIT OR Apache-2.0" @@ -21,6 +21,7 @@ io-compat = ["io", "compat", "tokio-io"] sink = ["futures-sink"] io = ["std", "futures-io", "memchr"] channel = ["std", "futures-channel"] +portable-atomic = ["futures-core/portable-atomic"] # Unstable features # These features are outside of the normal semver guarantees and require the @@ -34,12 +35,12 @@ write-all-vectored = ["io"] cfg-target-has-atomic = [] [dependencies] -futures-core = { path = "../futures-core", version = "0.3.25", default-features = false } -futures-task = { path = "../futures-task", version = "0.3.25", default-features = false } -futures-channel = { path = "../futures-channel", version = "0.3.25", default-features = false, features = ["std"], optional = true } -futures-io = { path = "../futures-io", version = "0.3.25", default-features = false, features = ["std"], optional = true } -futures-sink = { path = "../futures-sink", version = "0.3.25", default-features = false, optional = true } -futures-macro = { path = "../futures-macro", version = "=0.3.25", default-features = false, optional = true } +futures-core = { path = "../futures-core", version = "0.3.26", default-features = false } +futures-task = { path = "../futures-task", version = "0.3.26", default-features = false } +futures-channel = { path = "../futures-channel", version = "0.3.26", default-features = false, features = ["std"], optional = true } +futures-io = { path = "../futures-io", version = "0.3.26", default-features = false, features = ["std"], optional = true } +futures-sink = { path = "../futures-sink", version = "0.3.26", default-features = false, optional = true } +futures-macro = { path = "../futures-macro", version = "=0.3.26", default-features = false, optional = true } slab = { version = "0.4.2", optional = true } memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } @@ -11,13 +11,13 @@ third_party { } url { type: ARCHIVE - value: "https://static.crates.io/crates/futures-util/futures-util-0.3.25.crate" + value: "https://static.crates.io/crates/futures-util/futures-util-0.3.26.crate" } - version: "0.3.25" + version: "0.3.26" license_type: NOTICE last_upgrade_date { - year: 2022 - month: 12 - day: 12 + year: 2023 + month: 2 + day: 15 } } diff --git a/src/future/either.rs b/src/future/either.rs index 9602de7..27e5064 100644 --- a/src/future/either.rs +++ b/src/future/either.rs @@ -33,11 +33,31 @@ pub enum Either<A, B> { } impl<A, B> Either<A, B> { - fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> { + // SAFETY: We can use `new_unchecked` because the `inner` parts are + // guaranteed to be pinned, as they come from `self` which is pinned. unsafe { - match self.get_unchecked_mut() { - Either::Left(a) => Either::Left(Pin::new_unchecked(a)), - Either::Right(b) => Either::Right(Pin::new_unchecked(b)), + match *Pin::get_ref(self) { + Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)), + } + } + } + + /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + // SAFETY: `get_unchecked_mut` is fine because we don't move anything. + // We can use `new_unchecked` because the `inner` parts are guaranteed + // to be pinned, as they come from `self` which is pinned, and we never + // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We + // also don't have an implementation of `Drop`, nor manual `Unpin`. + unsafe { + match *Pin::get_unchecked_mut(self) { + Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)), } } } @@ -85,7 +105,7 @@ where type Output = A::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll(cx), Either::Right(x) => x.poll(cx), } @@ -113,7 +133,7 @@ where type Item = A::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_next(cx), Either::Right(x) => x.poll_next(cx), } @@ -149,28 +169,28 @@ where type Error = A::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_ready(cx), Either::Right(x) => x.poll_ready(cx), } } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.start_send(item), Either::Right(x) => x.start_send(item), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -198,7 +218,7 @@ mod if_std { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read(cx, buf), Either::Right(x) => x.poll_read(cx, buf), } @@ -209,7 +229,7 @@ mod if_std { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read_vectored(cx, bufs), Either::Right(x) => x.poll_read_vectored(cx, bufs), } @@ -226,7 +246,7 @@ mod if_std { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write(cx, buf), Either::Right(x) => x.poll_write(cx, buf), } @@ -237,21 +257,21 @@ mod if_std { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write_vectored(cx, bufs), Either::Right(x) => x.poll_write_vectored(cx, bufs), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -268,7 +288,7 @@ mod if_std { cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_seek(cx, pos), Either::Right(x) => x.poll_seek(cx, pos), } @@ -281,14 +301,14 @@ mod if_std { B: AsyncBufRead, { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_fill_buf(cx), Either::Right(x) => x.poll_fill_buf(cx), } } fn consume(self: Pin<&mut Self>, amt: usize) { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.consume(amt), Either::Right(x) => x.consume(amt), } diff --git a/src/future/future/shared.rs b/src/future/future/shared.rs index 9859315..ecd1b42 100644 --- a/src/future/future/shared.rs +++ b/src/future/future/shared.rs @@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker}; use slab::Slab; use std::cell::UnsafeCell; use std::fmt; +use std::hash::Hasher; use std::pin::Pin; +use std::ptr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, SeqCst}; use std::sync::{Arc, Mutex, Weak}; @@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> { impl<Fut> Shared<Fut> where Fut: Future, - Fut::Output: Clone, { /// Returns [`Some`] containing a reference to this [`Shared`]'s output if /// it has already been computed by a clone or [`None`] if it hasn't been @@ -139,6 +140,7 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the strong count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn strong_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::strong_count(arc)) } @@ -152,15 +154,44 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the weak count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn weak_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::weak_count(arc)) } + + /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`. + pub fn ptr_hash<H: Hasher>(&self, state: &mut H) { + match self.inner.as_ref() { + Some(arc) => { + state.write_u8(1); + ptr::hash(Arc::as_ptr(arc), state); + } + None => { + state.write_u8(0); + } + } + } + + /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to + /// `Arc::ptr_eq`). + /// + /// Returns `false` if either `Shared` has terminated. + pub fn ptr_eq(&self, rhs: &Self) -> bool { + let lhs = match self.inner.as_ref() { + Some(lhs) => lhs, + None => return false, + }; + let rhs = match rhs.inner.as_ref() { + Some(rhs) => rhs, + None => return false, + }; + Arc::ptr_eq(lhs, rhs) + } } impl<Fut> Inner<Fut> where Fut: Future, - Fut::Output: Clone, { /// Safety: callers must first ensure that `self.inner.state` /// is `COMPLETE` @@ -170,6 +201,13 @@ where FutureOrOutput::Future(_) => unreachable!(), } } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ /// Registers the current task to receive a wakeup when we are awoken. fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { let mut wakers_guard = self.notifier.wakers.lock().unwrap(); diff --git a/src/future/select_all.rs b/src/future/select_all.rs index 07d65ca..0a51d0d 100644 --- a/src/future/select_all.rs +++ b/src/future/select_all.rs @@ -58,6 +58,7 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> { }); match item { Some((idx, res)) => { + #[allow(clippy::let_underscore_future)] let _ = self.inner.swap_remove(idx); let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) diff --git a/src/future/try_join_all.rs b/src/future/try_join_all.rs index 25fcfcb..506f450 100644 --- a/src/future/try_join_all.rs +++ b/src/future/try_join_all.rs @@ -77,6 +77,20 @@ where /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// +/// # See Also +/// +/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// +/// Some examples for additional functionality provided by these are: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// /// # Examples /// /// ``` diff --git a/src/sink/unfold.rs b/src/sink/unfold.rs index 330a068..dea1307 100644 --- a/src/sink/unfold.rs +++ b/src/sink/unfold.rs @@ -73,7 +73,10 @@ where this.state.set(UnfoldState::Value { value: state }); Ok(()) } - Err(err) => Err(err), + Err(err) => { + this.state.set(UnfoldState::Empty); + Err(err) + } } } else { Ok(()) diff --git a/src/stream/futures_ordered.rs b/src/stream/futures_ordered.rs index f1c93fd..618bf1b 100644 --- a/src/stream/futures_ordered.rs +++ b/src/stream/futures_ordered.rs @@ -19,7 +19,7 @@ pin_project! { struct OrderWrapper<T> { #[pin] data: T, // A future or a future's output - index: usize, + index: isize, } } @@ -58,7 +58,7 @@ where /// An unbounded queue of futures. /// -/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order /// on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. @@ -95,8 +95,8 @@ where pub struct FuturesOrdered<T: Future> { in_progress_queue: FuturesUnordered<OrderWrapper<T>>, queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, - next_incoming_index: usize, - next_outgoing_index: usize, + next_incoming_index: isize, + next_outgoing_index: isize, } impl<T: Future> Unpin for FuturesOrdered<T> {} @@ -160,13 +160,9 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// task notifications. This future will be the next future to be returned /// complete. pub fn push_front(&mut self, future: Fut) { - if self.next_outgoing_index == 0 { - self.push_back(future) - } else { - let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; - self.next_outgoing_index -= 1; - self.in_progress_queue.push(wrapped); - } + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); } } diff --git a/src/stream/futures_unordered/mod.rs b/src/stream/futures_unordered/mod.rs index 5e995fd..6b5804d 100644 --- a/src/stream/futures_unordered/mod.rs +++ b/src/stream/futures_unordered/mod.rs @@ -33,6 +33,9 @@ use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; /// A set of futures which may complete in any order. /// +/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this +/// type that preserves a FIFO order. +/// /// This structure is optimized to manage a large number of futures. /// Futures managed by [`FuturesUnordered`] will only be polled when they /// generate wake-up notifications. This reduces the required amount of work diff --git a/src/stream/stream/buffered.rs b/src/stream/stream/buffered.rs index 8ca0391..5854eb7 100644 --- a/src/stream/stream/buffered.rs +++ b/src/stream/stream/buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -95,6 +95,16 @@ where } } +impl<St> FusedStream for Buffered<St> +where + St: Stream, + St::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.stream.is_done() && self.in_progress_queue.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Item> Sink<Item> for Buffered<S> diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index c5da35e..36ff1e5 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -50,8 +50,9 @@ where if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)); } + + this.first.set(None); } - this.first.set(None); this.second.poll_next(cx) } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index a823fab..bb5e249 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1513,8 +1513,7 @@ pub trait StreamExt: Stream { /// be immediately returned. /// /// If the underlying stream ended and only a partial vector was created, - /// it'll be returned. Additionally if an error happens from the underlying - /// stream then the currently buffered items will be yielded. + /// it will be returned. /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. diff --git a/src/stream/stream/ready_chunks.rs b/src/stream/stream/ready_chunks.rs index 49116d4..192054c 100644 --- a/src/stream/stream/ready_chunks.rs +++ b/src/stream/stream/ready_chunks.rs @@ -1,6 +1,5 @@ -use crate::stream::Fuse; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; -use core::mem; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -15,7 +14,6 @@ pin_project! { pub struct ReadyChunks<St: Stream> { #[pin] stream: Fuse<St>, - items: Vec<St::Item>, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } @@ -24,11 +22,7 @@ impl<St: Stream> ReadyChunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: super::Fuse::new(stream), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: stream.fuse(), cap: capacity } } delegate_access_inner!(stream, St, (.)); @@ -40,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); + let mut items: Vec<St::Item> = Vec::new(); + loop { match this.stream.as_mut().poll_next(cx) { // Flush all collected data if underlying stream doesn't contain // more ready values Poll::Pending => { - return if this.items.is_empty() { - Poll::Pending - } else { - Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) - } + return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } } // Push the ready item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Poll::Ready(Some(item)) => { - this.items.push(item); - if this.items.len() >= *this.cap { - return Poll::Ready(Some(mem::replace( - this.items, - Vec::with_capacity(*this.cap), - ))); + if items.is_empty() { + items.reserve(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(items)); } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. Poll::Ready(None) => { - let last = if this.items.is_empty() { - None - } else { - let full_buf = mem::take(this.items); - Some(full_buf) - }; + let last = if items.is_empty() { None } else { Some(items) }; return Poll::Ready(last); } @@ -82,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = (lower / self.cap).saturating_add(chunk_len); - let upper = match upper { - Some(x) => x.checked_add(chunk_len), - None => None, - }; + let lower = lower / self.cap; (lower, upper) } } -impl<St: FusedStream> FusedStream for ReadyChunks<St> { +impl<St: Stream> FusedStream for ReadyChunks<St> { fn is_terminated(&self) -> bool { - self.stream.is_terminated() && self.items.is_empty() + self.stream.is_terminated() } } |