TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 : #define BOOST_CAPY_WHEN_ALL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/io_result_combinators.hpp>
15 : #include <boost/capy/concept/executor.hpp>
16 : #include <boost/capy/concept/io_awaitable.hpp>
17 : #include <coroutine>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <boost/capy/ex/frame_allocator.hpp>
20 : #include <boost/capy/task.hpp>
21 :
22 : #include <array>
23 : #include <atomic>
24 : #include <exception>
25 : #include <optional>
26 : #include <ranges>
27 : #include <stdexcept>
28 : #include <stop_token>
29 : #include <tuple>
30 : #include <type_traits>
31 : #include <utility>
32 : #include <vector>
33 :
34 : namespace boost {
35 : namespace capy {
36 :
37 : namespace detail {
38 :
39 : /** Holds the result of a single task within when_all.
40 : */
41 : template<typename T>
42 : struct result_holder
43 : {
44 : std::optional<T> value_;
45 :
46 HIT 81 : void set(T v)
47 : {
48 81 : value_ = std::move(v);
49 81 : }
50 :
51 69 : T get() &&
52 : {
53 69 : return std::move(*value_);
54 : }
55 : };
56 :
57 : /** Core shared state for when_all operations.
58 :
59 : Contains all members and methods common to both heterogeneous (variadic)
60 : and homogeneous (range) when_all implementations. State classes embed
61 : this via composition to avoid CRTP destructor ordering issues.
62 :
63 : @par Thread Safety
64 : Atomic operations protect exception capture and completion count.
65 : */
66 : struct when_all_core
67 : {
68 : std::atomic<std::size_t> remaining_count_;
69 :
70 : // Exception storage - first error wins, others discarded
71 : std::atomic<bool> has_exception_{false};
72 : std::exception_ptr first_exception_;
73 :
74 : std::stop_source stop_source_;
75 :
76 : // Bridges parent's stop token to our stop_source
77 : struct stop_callback_fn
78 : {
79 : std::stop_source* source_;
80 MIS 0 : void operator()() const { source_->request_stop(); }
81 : };
82 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
83 : std::optional<stop_callback_t> parent_stop_callback_;
84 :
85 : std::coroutine_handle<> continuation_;
86 : io_env const* caller_env_ = nullptr;
87 :
88 HIT 71 : explicit when_all_core(std::size_t count) noexcept
89 71 : : remaining_count_(count)
90 : {
91 71 : }
92 :
93 : /** Capture an exception (first one wins). */
94 17 : void capture_exception(std::exception_ptr ep)
95 : {
96 17 : bool expected = false;
97 17 : if(has_exception_.compare_exchange_strong(
98 : expected, true, std::memory_order_relaxed))
99 16 : first_exception_ = ep;
100 17 : }
101 : };
102 :
103 : /** Shared state for heterogeneous when_all (variadic overload).
104 :
105 : @tparam Ts The result types of the tasks.
106 : */
107 : template<typename... Ts>
108 : struct when_all_state
109 : {
110 : static constexpr std::size_t task_count = sizeof...(Ts);
111 :
112 : when_all_core core_;
113 : std::tuple<result_holder<Ts>...> results_;
114 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
115 :
116 : std::atomic<bool> has_error_{false};
117 : std::error_code first_error_;
118 :
119 49 : when_all_state()
120 49 : : core_(task_count)
121 : {
122 49 : }
123 :
124 : /** Record the first error (subsequent errors are discarded). */
125 43 : void record_error(std::error_code ec)
126 : {
127 43 : bool expected = false;
128 43 : if(has_error_.compare_exchange_strong(
129 : expected, true, std::memory_order_relaxed))
130 29 : first_error_ = ec;
131 43 : }
132 : };
133 :
134 : /** Shared state for homogeneous when_all (range overload).
135 :
136 : Stores extracted io_result payloads in a vector indexed by task
137 : position. Tracks the first error_code for error propagation.
138 :
139 : @tparam T The payload type extracted from io_result.
140 : */
141 : template<typename T>
142 : struct when_all_homogeneous_state
143 : {
144 : when_all_core core_;
145 : std::vector<std::optional<T>> results_;
146 : std::vector<std::coroutine_handle<>> runner_handles_;
147 :
148 : std::atomic<bool> has_error_{false};
149 : std::error_code first_error_;
150 :
151 11 : explicit when_all_homogeneous_state(std::size_t count)
152 11 : : core_(count)
153 22 : , results_(count)
154 33 : , runner_handles_(count)
155 : {
156 11 : }
157 :
158 16 : void set_result(std::size_t index, T value)
159 : {
160 16 : results_[index].emplace(std::move(value));
161 16 : }
162 :
163 : /** Record the first error (subsequent errors are discarded). */
164 7 : void record_error(std::error_code ec)
165 : {
166 7 : bool expected = false;
167 7 : if(has_error_.compare_exchange_strong(
168 : expected, true, std::memory_order_relaxed))
169 5 : first_error_ = ec;
170 7 : }
171 : };
172 :
173 : /** Specialization for void io_result children (no payload storage). */
174 : template<>
175 : struct when_all_homogeneous_state<std::tuple<>>
176 : {
177 : when_all_core core_;
178 : std::vector<std::coroutine_handle<>> runner_handles_;
179 :
180 : std::atomic<bool> has_error_{false};
181 : std::error_code first_error_;
182 :
183 3 : explicit when_all_homogeneous_state(std::size_t count)
184 3 : : core_(count)
185 9 : , runner_handles_(count)
186 : {
187 3 : }
188 :
189 : /** Record the first error (subsequent errors are discarded). */
190 1 : void record_error(std::error_code ec)
191 : {
192 1 : bool expected = false;
193 1 : if(has_error_.compare_exchange_strong(
194 : expected, true, std::memory_order_relaxed))
195 1 : first_error_ = ec;
196 1 : }
197 : };
198 :
199 : /** Wrapper coroutine that intercepts task completion for when_all.
200 :
201 : Parameterized on StateType to work with both heterogeneous (variadic)
202 : and homogeneous (range) state types. All state types expose their
203 : shared members through a `core_` member of type when_all_core.
204 :
205 : @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
206 : */
207 : template<typename StateType>
208 : struct when_all_runner
209 : {
210 : struct promise_type
211 : {
212 : StateType* state_ = nullptr;
213 : std::size_t index_ = 0;
214 : io_env env_;
215 :
216 143 : when_all_runner get_return_object() noexcept
217 : {
218 : return when_all_runner(
219 143 : std::coroutine_handle<promise_type>::from_promise(*this));
220 : }
221 :
222 143 : std::suspend_always initial_suspend() noexcept
223 : {
224 143 : return {};
225 : }
226 :
227 143 : auto final_suspend() noexcept
228 : {
229 : struct awaiter
230 : {
231 : promise_type* p_;
232 143 : bool await_ready() const noexcept { return false; }
233 143 : auto await_suspend(std::coroutine_handle<> h) noexcept
234 : {
235 143 : auto& core = p_->state_->core_;
236 143 : auto* counter = &core.remaining_count_;
237 143 : auto* caller_env = core.caller_env_;
238 143 : auto cont = core.continuation_;
239 :
240 143 : h.destroy();
241 :
242 143 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
243 143 : if(remaining == 1)
244 71 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
245 72 : return detail::symmetric_transfer(std::noop_coroutine());
246 : }
247 MIS 0 : void await_resume() const noexcept {}
248 : };
249 HIT 143 : return awaiter{this};
250 : }
251 :
252 126 : void return_void() noexcept {}
253 :
254 17 : void unhandled_exception()
255 : {
256 17 : state_->core_.capture_exception(std::current_exception());
257 17 : state_->core_.stop_source_.request_stop();
258 17 : }
259 :
260 : template<class Awaitable>
261 : struct transform_awaiter
262 : {
263 : std::decay_t<Awaitable> a_;
264 : promise_type* p_;
265 :
266 143 : bool await_ready() { return a_.await_ready(); }
267 143 : decltype(auto) await_resume() { return a_.await_resume(); }
268 :
269 : template<class Promise>
270 142 : auto await_suspend(std::coroutine_handle<Promise> h)
271 : {
272 : using R = decltype(a_.await_suspend(h, &p_->env_));
273 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
274 142 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
275 : else
276 : return a_.await_suspend(h, &p_->env_);
277 : }
278 : };
279 :
280 : template<class Awaitable>
281 143 : auto await_transform(Awaitable&& a)
282 : {
283 : using A = std::decay_t<Awaitable>;
284 : if constexpr (IoAwaitable<A>)
285 : {
286 : return transform_awaiter<Awaitable>{
287 286 : std::forward<Awaitable>(a), this};
288 : }
289 : else
290 : {
291 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
292 : }
293 143 : }
294 : };
295 :
296 : std::coroutine_handle<promise_type> h_;
297 :
298 143 : explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
299 143 : : h_(h)
300 : {
301 143 : }
302 :
303 : // Enable move for all clang versions - some versions need it
304 : when_all_runner(when_all_runner&& other) noexcept
305 : : h_(std::exchange(other.h_, nullptr))
306 : {
307 : }
308 :
309 : when_all_runner(when_all_runner const&) = delete;
310 : when_all_runner& operator=(when_all_runner const&) = delete;
311 : when_all_runner& operator=(when_all_runner&&) = delete;
312 :
313 143 : auto release() noexcept
314 : {
315 143 : return std::exchange(h_, nullptr);
316 : }
317 : };
318 :
319 : /** Create an io_result-aware runner for a single awaitable (range path).
320 :
321 : Checks the error code, records errors and requests stop on failure,
322 : or extracts the payload on success.
323 : */
324 : template<IoAwaitable Awaitable, typename StateType>
325 : when_all_runner<StateType>
326 32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
327 : {
328 : auto result = co_await std::move(inner);
329 :
330 : if(result.ec)
331 : {
332 : state->record_error(result.ec);
333 : state->core_.stop_source_.request_stop();
334 : }
335 : else
336 : {
337 : using PayloadT = io_result_payload_t<
338 : awaitable_result_t<Awaitable>>;
339 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
340 : {
341 : state->set_result(index,
342 : extract_io_payload(std::move(result)));
343 : }
344 : }
345 64 : }
346 :
347 : /** Create a runner for io_result children that requests stop on ec. */
348 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
349 : when_all_runner<when_all_state<Ts...>>
350 95 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
351 : {
352 : auto result = co_await std::move(inner);
353 : auto ec = result.ec;
354 : std::get<Index>(state->results_).set(std::move(result));
355 :
356 : if(ec)
357 : {
358 : state->record_error(ec);
359 : state->core_.stop_source_.request_stop();
360 : }
361 190 : }
362 :
363 : /** Launcher that uses io_result-aware runners. */
364 : template<IoAwaitable... Awaitables>
365 : class when_all_io_launcher
366 : {
367 : using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
368 :
369 : std::tuple<Awaitables...>* awaitables_;
370 : state_type* state_;
371 :
372 : public:
373 49 : when_all_io_launcher(
374 : std::tuple<Awaitables...>* awaitables,
375 : state_type* state)
376 49 : : awaitables_(awaitables)
377 49 : , state_(state)
378 : {
379 49 : }
380 :
381 49 : bool await_ready() const noexcept
382 : {
383 49 : return sizeof...(Awaitables) == 0;
384 : }
385 :
386 49 : std::coroutine_handle<> await_suspend(
387 : std::coroutine_handle<> continuation, io_env const* caller_env)
388 : {
389 49 : state_->core_.continuation_ = continuation;
390 49 : state_->core_.caller_env_ = caller_env;
391 :
392 49 : if(caller_env->stop_token.stop_possible())
393 : {
394 MIS 0 : state_->core_.parent_stop_callback_.emplace(
395 0 : caller_env->stop_token,
396 0 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
397 :
398 0 : if(caller_env->stop_token.stop_requested())
399 0 : state_->core_.stop_source_.request_stop();
400 : }
401 :
402 HIT 49 : auto token = state_->core_.stop_source_.get_token();
403 46 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
404 49 : (..., launch_one<Is>(caller_env->executor, token));
405 49 : }(std::index_sequence_for<Awaitables...>{});
406 :
407 98 : return std::noop_coroutine();
408 49 : }
409 :
410 49 : void await_resume() const noexcept {}
411 :
412 : private:
413 : template<std::size_t I>
414 95 : void launch_one(executor_ref caller_ex, std::stop_token token)
415 : {
416 95 : auto runner = make_when_all_io_runner<I>(
417 95 : std::move(std::get<I>(*awaitables_)), state_);
418 :
419 95 : auto h = runner.release();
420 95 : h.promise().state_ = state_;
421 95 : h.promise().env_ = io_env{caller_ex, token,
422 95 : state_->core_.caller_env_->frame_allocator};
423 :
424 95 : std::coroutine_handle<> ch{h};
425 95 : state_->runner_handles_[I] = ch;
426 95 : state_->core_.caller_env_->executor.post(ch);
427 190 : }
428 : };
429 :
430 : /** Helper to extract a single result from state.
431 : This is a separate function to work around a GCC-11 ICE that occurs
432 : when using nested immediately-invoked lambdas with pack expansion.
433 : */
434 : template<std::size_t I, typename... Ts>
435 69 : auto extract_single_result(when_all_state<Ts...>& state)
436 : {
437 69 : return std::move(std::get<I>(state.results_)).get();
438 : }
439 :
440 : /** Extract all results from state as a tuple.
441 : */
442 : template<typename... Ts>
443 36 : auto extract_results(when_all_state<Ts...>& state)
444 : {
445 55 : return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
446 36 : return std::tuple(extract_single_result<Is>(state)...);
447 72 : }(std::index_sequence_for<Ts...>{});
448 : }
449 :
450 : /** Launches all homogeneous runners concurrently.
451 :
452 : Two-phase approach: create all runners first, then post all.
453 : This avoids lifetime issues if a task completes synchronously.
454 : */
455 : template<typename Range>
456 : class when_all_homogeneous_launcher
457 : {
458 : using Awaitable = std::ranges::range_value_t<Range>;
459 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
460 :
461 : Range* range_;
462 : when_all_homogeneous_state<PayloadT>* state_;
463 :
464 : public:
465 14 : when_all_homogeneous_launcher(
466 : Range* range,
467 : when_all_homogeneous_state<PayloadT>* state)
468 14 : : range_(range)
469 14 : , state_(state)
470 : {
471 14 : }
472 :
473 14 : bool await_ready() const noexcept
474 : {
475 14 : return std::ranges::empty(*range_);
476 : }
477 :
478 14 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
479 : {
480 14 : state_->core_.continuation_ = continuation;
481 14 : state_->core_.caller_env_ = caller_env;
482 :
483 14 : if(caller_env->stop_token.stop_possible())
484 : {
485 2 : state_->core_.parent_stop_callback_.emplace(
486 1 : caller_env->stop_token,
487 1 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
488 :
489 1 : if(caller_env->stop_token.stop_requested())
490 MIS 0 : state_->core_.stop_source_.request_stop();
491 : }
492 :
493 HIT 14 : auto token = state_->core_.stop_source_.get_token();
494 :
495 : // Phase 1: Create all runners without dispatching.
496 14 : std::size_t index = 0;
497 46 : for(auto&& a : *range_)
498 : {
499 32 : auto runner = make_when_all_homogeneous_runner(
500 32 : std::move(a), state_, index);
501 :
502 32 : auto h = runner.release();
503 32 : h.promise().state_ = state_;
504 32 : h.promise().index_ = index;
505 32 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
506 :
507 32 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
508 32 : ++index;
509 : }
510 :
511 : // Phase 2: Post all runners. Any may complete synchronously.
512 : // After last post, state_ and this may be destroyed.
513 14 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
514 14 : std::size_t count = state_->runner_handles_.size();
515 46 : for(std::size_t i = 0; i < count; ++i)
516 32 : caller_env->executor.post(handles[i]);
517 :
518 28 : return std::noop_coroutine();
519 46 : }
520 :
521 14 : void await_resume() const noexcept
522 : {
523 14 : }
524 : };
525 :
526 : } // namespace detail
527 :
528 : /** Execute a range of io_result-returning awaitables concurrently.
529 :
530 : Launches all awaitables simultaneously and waits for all to complete.
531 : On success, extracted payloads are collected in a vector preserving
532 : input order. The first error_code cancels siblings and is propagated
533 : in the outer io_result. Exceptions always beat error codes.
534 :
535 : @li All child awaitables run concurrently on the caller's executor
536 : @li Payloads are returned as a vector in input order
537 : @li First error_code wins and cancels siblings
538 : @li Exception always beats error_code
539 : @li Completes only after all children have finished
540 :
541 : @par Thread Safety
542 : The returned task must be awaited from a single execution context.
543 : Child awaitables execute concurrently but complete through the caller's
544 : executor.
545 :
546 : @param awaitables Range of io_result-returning awaitables to execute
547 : concurrently (must not be empty).
548 :
549 : @return A task yielding io_result<vector<PayloadT>> where PayloadT
550 : is the payload extracted from each child's io_result.
551 :
552 : @throws std::invalid_argument if range is empty (thrown before
553 : coroutine suspends).
554 : @throws Rethrows the first child exception after all children
555 : complete (exception beats error_code).
556 :
557 : @par Example
558 : @code
559 : task<void> example()
560 : {
561 : std::vector<io_task<size_t>> reads;
562 : for (auto& buf : buffers)
563 : reads.push_back(stream.read_some(buf));
564 :
565 : auto [ec, counts] = co_await when_all(std::move(reads));
566 : if (ec) { // handle error
567 : }
568 : }
569 : @endcode
570 :
571 : @see IoAwaitableRange, when_all
572 : */
573 : template<IoAwaitableRange R>
574 : requires detail::is_io_result_v<
575 : awaitable_result_t<std::ranges::range_value_t<R>>>
576 : && (!std::is_same_v<
577 : detail::io_result_payload_t<
578 : awaitable_result_t<std::ranges::range_value_t<R>>>,
579 : std::tuple<>>)
580 12 : [[nodiscard]] auto when_all(R&& awaitables)
581 : -> task<io_result<std::vector<
582 : detail::io_result_payload_t<
583 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
584 : {
585 : using Awaitable = std::ranges::range_value_t<R>;
586 : using PayloadT = detail::io_result_payload_t<
587 : awaitable_result_t<Awaitable>>;
588 : using OwnedRange = std::remove_cvref_t<R>;
589 :
590 : auto count = std::ranges::size(awaitables);
591 : if(count == 0)
592 : throw std::invalid_argument("when_all requires at least one awaitable");
593 :
594 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
595 :
596 : detail::when_all_homogeneous_state<PayloadT> state(count);
597 :
598 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
599 : &owned_awaitables, &state);
600 :
601 : if(state.core_.first_exception_)
602 : std::rethrow_exception(state.core_.first_exception_);
603 :
604 : if(state.has_error_.load(std::memory_order_relaxed))
605 : co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
606 :
607 : std::vector<PayloadT> results;
608 : results.reserve(count);
609 : for(auto& opt : state.results_)
610 : results.push_back(std::move(*opt));
611 :
612 : co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
613 24 : }
614 :
615 : /** Execute a range of void io_result-returning awaitables concurrently.
616 :
617 : Launches all awaitables simultaneously and waits for all to complete.
618 : Since all awaitables return io_result<>, no payload values are
619 : collected. The first error_code cancels siblings and is propagated.
620 : Exceptions always beat error codes.
621 :
622 : @param awaitables Range of io_result<>-returning awaitables to
623 : execute concurrently (must not be empty).
624 :
625 : @return A task yielding io_result<> whose ec is the first child
626 : error, or default-constructed on success.
627 :
628 : @throws std::invalid_argument if range is empty.
629 : @throws Rethrows the first child exception after all children
630 : complete (exception beats error_code).
631 :
632 : @par Example
633 : @code
634 : task<void> example()
635 : {
636 : std::vector<io_task<>> jobs;
637 : for (int i = 0; i < n; ++i)
638 : jobs.push_back(process(i));
639 :
640 : auto [ec] = co_await when_all(std::move(jobs));
641 : }
642 : @endcode
643 :
644 : @see IoAwaitableRange, when_all
645 : */
646 : template<IoAwaitableRange R>
647 : requires detail::is_io_result_v<
648 : awaitable_result_t<std::ranges::range_value_t<R>>>
649 : && std::is_same_v<
650 : detail::io_result_payload_t<
651 : awaitable_result_t<std::ranges::range_value_t<R>>>,
652 : std::tuple<>>
653 4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
654 : {
655 : using OwnedRange = std::remove_cvref_t<R>;
656 :
657 : auto count = std::ranges::size(awaitables);
658 : if(count == 0)
659 : throw std::invalid_argument("when_all requires at least one awaitable");
660 :
661 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
662 :
663 : detail::when_all_homogeneous_state<std::tuple<>> state(count);
664 :
665 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
666 : &owned_awaitables, &state);
667 :
668 : if(state.core_.first_exception_)
669 : std::rethrow_exception(state.core_.first_exception_);
670 :
671 : if(state.has_error_.load(std::memory_order_relaxed))
672 : co_return io_result<>{state.first_error_};
673 :
674 : co_return io_result<>{};
675 8 : }
676 :
677 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
678 :
679 : Overload selected when all children return io_result<Ts...>.
680 : The error_code is lifted out of each child into a single outer
681 : io_result. On success all values are returned; on failure the
682 : first error_code wins.
683 :
684 : @par Exception Safety
685 : Exception always beats error_code. If any child throws, the
686 : exception is rethrown regardless of error_code results.
687 :
688 : @param awaitables One or more awaitables each returning
689 : io_result<Ts...>.
690 :
691 : @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
692 : follows the payload flattening rules.
693 : */
694 : template<IoAwaitable... As>
695 : requires (sizeof...(As) > 0)
696 : && detail::all_io_result_awaitables<As...>
697 49 : [[nodiscard]] auto when_all(As... awaitables)
698 : -> task<io_result<
699 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
700 : {
701 : using result_type = io_result<
702 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
703 :
704 : detail::when_all_state<awaitable_result_t<As>...> state;
705 : std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
706 :
707 : co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
708 :
709 : // Exception always wins over error_code
710 : if(state.core_.first_exception_)
711 : std::rethrow_exception(state.core_.first_exception_);
712 :
713 : auto r = detail::build_when_all_io_result<result_type>(
714 : detail::extract_results(state));
715 : if(state.has_error_.load(std::memory_order_relaxed))
716 : r.ec = state.first_error_;
717 : co_return r;
718 98 : }
719 :
720 : } // namespace capy
721 : } // namespace boost
722 :
723 : #endif
|