LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.6 % 166 152 14
Test Date: 2026-03-19 01:23:19 Functions: 90.4 % 146 132 14

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      12                 : #define BOOST_CAPY_WHEN_ANY_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/detail/io_result_combinators.hpp>
      16                 : #include <boost/capy/concept/executor.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <coroutine>
      19                 : #include <boost/capy/ex/executor_ref.hpp>
      20                 : #include <boost/capy/ex/frame_allocator.hpp>
      21                 : #include <boost/capy/ex/io_env.hpp>
      22                 : #include <boost/capy/task.hpp>
      23                 : 
      24                 : #include <array>
      25                 : #include <atomic>
      26                 : #include <exception>
      27                 : #include <mutex>
      28                 : #include <optional>
      29                 : #include <ranges>
      30                 : #include <stdexcept>
      31                 : #include <stop_token>
      32                 : #include <tuple>
      33                 : #include <type_traits>
      34                 : #include <utility>
      35                 : #include <variant>
      36                 : #include <vector>
      37                 : 
      38                 : /*
      39                 :    when_any - Race multiple io_result tasks, select first success
      40                 :    =============================================================
      41                 : 
      42                 :    OVERVIEW:
      43                 :    ---------
      44                 :    when_any launches N io_result-returning tasks concurrently. A task
      45                 :    wins by returning !ec; errors and exceptions do not win. Once a
      46                 :    winner is found, stop is requested for siblings and the winner's
      47                 :    payload is returned. If no winner exists (all fail), the first
      48                 :    error_code is returned or the last exception is rethrown.
      49                 : 
      50                 :    ARCHITECTURE:
      51                 :    -------------
      52                 :    The design mirrors when_all but with inverted completion semantics:
      53                 : 
      54                 :      when_all:  complete when remaining_count reaches 0 (all done)
      55                 :      when_any:  complete when has_winner becomes true (first done)
      56                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      57                 : 
      58                 :    Key components:
      59                 :      - when_any_core:    Shared state tracking winner and completion
      60                 :      - when_any_io_runner: Wrapper coroutine for each child task
      61                 :      - when_any_io_launcher/when_any_io_homogeneous_launcher:
      62                 :                           Awaitables that start all runners concurrently
      63                 : 
      64                 :    CRITICAL INVARIANTS:
      65                 :    --------------------
      66                 :    1. Only a task returning !ec can become the winner (via atomic CAS)
      67                 :    2. All tasks must complete before parent resumes (cleanup safety)
      68                 :    3. Stop is requested immediately when winner is determined
      69                 :    4. Exceptions and errors do not claim winner status
      70                 : 
      71                 :    POSITIONAL VARIANT:
      72                 :    -------------------
      73                 :    The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
      74                 :    Index 0 is error_code (failure/no-winner). Index 1..N identifies the
      75                 :    winning child and carries its payload.
      76                 : 
      77                 :    RANGE OVERLOAD:
      78                 :    ---------------
      79                 :    The range overload returns variant<error_code, pair<size_t, T>> for
      80                 :    non-void children or variant<error_code, size_t> for void children.
      81                 : 
      82                 :    MEMORY MODEL:
      83                 :    -------------
      84                 :    Synchronization chain from winner's write to parent's read:
      85                 : 
      86                 :    1. Winner thread writes result_ (non-atomic)
      87                 :    2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
      88                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      89                 :       -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      90                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      91                 :    5. Parent coroutine resumes and reads result_
      92                 : 
      93                 :    Synchronization analysis:
      94                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      95                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      96                 :      in the modification order of remaining_count_
      97                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
      98                 :      modification order, establishing happens-before from winner's writes
      99                 :    - Executor dispatch() is expected to provide queue-based synchronization
     100                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     101                 :    - Even inline executors work (same thread = sequenced-before)
     102                 : 
     103                 :    EXCEPTION SEMANTICS:
     104                 :    --------------------
     105                 :    Exceptions do NOT claim winner status. If a child throws, the exception
     106                 :    is recorded but the combinator keeps waiting for a success. Only when
     107                 :    all children complete without a winner does the combinator check: if
     108                 :    any exception was recorded, it is rethrown (exception beats error_code).
     109                 : */
     110                 : 
     111                 : namespace boost {
     112                 : namespace capy {
     113                 : 
     114                 : namespace detail {
     115                 : 
     116                 : /** Core shared state for when_any operations.
     117                 : 
     118                 :     Contains all members and methods common to both heterogeneous (variadic)
     119                 :     and homogeneous (range) when_any implementations. State classes embed
     120                 :     this via composition to avoid CRTP destructor ordering issues.
     121                 : 
     122                 :     @par Thread Safety
     123                 :     Atomic operations protect winner selection and completion count.
     124                 : */
     125                 : struct when_any_core
     126                 : {
     127                 :     std::atomic<std::size_t> remaining_count_;
     128                 :     std::size_t winner_index_{0};
     129                 :     std::exception_ptr winner_exception_;
     130                 :     std::stop_source stop_source_;
     131                 : 
     132                 :     // Bridges parent's stop token to our stop_source
     133                 :     struct stop_callback_fn
     134                 :     {
     135                 :         std::stop_source* source_;
     136 HIT           1 :         void operator()() const noexcept { source_->request_stop(); }
     137                 :     };
     138                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     139                 :     std::optional<stop_callback_t> parent_stop_callback_;
     140                 : 
     141                 :     std::coroutine_handle<> continuation_;
     142                 :     io_env const* caller_env_ = nullptr;
     143                 : 
     144                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     145                 :     std::atomic<bool> has_winner_{false};
     146                 : 
     147              29 :     explicit when_any_core(std::size_t count) noexcept
     148              29 :         : remaining_count_(count)
     149                 :     {
     150              29 :     }
     151                 : 
     152                 :     /** Atomically claim winner status; exactly one task succeeds. */
     153              51 :     bool try_win(std::size_t index) noexcept
     154                 :     {
     155              51 :         bool expected = false;
     156              51 :         if(has_winner_.compare_exchange_strong(
     157                 :             expected, true, std::memory_order_acq_rel))
     158                 :         {
     159              21 :             winner_index_ = index;
     160              21 :             stop_source_.request_stop();
     161              21 :             return true;
     162                 :         }
     163              30 :         return false;
     164                 :     }
     165                 : 
     166                 :     /** @pre try_win() returned true. */
     167 MIS           0 :     void set_winner_exception(std::exception_ptr ep) noexcept
     168                 :     {
     169               0 :         winner_exception_ = ep;
     170               0 :     }
     171                 : 
     172                 :     // Runners signal completion directly via final_suspend; no member function needed.
     173                 : };
     174                 : 
     175                 : } // namespace detail
     176                 : 
     177                 : namespace detail {
     178                 : 
     179                 : // State for io_result-aware when_any: only !ec wins.
     180                 : template<typename... Ts>
     181                 : struct when_any_io_state
     182                 : {
     183                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     184                 :     using variant_type = std::variant<std::error_code, Ts...>;
     185                 : 
     186                 :     when_any_core core_;
     187                 :     std::optional<variant_type> result_;
     188                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     189                 : 
     190                 :     // Last failure (error or exception) for the all-fail case.
     191                 :     // Last writer wins — no priority between errors and exceptions.
     192                 :     std::mutex failure_mu_;
     193                 :     std::error_code last_error_;
     194                 :     std::exception_ptr last_exception_;
     195                 : 
     196 HIT          14 :     when_any_io_state()
     197              14 :         : core_(task_count)
     198                 :     {
     199              14 :     }
     200                 : 
     201              12 :     void record_error(std::error_code ec)
     202                 :     {
     203              12 :         std::lock_guard lk(failure_mu_);
     204              12 :         last_error_ = ec;
     205              12 :         last_exception_ = nullptr;
     206              12 :     }
     207                 : 
     208               4 :     void record_exception(std::exception_ptr ep)
     209                 :     {
     210               4 :         std::lock_guard lk(failure_mu_);
     211               4 :         last_exception_ = ep;
     212               4 :         last_error_ = {};
     213               4 :     }
     214                 : };
     215                 : 
     216                 : // Wrapper coroutine for io_result-aware when_any children.
     217                 : // unhandled_exception records the exception but does NOT claim winner status.
     218                 : template<typename StateType>
     219                 : struct when_any_io_runner
     220                 : {
     221                 :     struct promise_type
     222                 :     {
     223                 :         StateType* state_ = nullptr;
     224                 :         std::size_t index_ = 0;
     225                 :         io_env env_;
     226                 : 
     227              78 :         when_any_io_runner get_return_object() noexcept
     228                 :         {
     229                 :             return when_any_io_runner(
     230              78 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     231                 :         }
     232                 : 
     233              78 :         std::suspend_always initial_suspend() noexcept { return {}; }
     234                 : 
     235              78 :         auto final_suspend() noexcept
     236                 :         {
     237                 :             struct awaiter
     238                 :             {
     239                 :                 promise_type* p_;
     240              78 :                 bool await_ready() const noexcept { return false; }
     241              78 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     242                 :                 {
     243              78 :                     auto& core = p_->state_->core_;
     244              78 :                     auto* counter = &core.remaining_count_;
     245              78 :                     auto* caller_env = core.caller_env_;
     246              78 :                     auto cont = core.continuation_;
     247                 : 
     248              78 :                     h.destroy();
     249                 : 
     250              78 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     251              78 :                     if(remaining == 1)
     252              29 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     253              49 :                     return detail::symmetric_transfer(std::noop_coroutine());
     254                 :                 }
     255 MIS           0 :                 void await_resume() const noexcept {}
     256                 :             };
     257 HIT          78 :             return awaiter{this};
     258                 :         }
     259                 : 
     260              70 :         void return_void() noexcept {}
     261                 : 
     262                 :         // Exceptions do NOT win in io_result when_any
     263               8 :         void unhandled_exception()
     264                 :         {
     265               8 :             state_->record_exception(std::current_exception());
     266               8 :         }
     267                 : 
     268                 :         template<class Awaitable>
     269                 :         struct transform_awaiter
     270                 :         {
     271                 :             std::decay_t<Awaitable> a_;
     272                 :             promise_type* p_;
     273                 : 
     274              78 :             bool await_ready() { return a_.await_ready(); }
     275              78 :             decltype(auto) await_resume() { return a_.await_resume(); }
     276                 : 
     277                 :             template<class Promise>
     278              77 :             auto await_suspend(std::coroutine_handle<Promise> h)
     279                 :             {
     280                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     281                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     282              77 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     283                 :                 else
     284                 :                     return a_.await_suspend(h, &p_->env_);
     285                 :             }
     286                 :         };
     287                 : 
     288                 :         template<class Awaitable>
     289              78 :         auto await_transform(Awaitable&& a)
     290                 :         {
     291                 :             using A = std::decay_t<Awaitable>;
     292                 :             if constexpr (IoAwaitable<A>)
     293                 :             {
     294                 :                 return transform_awaiter<Awaitable>{
     295             155 :                     std::forward<Awaitable>(a), this};
     296                 :             }
     297                 :             else
     298                 :             {
     299                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     300                 :             }
     301              77 :         }
     302                 :     };
     303                 : 
     304                 :     std::coroutine_handle<promise_type> h_;
     305                 : 
     306              78 :     explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
     307              78 :         : h_(h)
     308                 :     {
     309              78 :     }
     310                 : 
     311                 :     when_any_io_runner(when_any_io_runner&& other) noexcept
     312                 :         : h_(std::exchange(other.h_, nullptr))
     313                 :     {
     314                 :     }
     315                 : 
     316                 :     when_any_io_runner(when_any_io_runner const&) = delete;
     317                 :     when_any_io_runner& operator=(when_any_io_runner const&) = delete;
     318                 :     when_any_io_runner& operator=(when_any_io_runner&&) = delete;
     319                 : 
     320              78 :     auto release() noexcept
     321                 :     {
     322              78 :         return std::exchange(h_, nullptr);
     323                 :     }
     324                 : };
     325                 : 
     326                 : // Runner coroutine: only tries to win when the child returns !ec.
     327                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     328                 : when_any_io_runner<StateType>
     329              26 : make_when_any_io_runner(Awaitable inner, StateType* state)
     330                 : {
     331                 :     auto result = co_await std::move(inner);
     332                 : 
     333                 :     if(!result.ec)
     334                 :     {
     335                 :         // Success: try to claim winner
     336                 :         if(state->core_.try_win(I))
     337                 :         {
     338                 :             try
     339                 :             {
     340                 :                 state->result_.emplace(
     341                 :                     std::in_place_index<I + 1>,
     342                 :                     detail::extract_io_payload(std::move(result)));
     343                 :             }
     344                 :             catch(...)
     345                 :             {
     346                 :                 state->core_.set_winner_exception(std::current_exception());
     347                 :             }
     348                 :         }
     349                 :     }
     350                 :     else
     351                 :     {
     352                 :         // Error: record but don't win
     353                 :         state->record_error(result.ec);
     354                 :     }
     355              52 : }
     356                 : 
     357                 : // Launcher for io_result-aware when_any.
     358                 : template<IoAwaitable... Awaitables>
     359                 : class when_any_io_launcher
     360                 : {
     361                 :     using state_type = when_any_io_state<
     362                 :         io_result_payload_t<awaitable_result_t<Awaitables>>...>;
     363                 : 
     364                 :     std::tuple<Awaitables...>* tasks_;
     365                 :     state_type* state_;
     366                 : 
     367                 : public:
     368              14 :     when_any_io_launcher(
     369                 :         std::tuple<Awaitables...>* tasks,
     370                 :         state_type* state)
     371              14 :         : tasks_(tasks)
     372              14 :         , state_(state)
     373                 :     {
     374              14 :     }
     375                 : 
     376              14 :     bool await_ready() const noexcept
     377                 :     {
     378              14 :         return sizeof...(Awaitables) == 0;
     379                 :     }
     380                 : 
     381              14 :     std::coroutine_handle<> await_suspend(
     382                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     383                 :     {
     384              14 :         state_->core_.continuation_ = continuation;
     385              14 :         state_->core_.caller_env_ = caller_env;
     386                 : 
     387              14 :         if(caller_env->stop_token.stop_possible())
     388                 :         {
     389 MIS           0 :             state_->core_.parent_stop_callback_.emplace(
     390               0 :                 caller_env->stop_token,
     391               0 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     392                 : 
     393               0 :             if(caller_env->stop_token.stop_requested())
     394               0 :                 state_->core_.stop_source_.request_stop();
     395                 :         }
     396                 : 
     397 HIT          14 :         auto token = state_->core_.stop_source_.get_token();
     398              28 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     399              14 :             (..., launch_one<Is>(caller_env->executor, token));
     400              14 :         }(std::index_sequence_for<Awaitables...>{});
     401                 : 
     402              28 :         return std::noop_coroutine();
     403              14 :     }
     404                 : 
     405              14 :     void await_resume() const noexcept {}
     406                 : 
     407                 : private:
     408                 :     template<std::size_t I>
     409              26 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     410                 :     {
     411              26 :         auto runner = make_when_any_io_runner<I>(
     412              26 :             std::move(std::get<I>(*tasks_)), state_);
     413                 : 
     414              26 :         auto h = runner.release();
     415              26 :         h.promise().state_ = state_;
     416              26 :         h.promise().index_ = I;
     417              26 :         h.promise().env_ = io_env{caller_ex, token,
     418              26 :             state_->core_.caller_env_->frame_allocator};
     419                 : 
     420              26 :         std::coroutine_handle<> ch{h};
     421              26 :         state_->runner_handles_[I] = ch;
     422              26 :         caller_ex.post(ch);
     423              52 :     }
     424                 : };
     425                 : 
     426                 : /** Shared state for homogeneous io_result-aware when_any (range overload).
     427                 : 
     428                 :     @tparam T The payload type extracted from io_result.
     429                 : */
     430                 : template<typename T>
     431                 : struct when_any_io_homogeneous_state
     432                 : {
     433                 :     when_any_core core_;
     434                 :     std::optional<T> result_;
     435                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     436                 : 
     437                 :     std::mutex failure_mu_;
     438                 :     std::error_code last_error_;
     439                 :     std::exception_ptr last_exception_;
     440                 : 
     441              13 :     explicit when_any_io_homogeneous_state(std::size_t count)
     442              13 :         : core_(count)
     443              39 :         , runner_handles_(count)
     444                 :     {
     445              13 :     }
     446                 : 
     447               6 :     void record_error(std::error_code ec)
     448                 :     {
     449               6 :         std::lock_guard lk(failure_mu_);
     450               6 :         last_error_ = ec;
     451               6 :         last_exception_ = nullptr;
     452               6 :     }
     453                 : 
     454               4 :     void record_exception(std::exception_ptr ep)
     455                 :     {
     456               4 :         std::lock_guard lk(failure_mu_);
     457               4 :         last_exception_ = ep;
     458               4 :         last_error_ = {};
     459               4 :     }
     460                 : };
     461                 : 
     462                 : /** Specialization for void io_result children (no payload storage). */
     463                 : template<>
     464                 : struct when_any_io_homogeneous_state<std::tuple<>>
     465                 : {
     466                 :     when_any_core core_;
     467                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     468                 : 
     469                 :     std::mutex failure_mu_;
     470                 :     std::error_code last_error_;
     471                 :     std::exception_ptr last_exception_;
     472                 : 
     473               2 :     explicit when_any_io_homogeneous_state(std::size_t count)
     474               2 :         : core_(count)
     475               6 :         , runner_handles_(count)
     476                 :     {
     477               2 :     }
     478                 : 
     479               1 :     void record_error(std::error_code ec)
     480                 :     {
     481               1 :         std::lock_guard lk(failure_mu_);
     482               1 :         last_error_ = ec;
     483               1 :         last_exception_ = nullptr;
     484               1 :     }
     485                 : 
     486 MIS           0 :     void record_exception(std::exception_ptr ep)
     487                 :     {
     488               0 :         std::lock_guard lk(failure_mu_);
     489               0 :         last_exception_ = ep;
     490               0 :         last_error_ = {};
     491               0 :     }
     492                 : };
     493                 : 
     494                 : /** Create an io_result-aware runner for homogeneous when_any (range path).
     495                 : 
     496                 :     Only tries to win when the child returns !ec.
     497                 : */
     498                 : template<IoAwaitable Awaitable, typename StateType>
     499                 : when_any_io_runner<StateType>
     500 HIT          52 : make_when_any_io_homogeneous_runner(
     501                 :     Awaitable inner, StateType* state, std::size_t index)
     502                 : {
     503                 :     auto result = co_await std::move(inner);
     504                 : 
     505                 :     if(!result.ec)
     506                 :     {
     507                 :         if(state->core_.try_win(index))
     508                 :         {
     509                 :             using PayloadT = io_result_payload_t<
     510                 :                 awaitable_result_t<Awaitable>>;
     511                 :             if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     512                 :             {
     513                 :                 try
     514                 :                 {
     515                 :                     state->result_.emplace(
     516                 :                         extract_io_payload(std::move(result)));
     517                 :                 }
     518                 :                 catch(...)
     519                 :                 {
     520                 :                     state->core_.set_winner_exception(
     521                 :                         std::current_exception());
     522                 :                 }
     523                 :             }
     524                 :         }
     525                 :     }
     526                 :     else
     527                 :     {
     528                 :         state->record_error(result.ec);
     529                 :     }
     530             104 : }
     531                 : 
     532                 : /** Launches all io_result-aware homogeneous runners concurrently. */
     533                 : template<IoAwaitableRange Range>
     534                 : class when_any_io_homogeneous_launcher
     535                 : {
     536                 :     using Awaitable = std::ranges::range_value_t<Range>;
     537                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     538                 : 
     539                 :     Range* range_;
     540                 :     when_any_io_homogeneous_state<PayloadT>* state_;
     541                 : 
     542                 : public:
     543              15 :     when_any_io_homogeneous_launcher(
     544                 :         Range* range,
     545                 :         when_any_io_homogeneous_state<PayloadT>* state)
     546              15 :         : range_(range)
     547              15 :         , state_(state)
     548                 :     {
     549              15 :     }
     550                 : 
     551              15 :     bool await_ready() const noexcept
     552                 :     {
     553              15 :         return std::ranges::empty(*range_);
     554                 :     }
     555                 : 
     556              15 :     std::coroutine_handle<> await_suspend(
     557                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     558                 :     {
     559              15 :         state_->core_.continuation_ = continuation;
     560              15 :         state_->core_.caller_env_ = caller_env;
     561                 : 
     562              15 :         if(caller_env->stop_token.stop_possible())
     563                 :         {
     564               4 :             state_->core_.parent_stop_callback_.emplace(
     565               2 :                 caller_env->stop_token,
     566               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     567                 : 
     568               2 :             if(caller_env->stop_token.stop_requested())
     569               1 :                 state_->core_.stop_source_.request_stop();
     570                 :         }
     571                 : 
     572              15 :         auto token = state_->core_.stop_source_.get_token();
     573                 : 
     574                 :         // Phase 1: Create all runners without dispatching.
     575              15 :         std::size_t index = 0;
     576              67 :         for(auto&& a : *range_)
     577                 :         {
     578              52 :             auto runner = make_when_any_io_homogeneous_runner(
     579              52 :                 std::move(a), state_, index);
     580                 : 
     581              52 :             auto h = runner.release();
     582              52 :             h.promise().state_ = state_;
     583              52 :             h.promise().index_ = index;
     584              52 :             h.promise().env_ = io_env{caller_env->executor, token,
     585              52 :                 caller_env->frame_allocator};
     586                 : 
     587              52 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     588              52 :             ++index;
     589                 :         }
     590                 : 
     591                 :         // Phase 2: Post all runners. Any may complete synchronously.
     592              15 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     593              15 :         std::size_t count = state_->runner_handles_.size();
     594              67 :         for(std::size_t i = 0; i < count; ++i)
     595              52 :             caller_env->executor.post(handles[i]);
     596                 : 
     597              30 :         return std::noop_coroutine();
     598              67 :     }
     599                 : 
     600              15 :     void await_resume() const noexcept {}
     601                 : };
     602                 : 
     603                 : } // namespace detail
     604                 : 
     605                 : /** Race a range of io_result-returning awaitables (non-void payloads).
     606                 : 
     607                 :     Only a child returning !ec can win. Errors and exceptions do not
     608                 :     claim winner status. If all children fail, the last failure
     609                 :     is reported — either the last error_code at variant index 0,
     610                 :     or the last exception rethrown.
     611                 : 
     612                 :     @param awaitables Range of io_result-returning awaitables (must
     613                 :         not be empty).
     614                 : 
     615                 :     @return A task yielding variant<error_code, pair<size_t, PayloadT>>
     616                 :         where index 0 is failure and index 1 carries the winner's
     617                 :         index and payload.
     618                 : 
     619                 :     @throws std::invalid_argument if range is empty.
     620                 :     @throws Rethrows last exception when no winner and the last
     621                 :         failure was an exception.
     622                 : 
     623                 :     @par Example
     624                 :     @code
     625                 :     task<void> example()
     626                 :     {
     627                 :         std::vector<io_task<size_t>> reads;
     628                 :         for (auto& buf : buffers)
     629                 :             reads.push_back(stream.read_some(buf));
     630                 : 
     631                 :         auto result = co_await when_any(std::move(reads));
     632                 :         if (result.index() == 1)
     633                 :         {
     634                 :             auto [idx, n] = std::get<1>(result);
     635                 :         }
     636                 :     }
     637                 :     @endcode
     638                 : 
     639                 :     @see IoAwaitableRange, when_any
     640                 : */
     641                 : template<IoAwaitableRange R>
     642                 :     requires detail::is_io_result_v<
     643                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     644                 :     && (!std::is_same_v<
     645                 :             detail::io_result_payload_t<
     646                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     647                 :             std::tuple<>>)
     648              14 : [[nodiscard]] auto when_any(R&& awaitables)
     649                 :     -> task<std::variant<std::error_code,
     650                 :         std::pair<std::size_t,
     651                 :             detail::io_result_payload_t<
     652                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     653                 : {
     654                 :     using Awaitable = std::ranges::range_value_t<R>;
     655                 :     using PayloadT = detail::io_result_payload_t<
     656                 :         awaitable_result_t<Awaitable>>;
     657                 :     using result_type = std::variant<std::error_code,
     658                 :         std::pair<std::size_t, PayloadT>>;
     659                 :     using OwnedRange = std::remove_cvref_t<R>;
     660                 : 
     661                 :     auto count = std::ranges::size(awaitables);
     662                 :     if(count == 0)
     663                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     664                 : 
     665                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     666                 : 
     667                 :     detail::when_any_io_homogeneous_state<PayloadT> state(count);
     668                 : 
     669                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     670                 :         &owned_awaitables, &state);
     671                 : 
     672                 :     // Winner found
     673                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     674                 :     {
     675                 :         if(state.core_.winner_exception_)
     676                 :             std::rethrow_exception(state.core_.winner_exception_);
     677                 :         co_return result_type{std::in_place_index<1>,
     678                 :             std::pair{state.core_.winner_index_, std::move(*state.result_)}};
     679                 :     }
     680                 : 
     681                 :     // No winner — report last failure
     682                 :     if(state.last_exception_)
     683                 :         std::rethrow_exception(state.last_exception_);
     684                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     685              28 : }
     686                 : 
     687                 : /** Race a range of void io_result-returning awaitables.
     688                 : 
     689                 :     Only a child returning !ec can win. Returns the winner's index
     690                 :     at variant index 1, or error_code at index 0 on all-fail.
     691                 : 
     692                 :     @param awaitables Range of io_result<>-returning awaitables (must
     693                 :         not be empty).
     694                 : 
     695                 :     @return A task yielding variant<error_code, size_t> where index 0
     696                 :         is failure and index 1 carries the winner's index.
     697                 : 
     698                 :     @throws std::invalid_argument if range is empty.
     699                 :     @throws Rethrows first exception when no winner and at least one
     700                 :         child threw.
     701                 : 
     702                 :     @par Example
     703                 :     @code
     704                 :     task<void> example()
     705                 :     {
     706                 :         std::vector<io_task<>> jobs;
     707                 :         jobs.push_back(background_work_a());
     708                 :         jobs.push_back(background_work_b());
     709                 : 
     710                 :         auto result = co_await when_any(std::move(jobs));
     711                 :         if (result.index() == 1)
     712                 :         {
     713                 :             auto winner = std::get<1>(result);
     714                 :         }
     715                 :     }
     716                 :     @endcode
     717                 : 
     718                 :     @see IoAwaitableRange, when_any
     719                 : */
     720                 : template<IoAwaitableRange R>
     721                 :     requires detail::is_io_result_v<
     722                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     723                 :     && std::is_same_v<
     724                 :             detail::io_result_payload_t<
     725                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     726                 :             std::tuple<>>
     727               2 : [[nodiscard]] auto when_any(R&& awaitables)
     728                 :     -> task<std::variant<std::error_code, std::size_t>>
     729                 : {
     730                 :     using OwnedRange = std::remove_cvref_t<R>;
     731                 :     using result_type = std::variant<std::error_code, std::size_t>;
     732                 : 
     733                 :     auto count = std::ranges::size(awaitables);
     734                 :     if(count == 0)
     735                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     736                 : 
     737                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     738                 : 
     739                 :     detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
     740                 : 
     741                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     742                 :         &owned_awaitables, &state);
     743                 : 
     744                 :     // Winner found
     745                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     746                 :     {
     747                 :         if(state.core_.winner_exception_)
     748                 :             std::rethrow_exception(state.core_.winner_exception_);
     749                 :         co_return result_type{std::in_place_index<1>,
     750                 :             state.core_.winner_index_};
     751                 :     }
     752                 : 
     753                 :     // No winner — report last failure
     754                 :     if(state.last_exception_)
     755                 :         std::rethrow_exception(state.last_exception_);
     756                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     757               4 : }
     758                 : 
     759                 : /** Race io_result-returning awaitables, selecting the first success.
     760                 : 
     761                 :     Overload selected when all children return io_result<Ts...>.
     762                 :     Only a child returning !ec can win. Errors and exceptions do
     763                 :     not claim winner status.
     764                 : 
     765                 :     @return A task yielding variant<error_code, R1, ..., Rn> where
     766                 :         index 0 is the failure/no-winner case and index i+1
     767                 :         identifies the winning child.
     768                 : */
     769                 : template<IoAwaitable... As>
     770                 :     requires (sizeof...(As) > 0)
     771                 :           && detail::all_io_result_awaitables<As...>
     772              14 : [[nodiscard]] auto when_any(As... as)
     773                 :     -> task<std::variant<
     774                 :         std::error_code,
     775                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     776                 : {
     777                 :     using result_type = std::variant<
     778                 :         std::error_code,
     779                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     780                 : 
     781                 :     detail::when_any_io_state<
     782                 :         detail::io_result_payload_t<awaitable_result_t<As>>...> state;
     783                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     784                 : 
     785                 :     co_await detail::when_any_io_launcher<As...>(
     786                 :         &awaitable_tuple, &state);
     787                 : 
     788                 :     // Winner found: return their result
     789                 :     if(state.result_.has_value())
     790                 :         co_return std::move(*state.result_);
     791                 : 
     792                 :     // Winner claimed but payload construction failed
     793                 :     if(state.core_.winner_exception_)
     794                 :         std::rethrow_exception(state.core_.winner_exception_);
     795                 : 
     796                 :     // No winner — report last failure
     797                 :     if(state.last_exception_)
     798                 :         std::rethrow_exception(state.last_exception_);
     799                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     800              28 : }
     801                 : 
     802                 : } // namespace capy
     803                 : } // namespace boost
     804                 : 
     805                 : #endif
        

Generated by: LCOV version 2.3