1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
14  
#include <boost/capy/detail/void_to_monostate.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/task.hpp>
20  
#include <boost/capy/task.hpp>
21  

21  

22  
#include <array>
22  
#include <array>
23  
#include <atomic>
23  
#include <atomic>
24  
#include <exception>
24  
#include <exception>
25  
#include <optional>
25  
#include <optional>
26  
#include <stop_token>
26  
#include <stop_token>
27  
#include <tuple>
27  
#include <tuple>
28  
#include <type_traits>
28  
#include <type_traits>
29  
#include <utility>
29  
#include <utility>
30  

30  

31  
namespace boost {
31  
namespace boost {
32  
namespace capy {
32  
namespace capy {
33  

33  

34  
namespace detail {
34  
namespace detail {
35  

35  

36  
/** Holds the result of a single task within when_all.
36  
/** Holds the result of a single task within when_all.
37  
*/
37  
*/
38  
template<typename T>
38  
template<typename T>
39  
struct result_holder
39  
struct result_holder
40  
{
40  
{
41  
    std::optional<T> value_;
41  
    std::optional<T> value_;
42  

42  

43  
    void set(T v)
43  
    void set(T v)
44  
    {
44  
    {
45  
        value_ = std::move(v);
45  
        value_ = std::move(v);
46  
    }
46  
    }
47  

47  

48  
    T get() &&
48  
    T get() &&
49  
    {
49  
    {
50  
        return std::move(*value_);
50  
        return std::move(*value_);
51  
    }
51  
    }
52  
};
52  
};
53  

53  

54  
/** Specialization for void tasks - returns monostate to preserve index mapping.
54  
/** Specialization for void tasks - returns monostate to preserve index mapping.
55  
*/
55  
*/
56  
template<>
56  
template<>
57  
struct result_holder<void>
57  
struct result_holder<void>
58  
{
58  
{
59  
    std::monostate get() && { return {}; }
59  
    std::monostate get() && { return {}; }
60  
};
60  
};
61  

61  

62  
/** Shared state for when_all operation.
62  
/** Shared state for when_all operation.
63  

63  

64  
    @tparam Ts The result types of the tasks.
64  
    @tparam Ts The result types of the tasks.
65  
*/
65  
*/
66  
template<typename... Ts>
66  
template<typename... Ts>
67  
struct when_all_state
67  
struct when_all_state
68  
{
68  
{
69  
    static constexpr std::size_t task_count = sizeof...(Ts);
69  
    static constexpr std::size_t task_count = sizeof...(Ts);
70  

70  

71  
    // Completion tracking - when_all waits for all children
71  
    // Completion tracking - when_all waits for all children
72  
    std::atomic<std::size_t> remaining_count_;
72  
    std::atomic<std::size_t> remaining_count_;
73  

73  

74  
    // Result storage in input order
74  
    // Result storage in input order
75  
    std::tuple<result_holder<Ts>...> results_;
75  
    std::tuple<result_holder<Ts>...> results_;
76  

76  

77  
    // Runner handles - destroyed in await_resume while allocator is valid
77  
    // Runner handles - destroyed in await_resume while allocator is valid
78  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
78  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
79  

79  

80  
    // Exception storage - first error wins, others discarded
80  
    // Exception storage - first error wins, others discarded
81  
    std::atomic<bool> has_exception_{false};
81  
    std::atomic<bool> has_exception_{false};
82  
    std::exception_ptr first_exception_;
82  
    std::exception_ptr first_exception_;
83  

83  

84  
    // Stop propagation - on error, request stop for siblings
84  
    // Stop propagation - on error, request stop for siblings
85  
    std::stop_source stop_source_;
85  
    std::stop_source stop_source_;
86  

86  

87  
    // Connects parent's stop_token to our stop_source
87  
    // Connects parent's stop_token to our stop_source
88  
    struct stop_callback_fn
88  
    struct stop_callback_fn
89  
    {
89  
    {
90  
        std::stop_source* source_;
90  
        std::stop_source* source_;
91  
        void operator()() const { source_->request_stop(); }
91  
        void operator()() const { source_->request_stop(); }
92  
    };
92  
    };
93  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
93  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
94  
    std::optional<stop_callback_t> parent_stop_callback_;
94  
    std::optional<stop_callback_t> parent_stop_callback_;
95  

95  

96  
    // Parent resumption
96  
    // Parent resumption
97  
    std::coroutine_handle<> continuation_;
97  
    std::coroutine_handle<> continuation_;
98  
    io_env const* caller_env_ = nullptr;
98  
    io_env const* caller_env_ = nullptr;
99  

99  

100  
    when_all_state()
100  
    when_all_state()
101  
        : remaining_count_(task_count)
101  
        : remaining_count_(task_count)
102  
    {
102  
    {
103  
    }
103  
    }
104  

104  

105  
    // Runners self-destruct in final_suspend. No destruction needed here.
105  
    // Runners self-destruct in final_suspend. No destruction needed here.
106  

106  

107  
    /** Capture an exception (first one wins).
107  
    /** Capture an exception (first one wins).
108  
    */
108  
    */
109  
    void capture_exception(std::exception_ptr ep)
109  
    void capture_exception(std::exception_ptr ep)
110  
    {
110  
    {
111  
        bool expected = false;
111  
        bool expected = false;
112  
        if(has_exception_.compare_exchange_strong(
112  
        if(has_exception_.compare_exchange_strong(
113  
            expected, true, std::memory_order_relaxed))
113  
            expected, true, std::memory_order_relaxed))
114  
            first_exception_ = ep;
114  
            first_exception_ = ep;
115  
    }
115  
    }
116  

116  

117  
};
117  
};
118  

118  

119  
/** Wrapper coroutine that intercepts task completion.
119  
/** Wrapper coroutine that intercepts task completion.
120  

120  

121  
    This runner awaits its assigned task and stores the result in
121  
    This runner awaits its assigned task and stores the result in
122  
    the shared state, or captures the exception and requests stop.
122  
    the shared state, or captures the exception and requests stop.
123  
*/
123  
*/
124  
template<typename T, typename... Ts>
124  
template<typename T, typename... Ts>
125  
struct when_all_runner
125  
struct when_all_runner
126  
{
126  
{
127  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
127  
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
128  
    {
128  
    {
129  
        when_all_state<Ts...>* state_ = nullptr;
129  
        when_all_state<Ts...>* state_ = nullptr;
130  
        io_env env_;
130  
        io_env env_;
131  

131  

132  
        when_all_runner get_return_object()
132  
        when_all_runner get_return_object()
133  
        {
133  
        {
134  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
134  
            return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
135  
        }
135  
        }
136  

136  

137  
        std::suspend_always initial_suspend() noexcept
137  
        std::suspend_always initial_suspend() noexcept
138  
        {
138  
        {
139  
            return {};
139  
            return {};
140  
        }
140  
        }
141  

141  

142  
        auto final_suspend() noexcept
142  
        auto final_suspend() noexcept
143  
        {
143  
        {
144  
            struct awaiter
144  
            struct awaiter
145  
            {
145  
            {
146  
                promise_type* p_;
146  
                promise_type* p_;
147  

147  

148  
                bool await_ready() const noexcept
148  
                bool await_ready() const noexcept
149  
                {
149  
                {
150  
                    return false;
150  
                    return false;
151  
                }
151  
                }
152  

152  

153  
                auto await_suspend(std::coroutine_handle<> h) noexcept
153  
                auto await_suspend(std::coroutine_handle<> h) noexcept
154  
                {
154  
                {
155  
                    // Extract everything needed before self-destruction.
155  
                    // Extract everything needed before self-destruction.
156  
                    auto* state = p_->state_;
156  
                    auto* state = p_->state_;
157  
                    auto* counter = &state->remaining_count_;
157  
                    auto* counter = &state->remaining_count_;
158  
                    auto* caller_env = state->caller_env_;
158  
                    auto* caller_env = state->caller_env_;
159  
                    auto cont = state->continuation_;
159  
                    auto cont = state->continuation_;
160  

160  

161  
                    h.destroy();
161  
                    h.destroy();
162  

162  

163  
                    // If last runner, dispatch parent for symmetric transfer.
163  
                    // If last runner, dispatch parent for symmetric transfer.
164  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
164  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
165  
                    if(remaining == 1)
165  
                    if(remaining == 1)
166  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
166  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
167  
                    return detail::symmetric_transfer(std::noop_coroutine());
167  
                    return detail::symmetric_transfer(std::noop_coroutine());
168  
                }
168  
                }
169  

169  

170  
                void await_resume() const noexcept
170  
                void await_resume() const noexcept
171  
                {
171  
                {
172  
                }
172  
                }
173  
            };
173  
            };
174  
            return awaiter{this};
174  
            return awaiter{this};
175  
        }
175  
        }
176  

176  

177  
        void return_void()
177  
        void return_void()
178  
        {
178  
        {
179  
        }
179  
        }
180  

180  

181  
        void unhandled_exception()
181  
        void unhandled_exception()
182  
        {
182  
        {
183  
            state_->capture_exception(std::current_exception());
183  
            state_->capture_exception(std::current_exception());
184  
            // Request stop for sibling tasks
184  
            // Request stop for sibling tasks
185  
            state_->stop_source_.request_stop();
185  
            state_->stop_source_.request_stop();
186  
        }
186  
        }
187  

187  

188  
        template<class Awaitable>
188  
        template<class Awaitable>
189  
        struct transform_awaiter
189  
        struct transform_awaiter
190  
        {
190  
        {
191  
            std::decay_t<Awaitable> a_;
191  
            std::decay_t<Awaitable> a_;
192  
            promise_type* p_;
192  
            promise_type* p_;
193  

193  

194  
            bool await_ready()
194  
            bool await_ready()
195  
            {
195  
            {
196  
                return a_.await_ready();
196  
                return a_.await_ready();
197  
            }
197  
            }
198  

198  

199  
            decltype(auto) await_resume()
199  
            decltype(auto) await_resume()
200  
            {
200  
            {
201  
                return a_.await_resume();
201  
                return a_.await_resume();
202  
            }
202  
            }
203  

203  

204  
            template<class Promise>
204  
            template<class Promise>
205  
            auto await_suspend(std::coroutine_handle<Promise> h)
205  
            auto await_suspend(std::coroutine_handle<Promise> h)
206  
            {
206  
            {
207  
                using R = decltype(a_.await_suspend(h, &p_->env_));
207  
                using R = decltype(a_.await_suspend(h, &p_->env_));
208  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
208  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
209  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
209  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
210  
                else
210  
                else
211  
                    return a_.await_suspend(h, &p_->env_);
211  
                    return a_.await_suspend(h, &p_->env_);
212  
            }
212  
            }
213  
        };
213  
        };
214  

214  

215  
        template<class Awaitable>
215  
        template<class Awaitable>
216  
        auto await_transform(Awaitable&& a)
216  
        auto await_transform(Awaitable&& a)
217  
        {
217  
        {
218  
            using A = std::decay_t<Awaitable>;
218  
            using A = std::decay_t<Awaitable>;
219  
            if constexpr (IoAwaitable<A>)
219  
            if constexpr (IoAwaitable<A>)
220  
            {
220  
            {
221  
                return transform_awaiter<Awaitable>{
221  
                return transform_awaiter<Awaitable>{
222  
                    std::forward<Awaitable>(a), this};
222  
                    std::forward<Awaitable>(a), this};
223  
            }
223  
            }
224  
            else
224  
            else
225  
            {
225  
            {
226  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
226  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
227  
            }
227  
            }
228  
        }
228  
        }
229  
    };
229  
    };
230  

230  

231  
    std::coroutine_handle<promise_type> h_;
231  
    std::coroutine_handle<promise_type> h_;
232  

232  

233  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
233  
    explicit when_all_runner(std::coroutine_handle<promise_type> h)
234  
        : h_(h)
234  
        : h_(h)
235  
    {
235  
    {
236  
    }
236  
    }
237  

237  

238  
    // Enable move for all clang versions - some versions need it
238  
    // Enable move for all clang versions - some versions need it
239  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
239  
    when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
240  

240  

241  
    // Non-copyable
241  
    // Non-copyable
242  
    when_all_runner(when_all_runner const&) = delete;
242  
    when_all_runner(when_all_runner const&) = delete;
243  
    when_all_runner& operator=(when_all_runner const&) = delete;
243  
    when_all_runner& operator=(when_all_runner const&) = delete;
244  
    when_all_runner& operator=(when_all_runner&&) = delete;
244  
    when_all_runner& operator=(when_all_runner&&) = delete;
245  

245  

246  
    auto release() noexcept
246  
    auto release() noexcept
247  
    {
247  
    {
248  
        return std::exchange(h_, nullptr);
248  
        return std::exchange(h_, nullptr);
249  
    }
249  
    }
250  
};
250  
};
251  

251  

252  
/** Create a runner coroutine for a single awaitable.
252  
/** Create a runner coroutine for a single awaitable.
253  

253  

254  
    Awaitable is passed directly to ensure proper coroutine frame storage.
254  
    Awaitable is passed directly to ensure proper coroutine frame storage.
255  
*/
255  
*/
256  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
256  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
257  
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
257  
when_all_runner<awaitable_result_t<Awaitable>, Ts...>
258  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
258  
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
259  
{
259  
{
260  
    using T = awaitable_result_t<Awaitable>;
260  
    using T = awaitable_result_t<Awaitable>;
261  
    if constexpr (std::is_void_v<T>)
261  
    if constexpr (std::is_void_v<T>)
262  
    {
262  
    {
263  
        co_await std::move(inner);
263  
        co_await std::move(inner);
264  
    }
264  
    }
265  
    else
265  
    else
266  
    {
266  
    {
267  
        std::get<Index>(state->results_).set(co_await std::move(inner));
267  
        std::get<Index>(state->results_).set(co_await std::move(inner));
268  
    }
268  
    }
269  
}
269  
}
270  

270  

271  
/** Internal awaitable that launches all runner coroutines and waits.
271  
/** Internal awaitable that launches all runner coroutines and waits.
272  

272  

273  
    This awaitable is used inside the when_all coroutine to handle
273  
    This awaitable is used inside the when_all coroutine to handle
274  
    the concurrent execution of child awaitables.
274  
    the concurrent execution of child awaitables.
275  
*/
275  
*/
276  
template<IoAwaitable... Awaitables>
276  
template<IoAwaitable... Awaitables>
277  
class when_all_launcher
277  
class when_all_launcher
278  
{
278  
{
279  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
279  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
280  

280  

281  
    std::tuple<Awaitables...>* awaitables_;
281  
    std::tuple<Awaitables...>* awaitables_;
282  
    state_type* state_;
282  
    state_type* state_;
283  

283  

284  
public:
284  
public:
285  
    when_all_launcher(
285  
    when_all_launcher(
286  
        std::tuple<Awaitables...>* awaitables,
286  
        std::tuple<Awaitables...>* awaitables,
287  
        state_type* state)
287  
        state_type* state)
288  
        : awaitables_(awaitables)
288  
        : awaitables_(awaitables)
289  
        , state_(state)
289  
        , state_(state)
290  
    {
290  
    {
291  
    }
291  
    }
292  

292  

293  
    bool await_ready() const noexcept
293  
    bool await_ready() const noexcept
294  
    {
294  
    {
295  
        return sizeof...(Awaitables) == 0;
295  
        return sizeof...(Awaitables) == 0;
296  
    }
296  
    }
297  

297  

298  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
298  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
299  
    {
299  
    {
300  
        state_->continuation_ = continuation;
300  
        state_->continuation_ = continuation;
301  
        state_->caller_env_ = caller_env;
301  
        state_->caller_env_ = caller_env;
302  

302  

303  
        // Forward parent's stop requests to children
303  
        // Forward parent's stop requests to children
304  
        if(caller_env->stop_token.stop_possible())
304  
        if(caller_env->stop_token.stop_possible())
305  
        {
305  
        {
306  
            state_->parent_stop_callback_.emplace(
306  
            state_->parent_stop_callback_.emplace(
307  
                caller_env->stop_token,
307  
                caller_env->stop_token,
308  
                typename state_type::stop_callback_fn{&state_->stop_source_});
308  
                typename state_type::stop_callback_fn{&state_->stop_source_});
309  

309  

310  
            if(caller_env->stop_token.stop_requested())
310  
            if(caller_env->stop_token.stop_requested())
311  
                state_->stop_source_.request_stop();
311  
                state_->stop_source_.request_stop();
312  
        }
312  
        }
313  

313  

314  
        // CRITICAL: If the last task finishes synchronously then the parent
314  
        // CRITICAL: If the last task finishes synchronously then the parent
315  
        // coroutine resumes, destroying its frame, and destroying this object
315  
        // coroutine resumes, destroying its frame, and destroying this object
316  
        // prior to the completion of await_suspend. Therefore, await_suspend
316  
        // prior to the completion of await_suspend. Therefore, await_suspend
317  
        // must ensure `this` cannot be referenced after calling `launch_one`
317  
        // must ensure `this` cannot be referenced after calling `launch_one`
318  
        // for the last time.
318  
        // for the last time.
319  
        auto token = state_->stop_source_.get_token();
319  
        auto token = state_->stop_source_.get_token();
320  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
320  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
321  
            (..., launch_one<Is>(caller_env->executor, token));
321  
            (..., launch_one<Is>(caller_env->executor, token));
322  
        }(std::index_sequence_for<Awaitables...>{});
322  
        }(std::index_sequence_for<Awaitables...>{});
323  

323  

324  
        // Let signal_completion() handle resumption
324  
        // Let signal_completion() handle resumption
325  
        return std::noop_coroutine();
325  
        return std::noop_coroutine();
326  
    }
326  
    }
327  

327  

328  
    void await_resume() const noexcept
328  
    void await_resume() const noexcept
329  
    {
329  
    {
330  
        // Results are extracted by the when_all coroutine from state
330  
        // Results are extracted by the when_all coroutine from state
331  
    }
331  
    }
332  

332  

333  
private:
333  
private:
334  
    template<std::size_t I>
334  
    template<std::size_t I>
335  
    void launch_one(executor_ref caller_ex, std::stop_token token)
335  
    void launch_one(executor_ref caller_ex, std::stop_token token)
336  
    {
336  
    {
337  
        auto runner = make_when_all_runner<I>(
337  
        auto runner = make_when_all_runner<I>(
338  
            std::move(std::get<I>(*awaitables_)), state_);
338  
            std::move(std::get<I>(*awaitables_)), state_);
339  

339  

340  
        auto h = runner.release();
340  
        auto h = runner.release();
341  
        h.promise().state_ = state_;
341  
        h.promise().state_ = state_;
342  
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator};
342  
        h.promise().env_ = io_env{caller_ex, token, state_->caller_env_->frame_allocator};
343  

343  

344  
        std::coroutine_handle<> ch{h};
344  
        std::coroutine_handle<> ch{h};
345  
        state_->runner_handles_[I] = ch;
345  
        state_->runner_handles_[I] = ch;
346  
        state_->caller_env_->executor.post(ch);
346  
        state_->caller_env_->executor.post(ch);
347  
    }
347  
    }
348  
};
348  
};
349  

349  

350  
/** Helper to extract a single result from state.
350  
/** Helper to extract a single result from state.
351  
    This is a separate function to work around a GCC-11 ICE that occurs
351  
    This is a separate function to work around a GCC-11 ICE that occurs
352  
    when using nested immediately-invoked lambdas with pack expansion.
352  
    when using nested immediately-invoked lambdas with pack expansion.
353  
*/
353  
*/
354  
template<std::size_t I, typename... Ts>
354  
template<std::size_t I, typename... Ts>
355  
auto extract_single_result(when_all_state<Ts...>& state)
355  
auto extract_single_result(when_all_state<Ts...>& state)
356  
{
356  
{
357  
    return std::move(std::get<I>(state.results_)).get();
357  
    return std::move(std::get<I>(state.results_)).get();
358  
}
358  
}
359  

359  

360  
/** Extract all results from state as a tuple.
360  
/** Extract all results from state as a tuple.
361  
*/
361  
*/
362  
template<typename... Ts>
362  
template<typename... Ts>
363  
auto extract_results(when_all_state<Ts...>& state)
363  
auto extract_results(when_all_state<Ts...>& state)
364  
{
364  
{
365  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
365  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
366  
        return std::tuple(extract_single_result<Is>(state)...);
366  
        return std::tuple(extract_single_result<Is>(state)...);
367  
    }(std::index_sequence_for<Ts...>{});
367  
    }(std::index_sequence_for<Ts...>{});
368  
}
368  
}
369  

369  

370  
} // namespace detail
370  
} // namespace detail
371  

371  

372  
/** Compute the when_all result tuple type.
372  
/** Compute the when_all result tuple type.
373  

373  

374  
    Void-returning tasks contribute std::monostate to preserve the
374  
    Void-returning tasks contribute std::monostate to preserve the
375  
    task-index-to-result-index mapping, matching when_any's approach.
375  
    task-index-to-result-index mapping, matching when_any's approach.
376  

376  

377  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
377  
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
378  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
378  
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
379  
*/
379  
*/
380  
template<typename... Ts>
380  
template<typename... Ts>
381  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
381  
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
382  

382  

383  
/** Execute multiple awaitables concurrently and collect their results.
383  
/** Execute multiple awaitables concurrently and collect their results.
384  

384  

385  
    Launches all awaitables simultaneously and waits for all to complete
385  
    Launches all awaitables simultaneously and waits for all to complete
386  
    before returning. Results are collected in input order. If any
386  
    before returning. Results are collected in input order. If any
387  
    awaitable throws, cancellation is requested for siblings and the first
387  
    awaitable throws, cancellation is requested for siblings and the first
388  
    exception is rethrown after all awaitables complete.
388  
    exception is rethrown after all awaitables complete.
389  

389  

390  
    @li All child awaitables run concurrently on the caller's executor
390  
    @li All child awaitables run concurrently on the caller's executor
391  
    @li Results are returned as a tuple in input order
391  
    @li Results are returned as a tuple in input order
392  
    @li Void-returning awaitables contribute std::monostate to the
392  
    @li Void-returning awaitables contribute std::monostate to the
393  
        result tuple, preserving the task-index-to-result-index mapping
393  
        result tuple, preserving the task-index-to-result-index mapping
394  
    @li First exception wins; subsequent exceptions are discarded
394  
    @li First exception wins; subsequent exceptions are discarded
395  
    @li Stop is requested for siblings on first error
395  
    @li Stop is requested for siblings on first error
396  
    @li Completes only after all children have finished
396  
    @li Completes only after all children have finished
397  

397  

398  
    @par Thread Safety
398  
    @par Thread Safety
399  
    The returned task must be awaited from a single execution context.
399  
    The returned task must be awaited from a single execution context.
400  
    Child awaitables execute concurrently but complete through the caller's
400  
    Child awaitables execute concurrently but complete through the caller's
401  
    executor.
401  
    executor.
402  

402  

403  
    @param awaitables The awaitables to execute concurrently. Each must
403  
    @param awaitables The awaitables to execute concurrently. Each must
404  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
404  
        satisfy @ref IoAwaitable and is consumed (moved-from) when
405  
        `when_all` is awaited.
405  
        `when_all` is awaited.
406  

406  

407  
    @return A task yielding a tuple of results in input order. Void tasks
407  
    @return A task yielding a tuple of results in input order. Void tasks
408  
        contribute std::monostate to preserve index correspondence.
408  
        contribute std::monostate to preserve index correspondence.
409  

409  

410  
    @par Example
410  
    @par Example
411  

411  

412  
    @code
412  
    @code
413  
    task<> example()
413  
    task<> example()
414  
    {
414  
    {
415  
        // Concurrent fetch, results collected in order
415  
        // Concurrent fetch, results collected in order
416  
        auto [user, posts] = co_await when_all(
416  
        auto [user, posts] = co_await when_all(
417  
            fetch_user( id ),      // task<User>
417  
            fetch_user( id ),      // task<User>
418  
            fetch_posts( id )      // task<std::vector<Post>>
418  
            fetch_posts( id )      // task<std::vector<Post>>
419  
        );
419  
        );
420  

420  

421  
        // Void awaitables contribute monostate
421  
        // Void awaitables contribute monostate
422  
        auto [a, _, b] = co_await when_all(
422  
        auto [a, _, b] = co_await when_all(
423  
            fetch_int(),           // task<int>
423  
            fetch_int(),           // task<int>
424  
            log_event( "start" ),  // task<void>  → monostate
424  
            log_event( "start" ),  // task<void>  → monostate
425  
            fetch_str()            // task<string>
425  
            fetch_str()            // task<string>
426  
        );
426  
        );
427  
        // a is int, _ is monostate, b is string
427  
        // a is int, _ is monostate, b is string
428  
    }
428  
    }
429  
    @endcode
429  
    @endcode
430  

430  

431  
    @see IoAwaitable, task
431  
    @see IoAwaitable, task
432  
*/
432  
*/
433  
template<IoAwaitable... As>
433  
template<IoAwaitable... As>
434  
[[nodiscard]] auto when_all(As... awaitables)
434  
[[nodiscard]] auto when_all(As... awaitables)
435  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
435  
    -> task<when_all_result_t<awaitable_result_t<As>...>>
436  
{
436  
{
437  
    // State is stored in the coroutine frame, using the frame allocator
437  
    // State is stored in the coroutine frame, using the frame allocator
438  
    detail::when_all_state<awaitable_result_t<As>...> state;
438  
    detail::when_all_state<awaitable_result_t<As>...> state;
439  

439  

440  
    // Store awaitables in the frame
440  
    // Store awaitables in the frame
441  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
441  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
442  

442  

443  
    // Launch all awaitables and wait for completion
443  
    // Launch all awaitables and wait for completion
444  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
444  
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
445  

445  

446  
    // Propagate first exception if any.
446  
    // Propagate first exception if any.
447  
    // Safe without explicit acquire: capture_exception() is sequenced-before
447  
    // Safe without explicit acquire: capture_exception() is sequenced-before
448  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
448  
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
449  
    // last task's decrement that resumes this coroutine.
449  
    // last task's decrement that resumes this coroutine.
450  
    if(state.first_exception_)
450  
    if(state.first_exception_)
451  
        std::rethrow_exception(state.first_exception_);
451  
        std::rethrow_exception(state.first_exception_);
452  

452  

453  
    co_return detail::extract_results(state);
453  
    co_return detail::extract_results(state);
454  
}
454  
}
455  

455  

456  
} // namespace capy
456  
} // namespace capy
457  
} // namespace boost
457  
} // namespace boost
458  

458  

459  
#endif
459  
#endif