1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14 -
#include <boost/capy/detail/void_to_monostate.hpp>
14 +
#include <boost/capy/detail/io_result_combinators.hpp>
15  
#include <boost/capy/concept/executor.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <coroutine>
17  
#include <coroutine>
18  
#include <boost/capy/ex/io_env.hpp>
18  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/task.hpp>
20  
#include <boost/capy/task.hpp>
21  

21  

22  
#include <array>
22  
#include <array>
23  
#include <atomic>
23  
#include <atomic>
24  
#include <exception>
24  
#include <exception>
25  
#include <optional>
25  
#include <optional>
26  
#include <ranges>
26  
#include <ranges>
27  
#include <stdexcept>
27  
#include <stdexcept>
28  
#include <stop_token>
28  
#include <stop_token>
29  
#include <tuple>
29  
#include <tuple>
30  
#include <type_traits>
30  
#include <type_traits>
31  
#include <utility>
31  
#include <utility>
32  
#include <vector>
32  
#include <vector>
33  

33  

34  
namespace boost {
34  
namespace boost {
35  
namespace capy {
35  
namespace capy {
36  

36  

37  
namespace detail {
37  
namespace detail {
38  

38  

39  
/** Holds the result of a single task within when_all.
39  
/** Holds the result of a single task within when_all.
40  
*/
40  
*/
41  
template<typename T>
41  
template<typename T>
42  
struct result_holder
42  
struct result_holder
43  
{
43  
{
44  
    std::optional<T> value_;
44  
    std::optional<T> value_;
45  

45  

46  
    void set(T v)
46  
    void set(T v)
47  
    {
47  
    {
48  
        value_ = std::move(v);
48  
        value_ = std::move(v);
49  
    }
49  
    }
50  

50  

51  
    T get() &&
51  
    T get() &&
52  
    {
52  
    {
53  
        return std::move(*value_);
53  
        return std::move(*value_);
54  
    }
54  
    }
55  
};
55  
};
56 -
/** Specialization for void tasks - returns monostate to preserve index mapping.
 
57 -
*/
 
58 -
template<>
 
59 -
struct result_holder<void>
 
60 -
{
 
61 -
    std::monostate get() && { return {}; }
 
62 -
};
 
63 -

 
64  

56  

65  
/** Core shared state for when_all operations.
57  
/** Core shared state for when_all operations.
66  

58  

67  
    Contains all members and methods common to both heterogeneous (variadic)
59  
    Contains all members and methods common to both heterogeneous (variadic)
68  
    and homogeneous (range) when_all implementations. State classes embed
60  
    and homogeneous (range) when_all implementations. State classes embed
69  
    this via composition to avoid CRTP destructor ordering issues.
61  
    this via composition to avoid CRTP destructor ordering issues.
70  

62  

71  
    @par Thread Safety
63  
    @par Thread Safety
72  
    Atomic operations protect exception capture and completion count.
64  
    Atomic operations protect exception capture and completion count.
73  
*/
65  
*/
74  
struct when_all_core
66  
struct when_all_core
75  
{
67  
{
76  
    std::atomic<std::size_t> remaining_count_;
68  
    std::atomic<std::size_t> remaining_count_;
77  

69  

78  
    // Exception storage - first error wins, others discarded
70  
    // Exception storage - first error wins, others discarded
79  
    std::atomic<bool> has_exception_{false};
71  
    std::atomic<bool> has_exception_{false};
80  
    std::exception_ptr first_exception_;
72  
    std::exception_ptr first_exception_;
81  

73  

82  
    std::stop_source stop_source_;
74  
    std::stop_source stop_source_;
83  

75  

84  
    // Bridges parent's stop token to our stop_source
76  
    // Bridges parent's stop token to our stop_source
85  
    struct stop_callback_fn
77  
    struct stop_callback_fn
86  
    {
78  
    {
87  
        std::stop_source* source_;
79  
        std::stop_source* source_;
88  
        void operator()() const { source_->request_stop(); }
80  
        void operator()() const { source_->request_stop(); }
89  
    };
81  
    };
90  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
82  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
91  
    std::optional<stop_callback_t> parent_stop_callback_;
83  
    std::optional<stop_callback_t> parent_stop_callback_;
92  

84  

93  
    std::coroutine_handle<> continuation_;
85  
    std::coroutine_handle<> continuation_;
94  
    io_env const* caller_env_ = nullptr;
86  
    io_env const* caller_env_ = nullptr;
95  

87  

96  
    explicit when_all_core(std::size_t count) noexcept
88  
    explicit when_all_core(std::size_t count) noexcept
97  
        : remaining_count_(count)
89  
        : remaining_count_(count)
98  
    {
90  
    {
99  
    }
91  
    }
100  

92  

101  
    /** Capture an exception (first one wins). */
93  
    /** Capture an exception (first one wins). */
102  
    void capture_exception(std::exception_ptr ep)
94  
    void capture_exception(std::exception_ptr ep)
103  
    {
95  
    {
104  
        bool expected = false;
96  
        bool expected = false;
105  
        if(has_exception_.compare_exchange_strong(
97  
        if(has_exception_.compare_exchange_strong(
106  
            expected, true, std::memory_order_relaxed))
98  
            expected, true, std::memory_order_relaxed))
107  
            first_exception_ = ep;
99  
            first_exception_ = ep;
108  
    }
100  
    }
109  
};
101  
};
110  

102  

111  
/** Shared state for heterogeneous when_all (variadic overload).
103  
/** Shared state for heterogeneous when_all (variadic overload).
112  

104  

113  
    @tparam Ts The result types of the tasks.
105  
    @tparam Ts The result types of the tasks.
114  
*/
106  
*/
115  
template<typename... Ts>
107  
template<typename... Ts>
116  
struct when_all_state
108  
struct when_all_state
117  
{
109  
{
118  
    static constexpr std::size_t task_count = sizeof...(Ts);
110  
    static constexpr std::size_t task_count = sizeof...(Ts);
119  

111  

120  
    when_all_core core_;
112  
    when_all_core core_;
121  
    std::tuple<result_holder<Ts>...> results_;
113  
    std::tuple<result_holder<Ts>...> results_;
122  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
114  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
123  

115  

 
116 +
    std::atomic<bool> has_error_{false};
 
117 +
    std::error_code first_error_;
 
118 +

124  
    when_all_state()
119  
    when_all_state()
125  
        : core_(task_count)
120  
        : core_(task_count)
126  
    {
121  
    {
127  
    }
122  
    }
 
123 +

 
124 +
    /** Record the first error (subsequent errors are discarded). */
 
125 +
    void record_error(std::error_code ec)
 
126 +
    {
 
127 +
        bool expected = false;
 
128 +
        if(has_error_.compare_exchange_strong(
 
129 +
            expected, true, std::memory_order_relaxed))
 
130 +
            first_error_ = ec;
 
131 +
    }
128  
};
132  
};
129  

133  

130  
/** Shared state for homogeneous when_all (range overload).
134  
/** Shared state for homogeneous when_all (range overload).
131  

135  

132 -
    Stores all results in a vector indexed by task position.
136 +
    Stores extracted io_result payloads in a vector indexed by task
 
137 +
    position. Tracks the first error_code for error propagation.
133  

138  

134 -
    @tparam T The common result type of all tasks.
139 +
    @tparam T The payload type extracted from io_result.
135  
*/
140  
*/
136  
template<typename T>
141  
template<typename T>
137  
struct when_all_homogeneous_state
142  
struct when_all_homogeneous_state
138  
{
143  
{
139  
    when_all_core core_;
144  
    when_all_core core_;
140  
    std::vector<std::optional<T>> results_;
145  
    std::vector<std::optional<T>> results_;
141  
    std::vector<std::coroutine_handle<>> runner_handles_;
146  
    std::vector<std::coroutine_handle<>> runner_handles_;
142  

147  

 
148 +
    std::atomic<bool> has_error_{false};
 
149 +
    std::error_code first_error_;
 
150 +

143  
    explicit when_all_homogeneous_state(std::size_t count)
151  
    explicit when_all_homogeneous_state(std::size_t count)
144  
        : core_(count)
152  
        : core_(count)
145  
        , results_(count)
153  
        , results_(count)
146  
        , runner_handles_(count)
154  
        , runner_handles_(count)
147  
    {
155  
    {
148  
    }
156  
    }
149  

157  

150  
    void set_result(std::size_t index, T value)
158  
    void set_result(std::size_t index, T value)
151  
    {
159  
    {
152  
        results_[index].emplace(std::move(value));
160  
        results_[index].emplace(std::move(value));
153  
    }
161  
    }
 
162 +

 
163 +
    /** Record the first error (subsequent errors are discarded). */
 
164 +
    void record_error(std::error_code ec)
 
165 +
    {
 
166 +
        bool expected = false;
 
167 +
        if(has_error_.compare_exchange_strong(
 
168 +
            expected, true, std::memory_order_relaxed))
 
169 +
            first_error_ = ec;
 
170 +
    }
154  
};
171  
};
155  

172  

156 -
/** Specialization for void tasks (no result storage). */
173 +
/** Specialization for void io_result children (no payload storage). */
157  
template<>
174  
template<>
158 -
struct when_all_homogeneous_state<void>
175 +
struct when_all_homogeneous_state<std::tuple<>>
159  
{
176  
{
160  
    when_all_core core_;
177  
    when_all_core core_;
161  
    std::vector<std::coroutine_handle<>> runner_handles_;
178  
    std::vector<std::coroutine_handle<>> runner_handles_;
162  

179  

 
180 +
    std::atomic<bool> has_error_{false};
 
181 +
    std::error_code first_error_;
 
182 +

163  
    explicit when_all_homogeneous_state(std::size_t count)
183  
    explicit when_all_homogeneous_state(std::size_t count)
164  
        : core_(count)
184  
        : core_(count)
165  
        , runner_handles_(count)
185  
        , runner_handles_(count)
166  
    {
186  
    {
167  
    }
187  
    }
 
188 +

 
189 +
    /** Record the first error (subsequent errors are discarded). */
 
190 +
    void record_error(std::error_code ec)
 
191 +
    {
 
192 +
        bool expected = false;
 
193 +
        if(has_error_.compare_exchange_strong(
 
194 +
            expected, true, std::memory_order_relaxed))
 
195 +
            first_error_ = ec;
 
196 +
    }
168  
};
197  
};
169  

198  

170  
/** Wrapper coroutine that intercepts task completion for when_all.
199  
/** Wrapper coroutine that intercepts task completion for when_all.
171  

200  

172  
    Parameterized on StateType to work with both heterogeneous (variadic)
201  
    Parameterized on StateType to work with both heterogeneous (variadic)
173  
    and homogeneous (range) state types. All state types expose their
202  
    and homogeneous (range) state types. All state types expose their
174  
    shared members through a `core_` member of type when_all_core.
203  
    shared members through a `core_` member of type when_all_core.
175  

204  

176  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
205  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
177  
*/
206  
*/
178  
template<typename StateType>
207  
template<typename StateType>
179  
struct when_all_runner
208  
struct when_all_runner
180  
{
209  
{
181  
    struct promise_type
210  
    struct promise_type
182  
    {
211  
    {
183  
        StateType* state_ = nullptr;
212  
        StateType* state_ = nullptr;
184  
        std::size_t index_ = 0;
213  
        std::size_t index_ = 0;
185  
        io_env env_;
214  
        io_env env_;
186  

215  

187  
        when_all_runner get_return_object() noexcept
216  
        when_all_runner get_return_object() noexcept
188  
        {
217  
        {
189  
            return when_all_runner(
218  
            return when_all_runner(
190  
                std::coroutine_handle<promise_type>::from_promise(*this));
219  
                std::coroutine_handle<promise_type>::from_promise(*this));
191  
        }
220  
        }
192  

221  

193  
        std::suspend_always initial_suspend() noexcept
222  
        std::suspend_always initial_suspend() noexcept
194  
        {
223  
        {
195  
            return {};
224  
            return {};
196  
        }
225  
        }
197  

226  

198  
        auto final_suspend() noexcept
227  
        auto final_suspend() noexcept
199  
        {
228  
        {
200  
            struct awaiter
229  
            struct awaiter
201  
            {
230  
            {
202  
                promise_type* p_;
231  
                promise_type* p_;
203  
                bool await_ready() const noexcept { return false; }
232  
                bool await_ready() const noexcept { return false; }
204  
                auto await_suspend(std::coroutine_handle<> h) noexcept
233  
                auto await_suspend(std::coroutine_handle<> h) noexcept
205  
                {
234  
                {
206  
                    auto& core = p_->state_->core_;
235  
                    auto& core = p_->state_->core_;
207  
                    auto* counter = &core.remaining_count_;
236  
                    auto* counter = &core.remaining_count_;
208  
                    auto* caller_env = core.caller_env_;
237  
                    auto* caller_env = core.caller_env_;
209  
                    auto cont = core.continuation_;
238  
                    auto cont = core.continuation_;
210  

239  

211  
                    h.destroy();
240  
                    h.destroy();
212  

241  

213  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
242  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
214  
                    if(remaining == 1)
243  
                    if(remaining == 1)
215  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
244  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
216  
                    return detail::symmetric_transfer(std::noop_coroutine());
245  
                    return detail::symmetric_transfer(std::noop_coroutine());
217  
                }
246  
                }
218  
                void await_resume() const noexcept {}
247  
                void await_resume() const noexcept {}
219  
            };
248  
            };
220  
            return awaiter{this};
249  
            return awaiter{this};
221  
        }
250  
        }
222  

251  

223  
        void return_void() noexcept {}
252  
        void return_void() noexcept {}
224  

253  

225  
        void unhandled_exception()
254  
        void unhandled_exception()
226  
        {
255  
        {
227  
            state_->core_.capture_exception(std::current_exception());
256  
            state_->core_.capture_exception(std::current_exception());
228  
            state_->core_.stop_source_.request_stop();
257  
            state_->core_.stop_source_.request_stop();
229  
        }
258  
        }
230  

259  

231  
        template<class Awaitable>
260  
        template<class Awaitable>
232  
        struct transform_awaiter
261  
        struct transform_awaiter
233  
        {
262  
        {
234  
            std::decay_t<Awaitable> a_;
263  
            std::decay_t<Awaitable> a_;
235  
            promise_type* p_;
264  
            promise_type* p_;
236  

265  

237  
            bool await_ready() { return a_.await_ready(); }
266  
            bool await_ready() { return a_.await_ready(); }
238  
            decltype(auto) await_resume() { return a_.await_resume(); }
267  
            decltype(auto) await_resume() { return a_.await_resume(); }
239  

268  

240  
            template<class Promise>
269  
            template<class Promise>
241  
            auto await_suspend(std::coroutine_handle<Promise> h)
270  
            auto await_suspend(std::coroutine_handle<Promise> h)
242  
            {
271  
            {
243  
                using R = decltype(a_.await_suspend(h, &p_->env_));
272  
                using R = decltype(a_.await_suspend(h, &p_->env_));
244  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
273  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
245  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
274  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
246  
                else
275  
                else
247  
                    return a_.await_suspend(h, &p_->env_);
276  
                    return a_.await_suspend(h, &p_->env_);
248  
            }
277  
            }
249  
        };
278  
        };
250  

279  

251  
        template<class Awaitable>
280  
        template<class Awaitable>
252  
        auto await_transform(Awaitable&& a)
281  
        auto await_transform(Awaitable&& a)
253  
        {
282  
        {
254  
            using A = std::decay_t<Awaitable>;
283  
            using A = std::decay_t<Awaitable>;
255  
            if constexpr (IoAwaitable<A>)
284  
            if constexpr (IoAwaitable<A>)
256  
            {
285  
            {
257  
                return transform_awaiter<Awaitable>{
286  
                return transform_awaiter<Awaitable>{
258  
                    std::forward<Awaitable>(a), this};
287  
                    std::forward<Awaitable>(a), this};
259  
            }
288  
            }
260  
            else
289  
            else
261  
            {
290  
            {
262  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
291  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
263  
            }
292  
            }
264  
        }
293  
        }
265  
    };
294  
    };
266  

295  

267  
    std::coroutine_handle<promise_type> h_;
296  
    std::coroutine_handle<promise_type> h_;
268  

297  

269  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
298  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
270  
        : h_(h)
299  
        : h_(h)
271  
    {
300  
    {
272  
    }
301  
    }
273  

302  

274  
    // Enable move for all clang versions - some versions need it
303  
    // Enable move for all clang versions - some versions need it
275  
    when_all_runner(when_all_runner&& other) noexcept
304  
    when_all_runner(when_all_runner&& other) noexcept
276  
        : h_(std::exchange(other.h_, nullptr))
305  
        : h_(std::exchange(other.h_, nullptr))
277  
    {
306  
    {
278  
    }
307  
    }
279  

308  

280  
    when_all_runner(when_all_runner const&) = delete;
309  
    when_all_runner(when_all_runner const&) = delete;
281  
    when_all_runner& operator=(when_all_runner const&) = delete;
310  
    when_all_runner& operator=(when_all_runner const&) = delete;
282  
    when_all_runner& operator=(when_all_runner&&) = delete;
311  
    when_all_runner& operator=(when_all_runner&&) = delete;
283  

312  

284  
    auto release() noexcept
313  
    auto release() noexcept
285  
    {
314  
    {
286  
        return std::exchange(h_, nullptr);
315  
        return std::exchange(h_, nullptr);
287  
    }
316  
    }
288  
};
317  
};
289  

318  

290 -
/** Create a runner coroutine for a single awaitable (variadic path).
319 +
/** Create an io_result-aware runner for a single awaitable (range path).
291  

320  

292 -
    Uses compile-time index for tuple-based result storage.
321 +
    Checks the error code, records errors and requests stop on failure,
 
322 +
    or extracts the payload on success.
293  
*/
323  
*/
294 -
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
324 +
template<IoAwaitable Awaitable, typename StateType>
295 -
when_all_runner<when_all_state<Ts...>>
325 +
when_all_runner<StateType>
296 -
make_when_all_runner(Awaitable inner, when_all_state<Ts...>* state)
326 +
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
297  
{
327  
{
298 -
    using T = awaitable_result_t<Awaitable>;
328 +
    auto result = co_await std::move(inner);
299 -
    if constexpr (std::is_void_v<T>)
329 +

 
330 +
    if(result.ec)
300  
    {
331  
    {
301 -
        co_await std::move(inner);
332 +
        state->record_error(result.ec);
 
333 +
        state->core_.stop_source_.request_stop();
302  
    }
334  
    }
303  
    else
335  
    else
304  
    {
336  
    {
305 -
        std::get<Index>(state->results_).set(co_await std::move(inner));
337 +
        using PayloadT = io_result_payload_t<
 
338 +
            awaitable_result_t<Awaitable>>;
 
339 +
        if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
 
340 +
        {
 
341 +
            state->set_result(index,
 
342 +
                extract_io_payload(std::move(result)));
 
343 +
        }
306  
    }
344  
    }
307  
}
345  
}
308  

346  

309 -
/** Create a runner coroutine for a single awaitable (range path).
347 +
/** Create a runner for io_result children that requests stop on ec. */
310 -

348 +
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
311 -
    Uses runtime index for vector-based result storage.
349 +
when_all_runner<when_all_state<Ts...>>
312 -
*/
350 +
make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
313 -
template<IoAwaitable Awaitable, typename StateType>
 
314 -
when_all_runner<StateType>
 
315 -
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
 
316  
{
351  
{
317 -
    using T = awaitable_result_t<Awaitable>;
352 +
    auto result = co_await std::move(inner);
318 -
    if constexpr (std::is_void_v<T>)
353 +
    auto ec = result.ec;
319 -
    {
354 +
    std::get<Index>(state->results_).set(std::move(result));
320 -
        co_await std::move(inner);
355 +

321 -
    }
356 +
    if(ec)
322 -
    else
 
323  
    {
357  
    {
324 -
        state->set_result(index, co_await std::move(inner));
358 +
        state->record_error(ec);
 
359 +
        state->core_.stop_source_.request_stop();
325  
    }
360  
    }
326  
}
361  
}
327  

362  

328 -
/** Internal awaitable that launches all variadic runner coroutines.
363 +
/** Launcher that uses io_result-aware runners. */
329 -

 
330 -
    CRITICAL: If the last task finishes synchronously then the parent
 
331 -
    coroutine resumes, destroying its frame, and destroying this object
 
332 -
    prior to the completion of await_suspend. Therefore, await_suspend
 
333 -
    must ensure `this` cannot be referenced after calling `launch_one`
 
334 -
    for the last time.
 
335 -
*/
 
336  
template<IoAwaitable... Awaitables>
364  
template<IoAwaitable... Awaitables>
337 -
class when_all_launcher
365 +
class when_all_io_launcher
338  
{
366  
{
339  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
367  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
340  

368  

341  
    std::tuple<Awaitables...>* awaitables_;
369  
    std::tuple<Awaitables...>* awaitables_;
342  
    state_type* state_;
370  
    state_type* state_;
343  

371  

344  
public:
372  
public:
345 -
    when_all_launcher(
373 +
    when_all_io_launcher(
346  
        std::tuple<Awaitables...>* awaitables,
374  
        std::tuple<Awaitables...>* awaitables,
347  
        state_type* state)
375  
        state_type* state)
348  
        : awaitables_(awaitables)
376  
        : awaitables_(awaitables)
349  
        , state_(state)
377  
        , state_(state)
350  
    {
378  
    {
351  
    }
379  
    }
352  

380  

353  
    bool await_ready() const noexcept
381  
    bool await_ready() const noexcept
354  
    {
382  
    {
355  
        return sizeof...(Awaitables) == 0;
383  
        return sizeof...(Awaitables) == 0;
356  
    }
384  
    }
357  

385  

358 -
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
386 +
    std::coroutine_handle<> await_suspend(
 
387 +
        std::coroutine_handle<> continuation, io_env const* caller_env)
359  
    {
388  
    {
360  
        state_->core_.continuation_ = continuation;
389  
        state_->core_.continuation_ = continuation;
361  
        state_->core_.caller_env_ = caller_env;
390  
        state_->core_.caller_env_ = caller_env;
362  

391  

363  
        if(caller_env->stop_token.stop_possible())
392  
        if(caller_env->stop_token.stop_possible())
364  
        {
393  
        {
365  
            state_->core_.parent_stop_callback_.emplace(
394  
            state_->core_.parent_stop_callback_.emplace(
366  
                caller_env->stop_token,
395  
                caller_env->stop_token,
367  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
396  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
368  

397  

369  
            if(caller_env->stop_token.stop_requested())
398  
            if(caller_env->stop_token.stop_requested())
370  
                state_->core_.stop_source_.request_stop();
399  
                state_->core_.stop_source_.request_stop();
371  
        }
400  
        }
372  

401  

373  
        auto token = state_->core_.stop_source_.get_token();
402  
        auto token = state_->core_.stop_source_.get_token();
374  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
403  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
375  
            (..., launch_one<Is>(caller_env->executor, token));
404  
            (..., launch_one<Is>(caller_env->executor, token));
376  
        }(std::index_sequence_for<Awaitables...>{});
405  
        }(std::index_sequence_for<Awaitables...>{});
377  

406  

378  
        return std::noop_coroutine();
407  
        return std::noop_coroutine();
379  
    }
408  
    }
380  

409  

381 -
    void await_resume() const noexcept
410 +
    void await_resume() const noexcept {}
382 -
    {
 
383 -
    }
 
384  

411  

385  
private:
412  
private:
386  
    template<std::size_t I>
413  
    template<std::size_t I>
387  
    void launch_one(executor_ref caller_ex, std::stop_token token)
414  
    void launch_one(executor_ref caller_ex, std::stop_token token)
388  
    {
415  
    {
389 -
        auto runner = make_when_all_runner<I>(
416 +
        auto runner = make_when_all_io_runner<I>(
390  
            std::move(std::get<I>(*awaitables_)), state_);
417  
            std::move(std::get<I>(*awaitables_)), state_);
391  

418  

392  
        auto h = runner.release();
419  
        auto h = runner.release();
393  
        h.promise().state_ = state_;
420  
        h.promise().state_ = state_;
394 -
        h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
421 +
        h.promise().env_ = io_env{caller_ex, token,
 
422 +
            state_->core_.caller_env_->frame_allocator};
395  

423  

396  
        std::coroutine_handle<> ch{h};
424  
        std::coroutine_handle<> ch{h};
397  
        state_->runner_handles_[I] = ch;
425  
        state_->runner_handles_[I] = ch;
398  
        state_->core_.caller_env_->executor.post(ch);
426  
        state_->core_.caller_env_->executor.post(ch);
399  
    }
427  
    }
400  
};
428  
};
401  

429  

402  
/** Helper to extract a single result from state.
430  
/** Helper to extract a single result from state.
403  
    This is a separate function to work around a GCC-11 ICE that occurs
431  
    This is a separate function to work around a GCC-11 ICE that occurs
404  
    when using nested immediately-invoked lambdas with pack expansion.
432  
    when using nested immediately-invoked lambdas with pack expansion.
405  
*/
433  
*/
406  
template<std::size_t I, typename... Ts>
434  
template<std::size_t I, typename... Ts>
407  
auto extract_single_result(when_all_state<Ts...>& state)
435  
auto extract_single_result(when_all_state<Ts...>& state)
408  
{
436  
{
409  
    return std::move(std::get<I>(state.results_)).get();
437  
    return std::move(std::get<I>(state.results_)).get();
410  
}
438  
}
411  

439  

412  
/** Extract all results from state as a tuple.
440  
/** Extract all results from state as a tuple.
413  
*/
441  
*/
414  
template<typename... Ts>
442  
template<typename... Ts>
415  
auto extract_results(when_all_state<Ts...>& state)
443  
auto extract_results(when_all_state<Ts...>& state)
416  
{
444  
{
417  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
445  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
418  
        return std::tuple(extract_single_result<Is>(state)...);
446  
        return std::tuple(extract_single_result<Is>(state)...);
419  
    }(std::index_sequence_for<Ts...>{});
447  
    }(std::index_sequence_for<Ts...>{});
420  
}
448  
}
421  

449  

422  
/** Launches all homogeneous runners concurrently.
450  
/** Launches all homogeneous runners concurrently.
423  

451  

424  
    Two-phase approach: create all runners first, then post all.
452  
    Two-phase approach: create all runners first, then post all.
425  
    This avoids lifetime issues if a task completes synchronously.
453  
    This avoids lifetime issues if a task completes synchronously.
426  
*/
454  
*/
427  
template<typename Range>
455  
template<typename Range>
428  
class when_all_homogeneous_launcher
456  
class when_all_homogeneous_launcher
429  
{
457  
{
430  
    using Awaitable = std::ranges::range_value_t<Range>;
458  
    using Awaitable = std::ranges::range_value_t<Range>;
431 -
    using T = awaitable_result_t<Awaitable>;
459 +
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
432  

460  

433  
    Range* range_;
461  
    Range* range_;
434 -
    when_all_homogeneous_state<T>* state_;
462 +
    when_all_homogeneous_state<PayloadT>* state_;
435  

463  

436  
public:
464  
public:
437  
    when_all_homogeneous_launcher(
465  
    when_all_homogeneous_launcher(
438  
        Range* range,
466  
        Range* range,
439 -
        when_all_homogeneous_state<T>* state)
467 +
        when_all_homogeneous_state<PayloadT>* state)
440  
        : range_(range)
468  
        : range_(range)
441  
        , state_(state)
469  
        , state_(state)
442  
    {
470  
    {
443  
    }
471  
    }
444  

472  

445  
    bool await_ready() const noexcept
473  
    bool await_ready() const noexcept
446  
    {
474  
    {
447  
        return std::ranges::empty(*range_);
475  
        return std::ranges::empty(*range_);
448  
    }
476  
    }
449  

477  

450  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
478  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
451  
    {
479  
    {
452  
        state_->core_.continuation_ = continuation;
480  
        state_->core_.continuation_ = continuation;
453  
        state_->core_.caller_env_ = caller_env;
481  
        state_->core_.caller_env_ = caller_env;
454  

482  

455  
        if(caller_env->stop_token.stop_possible())
483  
        if(caller_env->stop_token.stop_possible())
456  
        {
484  
        {
457  
            state_->core_.parent_stop_callback_.emplace(
485  
            state_->core_.parent_stop_callback_.emplace(
458  
                caller_env->stop_token,
486  
                caller_env->stop_token,
459  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
487  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
460  

488  

461  
            if(caller_env->stop_token.stop_requested())
489  
            if(caller_env->stop_token.stop_requested())
462  
                state_->core_.stop_source_.request_stop();
490  
                state_->core_.stop_source_.request_stop();
463  
        }
491  
        }
464  

492  

465  
        auto token = state_->core_.stop_source_.get_token();
493  
        auto token = state_->core_.stop_source_.get_token();
466  

494  

467  
        // Phase 1: Create all runners without dispatching.
495  
        // Phase 1: Create all runners without dispatching.
468  
        std::size_t index = 0;
496  
        std::size_t index = 0;
469  
        for(auto&& a : *range_)
497  
        for(auto&& a : *range_)
470  
        {
498  
        {
471  
            auto runner = make_when_all_homogeneous_runner(
499  
            auto runner = make_when_all_homogeneous_runner(
472  
                std::move(a), state_, index);
500  
                std::move(a), state_, index);
473  

501  

474  
            auto h = runner.release();
502  
            auto h = runner.release();
475  
            h.promise().state_ = state_;
503  
            h.promise().state_ = state_;
476  
            h.promise().index_ = index;
504  
            h.promise().index_ = index;
477  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
505  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
478  

506  

479  
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
507  
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
480  
            ++index;
508  
            ++index;
481  
        }
509  
        }
482  

510  

483  
        // Phase 2: Post all runners. Any may complete synchronously.
511  
        // Phase 2: Post all runners. Any may complete synchronously.
484  
        // After last post, state_ and this may be destroyed.
512  
        // After last post, state_ and this may be destroyed.
485  
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
513  
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
486  
        std::size_t count = state_->runner_handles_.size();
514  
        std::size_t count = state_->runner_handles_.size();
487  
        for(std::size_t i = 0; i < count; ++i)
515  
        for(std::size_t i = 0; i < count; ++i)
488  
            caller_env->executor.post(handles[i]);
516  
            caller_env->executor.post(handles[i]);
489  

517  

490  
        return std::noop_coroutine();
518  
        return std::noop_coroutine();
491  
    }
519  
    }
492  

520  

493  
    void await_resume() const noexcept
521  
    void await_resume() const noexcept
494  
    {
522  
    {
495  
    }
523  
    }
496  
};
524  
};
497  

525  

498  
} // namespace detail
526  
} // namespace detail
499  

527  

500 -
/** Compute the when_all result tuple type.
528 +
/** Execute a range of io_result-returning awaitables concurrently.
501 -

 
502 -
    Void-returning tasks contribute std::monostate to preserve the
 
503 -
    task-index-to-result-index mapping, matching when_any's approach.
 
504 -

 
505 -
    Example: when_all_result_t<int, void, string> = std::tuple<int, std::monostate, string>
 
506 -
    Example: when_all_result_t<void, void> = std::tuple<std::monostate, std::monostate>
 
507 -
*/
 
508 -
template<typename... Ts>
 
509 -
using when_all_result_t = std::tuple<void_to_monostate_t<Ts>...>;
 
510 -

 
511 -
/** Execute multiple awaitables concurrently and collect their results.
 
512 -

 
513 -
    Launches all awaitables simultaneously and waits for all to complete
 
514 -
    before returning. Results are collected in input order. If any
 
515 -
    awaitable throws, cancellation is requested for siblings and the first
 
516 -
    exception is rethrown after all awaitables complete.
 
517 -

 
518 -
    @li All child awaitables run concurrently on the caller's executor
 
519 -
    @li Results are returned as a tuple in input order
 
520 -
    @li Void-returning awaitables contribute std::monostate to the
 
521 -
        result tuple, preserving the task-index-to-result-index mapping
 
522 -
    @li First exception wins; subsequent exceptions are discarded
 
523 -
    @li Stop is requested for siblings on first error
 
524 -
    @li Completes only after all children have finished
 
525 -

 
526 -
    @par Thread Safety
 
527 -
    The returned task must be awaited from a single execution context.
 
528 -
    Child awaitables execute concurrently but complete through the caller's
 
529 -
    executor.
 
530 -

 
531 -
    @param awaitables The awaitables to execute concurrently. Each must
 
532 -
        satisfy @ref IoAwaitable and is consumed (moved-from) when
 
533 -
        `when_all` is awaited.
 
534 -

 
535 -
    @return A task yielding a tuple of results in input order. Void tasks
 
536 -
        contribute std::monostate to preserve index correspondence.
 
537 -

 
538 -
    @par Example
 
539 -

 
540 -
    @code
 
541 -
    task<> example()
 
542 -
    {
 
543 -
        // Concurrent fetch, results collected in order
 
544 -
        auto [user, posts] = co_await when_all(
 
545 -
            fetch_user( id ),      // task<User>
 
546 -
            fetch_posts( id )      // task<std::vector<Post>>
 
547 -
        );
 
548 -

 
549 -
        // Void awaitables contribute monostate
 
550 -
        auto [a, _, b] = co_await when_all(
 
551 -
            fetch_int(),           // task<int>
 
552 -
            log_event( "start" ),  // task<void>  → monostate
 
553 -
            fetch_str()            // task<string>
 
554 -
        );
 
555 -
        // a is int, _ is monostate, b is string
 
556 -
    }
 
557 -
    @endcode
 
558 -

 
559 -
    @see IoAwaitable, task
 
560 -
*/
 
561 -
template<IoAwaitable... As>
 
562 -
[[nodiscard]] auto when_all(As... awaitables)
 
563 -
    -> task<when_all_result_t<awaitable_result_t<As>...>>
 
564 -
{
 
565 -
    // State is stored in the coroutine frame, using the frame allocator
 
566 -
    detail::when_all_state<awaitable_result_t<As>...> state;
 
567 -

 
568 -
    // Store awaitables in the frame
 
569 -
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
 
570 -

 
571 -
    // Launch all awaitables and wait for completion
 
572 -
    co_await detail::when_all_launcher<As...>(&awaitable_tuple, &state);
 
573 -

 
574 -
    // Propagate first exception if any.
 
575 -
    // Safe without explicit acquire: capture_exception() is sequenced-before
 
576 -
    // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
 
577 -
    // last task's decrement that resumes this coroutine.
 
578 -
    if(state.core_.first_exception_)
 
579 -
        std::rethrow_exception(state.core_.first_exception_);
 
580 -

 
581 -
    co_return detail::extract_results(state);
 
582 -
}
 
583 -

 
584 -
/** Execute a range of awaitables concurrently and collect their results.
 
585  

529  

586 -
    Launches all awaitables in the range simultaneously and waits for all
530 +
    Launches all awaitables simultaneously and waits for all to complete.
587 -
    to complete. Results are collected in a vector preserving input order.
531 +
    On success, extracted payloads are collected in a vector preserving
588 -
    If any awaitable throws, cancellation is requested for siblings and
532 +
    input order. The first error_code cancels siblings and is propagated
589 -
    the first exception is rethrown after all awaitables complete.
533 +
    in the outer io_result. Exceptions always beat error codes.
590  

534  

591  
    @li All child awaitables run concurrently on the caller's executor
535  
    @li All child awaitables run concurrently on the caller's executor
592 -
    @li Results are returned as a vector in input order
536 +
    @li Payloads are returned as a vector in input order
593 -
    @li First exception wins; subsequent exceptions are discarded
537 +
    @li First error_code wins and cancels siblings
594 -
    @li Stop is requested for siblings on first error
538 +
    @li Exception always beats error_code
595  
    @li Completes only after all children have finished
539  
    @li Completes only after all children have finished
596  

540  

597  
    @par Thread Safety
541  
    @par Thread Safety
598  
    The returned task must be awaited from a single execution context.
542  
    The returned task must be awaited from a single execution context.
599  
    Child awaitables execute concurrently but complete through the caller's
543  
    Child awaitables execute concurrently but complete through the caller's
600  
    executor.
544  
    executor.
601  

545  

602 -
    @param awaitables Range of awaitables to execute concurrently (must
546 +
    @param awaitables Range of io_result-returning awaitables to execute
603 -
        not be empty). Each element must satisfy @ref IoAwaitable and is
547 +
        concurrently (must not be empty).
604 -
        consumed (moved-from) when `when_all` is awaited.
 
605  

548  

606 -
    @return A task yielding a vector where each element is the result of
549 +
    @return A task yielding io_result<vector<PayloadT>> where PayloadT
607 -
        the corresponding awaitable, in input order.
550 +
        is the payload extracted from each child's io_result.
608  

551  

609  
    @throws std::invalid_argument if range is empty (thrown before
552  
    @throws std::invalid_argument if range is empty (thrown before
610  
        coroutine suspends).
553  
        coroutine suspends).
611  
    @throws Rethrows the first child exception after all children
554  
    @throws Rethrows the first child exception after all children
612 -
        complete.
555 +
        complete (exception beats error_code).
613  

556  

614  
    @par Example
557  
    @par Example
615  
    @code
558  
    @code
616  
    task<void> example()
559  
    task<void> example()
617  
    {
560  
    {
618 -
        std::vector<task<Response>> requests;
561 +
        std::vector<io_task<size_t>> reads;
619 -
        for (auto const& url : urls)
562 +
        for (auto& buf : buffers)
620 -
            requests.push_back(fetch(url));
563 +
            reads.push_back(stream.read_some(buf));
621  

564  

622 -
        auto responses = co_await when_all(std::move(requests));
565 +
        auto [ec, counts] = co_await when_all(std::move(reads));
 
566 +
        if (ec) { // handle error
 
567 +
        }
623  
    }
568  
    }
624  
    @endcode
569  
    @endcode
625  

570  

626  
    @see IoAwaitableRange, when_all
571  
    @see IoAwaitableRange, when_all
627  
*/
572  
*/
628  
template<IoAwaitableRange R>
573  
template<IoAwaitableRange R>
629 -
    requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
574 +
    requires detail::is_io_result_v<
 
575 +
        awaitable_result_t<std::ranges::range_value_t<R>>>
 
576 +
    && (!std::is_same_v<
 
577 +
            detail::io_result_payload_t<
 
578 +
                awaitable_result_t<std::ranges::range_value_t<R>>>,
 
579 +
            std::tuple<>>)
630  
[[nodiscard]] auto when_all(R&& awaitables)
580  
[[nodiscard]] auto when_all(R&& awaitables)
631 -
    -> task<std::vector<awaitable_result_t<std::ranges::range_value_t<R>>>>
581 +
    -> task<io_result<std::vector<
 
582 +
        detail::io_result_payload_t<
 
583 +
            awaitable_result_t<std::ranges::range_value_t<R>>>>>>
632  
{
584  
{
633  
    using Awaitable = std::ranges::range_value_t<R>;
585  
    using Awaitable = std::ranges::range_value_t<R>;
634 -
    using T = awaitable_result_t<Awaitable>;
586 +
    using PayloadT = detail::io_result_payload_t<
 
587 +
        awaitable_result_t<Awaitable>>;
635  
    using OwnedRange = std::remove_cvref_t<R>;
588  
    using OwnedRange = std::remove_cvref_t<R>;
636  

589  

637  
    auto count = std::ranges::size(awaitables);
590  
    auto count = std::ranges::size(awaitables);
638  
    if(count == 0)
591  
    if(count == 0)
639  
        throw std::invalid_argument("when_all requires at least one awaitable");
592  
        throw std::invalid_argument("when_all requires at least one awaitable");
640  

593  

641  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
594  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
642  

595  

643 -
    detail::when_all_homogeneous_state<T> state(count);
596 +
    detail::when_all_homogeneous_state<PayloadT> state(count);
644  

597  

645  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
598  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
646  
        &owned_awaitables, &state);
599  
        &owned_awaitables, &state);
647  

600  

648  
    if(state.core_.first_exception_)
601  
    if(state.core_.first_exception_)
649  
        std::rethrow_exception(state.core_.first_exception_);
602  
        std::rethrow_exception(state.core_.first_exception_);
650  

603  

651 -
    std::vector<T> results;
604 +
    if(state.has_error_.load(std::memory_order_relaxed))
 
605 +
        co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
 
606 +

 
607 +
    std::vector<PayloadT> results;
652  
    results.reserve(count);
608  
    results.reserve(count);
653  
    for(auto& opt : state.results_)
609  
    for(auto& opt : state.results_)
654  
        results.push_back(std::move(*opt));
610  
        results.push_back(std::move(*opt));
655  

611  

656 -
    co_return results;
612 +
    co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
657  
}
613  
}
658  

614  

659 -
/** Execute a range of void awaitables concurrently.
615 +
/** Execute a range of void io_result-returning awaitables concurrently.
660 -

 
661 -
    Launches all awaitables in the range simultaneously and waits for all
 
662 -
    to complete. Since all awaitables return void, no results are collected.
 
663 -
    If any awaitable throws, cancellation is requested for siblings and
 
664 -
    the first exception is rethrown after all awaitables complete.
 
665  

616  

666 -
    @li All child awaitables run concurrently on the caller's executor
617 +
    Launches all awaitables simultaneously and waits for all to complete.
667 -
    @li First exception wins; subsequent exceptions are discarded
618 +
    Since all awaitables return io_result<>, no payload values are
668 -
    @li Stop is requested for siblings on first error
619 +
    collected. The first error_code cancels siblings and is propagated.
669 -
    @li Completes only after all children have finished
620 +
    Exceptions always beat error codes.
670  

621  

671 -
    @par Thread Safety
622 +
    @param awaitables Range of io_result<>-returning awaitables to
672 -
    The returned task must be awaited from a single execution context.
623 +
        execute concurrently (must not be empty).
673 -
    Child awaitables execute concurrently but complete through the caller's
 
674 -
    executor.
 
675  

624  

676 -
    @param awaitables Range of void awaitables to execute concurrently
625 +
    @return A task yielding io_result<> whose ec is the first child
677 -
        (must not be empty).
626 +
        error, or default-constructed on success.
678  

627  

679 -
    @throws std::invalid_argument if range is empty (thrown before
628 +
    @throws std::invalid_argument if range is empty.
680 -
        coroutine suspends).
 
681  
    @throws Rethrows the first child exception after all children
629  
    @throws Rethrows the first child exception after all children
682 -
        complete.
630 +
        complete (exception beats error_code).
683  

631  

684  
    @par Example
632  
    @par Example
685  
    @code
633  
    @code
686  
    task<void> example()
634  
    task<void> example()
687  
    {
635  
    {
688 -
        std::vector<task<void>> jobs;
636 +
        std::vector<io_task<>> jobs;
689  
        for (int i = 0; i < n; ++i)
637  
        for (int i = 0; i < n; ++i)
690  
            jobs.push_back(process(i));
638  
            jobs.push_back(process(i));
691  

639  

692 -
        co_await when_all(std::move(jobs));
640 +
        auto [ec] = co_await when_all(std::move(jobs));
693  
    }
641  
    }
694  
    @endcode
642  
    @endcode
695  

643  

696  
    @see IoAwaitableRange, when_all
644  
    @see IoAwaitableRange, when_all
697  
*/
645  
*/
698  
template<IoAwaitableRange R>
646  
template<IoAwaitableRange R>
699 -
    requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
647 +
    requires detail::is_io_result_v<
700 -
[[nodiscard]] auto when_all(R&& awaitables) -> task<void>
648 +
        awaitable_result_t<std::ranges::range_value_t<R>>>
 
649 +
    && std::is_same_v<
 
650 +
            detail::io_result_payload_t<
 
651 +
                awaitable_result_t<std::ranges::range_value_t<R>>>,
 
652 +
            std::tuple<>>
 
653 +
[[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
701  
{
654  
{
702  
    using OwnedRange = std::remove_cvref_t<R>;
655  
    using OwnedRange = std::remove_cvref_t<R>;
703  

656  

704  
    auto count = std::ranges::size(awaitables);
657  
    auto count = std::ranges::size(awaitables);
705  
    if(count == 0)
658  
    if(count == 0)
706  
        throw std::invalid_argument("when_all requires at least one awaitable");
659  
        throw std::invalid_argument("when_all requires at least one awaitable");
707  

660  

708  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
661  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
709  

662  

710 -
    detail::when_all_homogeneous_state<void> state(count);
663 +
    detail::when_all_homogeneous_state<std::tuple<>> state(count);
711  

664  

712  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
665  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
713  
        &owned_awaitables, &state);
666  
        &owned_awaitables, &state);
714  

667  

715  
    if(state.core_.first_exception_)
668  
    if(state.core_.first_exception_)
716  
        std::rethrow_exception(state.core_.first_exception_);
669  
        std::rethrow_exception(state.core_.first_exception_);
 
670 +

 
671 +
    if(state.has_error_.load(std::memory_order_relaxed))
 
672 +
        co_return io_result<>{state.first_error_};
 
673 +

 
674 +
    co_return io_result<>{};
 
675 +
}
 
676 +

 
677 +
/** Execute io_result-returning awaitables concurrently, inspecting error codes.
 
678 +

 
679 +
    Overload selected when all children return io_result<Ts...>.
 
680 +
    The error_code is lifted out of each child into a single outer
 
681 +
    io_result. On success all values are returned; on failure the
 
682 +
    first error_code wins.
 
683 +

 
684 +
    @par Exception Safety
 
685 +
    Exception always beats error_code. If any child throws, the
 
686 +
    exception is rethrown regardless of error_code results.
 
687 +

 
688 +
    @param awaitables One or more awaitables each returning
 
689 +
        io_result<Ts...>.
 
690 +

 
691 +
    @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
 
692 +
        follows the payload flattening rules.
 
693 +
*/
 
694 +
template<IoAwaitable... As>
 
695 +
    requires (sizeof...(As) > 0)
 
696 +
          && detail::all_io_result_awaitables<As...>
 
697 +
[[nodiscard]] auto when_all(As... awaitables)
 
698 +
    -> task<io_result<
 
699 +
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
 
700 +
{
 
701 +
    using result_type = io_result<
 
702 +
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
 
703 +

 
704 +
    detail::when_all_state<awaitable_result_t<As>...> state;
 
705 +
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
 
706 +

 
707 +
    co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
 
708 +

 
709 +
    // Exception always wins over error_code
 
710 +
    if(state.core_.first_exception_)
 
711 +
        std::rethrow_exception(state.core_.first_exception_);
 
712 +

 
713 +
    auto r = detail::build_when_all_io_result<result_type>(
 
714 +
        detail::extract_results(state));
 
715 +
    if(state.has_error_.load(std::memory_order_relaxed))
 
716 +
        r.ec = state.first_error_;
 
717 +
    co_return r;
717  
}
718  
}
718  

719  

719  
} // namespace capy
720  
} // namespace capy
720  
} // namespace boost
721  
} // namespace boost
721  

722  

722  
#endif
723  
#endif