Rollup merge of #91721 - danielhenrymantilla:patch-1, r=joshtriplett
Minor improvements to `future::join!`'s implementation This is a follow-up from #91645, regarding [some remarks I made](https://rust-lang.zulipchat.com/#narrow/stream/187312-wg-async-foundations/topic/join!/near/264293660). Mainly: - it hides the recursive munching through a private `macro`, to avoid leaking such details (a corollary is getting rid of the need to use ``@`` to disambiguate); - it uses a `match` binding, _outside_ the `async move` block, to better match the semantics from function-like syntax; - it pre-pins the future before calling into `poll_fn`, since `poll_fn`, alone, cannot guarantee that its capture does not move (to clarify: I believe the previous code was sound, thanks to the outer layer of `async`. But I find it clearer / more robust to refactorings this way 🙂). - it uses `@ibraheemdev's` very neat `.ready()?`; - it renames `Took` to `Taken` for consistency with `Done` (tiny nit 😄). ~~TODO~~Done: - [x] Add unit tests to enforce the function-like `:value` semantics are respected. r? `@nrc`
This commit is contained in:
commit
ed81098fcc
@ -1,4 +1,4 @@
|
||||
#![allow(unused_imports)] // items are used by the macro
|
||||
#![allow(unused_imports, unused_macros)] // items are used by the macro
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::future::{poll_fn, Future};
|
||||
@ -45,59 +45,104 @@
|
||||
/// # };
|
||||
/// ```
|
||||
#[unstable(feature = "future_join", issue = "91642")]
|
||||
pub macro join {
|
||||
( $($fut:expr),* $(,)?) => {
|
||||
join! { @count: (), @futures: {}, @rest: ($($fut,)*) }
|
||||
},
|
||||
// Recurse until we have the position of each future in the tuple
|
||||
(
|
||||
// A token for each future that has been expanded: "_ _ _"
|
||||
@count: ($($count:tt)*),
|
||||
// Futures and their positions in the tuple: "{ a => (_), b => (_ _)) }"
|
||||
@futures: { $($fut:tt)* },
|
||||
// Take a future from @rest to expand
|
||||
@rest: ($current:expr, $($rest:tt)*)
|
||||
) => {
|
||||
join! {
|
||||
@count: ($($count)* _),
|
||||
@futures: { $($fut)* $current => ($($count)*), },
|
||||
@rest: ($($rest)*)
|
||||
}
|
||||
},
|
||||
// Now generate the output future
|
||||
(
|
||||
@count: ($($count:tt)*),
|
||||
@futures: {
|
||||
$( $(@$f:tt)? $fut:expr => ( $($pos:tt)* ), )*
|
||||
},
|
||||
@rest: ()
|
||||
) => {
|
||||
async move {
|
||||
let mut futures = ( $( MaybeDone::Future($fut), )* );
|
||||
pub macro join( $($fut:expr),+ $(,)? ) {
|
||||
// Funnel through an internal macro not to leak implementation details.
|
||||
join_internal! {
|
||||
current_position: []
|
||||
futures_and_positions: []
|
||||
munching: [ $($fut)+ ]
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME(danielhenrymantilla): a private macro should need no stability guarantee.
|
||||
#[unstable(feature = "future_join", issue = "91642")]
|
||||
/// To be able to *name* the i-th future in the tuple (say we want the .4-th),
|
||||
/// the following trick will be used: `let (_, _, _, _, it, ..) = tuple;`
|
||||
/// In order to do that, we need to generate a `i`-long repetition of `_`,
|
||||
/// for each i-th fut. Hence the recursive muncher approach.
|
||||
macro join_internal {
|
||||
// Recursion step: map each future with its "position" (underscore count).
|
||||
(
|
||||
// Accumulate a token for each future that has been expanded: "_ _ _".
|
||||
current_position: [
|
||||
$($underscores:tt)*
|
||||
]
|
||||
// Accumulate Futures and their positions in the tuple: `_0th () _1st ( _ ) …`.
|
||||
futures_and_positions: [
|
||||
$($acc:tt)*
|
||||
]
|
||||
// Munch one future.
|
||||
munching: [
|
||||
$current:tt
|
||||
$($rest:tt)*
|
||||
]
|
||||
) => (
|
||||
join_internal! {
|
||||
current_position: [
|
||||
$($underscores)*
|
||||
_
|
||||
]
|
||||
futures_and_positions: [
|
||||
$($acc)*
|
||||
$current ( $($underscores)* )
|
||||
]
|
||||
munching: [
|
||||
$($rest)*
|
||||
]
|
||||
}
|
||||
),
|
||||
|
||||
// End of recursion: generate the output future.
|
||||
(
|
||||
current_position: $_:tt
|
||||
futures_and_positions: [
|
||||
$(
|
||||
$fut_expr:tt ( $($pos:tt)* )
|
||||
)*
|
||||
]
|
||||
// Nothing left to munch.
|
||||
munching: []
|
||||
) => (
|
||||
match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
|
||||
let mut futures = futures;
|
||||
// SAFETY: this is `pin_mut!`.
|
||||
let mut futures = unsafe { Pin::new_unchecked(&mut futures) };
|
||||
poll_fn(move |cx| {
|
||||
let mut done = true;
|
||||
|
||||
// For each `fut`, pin-project to it, and poll it.
|
||||
$(
|
||||
let ( $($pos,)* fut, .. ) = &mut futures;
|
||||
|
||||
// SAFETY: The futures are never moved
|
||||
done &= unsafe { Pin::new_unchecked(fut).poll(cx).is_ready() };
|
||||
// SAFETY: pinning projection
|
||||
let fut = unsafe {
|
||||
futures.as_mut().map_unchecked_mut(|it| {
|
||||
let ( $($pos,)* fut, .. ) = it;
|
||||
fut
|
||||
})
|
||||
};
|
||||
// Despite how tempting it may be to `let () = fut.poll(cx).ready()?;`
|
||||
// doing so would defeat the point of `join!`: to start polling eagerly all
|
||||
// of the futures, to allow parallelizing the waits.
|
||||
done &= fut.poll(cx).is_ready();
|
||||
)*
|
||||
|
||||
if done {
|
||||
// Extract all the outputs
|
||||
Poll::Ready(($({
|
||||
let ( $($pos,)* fut, .. ) = &mut futures;
|
||||
|
||||
fut.take_output().unwrap()
|
||||
}),*))
|
||||
} else {
|
||||
Poll::Pending
|
||||
if !done {
|
||||
return Poll::Pending;
|
||||
}
|
||||
// All ready; time to extract all the outputs.
|
||||
|
||||
// SAFETY: `.take_output()` does not break the `Pin` invariants for that `fut`.
|
||||
let futures = unsafe {
|
||||
futures.as_mut().get_unchecked_mut()
|
||||
};
|
||||
Poll::Ready(
|
||||
($(
|
||||
{
|
||||
let ( $($pos,)* fut, .. ) = &mut *futures;
|
||||
fut.take_output().unwrap()
|
||||
}
|
||||
),*) // <- no trailing comma since we don't want 1-tuples.
|
||||
)
|
||||
}).await
|
||||
}
|
||||
}
|
||||
}}
|
||||
),
|
||||
}
|
||||
|
||||
/// Future used by `join!` that stores it's output to
|
||||
@ -109,14 +154,14 @@
|
||||
pub enum MaybeDone<F: Future> {
|
||||
Future(F),
|
||||
Done(F::Output),
|
||||
Took,
|
||||
Taken,
|
||||
}
|
||||
|
||||
#[unstable(feature = "future_join", issue = "91642")]
|
||||
impl<F: Future> MaybeDone<F> {
|
||||
pub fn take_output(&mut self) -> Option<F::Output> {
|
||||
match &*self {
|
||||
MaybeDone::Done(_) => match mem::replace(self, Self::Took) {
|
||||
match *self {
|
||||
MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
|
||||
MaybeDone::Done(val) => Some(val),
|
||||
_ => unreachable!(),
|
||||
},
|
||||
@ -132,13 +177,14 @@ impl<F: Future> Future for MaybeDone<F> {
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// SAFETY: pinning in structural for `f`
|
||||
unsafe {
|
||||
match self.as_mut().get_unchecked_mut() {
|
||||
MaybeDone::Future(f) => match Pin::new_unchecked(f).poll(cx) {
|
||||
Poll::Ready(val) => self.set(Self::Done(val)),
|
||||
Poll::Pending => return Poll::Pending,
|
||||
},
|
||||
// Do not mix match ergonomics with unsafe.
|
||||
match *self.as_mut().get_unchecked_mut() {
|
||||
MaybeDone::Future(ref mut f) => {
|
||||
let val = Pin::new_unchecked(f).poll(cx).ready()?;
|
||||
self.set(Self::Done(val));
|
||||
}
|
||||
MaybeDone::Done(_) => {}
|
||||
MaybeDone::Took => unreachable!(),
|
||||
MaybeDone::Taken => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,6 +64,40 @@ fn test_join() {
|
||||
});
|
||||
}
|
||||
|
||||
/// Tests that `join!(…)` behaves "like a function": evaluating its arguments
|
||||
/// before applying any of its own logic.
|
||||
///
|
||||
/// _e.g._, `join!(async_fn(&borrowed), …)` does not consume `borrowed`;
|
||||
/// and `join!(opt_fut?, …)` does let that `?` refer to the callsite scope.
|
||||
mod test_join_function_like_value_arg_semantics {
|
||||
use super::*;
|
||||
|
||||
async fn async_fn(_: impl Sized) {}
|
||||
|
||||
// no need to _run_ this test, just to compile it.
|
||||
fn _join_does_not_unnecessarily_move_mentioned_bindings() {
|
||||
let not_copy = vec![()];
|
||||
let _ = join!(async_fn(¬_copy)); // should not move `not_copy`
|
||||
let _ = ¬_copy; // OK
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_lets_control_flow_effects_such_as_try_flow_through() {
|
||||
let maybe_fut = None;
|
||||
if false {
|
||||
*&mut { maybe_fut } = Some(async {});
|
||||
loop {}
|
||||
}
|
||||
assert!(Option::is_none(&try { join!(maybe_fut?, async { unreachable!() }) }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn join_is_able_to_handle_temporaries() {
|
||||
let _ = join!(async_fn(&String::from("temporary")));
|
||||
let () = block_on(join!(async_fn(&String::from("temporary"))));
|
||||
}
|
||||
}
|
||||
|
||||
fn block_on(fut: impl Future) {
|
||||
struct Waker;
|
||||
impl Wake for Waker {
|
||||
|
@ -49,6 +49,7 @@
|
||||
#![feature(str_internals)]
|
||||
#![feature(test)]
|
||||
#![feature(trusted_len)]
|
||||
#![feature(try_blocks)]
|
||||
#![feature(try_trait_v2)]
|
||||
#![feature(slice_internals)]
|
||||
#![feature(slice_partition_dedup)]
|
||||
|
Loading…
Reference in New Issue
Block a user