LCOV - code coverage report
Current view: top level - capy - when_all.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 95.1 % 162 154 8
Test Date: 2026-03-19 01:23:19 Functions: 92.4 % 422 390 32

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
      11                 : #define BOOST_CAPY_WHEN_ALL_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/io_result_combinators.hpp>
      15                 : #include <boost/capy/concept/executor.hpp>
      16                 : #include <boost/capy/concept/io_awaitable.hpp>
      17                 : #include <coroutine>
      18                 : #include <boost/capy/ex/io_env.hpp>
      19                 : #include <boost/capy/ex/frame_allocator.hpp>
      20                 : #include <boost/capy/task.hpp>
      21                 : 
      22                 : #include <array>
      23                 : #include <atomic>
      24                 : #include <exception>
      25                 : #include <optional>
      26                 : #include <ranges>
      27                 : #include <stdexcept>
      28                 : #include <stop_token>
      29                 : #include <tuple>
      30                 : #include <type_traits>
      31                 : #include <utility>
      32                 : #include <vector>
      33                 : 
      34                 : namespace boost {
      35                 : namespace capy {
      36                 : 
      37                 : namespace detail {
      38                 : 
      39                 : /** Holds the result of a single task within when_all.
      40                 : */
      41                 : template<typename T>
      42                 : struct result_holder
      43                 : {
      44                 :     std::optional<T> value_;
      45                 : 
      46 HIT          81 :     void set(T v)
      47                 :     {
      48              81 :         value_ = std::move(v);
      49              81 :     }
      50                 : 
      51              69 :     T get() &&
      52                 :     {
      53              69 :         return std::move(*value_);
      54                 :     }
      55                 : };
      56                 : 
      57                 : /** Core shared state for when_all operations.
      58                 : 
      59                 :     Contains all members and methods common to both heterogeneous (variadic)
      60                 :     and homogeneous (range) when_all implementations. State classes embed
      61                 :     this via composition to avoid CRTP destructor ordering issues.
      62                 : 
      63                 :     @par Thread Safety
      64                 :     Atomic operations protect exception capture and completion count.
      65                 : */
      66                 : struct when_all_core
      67                 : {
      68                 :     std::atomic<std::size_t> remaining_count_;
      69                 : 
      70                 :     // Exception storage - first error wins, others discarded
      71                 :     std::atomic<bool> has_exception_{false};
      72                 :     std::exception_ptr first_exception_;
      73                 : 
      74                 :     std::stop_source stop_source_;
      75                 : 
      76                 :     // Bridges parent's stop token to our stop_source
      77                 :     struct stop_callback_fn
      78                 :     {
      79                 :         std::stop_source* source_;
      80 MIS           0 :         void operator()() const { source_->request_stop(); }
      81                 :     };
      82                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
      83                 :     std::optional<stop_callback_t> parent_stop_callback_;
      84                 : 
      85                 :     std::coroutine_handle<> continuation_;
      86                 :     io_env const* caller_env_ = nullptr;
      87                 : 
      88 HIT          71 :     explicit when_all_core(std::size_t count) noexcept
      89              71 :         : remaining_count_(count)
      90                 :     {
      91              71 :     }
      92                 : 
      93                 :     /** Capture an exception (first one wins). */
      94              17 :     void capture_exception(std::exception_ptr ep)
      95                 :     {
      96              17 :         bool expected = false;
      97              17 :         if(has_exception_.compare_exchange_strong(
      98                 :             expected, true, std::memory_order_relaxed))
      99              16 :             first_exception_ = ep;
     100              17 :     }
     101                 : };
     102                 : 
     103                 : /** Shared state for heterogeneous when_all (variadic overload).
     104                 : 
     105                 :     @tparam Ts The result types of the tasks.
     106                 : */
     107                 : template<typename... Ts>
     108                 : struct when_all_state
     109                 : {
     110                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     111                 : 
     112                 :     when_all_core core_;
     113                 :     std::tuple<result_holder<Ts>...> results_;
     114                 :     std::array<std::coroutine_handle<>, task_count> runner_handles_{};
     115                 : 
     116                 :     std::atomic<bool> has_error_{false};
     117                 :     std::error_code first_error_;
     118                 : 
     119              49 :     when_all_state()
     120              49 :         : core_(task_count)
     121                 :     {
     122              49 :     }
     123                 : 
     124                 :     /** Record the first error (subsequent errors are discarded). */
     125              43 :     void record_error(std::error_code ec)
     126                 :     {
     127              43 :         bool expected = false;
     128              43 :         if(has_error_.compare_exchange_strong(
     129                 :             expected, true, std::memory_order_relaxed))
     130              29 :             first_error_ = ec;
     131              43 :     }
     132                 : };
     133                 : 
     134                 : /** Shared state for homogeneous when_all (range overload).
     135                 : 
     136                 :     Stores extracted io_result payloads in a vector indexed by task
     137                 :     position. Tracks the first error_code for error propagation.
     138                 : 
     139                 :     @tparam T The payload type extracted from io_result.
     140                 : */
     141                 : template<typename T>
     142                 : struct when_all_homogeneous_state
     143                 : {
     144                 :     when_all_core core_;
     145                 :     std::vector<std::optional<T>> results_;
     146                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     147                 : 
     148                 :     std::atomic<bool> has_error_{false};
     149                 :     std::error_code first_error_;
     150                 : 
     151              11 :     explicit when_all_homogeneous_state(std::size_t count)
     152              11 :         : core_(count)
     153              22 :         , results_(count)
     154              33 :         , runner_handles_(count)
     155                 :     {
     156              11 :     }
     157                 : 
     158              16 :     void set_result(std::size_t index, T value)
     159                 :     {
     160              16 :         results_[index].emplace(std::move(value));
     161              16 :     }
     162                 : 
     163                 :     /** Record the first error (subsequent errors are discarded). */
     164               7 :     void record_error(std::error_code ec)
     165                 :     {
     166               7 :         bool expected = false;
     167               7 :         if(has_error_.compare_exchange_strong(
     168                 :             expected, true, std::memory_order_relaxed))
     169               5 :             first_error_ = ec;
     170               7 :     }
     171                 : };
     172                 : 
     173                 : /** Specialization for void io_result children (no payload storage). */
     174                 : template<>
     175                 : struct when_all_homogeneous_state<std::tuple<>>
     176                 : {
     177                 :     when_all_core core_;
     178                 :     std::vector<std::coroutine_handle<>> runner_handles_;
     179                 : 
     180                 :     std::atomic<bool> has_error_{false};
     181                 :     std::error_code first_error_;
     182                 : 
     183               3 :     explicit when_all_homogeneous_state(std::size_t count)
     184               3 :         : core_(count)
     185               9 :         , runner_handles_(count)
     186                 :     {
     187               3 :     }
     188                 : 
     189                 :     /** Record the first error (subsequent errors are discarded). */
     190               1 :     void record_error(std::error_code ec)
     191                 :     {
     192               1 :         bool expected = false;
     193               1 :         if(has_error_.compare_exchange_strong(
     194                 :             expected, true, std::memory_order_relaxed))
     195               1 :             first_error_ = ec;
     196               1 :     }
     197                 : };
     198                 : 
     199                 : /** Wrapper coroutine that intercepts task completion for when_all.
     200                 : 
     201                 :     Parameterized on StateType to work with both heterogeneous (variadic)
     202                 :     and homogeneous (range) state types. All state types expose their
     203                 :     shared members through a `core_` member of type when_all_core.
     204                 : 
     205                 :     @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
     206                 : */
     207                 : template<typename StateType>
     208                 : struct when_all_runner
     209                 : {
     210                 :     struct promise_type
     211                 :     {
     212                 :         StateType* state_ = nullptr;
     213                 :         std::size_t index_ = 0;
     214                 :         io_env env_;
     215                 : 
     216             143 :         when_all_runner get_return_object() noexcept
     217                 :         {
     218                 :             return when_all_runner(
     219             143 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     220                 :         }
     221                 : 
     222             143 :         std::suspend_always initial_suspend() noexcept
     223                 :         {
     224             143 :             return {};
     225                 :         }
     226                 : 
     227             143 :         auto final_suspend() noexcept
     228                 :         {
     229                 :             struct awaiter
     230                 :             {
     231                 :                 promise_type* p_;
     232             143 :                 bool await_ready() const noexcept { return false; }
     233             143 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     234                 :                 {
     235             143 :                     auto& core = p_->state_->core_;
     236             143 :                     auto* counter = &core.remaining_count_;
     237             143 :                     auto* caller_env = core.caller_env_;
     238             143 :                     auto cont = core.continuation_;
     239                 : 
     240             143 :                     h.destroy();
     241                 : 
     242             143 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     243             143 :                     if(remaining == 1)
     244              71 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     245              72 :                     return detail::symmetric_transfer(std::noop_coroutine());
     246                 :                 }
     247 MIS           0 :                 void await_resume() const noexcept {}
     248                 :             };
     249 HIT         143 :             return awaiter{this};
     250                 :         }
     251                 : 
     252             126 :         void return_void() noexcept {}
     253                 : 
     254              17 :         void unhandled_exception()
     255                 :         {
     256              17 :             state_->core_.capture_exception(std::current_exception());
     257              17 :             state_->core_.stop_source_.request_stop();
     258              17 :         }
     259                 : 
     260                 :         template<class Awaitable>
     261                 :         struct transform_awaiter
     262                 :         {
     263                 :             std::decay_t<Awaitable> a_;
     264                 :             promise_type* p_;
     265                 : 
     266             143 :             bool await_ready() { return a_.await_ready(); }
     267             143 :             decltype(auto) await_resume() { return a_.await_resume(); }
     268                 : 
     269                 :             template<class Promise>
     270             142 :             auto await_suspend(std::coroutine_handle<Promise> h)
     271                 :             {
     272                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     273                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     274             142 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     275                 :                 else
     276                 :                     return a_.await_suspend(h, &p_->env_);
     277                 :             }
     278                 :         };
     279                 : 
     280                 :         template<class Awaitable>
     281             143 :         auto await_transform(Awaitable&& a)
     282                 :         {
     283                 :             using A = std::decay_t<Awaitable>;
     284                 :             if constexpr (IoAwaitable<A>)
     285                 :             {
     286                 :                 return transform_awaiter<Awaitable>{
     287             286 :                     std::forward<Awaitable>(a), this};
     288                 :             }
     289                 :             else
     290                 :             {
     291                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     292                 :             }
     293             143 :         }
     294                 :     };
     295                 : 
     296                 :     std::coroutine_handle<promise_type> h_;
     297                 : 
     298             143 :     explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
     299             143 :         : h_(h)
     300                 :     {
     301             143 :     }
     302                 : 
     303                 :     // Enable move for all clang versions - some versions need it
     304                 :     when_all_runner(when_all_runner&& other) noexcept
     305                 :         : h_(std::exchange(other.h_, nullptr))
     306                 :     {
     307                 :     }
     308                 : 
     309                 :     when_all_runner(when_all_runner const&) = delete;
     310                 :     when_all_runner& operator=(when_all_runner const&) = delete;
     311                 :     when_all_runner& operator=(when_all_runner&&) = delete;
     312                 : 
     313             143 :     auto release() noexcept
     314                 :     {
     315             143 :         return std::exchange(h_, nullptr);
     316                 :     }
     317                 : };
     318                 : 
     319                 : /** Create an io_result-aware runner for a single awaitable (range path).
     320                 : 
     321                 :     Checks the error code, records errors and requests stop on failure,
     322                 :     or extracts the payload on success.
     323                 : */
     324                 : template<IoAwaitable Awaitable, typename StateType>
     325                 : when_all_runner<StateType>
     326              32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
     327                 : {
     328                 :     auto result = co_await std::move(inner);
     329                 : 
     330                 :     if(result.ec)
     331                 :     {
     332                 :         state->record_error(result.ec);
     333                 :         state->core_.stop_source_.request_stop();
     334                 :     }
     335                 :     else
     336                 :     {
     337                 :         using PayloadT = io_result_payload_t<
     338                 :             awaitable_result_t<Awaitable>>;
     339                 :         if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     340                 :         {
     341                 :             state->set_result(index,
     342                 :                 extract_io_payload(std::move(result)));
     343                 :         }
     344                 :     }
     345              64 : }
     346                 : 
     347                 : /** Create a runner for io_result children that requests stop on ec. */
     348                 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
     349                 : when_all_runner<when_all_state<Ts...>>
     350              95 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
     351                 : {
     352                 :     auto result = co_await std::move(inner);
     353                 :     auto ec = result.ec;
     354                 :     std::get<Index>(state->results_).set(std::move(result));
     355                 : 
     356                 :     if(ec)
     357                 :     {
     358                 :         state->record_error(ec);
     359                 :         state->core_.stop_source_.request_stop();
     360                 :     }
     361             190 : }
     362                 : 
     363                 : /** Launcher that uses io_result-aware runners. */
     364                 : template<IoAwaitable... Awaitables>
     365                 : class when_all_io_launcher
     366                 : {
     367                 :     using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
     368                 : 
     369                 :     std::tuple<Awaitables...>* awaitables_;
     370                 :     state_type* state_;
     371                 : 
     372                 : public:
     373              49 :     when_all_io_launcher(
     374                 :         std::tuple<Awaitables...>* awaitables,
     375                 :         state_type* state)
     376              49 :         : awaitables_(awaitables)
     377              49 :         , state_(state)
     378                 :     {
     379              49 :     }
     380                 : 
     381              49 :     bool await_ready() const noexcept
     382                 :     {
     383              49 :         return sizeof...(Awaitables) == 0;
     384                 :     }
     385                 : 
     386              49 :     std::coroutine_handle<> await_suspend(
     387                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     388                 :     {
     389              49 :         state_->core_.continuation_ = continuation;
     390              49 :         state_->core_.caller_env_ = caller_env;
     391                 : 
     392              49 :         if(caller_env->stop_token.stop_possible())
     393                 :         {
     394 MIS           0 :             state_->core_.parent_stop_callback_.emplace(
     395               0 :                 caller_env->stop_token,
     396               0 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     397                 : 
     398               0 :             if(caller_env->stop_token.stop_requested())
     399               0 :                 state_->core_.stop_source_.request_stop();
     400                 :         }
     401                 : 
     402 HIT          49 :         auto token = state_->core_.stop_source_.get_token();
     403              46 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     404              49 :             (..., launch_one<Is>(caller_env->executor, token));
     405              49 :         }(std::index_sequence_for<Awaitables...>{});
     406                 : 
     407              98 :         return std::noop_coroutine();
     408              49 :     }
     409                 : 
     410              49 :     void await_resume() const noexcept {}
     411                 : 
     412                 : private:
     413                 :     template<std::size_t I>
     414              95 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     415                 :     {
     416              95 :         auto runner = make_when_all_io_runner<I>(
     417              95 :             std::move(std::get<I>(*awaitables_)), state_);
     418                 : 
     419              95 :         auto h = runner.release();
     420              95 :         h.promise().state_ = state_;
     421              95 :         h.promise().env_ = io_env{caller_ex, token,
     422              95 :             state_->core_.caller_env_->frame_allocator};
     423                 : 
     424              95 :         std::coroutine_handle<> ch{h};
     425              95 :         state_->runner_handles_[I] = ch;
     426              95 :         state_->core_.caller_env_->executor.post(ch);
     427             190 :     }
     428                 : };
     429                 : 
     430                 : /** Helper to extract a single result from state.
     431                 :     This is a separate function to work around a GCC-11 ICE that occurs
     432                 :     when using nested immediately-invoked lambdas with pack expansion.
     433                 : */
     434                 : template<std::size_t I, typename... Ts>
     435              69 : auto extract_single_result(when_all_state<Ts...>& state)
     436                 : {
     437              69 :     return std::move(std::get<I>(state.results_)).get();
     438                 : }
     439                 : 
     440                 : /** Extract all results from state as a tuple.
     441                 : */
     442                 : template<typename... Ts>
     443              36 : auto extract_results(when_all_state<Ts...>& state)
     444                 : {
     445              55 :     return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     446              36 :         return std::tuple(extract_single_result<Is>(state)...);
     447              72 :     }(std::index_sequence_for<Ts...>{});
     448                 : }
     449                 : 
     450                 : /** Launches all homogeneous runners concurrently.
     451                 : 
     452                 :     Two-phase approach: create all runners first, then post all.
     453                 :     This avoids lifetime issues if a task completes synchronously.
     454                 : */
     455                 : template<typename Range>
     456                 : class when_all_homogeneous_launcher
     457                 : {
     458                 :     using Awaitable = std::ranges::range_value_t<Range>;
     459                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     460                 : 
     461                 :     Range* range_;
     462                 :     when_all_homogeneous_state<PayloadT>* state_;
     463                 : 
     464                 : public:
     465              14 :     when_all_homogeneous_launcher(
     466                 :         Range* range,
     467                 :         when_all_homogeneous_state<PayloadT>* state)
     468              14 :         : range_(range)
     469              14 :         , state_(state)
     470                 :     {
     471              14 :     }
     472                 : 
     473              14 :     bool await_ready() const noexcept
     474                 :     {
     475              14 :         return std::ranges::empty(*range_);
     476                 :     }
     477                 : 
     478              14 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     479                 :     {
     480              14 :         state_->core_.continuation_ = continuation;
     481              14 :         state_->core_.caller_env_ = caller_env;
     482                 : 
     483              14 :         if(caller_env->stop_token.stop_possible())
     484                 :         {
     485               2 :             state_->core_.parent_stop_callback_.emplace(
     486               1 :                 caller_env->stop_token,
     487               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     488                 : 
     489               1 :             if(caller_env->stop_token.stop_requested())
     490 MIS           0 :                 state_->core_.stop_source_.request_stop();
     491                 :         }
     492                 : 
     493 HIT          14 :         auto token = state_->core_.stop_source_.get_token();
     494                 : 
     495                 :         // Phase 1: Create all runners without dispatching.
     496              14 :         std::size_t index = 0;
     497              46 :         for(auto&& a : *range_)
     498                 :         {
     499              32 :             auto runner = make_when_all_homogeneous_runner(
     500              32 :                 std::move(a), state_, index);
     501                 : 
     502              32 :             auto h = runner.release();
     503              32 :             h.promise().state_ = state_;
     504              32 :             h.promise().index_ = index;
     505              32 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     506                 : 
     507              32 :             state_->runner_handles_[index] = std::coroutine_handle<>{h};
     508              32 :             ++index;
     509                 :         }
     510                 : 
     511                 :         // Phase 2: Post all runners. Any may complete synchronously.
     512                 :         // After last post, state_ and this may be destroyed.
     513              14 :         std::coroutine_handle<>* handles = state_->runner_handles_.data();
     514              14 :         std::size_t count = state_->runner_handles_.size();
     515              46 :         for(std::size_t i = 0; i < count; ++i)
     516              32 :             caller_env->executor.post(handles[i]);
     517                 : 
     518              28 :         return std::noop_coroutine();
     519              46 :     }
     520                 : 
     521              14 :     void await_resume() const noexcept
     522                 :     {
     523              14 :     }
     524                 : };
     525                 : 
     526                 : } // namespace detail
     527                 : 
     528                 : /** Execute a range of io_result-returning awaitables concurrently.
     529                 : 
     530                 :     Launches all awaitables simultaneously and waits for all to complete.
     531                 :     On success, extracted payloads are collected in a vector preserving
     532                 :     input order. The first error_code cancels siblings and is propagated
     533                 :     in the outer io_result. Exceptions always beat error codes.
     534                 : 
     535                 :     @li All child awaitables run concurrently on the caller's executor
     536                 :     @li Payloads are returned as a vector in input order
     537                 :     @li First error_code wins and cancels siblings
     538                 :     @li Exception always beats error_code
     539                 :     @li Completes only after all children have finished
     540                 : 
     541                 :     @par Thread Safety
     542                 :     The returned task must be awaited from a single execution context.
     543                 :     Child awaitables execute concurrently but complete through the caller's
     544                 :     executor.
     545                 : 
     546                 :     @param awaitables Range of io_result-returning awaitables to execute
     547                 :         concurrently (must not be empty).
     548                 : 
     549                 :     @return A task yielding io_result<vector<PayloadT>> where PayloadT
     550                 :         is the payload extracted from each child's io_result.
     551                 : 
     552                 :     @throws std::invalid_argument if range is empty (thrown before
     553                 :         coroutine suspends).
     554                 :     @throws Rethrows the first child exception after all children
     555                 :         complete (exception beats error_code).
     556                 : 
     557                 :     @par Example
     558                 :     @code
     559                 :     task<void> example()
     560                 :     {
     561                 :         std::vector<io_task<size_t>> reads;
     562                 :         for (auto& buf : buffers)
     563                 :             reads.push_back(stream.read_some(buf));
     564                 : 
     565                 :         auto [ec, counts] = co_await when_all(std::move(reads));
     566                 :         if (ec) { // handle error
     567                 :         }
     568                 :     }
     569                 :     @endcode
     570                 : 
     571                 :     @see IoAwaitableRange, when_all
     572                 : */
     573                 : template<IoAwaitableRange R>
     574                 :     requires detail::is_io_result_v<
     575                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     576                 :     && (!std::is_same_v<
     577                 :             detail::io_result_payload_t<
     578                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     579                 :             std::tuple<>>)
     580              12 : [[nodiscard]] auto when_all(R&& awaitables)
     581                 :     -> task<io_result<std::vector<
     582                 :         detail::io_result_payload_t<
     583                 :             awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     584                 : {
     585                 :     using Awaitable = std::ranges::range_value_t<R>;
     586                 :     using PayloadT = detail::io_result_payload_t<
     587                 :         awaitable_result_t<Awaitable>>;
     588                 :     using OwnedRange = std::remove_cvref_t<R>;
     589                 : 
     590                 :     auto count = std::ranges::size(awaitables);
     591                 :     if(count == 0)
     592                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     593                 : 
     594                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     595                 : 
     596                 :     detail::when_all_homogeneous_state<PayloadT> state(count);
     597                 : 
     598                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     599                 :         &owned_awaitables, &state);
     600                 : 
     601                 :     if(state.core_.first_exception_)
     602                 :         std::rethrow_exception(state.core_.first_exception_);
     603                 : 
     604                 :     if(state.has_error_.load(std::memory_order_relaxed))
     605                 :         co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
     606                 : 
     607                 :     std::vector<PayloadT> results;
     608                 :     results.reserve(count);
     609                 :     for(auto& opt : state.results_)
     610                 :         results.push_back(std::move(*opt));
     611                 : 
     612                 :     co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
     613              24 : }
     614                 : 
     615                 : /** Execute a range of void io_result-returning awaitables concurrently.
     616                 : 
     617                 :     Launches all awaitables simultaneously and waits for all to complete.
     618                 :     Since all awaitables return io_result<>, no payload values are
     619                 :     collected. The first error_code cancels siblings and is propagated.
     620                 :     Exceptions always beat error codes.
     621                 : 
     622                 :     @param awaitables Range of io_result<>-returning awaitables to
     623                 :         execute concurrently (must not be empty).
     624                 : 
     625                 :     @return A task yielding io_result<> whose ec is the first child
     626                 :         error, or default-constructed on success.
     627                 : 
     628                 :     @throws std::invalid_argument if range is empty.
     629                 :     @throws Rethrows the first child exception after all children
     630                 :         complete (exception beats error_code).
     631                 : 
     632                 :     @par Example
     633                 :     @code
     634                 :     task<void> example()
     635                 :     {
     636                 :         std::vector<io_task<>> jobs;
     637                 :         for (int i = 0; i < n; ++i)
     638                 :             jobs.push_back(process(i));
     639                 : 
     640                 :         auto [ec] = co_await when_all(std::move(jobs));
     641                 :     }
     642                 :     @endcode
     643                 : 
     644                 :     @see IoAwaitableRange, when_all
     645                 : */
     646                 : template<IoAwaitableRange R>
     647                 :     requires detail::is_io_result_v<
     648                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     649                 :     && std::is_same_v<
     650                 :             detail::io_result_payload_t<
     651                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     652                 :             std::tuple<>>
     653               4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
     654                 : {
     655                 :     using OwnedRange = std::remove_cvref_t<R>;
     656                 : 
     657                 :     auto count = std::ranges::size(awaitables);
     658                 :     if(count == 0)
     659                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     660                 : 
     661                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     662                 : 
     663                 :     detail::when_all_homogeneous_state<std::tuple<>> state(count);
     664                 : 
     665                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     666                 :         &owned_awaitables, &state);
     667                 : 
     668                 :     if(state.core_.first_exception_)
     669                 :         std::rethrow_exception(state.core_.first_exception_);
     670                 : 
     671                 :     if(state.has_error_.load(std::memory_order_relaxed))
     672                 :         co_return io_result<>{state.first_error_};
     673                 : 
     674                 :     co_return io_result<>{};
     675               8 : }
     676                 : 
     677                 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
     678                 : 
     679                 :     Overload selected when all children return io_result<Ts...>.
     680                 :     The error_code is lifted out of each child into a single outer
     681                 :     io_result. On success all values are returned; on failure the
     682                 :     first error_code wins.
     683                 : 
     684                 :     @par Exception Safety
     685                 :     Exception always beats error_code. If any child throws, the
     686                 :     exception is rethrown regardless of error_code results.
     687                 : 
     688                 :     @param awaitables One or more awaitables each returning
     689                 :         io_result<Ts...>.
     690                 : 
     691                 :     @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
     692                 :         follows the payload flattening rules.
     693                 : */
     694                 : template<IoAwaitable... As>
     695                 :     requires (sizeof...(As) > 0)
     696                 :           && detail::all_io_result_awaitables<As...>
     697              49 : [[nodiscard]] auto when_all(As... awaitables)
     698                 :     -> task<io_result<
     699                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     700                 : {
     701                 :     using result_type = io_result<
     702                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     703                 : 
     704                 :     detail::when_all_state<awaitable_result_t<As>...> state;
     705                 :     std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
     706                 : 
     707                 :     co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
     708                 : 
     709                 :     // Exception always wins over error_code
     710                 :     if(state.core_.first_exception_)
     711                 :         std::rethrow_exception(state.core_.first_exception_);
     712                 : 
     713                 :     auto r = detail::build_when_all_io_result<result_type>(
     714                 :         detail::extract_results(state));
     715                 :     if(state.has_error_.load(std::memory_order_relaxed))
     716                 :         r.ec = state.first_error_;
     717                 :     co_return r;
     718              98 : }
     719                 : 
     720                 : } // namespace capy
     721                 : } // namespace boost
     722                 : 
     723                 : #endif
        

Generated by: LCOV version 2.3