1  
//
1  
//
2  
// Copyright (c) 2026 Michael Vandeberg
2  
// Copyright (c) 2026 Michael Vandeberg
 
3 +
// Copyright (c) 2026 Steve Gerbino
3  
//
4  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
7  
//
7  
// Official repository: https://github.com/cppalliance/capy
8  
// Official repository: https://github.com/cppalliance/capy
8  
//
9  
//
9  

10  

10  
#ifndef BOOST_CAPY_WHEN_ANY_HPP
11  
#ifndef BOOST_CAPY_WHEN_ANY_HPP
11  
#define BOOST_CAPY_WHEN_ANY_HPP
12  
#define BOOST_CAPY_WHEN_ANY_HPP
12  

13  

13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/config.hpp>
14 -
#include <boost/capy/detail/void_to_monostate.hpp>
15 +
#include <boost/capy/detail/io_result_combinators.hpp>
15  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <coroutine>
18  
#include <coroutine>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/ex/io_env.hpp>
21  
#include <boost/capy/task.hpp>
22  
#include <boost/capy/task.hpp>
22  

23  

23  
#include <array>
24  
#include <array>
24  
#include <atomic>
25  
#include <atomic>
25  
#include <exception>
26  
#include <exception>
 
27 +
#include <mutex>
26  
#include <optional>
28  
#include <optional>
27  
#include <ranges>
29  
#include <ranges>
28  
#include <stdexcept>
30  
#include <stdexcept>
29  
#include <stop_token>
31  
#include <stop_token>
30  
#include <tuple>
32  
#include <tuple>
31  
#include <type_traits>
33  
#include <type_traits>
32  
#include <utility>
34  
#include <utility>
33  
#include <variant>
35  
#include <variant>
34  
#include <vector>
36  
#include <vector>
35  

37  

36  
/*
38  
/*
37 -
   when_any - Race multiple tasks, return first completion
39 +
   when_any - Race multiple io_result tasks, select first success
38 -
   ========================================================
40 +
   =============================================================
39  

41  

40  
   OVERVIEW:
42  
   OVERVIEW:
41  
   ---------
43  
   ---------
42 -
   when_any launches N tasks concurrently and completes when the FIRST task
44 +
   when_any launches N io_result-returning tasks concurrently. A task
43 -
   finishes (success or failure). It then requests stop for all siblings and
45 +
   wins by returning !ec; errors and exceptions do not win. Once a
44 -
   waits for them to acknowledge before returning.
46 +
   winner is found, stop is requested for siblings and the winner's
 
47 +
   payload is returned. If no winner exists (all fail), the first
 
48 +
   error_code is returned or the last exception is rethrown.
45  

49  

46  
   ARCHITECTURE:
50  
   ARCHITECTURE:
47  
   -------------
51  
   -------------
48  
   The design mirrors when_all but with inverted completion semantics:
52  
   The design mirrors when_all but with inverted completion semantics:
49  

53  

50  
     when_all:  complete when remaining_count reaches 0 (all done)
54  
     when_all:  complete when remaining_count reaches 0 (all done)
51  
     when_any:  complete when has_winner becomes true (first done)
55  
     when_any:  complete when has_winner becomes true (first done)
52  
                BUT still wait for remaining_count to reach 0 for cleanup
56  
                BUT still wait for remaining_count to reach 0 for cleanup
53  

57  

54  
   Key components:
58  
   Key components:
55 -
     - when_any_state:    Shared state tracking winner and completion
59 +
     - when_any_core:    Shared state tracking winner and completion
56 -
     - when_any_runner:   Wrapper coroutine for each child task
60 +
     - when_any_io_runner: Wrapper coroutine for each child task
57 -
     - when_any_launcher: Awaitable that starts all runners concurrently
61 +
     - when_any_io_launcher/when_any_io_homogeneous_launcher:
 
62 +
                          Awaitables that start all runners concurrently
58  

63  

59  
   CRITICAL INVARIANTS:
64  
   CRITICAL INVARIANTS:
60  
   --------------------
65  
   --------------------
61 -
   1. Exactly one task becomes the winner (via atomic compare_exchange)
66 +
   1. Only a task returning !ec can become the winner (via atomic CAS)
62  
   2. All tasks must complete before parent resumes (cleanup safety)
67  
   2. All tasks must complete before parent resumes (cleanup safety)
63  
   3. Stop is requested immediately when winner is determined
68  
   3. Stop is requested immediately when winner is determined
64 -
   4. Only the winner's result/exception is stored
69 +
   4. Exceptions and errors do not claim winner status
65  

70  

66  
   POSITIONAL VARIANT:
71  
   POSITIONAL VARIANT:
67  
   -------------------
72  
   -------------------
68 -
   The variadic overload returns a std::variant with one alternative per
73 +
   The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
69 -
   input task, preserving positional correspondence. Use .index() on
74 +
   Index 0 is error_code (failure/no-winner). Index 1..N identifies the
70 -
   the variant to identify which task won.
75 +
   winning child and carries its payload.
71 -

 
72 -
   Example: when_any(task<int>, task<string>, task<int>)
 
73 -
     - Raw types after void->monostate: int, string, int
 
74 -
     - Result variant: std::variant<int, string, int>
 
75 -
     - variant.index() tells you which task won (0, 1, or 2)
 
76  

76  

77 -
   VOID HANDLING:
77 +
   RANGE OVERLOAD:
78 -
   --------------
78 +
   ---------------
79 -
   void tasks contribute std::monostate to the variant.
79 +
   The range overload returns variant<error_code, pair<size_t, T>> for
80 -
   All-void tasks result in: variant<monostate, monostate, monostate>
80 +
   non-void children or variant<error_code, size_t> for void children.
81  

81  

82  
   MEMORY MODEL:
82  
   MEMORY MODEL:
83  
   -------------
83  
   -------------
84  
   Synchronization chain from winner's write to parent's read:
84  
   Synchronization chain from winner's write to parent's read:
85  

85  

86 -
   1. Winner thread writes result_/winner_exception_ (non-atomic)
86 +
   1. Winner thread writes result_ (non-atomic)
87 -
   2. Winner thread calls signal_completion() → fetch_sub(acq_rel) on remaining_count_
87 +
   2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
88  
   3. Last task thread (may be winner or non-winner) calls signal_completion()
88  
   3. Last task thread (may be winner or non-winner) calls signal_completion()
89 -
      → fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
89 +
      -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
90  
   4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
90  
   4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
91 -
   5. Parent coroutine resumes and reads result_/winner_exception_
91 +
   5. Parent coroutine resumes and reads result_
92  

92  

93  
   Synchronization analysis:
93  
   Synchronization analysis:
94  
   - All fetch_sub operations on remaining_count_ form a release sequence
94  
   - All fetch_sub operations on remaining_count_ form a release sequence
95  
   - Winner's fetch_sub releases; subsequent fetch_sub operations participate
95  
   - Winner's fetch_sub releases; subsequent fetch_sub operations participate
96  
     in the modification order of remaining_count_
96  
     in the modification order of remaining_count_
97  
   - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
97  
   - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
98  
     modification order, establishing happens-before from winner's writes
98  
     modification order, establishing happens-before from winner's writes
99  
   - Executor dispatch() is expected to provide queue-based synchronization
99  
   - Executor dispatch() is expected to provide queue-based synchronization
100  
     (release-on-post, acquire-on-execute) completing the chain to parent
100  
     (release-on-post, acquire-on-execute) completing the chain to parent
101  
   - Even inline executors work (same thread = sequenced-before)
101  
   - Even inline executors work (same thread = sequenced-before)
102 -
   Alternative considered: Adding winner_ready_ atomic (set with release after
 
103 -
   storing winner data, acquired before reading) would make synchronization
 
104 -
   self-contained and not rely on executor implementation details. Current
 
105 -
   approach is correct but requires careful reasoning about release sequences
 
106 -
   and executor behavior.
 
107 -

 
108  

102  

109  
   EXCEPTION SEMANTICS:
103  
   EXCEPTION SEMANTICS:
110  
   --------------------
104  
   --------------------
111 -
   Unlike when_all (which captures first exception, discards others), when_any
105 +
   Exceptions do NOT claim winner status. If a child throws, the exception
112 -
   treats exceptions as valid completions. If the winning task threw, that
106 +
   is recorded but the combinator keeps waiting for a success. Only when
113 -
   exception is rethrown. Exceptions from non-winners are silently discarded.
107 +
   all children complete without a winner does the combinator check: if
 
108 +
   any exception was recorded, it is rethrown (exception beats error_code).
114  
*/
109  
*/
115  

110  

116  
namespace boost {
111  
namespace boost {
117  
namespace capy {
112  
namespace capy {
118  

113  

119  
namespace detail {
114  
namespace detail {
120  

115  

121  
/** Core shared state for when_any operations.
116  
/** Core shared state for when_any operations.
122  

117  

123  
    Contains all members and methods common to both heterogeneous (variadic)
118  
    Contains all members and methods common to both heterogeneous (variadic)
124  
    and homogeneous (range) when_any implementations. State classes embed
119  
    and homogeneous (range) when_any implementations. State classes embed
125  
    this via composition to avoid CRTP destructor ordering issues.
120  
    this via composition to avoid CRTP destructor ordering issues.
126  

121  

127  
    @par Thread Safety
122  
    @par Thread Safety
128  
    Atomic operations protect winner selection and completion count.
123  
    Atomic operations protect winner selection and completion count.
129  
*/
124  
*/
130  
struct when_any_core
125  
struct when_any_core
131  
{
126  
{
132  
    std::atomic<std::size_t> remaining_count_;
127  
    std::atomic<std::size_t> remaining_count_;
133  
    std::size_t winner_index_{0};
128  
    std::size_t winner_index_{0};
134  
    std::exception_ptr winner_exception_;
129  
    std::exception_ptr winner_exception_;
135  
    std::stop_source stop_source_;
130  
    std::stop_source stop_source_;
136  

131  

137  
    // Bridges parent's stop token to our stop_source
132  
    // Bridges parent's stop token to our stop_source
138  
    struct stop_callback_fn
133  
    struct stop_callback_fn
139  
    {
134  
    {
140  
        std::stop_source* source_;
135  
        std::stop_source* source_;
141  
        void operator()() const noexcept { source_->request_stop(); }
136  
        void operator()() const noexcept { source_->request_stop(); }
142  
    };
137  
    };
143  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
138  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
144  
    std::optional<stop_callback_t> parent_stop_callback_;
139  
    std::optional<stop_callback_t> parent_stop_callback_;
145  

140  

146  
    std::coroutine_handle<> continuation_;
141  
    std::coroutine_handle<> continuation_;
147  
    io_env const* caller_env_ = nullptr;
142  
    io_env const* caller_env_ = nullptr;
148  

143  

149  
    // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
144  
    // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
150  
    std::atomic<bool> has_winner_{false};
145  
    std::atomic<bool> has_winner_{false};
151  

146  

152  
    explicit when_any_core(std::size_t count) noexcept
147  
    explicit when_any_core(std::size_t count) noexcept
153  
        : remaining_count_(count)
148  
        : remaining_count_(count)
154  
    {
149  
    {
155  
    }
150  
    }
156  

151  

157  
    /** Atomically claim winner status; exactly one task succeeds. */
152  
    /** Atomically claim winner status; exactly one task succeeds. */
158  
    bool try_win(std::size_t index) noexcept
153  
    bool try_win(std::size_t index) noexcept
159  
    {
154  
    {
160  
        bool expected = false;
155  
        bool expected = false;
161  
        if(has_winner_.compare_exchange_strong(
156  
        if(has_winner_.compare_exchange_strong(
162  
            expected, true, std::memory_order_acq_rel))
157  
            expected, true, std::memory_order_acq_rel))
163  
        {
158  
        {
164  
            winner_index_ = index;
159  
            winner_index_ = index;
165  
            stop_source_.request_stop();
160  
            stop_source_.request_stop();
166  
            return true;
161  
            return true;
167  
        }
162  
        }
168  
        return false;
163  
        return false;
169  
    }
164  
    }
170  

165  

171  
    /** @pre try_win() returned true. */
166  
    /** @pre try_win() returned true. */
172  
    void set_winner_exception(std::exception_ptr ep) noexcept
167  
    void set_winner_exception(std::exception_ptr ep) noexcept
173  
    {
168  
    {
174  
        winner_exception_ = ep;
169  
        winner_exception_ = ep;
175  
    }
170  
    }
176  

171  

177  
    // Runners signal completion directly via final_suspend; no member function needed.
172  
    // Runners signal completion directly via final_suspend; no member function needed.
178  
};
173  
};
179  

174  

180 -
/** Shared state for heterogeneous when_any operation.
175 +
} // namespace detail
181 -

 
182 -
    Coordinates winner selection, result storage, and completion tracking
 
183 -
    for all child tasks in a when_any operation. Uses composition with
 
184 -
    when_any_core for shared functionality.
 
185  

176  

186 -
    @par Lifetime
177 +
namespace detail {
187 -
    Allocated on the parent coroutine's frame, outlives all runners.
 
188  

178  

189 -
    @tparam Ts Task result types.
179 +
// State for io_result-aware when_any: only !ec wins.
190 -
*/
 
191  
template<typename... Ts>
180  
template<typename... Ts>
192 -
struct when_any_state
181 +
struct when_any_io_state
193  
{
182  
{
194  
    static constexpr std::size_t task_count = sizeof...(Ts);
183  
    static constexpr std::size_t task_count = sizeof...(Ts);
195 -
    using variant_type = std::variant<void_to_monostate_t<Ts>...>;
184 +
    using variant_type = std::variant<std::error_code, Ts...>;
196  

185  

197  
    when_any_core core_;
186  
    when_any_core core_;
198  
    std::optional<variant_type> result_;
187  
    std::optional<variant_type> result_;
199  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
188  
    std::array<std::coroutine_handle<>, task_count> runner_handles_{};
200  

189  

201 -
    when_any_state()
190 +
    // Last failure (error or exception) for the all-fail case.
 
191 +
    // Last writer wins — no priority between errors and exceptions.
 
192 +
    std::mutex failure_mu_;
 
193 +
    std::error_code last_error_;
 
194 +
    std::exception_ptr last_exception_;
 
195 +

 
196 +
    when_any_io_state()
202  
        : core_(task_count)
197  
        : core_(task_count)
203  
    {
198  
    {
204  
    }
199  
    }
205  

200  

206 -
    // Runners self-destruct in final_suspend. No destruction needed here.
201 +
    void record_error(std::error_code ec)
207 -

 
208 -
    /** @pre core_.try_win() returned true.
 
209 -
        @note Uses in_place_index (not type) for positional variant access.
 
210 -
    */
 
211 -
    template<std::size_t I, typename T>
 
212 -
    void set_winner_result(T value)
 
213 -
        noexcept(std::is_nothrow_move_constructible_v<T>)
 
214  
    {
202  
    {
215 -
        result_.emplace(std::in_place_index<I>, std::move(value));
203 +
        std::lock_guard lk(failure_mu_);
 
204 +
        last_error_ = ec;
 
205 +
        last_exception_ = nullptr;
216  
    }
206  
    }
217  

207  

218 -
    /** @pre core_.try_win() returned true. */
208 +
    void record_exception(std::exception_ptr ep)
219 -
    template<std::size_t I>
 
220 -
    void set_winner_void() noexcept
 
221  
    {
209  
    {
222 -
        result_.emplace(std::in_place_index<I>, std::monostate{});
210 +
        std::lock_guard lk(failure_mu_);
 
211 +
        last_exception_ = ep;
 
212 +
        last_error_ = {};
223  
    }
213  
    }
224  
};
214  
};
225  

215  

226 -
/** Wrapper coroutine that runs a single child task for when_any.
216 +
// Wrapper coroutine for io_result-aware when_any children.
227 -

217 +
// unhandled_exception records the exception but does NOT claim winner status.
228 -
    Propagates executor/stop_token to the child, attempts to claim winner
 
229 -
    status on completion, and signals completion for cleanup coordination.
 
230 -

 
231 -
    @tparam StateType The state type (when_any_state or when_any_homogeneous_state).
 
232 -
*/
 
233  
template<typename StateType>
218  
template<typename StateType>
234 -
struct when_any_runner
219 +
struct when_any_io_runner
235  
{
220  
{
236 -
    struct promise_type // : frame_allocating_base  // DISABLED FOR TESTING
221 +
    struct promise_type
237  
    {
222  
    {
238  
        StateType* state_ = nullptr;
223  
        StateType* state_ = nullptr;
239  
        std::size_t index_ = 0;
224  
        std::size_t index_ = 0;
240  
        io_env env_;
225  
        io_env env_;
241  

226  

242 -
        when_any_runner get_return_object() noexcept
227 +
        when_any_io_runner get_return_object() noexcept
243  
        {
228  
        {
244 -
            return when_any_runner(std::coroutine_handle<promise_type>::from_promise(*this));
229 +
            return when_any_io_runner(
 
230 +
                std::coroutine_handle<promise_type>::from_promise(*this));
245  
        }
231  
        }
246  

232  

247 -
        // Starts suspended; launcher sets up state/ex/token then resumes
233 +
        std::suspend_always initial_suspend() noexcept { return {}; }
248 -
        std::suspend_always initial_suspend() noexcept
 
249 -
        {
 
250 -
            return {};
 
251 -
        }
 
252  

234  

253  
        auto final_suspend() noexcept
235  
        auto final_suspend() noexcept
254  
        {
236  
        {
255  
            struct awaiter
237  
            struct awaiter
256  
            {
238  
            {
257  
                promise_type* p_;
239  
                promise_type* p_;
258  
                bool await_ready() const noexcept { return false; }
240  
                bool await_ready() const noexcept { return false; }
259  
                auto await_suspend(std::coroutine_handle<> h) noexcept
241  
                auto await_suspend(std::coroutine_handle<> h) noexcept
260 -
                    // Extract everything needed before self-destruction.
 
261  
                {
242  
                {
262  
                    auto& core = p_->state_->core_;
243  
                    auto& core = p_->state_->core_;
263  
                    auto* counter = &core.remaining_count_;
244  
                    auto* counter = &core.remaining_count_;
264  
                    auto* caller_env = core.caller_env_;
245  
                    auto* caller_env = core.caller_env_;
265  
                    auto cont = core.continuation_;
246  
                    auto cont = core.continuation_;
266  

247  

267  
                    h.destroy();
248  
                    h.destroy();
268 -
                    // If last runner, dispatch parent for symmetric transfer.
 
269  

249  

270  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
250  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
271  
                    if(remaining == 1)
251  
                    if(remaining == 1)
272  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
252  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
273  
                    return detail::symmetric_transfer(std::noop_coroutine());
253  
                    return detail::symmetric_transfer(std::noop_coroutine());
274  
                }
254  
                }
275  
                void await_resume() const noexcept {}
255  
                void await_resume() const noexcept {}
276  
            };
256  
            };
277  
            return awaiter{this};
257  
            return awaiter{this};
278  
        }
258  
        }
279  

259  

280  
        void return_void() noexcept {}
260  
        void return_void() noexcept {}
281  

261  

282 -
        // Exceptions are valid completions in when_any (unlike when_all)
262 +
        // Exceptions do NOT win in io_result when_any
283  
        void unhandled_exception()
263  
        void unhandled_exception()
284  
        {
264  
        {
285 -
            if(state_->core_.try_win(index_))
265 +
            state_->record_exception(std::current_exception());
286 -
                state_->core_.set_winner_exception(std::current_exception());
 
287  
        }
266  
        }
288 -
        /** Injects executor and stop token into child awaitables. */
 
289  

267  

290  
        template<class Awaitable>
268  
        template<class Awaitable>
291  
        struct transform_awaiter
269  
        struct transform_awaiter
292  
        {
270  
        {
293  
            std::decay_t<Awaitable> a_;
271  
            std::decay_t<Awaitable> a_;
294  
            promise_type* p_;
272  
            promise_type* p_;
295  

273  

296  
            bool await_ready() { return a_.await_ready(); }
274  
            bool await_ready() { return a_.await_ready(); }
297 -
            auto await_resume() { return a_.await_resume(); }
275 +
            decltype(auto) await_resume() { return a_.await_resume(); }
298  

276  

299  
            template<class Promise>
277  
            template<class Promise>
300  
            auto await_suspend(std::coroutine_handle<Promise> h)
278  
            auto await_suspend(std::coroutine_handle<Promise> h)
301  
            {
279  
            {
302  
                using R = decltype(a_.await_suspend(h, &p_->env_));
280  
                using R = decltype(a_.await_suspend(h, &p_->env_));
303  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
281  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
304  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
282  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
305  
                else
283  
                else
306  
                    return a_.await_suspend(h, &p_->env_);
284  
                    return a_.await_suspend(h, &p_->env_);
307  
            }
285  
            }
308  
        };
286  
        };
309  

287  

310  
        template<class Awaitable>
288  
        template<class Awaitable>
311  
        auto await_transform(Awaitable&& a)
289  
        auto await_transform(Awaitable&& a)
312  
        {
290  
        {
313  
            using A = std::decay_t<Awaitable>;
291  
            using A = std::decay_t<Awaitable>;
314  
            if constexpr (IoAwaitable<A>)
292  
            if constexpr (IoAwaitable<A>)
315  
            {
293  
            {
316  
                return transform_awaiter<Awaitable>{
294  
                return transform_awaiter<Awaitable>{
317  
                    std::forward<Awaitable>(a), this};
295  
                    std::forward<Awaitable>(a), this};
318  
            }
296  
            }
319  
            else
297  
            else
320  
            {
298  
            {
321  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
299  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
322  
            }
300  
            }
323  
        }
301  
        }
324  
    };
302  
    };
325  

303  

326  
    std::coroutine_handle<promise_type> h_;
304  
    std::coroutine_handle<promise_type> h_;
327  

305  

328 -
    explicit when_any_runner(std::coroutine_handle<promise_type> h) noexcept
306 +
    explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
329  
        : h_(h)
307  
        : h_(h)
330  
    {
308  
    {
331  
    }
309  
    }
332  

310  

333 -
    // Enable move for all clang versions - some versions need it
311 +
    when_any_io_runner(when_any_io_runner&& other) noexcept
334 -
    when_any_runner(when_any_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
312 +
        : h_(std::exchange(other.h_, nullptr))
 
313 +
    {
 
314 +
    }
335  

315  

336 -
    // Non-copyable
316 +
    when_any_io_runner(when_any_io_runner const&) = delete;
337 -
    when_any_runner(when_any_runner const&) = delete;
317 +
    when_any_io_runner& operator=(when_any_io_runner const&) = delete;
338 -
    when_any_runner& operator=(when_any_runner const&) = delete;
318 +
    when_any_io_runner& operator=(when_any_io_runner&&) = delete;
339 -
    when_any_runner& operator=(when_any_runner&&) = delete;
 
340  

319  

341  
    auto release() noexcept
320  
    auto release() noexcept
342  
    {
321  
    {
343  
        return std::exchange(h_, nullptr);
322  
        return std::exchange(h_, nullptr);
344  
    }
323  
    }
345  
};
324  
};
346  

325  

347 -
/** Indexed overload for heterogeneous when_any (compile-time index).
326 +
// Runner coroutine: only tries to win when the child returns !ec.
348 -

 
349 -
    Uses compile-time index I for variant construction via in_place_index.
 
350 -
    Called from when_any_launcher::launch_one<I>().
 
351 -
*/
 
352  
template<std::size_t I, IoAwaitable Awaitable, typename StateType>
327  
template<std::size_t I, IoAwaitable Awaitable, typename StateType>
353 -
when_any_runner<StateType>
328 +
when_any_io_runner<StateType>
354 -
make_when_any_runner(Awaitable inner, StateType* state)
329 +
make_when_any_io_runner(Awaitable inner, StateType* state)
355  
{
330  
{
356 -
    using T = awaitable_result_t<Awaitable>;
331 +
    auto result = co_await std::move(inner);
357 -
    if constexpr (std::is_void_v<T>)
332 +

358 -
    {
333 +
    if(!result.ec)
359 -
        co_await std::move(inner);
 
360 -
        if(state->core_.try_win(I))
 
361 -
            state->template set_winner_void<I>();
 
362 -
    }
 
363 -
    else
 
364  
    {
334  
    {
365 -
        auto result = co_await std::move(inner);
335 +
        // Success: try to claim winner
366  
        if(state->core_.try_win(I))
336  
        if(state->core_.try_win(I))
367  
        {
337  
        {
368  
            try
338  
            try
369  
            {
339  
            {
370 -
                state->template set_winner_result<I>(std::move(result));
340 +
                state->result_.emplace(
 
341 +
                    std::in_place_index<I + 1>,
 
342 +
                    detail::extract_io_payload(std::move(result)));
371  
            }
343  
            }
372  
            catch(...)
344  
            catch(...)
373  
            {
345  
            {
374  
                state->core_.set_winner_exception(std::current_exception());
346  
                state->core_.set_winner_exception(std::current_exception());
375  
            }
347  
            }
376  
        }
348  
        }
377 -
}
 
378 -

 
379 -
/** Runtime-index overload for homogeneous when_any (range path).
 
380 -

 
381 -
    Uses requires-expressions to detect state capabilities:
 
382 -
    - set_winner_void(): for heterogeneous void tasks (stores monostate)
 
383 -
    - set_winner_result(): for non-void tasks
 
384 -
    - Neither: for homogeneous void tasks (no result storage)
 
385 -
*/
 
386 -
template<IoAwaitable Awaitable, typename StateType>
 
387 -
when_any_runner<StateType>
 
388 -
make_when_any_runner(Awaitable inner, StateType* state, std::size_t index)
 
389 -
{
 
390 -
    using T = awaitable_result_t<Awaitable>;
 
391 -
    if constexpr (std::is_void_v<T>)
 
392 -
    {
 
393 -
        co_await std::move(inner);
 
394 -
        if(state->core_.try_win(index))
 
395 -
        {
 
396 -
            if constexpr (requires { state->set_winner_void(); })
 
397 -
                state->set_winner_void();
 
398 -
        }
 
399 -
    }
 
400  
    }
349  
    }
401  
    else
350  
    else
402  
    {
351  
    {
403 -
        auto result = co_await std::move(inner);
352 +
        // Error: record but don't win
404 -
        if(state->core_.try_win(index))
353 +
        state->record_error(result.ec);
405 -
        {
 
406 -
            try
 
407 -
            {
 
408 -
                state->set_winner_result(std::move(result));
 
409 -
            }
 
410 -
            catch(...)
 
411 -
            {
 
412 -
                state->core_.set_winner_exception(std::current_exception());
 
413 -
            }
 
414 -
        }
 
415  
    }
354  
    }
416  
}
355  
}
417  

356  

418 -
/** Launches all runners concurrently; see await_suspend for lifetime concerns. */
357 +
// Launcher for io_result-aware when_any.
419  
template<IoAwaitable... Awaitables>
358  
template<IoAwaitable... Awaitables>
420 -
class when_any_launcher
359 +
class when_any_io_launcher
421  
{
360  
{
422 -
    using state_type = when_any_state<awaitable_result_t<Awaitables>...>;
361 +
    using state_type = when_any_io_state<
 
362 +
        io_result_payload_t<awaitable_result_t<Awaitables>>...>;
423  

363  

424  
    std::tuple<Awaitables...>* tasks_;
364  
    std::tuple<Awaitables...>* tasks_;
425  
    state_type* state_;
365  
    state_type* state_;
426  

366  

427  
public:
367  
public:
428 -
    when_any_launcher(
368 +
    when_any_io_launcher(
429  
        std::tuple<Awaitables...>* tasks,
369  
        std::tuple<Awaitables...>* tasks,
430  
        state_type* state)
370  
        state_type* state)
431  
        : tasks_(tasks)
371  
        : tasks_(tasks)
432  
        , state_(state)
372  
        , state_(state)
433  
    {
373  
    {
434  
    }
374  
    }
435  

375  

436  
    bool await_ready() const noexcept
376  
    bool await_ready() const noexcept
437  
    {
377  
    {
438  
        return sizeof...(Awaitables) == 0;
378  
        return sizeof...(Awaitables) == 0;
439  
    }
379  
    }
440  

380  

441 -
    /** CRITICAL: If the last task finishes synchronously, parent resumes and
381 +
    std::coroutine_handle<> await_suspend(
442 -
        destroys this object before await_suspend returns. Must not reference
382 +
        std::coroutine_handle<> continuation, io_env const* caller_env)
443 -
        `this` after the final launch_one call.
 
444 -
    */
 
445 -
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
 
446  
    {
383  
    {
447  
        state_->core_.continuation_ = continuation;
384  
        state_->core_.continuation_ = continuation;
448  
        state_->core_.caller_env_ = caller_env;
385  
        state_->core_.caller_env_ = caller_env;
449  

386  

450  
        if(caller_env->stop_token.stop_possible())
387  
        if(caller_env->stop_token.stop_possible())
451  
        {
388  
        {
452  
            state_->core_.parent_stop_callback_.emplace(
389  
            state_->core_.parent_stop_callback_.emplace(
453  
                caller_env->stop_token,
390  
                caller_env->stop_token,
454  
                when_any_core::stop_callback_fn{&state_->core_.stop_source_});
391  
                when_any_core::stop_callback_fn{&state_->core_.stop_source_});
455  

392  

456  
            if(caller_env->stop_token.stop_requested())
393  
            if(caller_env->stop_token.stop_requested())
457  
                state_->core_.stop_source_.request_stop();
394  
                state_->core_.stop_source_.request_stop();
458  
        }
395  
        }
459  

396  

460  
        auto token = state_->core_.stop_source_.get_token();
397  
        auto token = state_->core_.stop_source_.get_token();
461  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
398  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
462  
            (..., launch_one<Is>(caller_env->executor, token));
399  
            (..., launch_one<Is>(caller_env->executor, token));
463  
        }(std::index_sequence_for<Awaitables...>{});
400  
        }(std::index_sequence_for<Awaitables...>{});
464  

401  

465  
        return std::noop_coroutine();
402  
        return std::noop_coroutine();
466  
    }
403  
    }
467  

404  

468 -
    void await_resume() const noexcept
405 +
    void await_resume() const noexcept {}
469 -
    {
 
470 -
    }
 
471  

406  

472 -
    /** @pre Ex::dispatch() and std::coroutine_handle<>::resume() must not throw (handle may leak). */
 
473  
private:
407  
private:
474  
    template<std::size_t I>
408  
    template<std::size_t I>
475  
    void launch_one(executor_ref caller_ex, std::stop_token token)
409  
    void launch_one(executor_ref caller_ex, std::stop_token token)
476  
    {
410  
    {
477 -
        auto runner = make_when_any_runner<I>(
411 +
        auto runner = make_when_any_io_runner<I>(
478  
            std::move(std::get<I>(*tasks_)), state_);
412  
            std::move(std::get<I>(*tasks_)), state_);
479  

413  

480  
        auto h = runner.release();
414  
        auto h = runner.release();
481  
        h.promise().state_ = state_;
415  
        h.promise().state_ = state_;
482  
        h.promise().index_ = I;
416  
        h.promise().index_ = I;
483 -
        h.promise().env_ = io_env{caller_ex, token, state_->core_.caller_env_->frame_allocator};
417 +
        h.promise().env_ = io_env{caller_ex, token,
 
418 +
            state_->core_.caller_env_->frame_allocator};
484  

419  

485  
        std::coroutine_handle<> ch{h};
420  
        std::coroutine_handle<> ch{h};
486  
        state_->runner_handles_[I] = ch;
421  
        state_->runner_handles_[I] = ch;
487  
        caller_ex.post(ch);
422  
        caller_ex.post(ch);
488  
    }
423  
    }
489  
};
424  
};
490  

425  

491 -
} // namespace detail
426 +
/** Shared state for homogeneous io_result-aware when_any (range overload).
492 -

 
493 -
/** Wait for the first awaitable to complete.
 
494 -

 
495 -
    Races multiple heterogeneous awaitables concurrently and returns when the
 
496 -
    first one completes. The result is a variant with one alternative per
 
497 -
    input task, preserving positional correspondence.
 
498 -

 
499 -
    @par Suspends
 
500 -
    The calling coroutine suspends when co_await is invoked. All awaitables
 
501 -
    are launched concurrently and execute in parallel. The coroutine resumes
 
502 -
    only after all awaitables have completed, even though the winner is
 
503 -
    determined by the first to finish.
 
504 -

 
505 -
    @par Completion Conditions
 
506 -
    @li Winner is determined when the first awaitable completes (success or exception)
 
507 -
    @li Only one task can claim winner status via atomic compare-exchange
 
508 -
    @li Once a winner exists, stop is requested for all remaining siblings
 
509 -
    @li Parent coroutine resumes only after all siblings acknowledge completion
 
510 -
    @li The winner's result is returned; if the winner threw, the exception is rethrown
 
511 -

 
512 -
    @par Cancellation Semantics
 
513 -
    Cancellation is supported via stop_token propagated through the
 
514 -
    IoAwaitable protocol:
 
515 -
    @li Each child awaitable receives a stop_token derived from a shared stop_source
 
516 -
    @li When the parent's stop token is activated, the stop is forwarded to all children
 
517 -
    @li When a winner is determined, stop_source_.request_stop() is called immediately
 
518 -
    @li Siblings must handle cancellation gracefully and complete before parent resumes
 
519 -
    @li Stop requests are cooperative; tasks must check and respond to them
 
520 -

 
521 -
    @par Concurrency/Overlap
 
522 -
    All awaitables are launched concurrently before any can complete.
 
523 -
    The launcher iterates through the arguments, starting each task on the
 
524 -
    caller's executor. Tasks may execute in parallel on multi-threaded
 
525 -
    executors or interleave on single-threaded executors. There is no
 
526 -
    guaranteed ordering of task completion.
 
527 -

 
528 -
    @par Notable Error Conditions
 
529 -
    @li Winner exception: if the winning task threw, that exception is rethrown
 
530 -
    @li Non-winner exceptions: silently discarded (only winner's result matters)
 
531 -
    @li Cancellation: tasks may complete via cancellation without throwing
 
532 -

 
533 -
    @par Example
 
534 -
    @code
 
535 -
    task<void> example() {
 
536 -
        auto result = co_await when_any(
 
537 -
            fetch_int(),      // task<int>
 
538 -
            fetch_string()    // task<std::string>
 
539 -
        );
 
540 -
        // result.index() is 0 or 1
 
541 -
        if (result.index() == 0)
 
542 -
            std::cout << "Got int: " << std::get<0>(result) << "\n";
 
543 -
        else
 
544 -
            std::cout << "Got string: " << std::get<1>(result) << "\n";
 
545 -
    }
 
546 -
    @endcode
 
547 -

 
548 -
    @param as Awaitables to race concurrently (at least one required; each
 
549 -
        must satisfy IoAwaitable).
 
550 -
    @return A task yielding a std::variant with one alternative per awaitable.
 
551 -
        Use .index() to identify the winner. Void awaitables contribute
 
552 -
        std::monostate.
 
553 -

 
554 -
    @throws Rethrows the winner's exception if the winning task threw an exception.
 
555 -

 
556 -
    @par Remarks
 
557 -
    Awaitables are moved into the coroutine frame; original objects become
 
558 -
    empty after the call. The variant preserves one alternative per input
 
559 -
    task. Use .index() to determine which awaitable completed first.
 
560 -
    Void awaitables contribute std::monostate to the variant.
 
561 -

 
562 -
    @see when_all, IoAwaitable
 
563 -
*/
 
564 -
template<IoAwaitable... As>
 
565 -
    requires (sizeof...(As) > 0)
 
566 -
[[nodiscard]] auto when_any(As... as)
 
567 -
    -> task<std::variant<void_to_monostate_t<awaitable_result_t<As>>...>>
 
568 -
{
 
569 -
    detail::when_any_state<awaitable_result_t<As>...> state;
 
570 -
    std::tuple<As...> awaitable_tuple(std::move(as)...);
 
571 -

 
572 -
    co_await detail::when_any_launcher<As...>(&awaitable_tuple, &state);
 
573 -

 
574 -
    if(state.core_.winner_exception_)
 
575 -
        std::rethrow_exception(state.core_.winner_exception_);
 
576 -

 
577 -
    co_return std::move(*state.result_);
 
578 -
}
 
579 -

 
580 -
namespace detail {
 
581 -

 
582 -
/** Shared state for homogeneous when_any (range overload).
 
583  

427  

584 -
    Uses composition with when_any_core for shared functionality.
428 +
    @tparam T The payload type extracted from io_result.
585 -
    Simpler than heterogeneous: optional<T> instead of variant, vector
 
586 -
    instead of array for runner handles.
 
587  
*/
429  
*/
588  
template<typename T>
430  
template<typename T>
589 -
struct when_any_homogeneous_state
431 +
struct when_any_io_homogeneous_state
590  
{
432  
{
591  
    when_any_core core_;
433  
    when_any_core core_;
592  
    std::optional<T> result_;
434  
    std::optional<T> result_;
593  
    std::vector<std::coroutine_handle<>> runner_handles_;
435  
    std::vector<std::coroutine_handle<>> runner_handles_;
594  

436  

595 -
    explicit when_any_homogeneous_state(std::size_t count)
437 +
    std::mutex failure_mu_;
 
438 +
    std::error_code last_error_;
 
439 +
    std::exception_ptr last_exception_;
 
440 +

 
441 +
    explicit when_any_io_homogeneous_state(std::size_t count)
596  
        : core_(count)
442  
        : core_(count)
597  
        , runner_handles_(count)
443  
        , runner_handles_(count)
598  
    {
444  
    {
599  
    }
445  
    }
600  

446  

601 -
    // Runners self-destruct in final_suspend. No destruction needed here.
447 +
    void record_error(std::error_code ec)
 
448 +
    {
 
449 +
        std::lock_guard lk(failure_mu_);
 
450 +
        last_error_ = ec;
 
451 +
        last_exception_ = nullptr;
 
452 +
    }
602  

453  

603 -
    /** @pre core_.try_win() returned true. */
454 +
    void record_exception(std::exception_ptr ep)
604 -
    void set_winner_result(T value)
 
605 -
        noexcept(std::is_nothrow_move_constructible_v<T>)
 
606  
    {
455  
    {
607 -
        result_.emplace(std::move(value));
456 +
        std::lock_guard lk(failure_mu_);
 
457 +
        last_exception_ = ep;
 
458 +
        last_error_ = {};
608  
    }
459  
    }
609  
};
460  
};
610  

461  

611 -
/** Specialization for void tasks (no result storage needed). */
462 +
/** Specialization for void io_result children (no payload storage). */
612  
template<>
463  
template<>
613 -
struct when_any_homogeneous_state<void>
464 +
struct when_any_io_homogeneous_state<std::tuple<>>
614  
{
465  
{
615  
    when_any_core core_;
466  
    when_any_core core_;
616  
    std::vector<std::coroutine_handle<>> runner_handles_;
467  
    std::vector<std::coroutine_handle<>> runner_handles_;
617  

468  

618 -
    explicit when_any_homogeneous_state(std::size_t count)
469 +
    std::mutex failure_mu_;
 
470 +
    std::error_code last_error_;
 
471 +
    std::exception_ptr last_exception_;
 
472 +

 
473 +
    explicit when_any_io_homogeneous_state(std::size_t count)
619  
        : core_(count)
474  
        : core_(count)
620  
        , runner_handles_(count)
475  
        , runner_handles_(count)
621  
    {
476  
    {
622  
    }
477  
    }
623  

478  

624 -
    // Runners self-destruct in final_suspend. No destruction needed here.
479 +
    void record_error(std::error_code ec)
 
480 +
    {
 
481 +
        std::lock_guard lk(failure_mu_);
 
482 +
        last_error_ = ec;
 
483 +
        last_exception_ = nullptr;
 
484 +
    }
625  

485  

626 -
    // No set_winner_result - void tasks have no result to store
486 +
    void record_exception(std::exception_ptr ep)
 
487 +
    {
 
488 +
        std::lock_guard lk(failure_mu_);
 
489 +
        last_exception_ = ep;
 
490 +
        last_error_ = {};
 
491 +
    }
627  
};
492  
};
628  

493  

629 -
/** Launches all runners concurrently; see await_suspend for lifetime concerns. */
494 +
/** Create an io_result-aware runner for homogeneous when_any (range path).
 
495 +

 
496 +
    Only tries to win when the child returns !ec.
 
497 +
*/
 
498 +
template<IoAwaitable Awaitable, typename StateType>
 
499 +
when_any_io_runner<StateType>
 
500 +
make_when_any_io_homogeneous_runner(
 
501 +
    Awaitable inner, StateType* state, std::size_t index)
 
502 +
{
 
503 +
    auto result = co_await std::move(inner);
 
504 +

 
505 +
    if(!result.ec)
 
506 +
    {
 
507 +
        if(state->core_.try_win(index))
 
508 +
        {
 
509 +
            using PayloadT = io_result_payload_t<
 
510 +
                awaitable_result_t<Awaitable>>;
 
511 +
            if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
 
512 +
            {
 
513 +
                try
 
514 +
                {
 
515 +
                    state->result_.emplace(
 
516 +
                        extract_io_payload(std::move(result)));
 
517 +
                }
 
518 +
                catch(...)
 
519 +
                {
 
520 +
                    state->core_.set_winner_exception(
 
521 +
                        std::current_exception());
 
522 +
                }
 
523 +
            }
 
524 +
        }
 
525 +
    }
 
526 +
    else
 
527 +
    {
 
528 +
        state->record_error(result.ec);
 
529 +
    }
 
530 +
}
 
531 +

 
532 +
/** Launches all io_result-aware homogeneous runners concurrently. */
630  
template<IoAwaitableRange Range>
533  
template<IoAwaitableRange Range>
631 -
class when_any_homogeneous_launcher
534 +
class when_any_io_homogeneous_launcher
632  
{
535  
{
633  
    using Awaitable = std::ranges::range_value_t<Range>;
536  
    using Awaitable = std::ranges::range_value_t<Range>;
634 -
    using T = awaitable_result_t<Awaitable>;
537 +
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
635  

538  

636  
    Range* range_;
539  
    Range* range_;
637 -
    when_any_homogeneous_state<T>* state_;
540 +
    when_any_io_homogeneous_state<PayloadT>* state_;
638  

541  

639  
public:
542  
public:
640 -
    when_any_homogeneous_launcher(
543 +
    when_any_io_homogeneous_launcher(
641  
        Range* range,
544  
        Range* range,
642 -
        when_any_homogeneous_state<T>* state)
545 +
        when_any_io_homogeneous_state<PayloadT>* state)
643  
        : range_(range)
546  
        : range_(range)
644  
        , state_(state)
547  
        , state_(state)
645  
    {
548  
    {
646  
    }
549  
    }
647  

550  

648  
    bool await_ready() const noexcept
551  
    bool await_ready() const noexcept
649  
    {
552  
    {
650  
        return std::ranges::empty(*range_);
553  
        return std::ranges::empty(*range_);
651  
    }
554  
    }
652  

555  

653 -
    /** CRITICAL: If the last task finishes synchronously, parent resumes and
556 +
    std::coroutine_handle<> await_suspend(
654 -
        destroys this object before await_suspend returns. Must not reference
557 +
        std::coroutine_handle<> continuation, io_env const* caller_env)
655 -
        `this` after dispatching begins.
 
656 -

 
657 -
        Two-phase approach:
 
658 -
        1. Create all runners (safe - no dispatch yet)
 
659 -
        2. Dispatch all runners (any may complete synchronously)
 
660 -
    */
 
661 -
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
 
662  
    {
558  
    {
663  
        state_->core_.continuation_ = continuation;
559  
        state_->core_.continuation_ = continuation;
664  
        state_->core_.caller_env_ = caller_env;
560  
        state_->core_.caller_env_ = caller_env;
665  

561  

666  
        if(caller_env->stop_token.stop_possible())
562  
        if(caller_env->stop_token.stop_possible())
667  
        {
563  
        {
668  
            state_->core_.parent_stop_callback_.emplace(
564  
            state_->core_.parent_stop_callback_.emplace(
669  
                caller_env->stop_token,
565  
                caller_env->stop_token,
670  
                when_any_core::stop_callback_fn{&state_->core_.stop_source_});
566  
                when_any_core::stop_callback_fn{&state_->core_.stop_source_});
671  

567  

672  
            if(caller_env->stop_token.stop_requested())
568  
            if(caller_env->stop_token.stop_requested())
673  
                state_->core_.stop_source_.request_stop();
569  
                state_->core_.stop_source_.request_stop();
674  
        }
570  
        }
675  

571  

676  
        auto token = state_->core_.stop_source_.get_token();
572  
        auto token = state_->core_.stop_source_.get_token();
677  

573  

678 -
        // This iterates over *range_ safely because no runners execute yet.
 
679  
        // Phase 1: Create all runners without dispatching.
574  
        // Phase 1: Create all runners without dispatching.
680  
        std::size_t index = 0;
575  
        std::size_t index = 0;
681  
        for(auto&& a : *range_)
576  
        for(auto&& a : *range_)
682  
        {
577  
        {
683 -
            auto runner = make_when_any_runner(
578 +
            auto runner = make_when_any_io_homogeneous_runner(
684  
                std::move(a), state_, index);
579  
                std::move(a), state_, index);
685  

580  

686  
            auto h = runner.release();
581  
            auto h = runner.release();
687  
            h.promise().state_ = state_;
582  
            h.promise().state_ = state_;
688  
            h.promise().index_ = index;
583  
            h.promise().index_ = index;
689 -
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
584 +
            h.promise().env_ = io_env{caller_env->executor, token,
 
585 +
                caller_env->frame_allocator};
690  

586  

691  
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
587  
            state_->runner_handles_[index] = std::coroutine_handle<>{h};
692  
            ++index;
588  
            ++index;
693  
        }
589  
        }
694  

590  

695 -
        // After last post, state_ and this may be destroyed.
 
696 -
        // Use raw pointer/count captured before posting.
 
697  
        // Phase 2: Post all runners. Any may complete synchronously.
591  
        // Phase 2: Post all runners. Any may complete synchronously.
698  
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
592  
        std::coroutine_handle<>* handles = state_->runner_handles_.data();
699  
        std::size_t count = state_->runner_handles_.size();
593  
        std::size_t count = state_->runner_handles_.size();
700  
        for(std::size_t i = 0; i < count; ++i)
594  
        for(std::size_t i = 0; i < count; ++i)
701  
            caller_env->executor.post(handles[i]);
595  
            caller_env->executor.post(handles[i]);
702  

596  

703  
        return std::noop_coroutine();
597  
        return std::noop_coroutine();
704  
    }
598  
    }
705  

599  

706 -
    void await_resume() const noexcept
600 +
    void await_resume() const noexcept {}
707 -
    {
 
708 -
    }
 
709  
};
601  
};
710  

602  

711  
} // namespace detail
603  
} // namespace detail
712  

604  

713 -
/** Wait for the first awaitable to complete (range overload).
605 +
/** Race a range of io_result-returning awaitables (non-void payloads).
714 -

 
715 -
    Races a range of awaitables with the same result type. Accepts any
 
716 -
    sized input range of IoAwaitable types, enabling use with arrays,
 
717 -
    spans, or custom containers.
 
718 -

 
719 -
    @par Suspends
 
720 -
    The calling coroutine suspends when co_await is invoked. All awaitables
 
721 -
    in the range are launched concurrently and execute in parallel. The
 
722 -
    coroutine resumes only after all awaitables have completed, even though
 
723 -
    the winner is determined by the first to finish.
 
724  

606  

725 -
    @par Completion Conditions
607 +
    Only a child returning !ec can win. Errors and exceptions do not
726 -
    @li Winner is determined when the first awaitable completes (success or exception)
608 +
    claim winner status. If all children fail, the last failure
727 -
    @li Only one task can claim winner status via atomic compare-exchange
609 +
    is reported — either the last error_code at variant index 0,
728 -
    @li Once a winner exists, stop is requested for all remaining siblings
610 +
    or the last exception rethrown.
729 -
    @li Parent coroutine resumes only after all siblings acknowledge completion
 
730 -
    @li The winner's index and result are returned; if the winner threw, the exception is rethrown
 
731  

611  

732 -
    @par Cancellation Semantics
612 +
    @param awaitables Range of io_result-returning awaitables (must
733 -
    Cancellation is supported via stop_token propagated through the
613 +
        not be empty).
734 -
    IoAwaitable protocol:
 
735 -
    @li Each child awaitable receives a stop_token derived from a shared stop_source
 
736 -
    @li When the parent's stop token is activated, the stop is forwarded to all children
 
737 -
    @li When a winner is determined, stop_source_.request_stop() is called immediately
 
738 -
    @li Siblings must handle cancellation gracefully and complete before parent resumes
 
739 -
    @li Stop requests are cooperative; tasks must check and respond to them
 
740  

614  

741 -
    @par Concurrency/Overlap
615 +
    @return A task yielding variant<error_code, pair<size_t, PayloadT>>
742 -
    All awaitables are launched concurrently before any can complete.
616 +
        where index 0 is failure and index 1 carries the winner's
743 -
    The launcher iterates through the range, starting each task on the
617 +
        index and payload.
744 -
    caller's executor. Tasks may execute in parallel on multi-threaded
 
745 -
    executors or interleave on single-threaded executors. There is no
 
746 -
    guaranteed ordering of task completion.
 
747  

618  

748 -
    @par Notable Error Conditions
619 +
    @throws std::invalid_argument if range is empty.
749 -
    @li Empty range: throws std::invalid_argument immediately (not via co_return)
620 +
    @throws Rethrows last exception when no winner and the last
750 -
    @li Winner exception: if the winning task threw, that exception is rethrown
621 +
        failure was an exception.
751 -
    @li Non-winner exceptions: silently discarded (only winner's result matters)
 
752 -
    @li Cancellation: tasks may complete via cancellation without throwing
 
753  

622  

754  
    @par Example
623  
    @par Example
755  
    @code
624  
    @code
756 -
    task<void> example() {
625 +
    task<void> example()
757 -
        std::array<task<Response>, 3> requests = {
626 +
    {
758 -
            fetch_from_server(0),
627 +
        std::vector<io_task<size_t>> reads;
759 -
            fetch_from_server(1),
628 +
        for (auto& buf : buffers)
760 -
            fetch_from_server(2)
629 +
            reads.push_back(stream.read_some(buf));
761 -
        };
 
762 -

 
763 -
        auto [index, response] = co_await when_any(std::move(requests));
 
764 -
    }
 
765 -
    @endcode
 
766 -

 
767 -
    @par Example with Vector
 
768 -
    @code
 
769 -
    task<Response> fetch_fastest(std::vector<Server> const& servers) {
 
770 -
        std::vector<task<Response>> requests;
 
771 -
        for (auto const& server : servers)
 
772 -
            requests.push_back(fetch_from(server));
 
773  

630  

774 -
        auto [index, response] = co_await when_any(std::move(requests));
631 +
        auto result = co_await when_any(std::move(reads));
775 -
        co_return response;
632 +
        if (result.index() == 1)
 
633 +
        {
 
634 +
            auto [idx, n] = std::get<1>(result);
 
635 +
        }
776  
    }
636  
    }
777  
    @endcode
637  
    @endcode
778  

638  

779 -
    @tparam R Range type satisfying IoAwaitableRange.
639 +
    @see IoAwaitableRange, when_any
780 -
    @param awaitables Range of awaitables to race concurrently (must not be empty).
 
781 -
    @return A task yielding a pair of (winner_index, result).
 
782 -

 
783 -
    @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
 
784 -
    @throws Rethrows the winner's exception if the winning task threw an exception.
 
785 -

 
786 -
    @par Remarks
 
787 -
    Elements are moved from the range; for lvalue ranges, the original
 
788 -
    container will have moved-from elements after this call. The range
 
789 -
    is moved onto the coroutine frame to ensure lifetime safety. Unlike
 
790 -
    the variadic overload, no variant wrapper is needed since all tasks
 
791 -
    share the same return type.
 
792 -

 
793 -
    @see when_any, IoAwaitableRange
 
794  
*/
640  
*/
795  
template<IoAwaitableRange R>
641  
template<IoAwaitableRange R>
796 -
    requires (!std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>)
642 +
    requires detail::is_io_result_v<
 
643 +
        awaitable_result_t<std::ranges::range_value_t<R>>>
 
644 +
    && (!std::is_same_v<
 
645 +
            detail::io_result_payload_t<
 
646 +
                awaitable_result_t<std::ranges::range_value_t<R>>>,
 
647 +
            std::tuple<>>)
797  
[[nodiscard]] auto when_any(R&& awaitables)
648  
[[nodiscard]] auto when_any(R&& awaitables)
798 -
    -> task<std::pair<std::size_t, awaitable_result_t<std::ranges::range_value_t<R>>>>
649 +
    -> task<std::variant<std::error_code,
 
650 +
        std::pair<std::size_t,
 
651 +
            detail::io_result_payload_t<
 
652 +
                awaitable_result_t<std::ranges::range_value_t<R>>>>>>
799  
{
653  
{
800  
    using Awaitable = std::ranges::range_value_t<R>;
654  
    using Awaitable = std::ranges::range_value_t<R>;
801 -
    using T = awaitable_result_t<Awaitable>;
655 +
    using PayloadT = detail::io_result_payload_t<
802 -
    using result_type = std::pair<std::size_t, T>;
656 +
        awaitable_result_t<Awaitable>>;
 
657 +
    using result_type = std::variant<std::error_code,
 
658 +
        std::pair<std::size_t, PayloadT>>;
803  
    using OwnedRange = std::remove_cvref_t<R>;
659  
    using OwnedRange = std::remove_cvref_t<R>;
804  

660  

805  
    auto count = std::ranges::size(awaitables);
661  
    auto count = std::ranges::size(awaitables);
806  
    if(count == 0)
662  
    if(count == 0)
807  
        throw std::invalid_argument("when_any requires at least one awaitable");
663  
        throw std::invalid_argument("when_any requires at least one awaitable");
808 -
    // Move/copy range onto coroutine frame to ensure lifetime
 
809  

664  

810  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
665  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
811  

666  

812 -
    detail::when_any_homogeneous_state<T> state(count);
667 +
    detail::when_any_io_homogeneous_state<PayloadT> state(count);
813  

668  

814 -
    co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
669 +
    co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
 
670 +
        &owned_awaitables, &state);
815  

671  

816 -
    if(state.core_.winner_exception_)
672 +
    // Winner found
817 -
        std::rethrow_exception(state.core_.winner_exception_);
673 +
    if(state.core_.has_winner_.load(std::memory_order_acquire))
 
674 +
    {
 
675 +
        if(state.core_.winner_exception_)
 
676 +
            std::rethrow_exception(state.core_.winner_exception_);
 
677 +
        co_return result_type{std::in_place_index<1>,
 
678 +
            std::pair{state.core_.winner_index_, std::move(*state.result_)}};
 
679 +
    }
818  

680  

819 -
    co_return result_type{state.core_.winner_index_, std::move(*state.result_)};
681 +
    // No winner — report last failure
 
682 +
    if(state.last_exception_)
 
683 +
        std::rethrow_exception(state.last_exception_);
 
684 +
    co_return result_type{std::in_place_index<0>, state.last_error_};
820  
}
685  
}
821  

686  

822 -
/** Wait for the first awaitable to complete (void range overload).
687 +
/** Race a range of void io_result-returning awaitables.
823 -

 
824 -
    Races a range of void-returning awaitables. Since void awaitables have
 
825 -
    no result value, only the winner's index is returned.
 
826 -

 
827 -
    @par Suspends
 
828 -
    The calling coroutine suspends when co_await is invoked. All awaitables
 
829 -
    in the range are launched concurrently and execute in parallel. The
 
830 -
    coroutine resumes only after all awaitables have completed, even though
 
831 -
    the winner is determined by the first to finish.
 
832  

688  

833 -
    @par Completion Conditions
689 +
    Only a child returning !ec can win. Returns the winner's index
834 -
    @li Winner is determined when the first awaitable completes (success or exception)
690 +
    at variant index 1, or error_code at index 0 on all-fail.
835 -
    @li Only one task can claim winner status via atomic compare-exchange
 
836 -
    @li Once a winner exists, stop is requested for all remaining siblings
 
837 -
    @li Parent coroutine resumes only after all siblings acknowledge completion
 
838 -
    @li The winner's index is returned; if the winner threw, the exception is rethrown
 
839  

691  

840 -
    @par Cancellation Semantics
692 +
    @param awaitables Range of io_result<>-returning awaitables (must
841 -
    Cancellation is supported via stop_token propagated through the
693 +
        not be empty).
842 -
    IoAwaitable protocol:
 
843 -
    @li Each child awaitable receives a stop_token derived from a shared stop_source
 
844 -
    @li When the parent's stop token is activated, the stop is forwarded to all children
 
845 -
    @li When a winner is determined, stop_source_.request_stop() is called immediately
 
846 -
    @li Siblings must handle cancellation gracefully and complete before parent resumes
 
847 -
    @li Stop requests are cooperative; tasks must check and respond to them
 
848  

694  

849 -
    @par Concurrency/Overlap
695 +
    @return A task yielding variant<error_code, size_t> where index 0
850 -
    All awaitables are launched concurrently before any can complete.
696 +
        is failure and index 1 carries the winner's index.
851 -
    The launcher iterates through the range, starting each task on the
 
852 -
    caller's executor. Tasks may execute in parallel on multi-threaded
 
853 -
    executors or interleave on single-threaded executors. There is no
 
854 -
    guaranteed ordering of task completion.
 
855  

697  

856 -
    @par Notable Error Conditions
698 +
    @throws std::invalid_argument if range is empty.
857 -
    @li Empty range: throws std::invalid_argument immediately (not via co_return)
699 +
    @throws Rethrows first exception when no winner and at least one
858 -
    @li Winner exception: if the winning task threw, that exception is rethrown
700 +
        child threw.
859 -
    @li Non-winner exceptions: silently discarded (only winner's result matters)
 
860 -
    @li Cancellation: tasks may complete via cancellation without throwing
 
861  

701  

862  
    @par Example
702  
    @par Example
863  
    @code
703  
    @code
864 -
    task<void> example() {
704 +
    task<void> example()
865 -
        std::vector<task<void>> tasks;
705 +
    {
866 -
        for (int i = 0; i < 5; ++i)
706 +
        std::vector<io_task<>> jobs;
867 -
            tasks.push_back(background_work(i));
707 +
        jobs.push_back(background_work_a());
868 -

708 +
        jobs.push_back(background_work_b());
869 -
        std::size_t winner = co_await when_any(std::move(tasks));
 
870 -
        // winner is the index of the first task to complete
 
871 -
    }
 
872 -
    @endcode
 
873 -

 
874 -
    @par Example with Timeout
 
875 -
    @code
 
876 -
    task<void> with_timeout() {
 
877 -
        std::vector<task<void>> tasks;
 
878 -
        tasks.push_back(long_running_operation());
 
879 -
        tasks.push_back(delay(std::chrono::seconds(5)));
 
880  

709  

881 -
        std::size_t winner = co_await when_any(std::move(tasks));
710 +
        auto result = co_await when_any(std::move(jobs));
882 -
        if (winner == 1) {
711 +
        if (result.index() == 1)
883 -
            // Timeout occurred
712 +
        {
 
713 +
            auto winner = std::get<1>(result);
884  
        }
714  
        }
885  
    }
715  
    }
886  
    @endcode
716  
    @endcode
887  

717  

888 -
    @tparam R Range type satisfying IoAwaitableRange with void result.
718 +
    @see IoAwaitableRange, when_any
889 -
    @param awaitables Range of void awaitables to race concurrently (must not be empty).
 
890 -
    @return A task yielding the winner's index (zero-based).
 
891 -

 
892 -
    @throws std::invalid_argument if range is empty (thrown before coroutine suspends).
 
893 -
    @throws Rethrows the winner's exception if the winning task threw an exception.
 
894 -

 
895 -
    @par Remarks
 
896 -
    Elements are moved from the range; for lvalue ranges, the original
 
897 -
    container will have moved-from elements after this call. The range
 
898 -
    is moved onto the coroutine frame to ensure lifetime safety. Unlike
 
899 -
    the non-void overload, no result storage is needed since void tasks
 
900 -
    produce no value.
 
901 -

 
902 -
    @see when_any, IoAwaitableRange
 
903  
*/
719  
*/
904  
template<IoAwaitableRange R>
720  
template<IoAwaitableRange R>
905 -
    requires std::is_void_v<awaitable_result_t<std::ranges::range_value_t<R>>>
721 +
    requires detail::is_io_result_v<
906 -
[[nodiscard]] auto when_any(R&& awaitables) -> task<std::size_t>
722 +
        awaitable_result_t<std::ranges::range_value_t<R>>>
 
723 +
    && std::is_same_v<
 
724 +
            detail::io_result_payload_t<
 
725 +
                awaitable_result_t<std::ranges::range_value_t<R>>>,
 
726 +
            std::tuple<>>
 
727 +
[[nodiscard]] auto when_any(R&& awaitables)
 
728 +
    -> task<std::variant<std::error_code, std::size_t>>
907  
{
729  
{
908  
    using OwnedRange = std::remove_cvref_t<R>;
730  
    using OwnedRange = std::remove_cvref_t<R>;
 
731 +
    using result_type = std::variant<std::error_code, std::size_t>;
909  

732  

910  
    auto count = std::ranges::size(awaitables);
733  
    auto count = std::ranges::size(awaitables);
911  
    if(count == 0)
734  
    if(count == 0)
912  
        throw std::invalid_argument("when_any requires at least one awaitable");
735  
        throw std::invalid_argument("when_any requires at least one awaitable");
913 -
    // Move/copy range onto coroutine frame to ensure lifetime
 
914  

736  

915  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
737  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
916  

738  

917 -
    detail::when_any_homogeneous_state<void> state(count);
739 +
    detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
918  

740  

919 -
    co_await detail::when_any_homogeneous_launcher<OwnedRange>(&owned_awaitables, &state);
741 +
    co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
 
742 +
        &owned_awaitables, &state);
920  

743  

 
744 +
    // Winner found
 
745 +
    if(state.core_.has_winner_.load(std::memory_order_acquire))
 
746 +
    {
 
747 +
        if(state.core_.winner_exception_)
 
748 +
            std::rethrow_exception(state.core_.winner_exception_);
 
749 +
        co_return result_type{std::in_place_index<1>,
 
750 +
            state.core_.winner_index_};
 
751 +
    }
 
752 +

 
753 +
    // No winner — report last failure
 
754 +
    if(state.last_exception_)
 
755 +
        std::rethrow_exception(state.last_exception_);
 
756 +
    co_return result_type{std::in_place_index<0>, state.last_error_};
 
757 +
}
 
758 +

 
759 +
/** Race io_result-returning awaitables, selecting the first success.
 
760 +

 
761 +
    Overload selected when all children return io_result<Ts...>.
 
762 +
    Only a child returning !ec can win. Errors and exceptions do
 
763 +
    not claim winner status.
 
764 +

 
765 +
    @return A task yielding variant<error_code, R1, ..., Rn> where
 
766 +
        index 0 is the failure/no-winner case and index i+1
 
767 +
        identifies the winning child.
 
768 +
*/
 
769 +
template<IoAwaitable... As>
 
770 +
    requires (sizeof...(As) > 0)
 
771 +
          && detail::all_io_result_awaitables<As...>
 
772 +
[[nodiscard]] auto when_any(As... as)
 
773 +
    -> task<std::variant<
 
774 +
        std::error_code,
 
775 +
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
 
776 +
{
 
777 +
    using result_type = std::variant<
 
778 +
        std::error_code,
 
779 +
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
 
780 +

 
781 +
    detail::when_any_io_state<
 
782 +
        detail::io_result_payload_t<awaitable_result_t<As>>...> state;
 
783 +
    std::tuple<As...> awaitable_tuple(std::move(as)...);
 
784 +

 
785 +
    co_await detail::when_any_io_launcher<As...>(
 
786 +
        &awaitable_tuple, &state);
 
787 +

 
788 +
    // Winner found: return their result
 
789 +
    if(state.result_.has_value())
 
790 +
        co_return std::move(*state.result_);
 
791 +

 
792 +
    // Winner claimed but payload construction failed
921  
    if(state.core_.winner_exception_)
793  
    if(state.core_.winner_exception_)
922  
        std::rethrow_exception(state.core_.winner_exception_);
794  
        std::rethrow_exception(state.core_.winner_exception_);
923  

795  

924 -
    co_return state.core_.winner_index_;
796 +
    // No winner — report last failure
 
797 +
    if(state.last_exception_)
 
798 +
        std::rethrow_exception(state.last_exception_);
 
799 +
    co_return result_type{std::in_place_index<0>, state.last_error_};
925  
}
800  
}
926  

801  

927  
} // namespace capy
802  
} // namespace capy
928  
} // namespace boost
803  
} // namespace boost
929  

804  

930  
#endif
805  
#endif