TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
12 : #define BOOST_CAPY_WHEN_ANY_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/detail/io_result_combinators.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/ex/frame_allocator.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <array>
25 : #include <atomic>
26 : #include <exception>
27 : #include <mutex>
28 : #include <optional>
29 : #include <ranges>
30 : #include <stdexcept>
31 : #include <stop_token>
32 : #include <tuple>
33 : #include <type_traits>
34 : #include <utility>
35 : #include <variant>
36 : #include <vector>
37 :
38 : /*
39 : when_any - Race multiple io_result tasks, select first success
40 : =============================================================
41 :
42 : OVERVIEW:
43 : ---------
44 : when_any launches N io_result-returning tasks concurrently. A task
45 : wins by returning !ec; errors and exceptions do not win. Once a
46 : winner is found, stop is requested for siblings and the winner's
47 : payload is returned. If no winner exists (all fail), the first
48 : error_code is returned or the last exception is rethrown.
49 :
50 : ARCHITECTURE:
51 : -------------
52 : The design mirrors when_all but with inverted completion semantics:
53 :
54 : when_all: complete when remaining_count reaches 0 (all done)
55 : when_any: complete when has_winner becomes true (first done)
56 : BUT still wait for remaining_count to reach 0 for cleanup
57 :
58 : Key components:
59 : - when_any_core: Shared state tracking winner and completion
60 : - when_any_io_runner: Wrapper coroutine for each child task
61 : - when_any_io_launcher/when_any_io_homogeneous_launcher:
62 : Awaitables that start all runners concurrently
63 :
64 : CRITICAL INVARIANTS:
65 : --------------------
66 : 1. Only a task returning !ec can become the winner (via atomic CAS)
67 : 2. All tasks must complete before parent resumes (cleanup safety)
68 : 3. Stop is requested immediately when winner is determined
69 : 4. Exceptions and errors do not claim winner status
70 :
71 : POSITIONAL VARIANT:
72 : -------------------
73 : The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
74 : Index 0 is error_code (failure/no-winner). Index 1..N identifies the
75 : winning child and carries its payload.
76 :
77 : RANGE OVERLOAD:
78 : ---------------
79 : The range overload returns variant<error_code, pair<size_t, T>> for
80 : non-void children or variant<error_code, size_t> for void children.
81 :
82 : MEMORY MODEL:
83 : -------------
84 : Synchronization chain from winner's write to parent's read:
85 :
86 : 1. Winner thread writes result_ (non-atomic)
87 : 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
88 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
89 : -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
90 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
91 : 5. Parent coroutine resumes and reads result_
92 :
93 : Synchronization analysis:
94 : - All fetch_sub operations on remaining_count_ form a release sequence
95 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
96 : in the modification order of remaining_count_
97 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
98 : modification order, establishing happens-before from winner's writes
99 : - Executor dispatch() is expected to provide queue-based synchronization
100 : (release-on-post, acquire-on-execute) completing the chain to parent
101 : - Even inline executors work (same thread = sequenced-before)
102 :
103 : EXCEPTION SEMANTICS:
104 : --------------------
105 : Exceptions do NOT claim winner status. If a child throws, the exception
106 : is recorded but the combinator keeps waiting for a success. Only when
107 : all children complete without a winner does the combinator check: if
108 : any exception was recorded, it is rethrown (exception beats error_code).
109 : */
110 :
111 : namespace boost {
112 : namespace capy {
113 :
114 : namespace detail {
115 :
116 : /** Core shared state for when_any operations.
117 :
118 : Contains all members and methods common to both heterogeneous (variadic)
119 : and homogeneous (range) when_any implementations. State classes embed
120 : this via composition to avoid CRTP destructor ordering issues.
121 :
122 : @par Thread Safety
123 : Atomic operations protect winner selection and completion count.
124 : */
125 : struct when_any_core
126 : {
127 : std::atomic<std::size_t> remaining_count_;
128 : std::size_t winner_index_{0};
129 : std::exception_ptr winner_exception_;
130 : std::stop_source stop_source_;
131 :
132 : // Bridges parent's stop token to our stop_source
133 : struct stop_callback_fn
134 : {
135 : std::stop_source* source_;
136 HIT 1 : void operator()() const noexcept { source_->request_stop(); }
137 : };
138 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
139 : std::optional<stop_callback_t> parent_stop_callback_;
140 :
141 : std::coroutine_handle<> continuation_;
142 : io_env const* caller_env_ = nullptr;
143 :
144 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
145 : std::atomic<bool> has_winner_{false};
146 :
147 29 : explicit when_any_core(std::size_t count) noexcept
148 29 : : remaining_count_(count)
149 : {
150 29 : }
151 :
152 : /** Atomically claim winner status; exactly one task succeeds. */
153 51 : bool try_win(std::size_t index) noexcept
154 : {
155 51 : bool expected = false;
156 51 : if(has_winner_.compare_exchange_strong(
157 : expected, true, std::memory_order_acq_rel))
158 : {
159 21 : winner_index_ = index;
160 21 : stop_source_.request_stop();
161 21 : return true;
162 : }
163 30 : return false;
164 : }
165 :
166 : /** @pre try_win() returned true. */
167 MIS 0 : void set_winner_exception(std::exception_ptr ep) noexcept
168 : {
169 0 : winner_exception_ = ep;
170 0 : }
171 :
172 : // Runners signal completion directly via final_suspend; no member function needed.
173 : };
174 :
175 : } // namespace detail
176 :
177 : namespace detail {
178 :
179 : // State for io_result-aware when_any: only !ec wins.
180 : template<typename... Ts>
181 : struct when_any_io_state
182 : {
183 : static constexpr std::size_t task_count = sizeof...(Ts);
184 : using variant_type = std::variant<std::error_code, Ts...>;
185 :
186 : when_any_core core_;
187 : std::optional<variant_type> result_;
188 : std::array<std::coroutine_handle<>, task_count> runner_handles_{};
189 :
190 : // Last failure (error or exception) for the all-fail case.
191 : // Last writer wins — no priority between errors and exceptions.
192 : std::mutex failure_mu_;
193 : std::error_code last_error_;
194 : std::exception_ptr last_exception_;
195 :
196 HIT 14 : when_any_io_state()
197 14 : : core_(task_count)
198 : {
199 14 : }
200 :
201 12 : void record_error(std::error_code ec)
202 : {
203 12 : std::lock_guard lk(failure_mu_);
204 12 : last_error_ = ec;
205 12 : last_exception_ = nullptr;
206 12 : }
207 :
208 4 : void record_exception(std::exception_ptr ep)
209 : {
210 4 : std::lock_guard lk(failure_mu_);
211 4 : last_exception_ = ep;
212 4 : last_error_ = {};
213 4 : }
214 : };
215 :
216 : // Wrapper coroutine for io_result-aware when_any children.
217 : // unhandled_exception records the exception but does NOT claim winner status.
218 : template<typename StateType>
219 : struct when_any_io_runner
220 : {
221 : struct promise_type
222 : {
223 : StateType* state_ = nullptr;
224 : std::size_t index_ = 0;
225 : io_env env_;
226 :
227 78 : when_any_io_runner get_return_object() noexcept
228 : {
229 : return when_any_io_runner(
230 78 : std::coroutine_handle<promise_type>::from_promise(*this));
231 : }
232 :
233 78 : std::suspend_always initial_suspend() noexcept { return {}; }
234 :
235 78 : auto final_suspend() noexcept
236 : {
237 : struct awaiter
238 : {
239 : promise_type* p_;
240 78 : bool await_ready() const noexcept { return false; }
241 78 : auto await_suspend(std::coroutine_handle<> h) noexcept
242 : {
243 78 : auto& core = p_->state_->core_;
244 78 : auto* counter = &core.remaining_count_;
245 78 : auto* caller_env = core.caller_env_;
246 78 : auto cont = core.continuation_;
247 :
248 78 : h.destroy();
249 :
250 78 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
251 78 : if(remaining == 1)
252 29 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
253 49 : return detail::symmetric_transfer(std::noop_coroutine());
254 : }
255 MIS 0 : void await_resume() const noexcept {}
256 : };
257 HIT 78 : return awaiter{this};
258 : }
259 :
260 70 : void return_void() noexcept {}
261 :
262 : // Exceptions do NOT win in io_result when_any
263 8 : void unhandled_exception()
264 : {
265 8 : state_->record_exception(std::current_exception());
266 8 : }
267 :
268 : template<class Awaitable>
269 : struct transform_awaiter
270 : {
271 : std::decay_t<Awaitable> a_;
272 : promise_type* p_;
273 :
274 78 : bool await_ready() { return a_.await_ready(); }
275 78 : decltype(auto) await_resume() { return a_.await_resume(); }
276 :
277 : template<class Promise>
278 77 : auto await_suspend(std::coroutine_handle<Promise> h)
279 : {
280 : using R = decltype(a_.await_suspend(h, &p_->env_));
281 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
282 77 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
283 : else
284 : return a_.await_suspend(h, &p_->env_);
285 : }
286 : };
287 :
288 : template<class Awaitable>
289 78 : auto await_transform(Awaitable&& a)
290 : {
291 : using A = std::decay_t<Awaitable>;
292 : if constexpr (IoAwaitable<A>)
293 : {
294 : return transform_awaiter<Awaitable>{
295 155 : std::forward<Awaitable>(a), this};
296 : }
297 : else
298 : {
299 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
300 : }
301 77 : }
302 : };
303 :
304 : std::coroutine_handle<promise_type> h_;
305 :
306 78 : explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
307 78 : : h_(h)
308 : {
309 78 : }
310 :
311 : when_any_io_runner(when_any_io_runner&& other) noexcept
312 : : h_(std::exchange(other.h_, nullptr))
313 : {
314 : }
315 :
316 : when_any_io_runner(when_any_io_runner const&) = delete;
317 : when_any_io_runner& operator=(when_any_io_runner const&) = delete;
318 : when_any_io_runner& operator=(when_any_io_runner&&) = delete;
319 :
320 78 : auto release() noexcept
321 : {
322 78 : return std::exchange(h_, nullptr);
323 : }
324 : };
325 :
326 : // Runner coroutine: only tries to win when the child returns !ec.
327 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
328 : when_any_io_runner<StateType>
329 26 : make_when_any_io_runner(Awaitable inner, StateType* state)
330 : {
331 : auto result = co_await std::move(inner);
332 :
333 : if(!result.ec)
334 : {
335 : // Success: try to claim winner
336 : if(state->core_.try_win(I))
337 : {
338 : try
339 : {
340 : state->result_.emplace(
341 : std::in_place_index<I + 1>,
342 : detail::extract_io_payload(std::move(result)));
343 : }
344 : catch(...)
345 : {
346 : state->core_.set_winner_exception(std::current_exception());
347 : }
348 : }
349 : }
350 : else
351 : {
352 : // Error: record but don't win
353 : state->record_error(result.ec);
354 : }
355 52 : }
356 :
357 : // Launcher for io_result-aware when_any.
358 : template<IoAwaitable... Awaitables>
359 : class when_any_io_launcher
360 : {
361 : using state_type = when_any_io_state<
362 : io_result_payload_t<awaitable_result_t<Awaitables>>...>;
363 :
364 : std::tuple<Awaitables...>* tasks_;
365 : state_type* state_;
366 :
367 : public:
368 14 : when_any_io_launcher(
369 : std::tuple<Awaitables...>* tasks,
370 : state_type* state)
371 14 : : tasks_(tasks)
372 14 : , state_(state)
373 : {
374 14 : }
375 :
376 14 : bool await_ready() const noexcept
377 : {
378 14 : return sizeof...(Awaitables) == 0;
379 : }
380 :
381 14 : std::coroutine_handle<> await_suspend(
382 : std::coroutine_handle<> continuation, io_env const* caller_env)
383 : {
384 14 : state_->core_.continuation_ = continuation;
385 14 : state_->core_.caller_env_ = caller_env;
386 :
387 14 : if(caller_env->stop_token.stop_possible())
388 : {
389 MIS 0 : state_->core_.parent_stop_callback_.emplace(
390 0 : caller_env->stop_token,
391 0 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
392 :
393 0 : if(caller_env->stop_token.stop_requested())
394 0 : state_->core_.stop_source_.request_stop();
395 : }
396 :
397 HIT 14 : auto token = state_->core_.stop_source_.get_token();
398 28 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
399 14 : (..., launch_one<Is>(caller_env->executor, token));
400 14 : }(std::index_sequence_for<Awaitables...>{});
401 :
402 28 : return std::noop_coroutine();
403 14 : }
404 :
405 14 : void await_resume() const noexcept {}
406 :
407 : private:
408 : template<std::size_t I>
409 26 : void launch_one(executor_ref caller_ex, std::stop_token token)
410 : {
411 26 : auto runner = make_when_any_io_runner<I>(
412 26 : std::move(std::get<I>(*tasks_)), state_);
413 :
414 26 : auto h = runner.release();
415 26 : h.promise().state_ = state_;
416 26 : h.promise().index_ = I;
417 26 : h.promise().env_ = io_env{caller_ex, token,
418 26 : state_->core_.caller_env_->frame_allocator};
419 :
420 26 : std::coroutine_handle<> ch{h};
421 26 : state_->runner_handles_[I] = ch;
422 26 : caller_ex.post(ch);
423 52 : }
424 : };
425 :
426 : /** Shared state for homogeneous io_result-aware when_any (range overload).
427 :
428 : @tparam T The payload type extracted from io_result.
429 : */
430 : template<typename T>
431 : struct when_any_io_homogeneous_state
432 : {
433 : when_any_core core_;
434 : std::optional<T> result_;
435 : std::vector<std::coroutine_handle<>> runner_handles_;
436 :
437 : std::mutex failure_mu_;
438 : std::error_code last_error_;
439 : std::exception_ptr last_exception_;
440 :
441 13 : explicit when_any_io_homogeneous_state(std::size_t count)
442 13 : : core_(count)
443 39 : , runner_handles_(count)
444 : {
445 13 : }
446 :
447 6 : void record_error(std::error_code ec)
448 : {
449 6 : std::lock_guard lk(failure_mu_);
450 6 : last_error_ = ec;
451 6 : last_exception_ = nullptr;
452 6 : }
453 :
454 4 : void record_exception(std::exception_ptr ep)
455 : {
456 4 : std::lock_guard lk(failure_mu_);
457 4 : last_exception_ = ep;
458 4 : last_error_ = {};
459 4 : }
460 : };
461 :
462 : /** Specialization for void io_result children (no payload storage). */
463 : template<>
464 : struct when_any_io_homogeneous_state<std::tuple<>>
465 : {
466 : when_any_core core_;
467 : std::vector<std::coroutine_handle<>> runner_handles_;
468 :
469 : std::mutex failure_mu_;
470 : std::error_code last_error_;
471 : std::exception_ptr last_exception_;
472 :
473 2 : explicit when_any_io_homogeneous_state(std::size_t count)
474 2 : : core_(count)
475 6 : , runner_handles_(count)
476 : {
477 2 : }
478 :
479 1 : void record_error(std::error_code ec)
480 : {
481 1 : std::lock_guard lk(failure_mu_);
482 1 : last_error_ = ec;
483 1 : last_exception_ = nullptr;
484 1 : }
485 :
486 MIS 0 : void record_exception(std::exception_ptr ep)
487 : {
488 0 : std::lock_guard lk(failure_mu_);
489 0 : last_exception_ = ep;
490 0 : last_error_ = {};
491 0 : }
492 : };
493 :
494 : /** Create an io_result-aware runner for homogeneous when_any (range path).
495 :
496 : Only tries to win when the child returns !ec.
497 : */
498 : template<IoAwaitable Awaitable, typename StateType>
499 : when_any_io_runner<StateType>
500 HIT 52 : make_when_any_io_homogeneous_runner(
501 : Awaitable inner, StateType* state, std::size_t index)
502 : {
503 : auto result = co_await std::move(inner);
504 :
505 : if(!result.ec)
506 : {
507 : if(state->core_.try_win(index))
508 : {
509 : using PayloadT = io_result_payload_t<
510 : awaitable_result_t<Awaitable>>;
511 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
512 : {
513 : try
514 : {
515 : state->result_.emplace(
516 : extract_io_payload(std::move(result)));
517 : }
518 : catch(...)
519 : {
520 : state->core_.set_winner_exception(
521 : std::current_exception());
522 : }
523 : }
524 : }
525 : }
526 : else
527 : {
528 : state->record_error(result.ec);
529 : }
530 104 : }
531 :
532 : /** Launches all io_result-aware homogeneous runners concurrently. */
533 : template<IoAwaitableRange Range>
534 : class when_any_io_homogeneous_launcher
535 : {
536 : using Awaitable = std::ranges::range_value_t<Range>;
537 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
538 :
539 : Range* range_;
540 : when_any_io_homogeneous_state<PayloadT>* state_;
541 :
542 : public:
543 15 : when_any_io_homogeneous_launcher(
544 : Range* range,
545 : when_any_io_homogeneous_state<PayloadT>* state)
546 15 : : range_(range)
547 15 : , state_(state)
548 : {
549 15 : }
550 :
551 15 : bool await_ready() const noexcept
552 : {
553 15 : return std::ranges::empty(*range_);
554 : }
555 :
556 15 : std::coroutine_handle<> await_suspend(
557 : std::coroutine_handle<> continuation, io_env const* caller_env)
558 : {
559 15 : state_->core_.continuation_ = continuation;
560 15 : state_->core_.caller_env_ = caller_env;
561 :
562 15 : if(caller_env->stop_token.stop_possible())
563 : {
564 4 : state_->core_.parent_stop_callback_.emplace(
565 2 : caller_env->stop_token,
566 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
567 :
568 2 : if(caller_env->stop_token.stop_requested())
569 1 : state_->core_.stop_source_.request_stop();
570 : }
571 :
572 15 : auto token = state_->core_.stop_source_.get_token();
573 :
574 : // Phase 1: Create all runners without dispatching.
575 15 : std::size_t index = 0;
576 67 : for(auto&& a : *range_)
577 : {
578 52 : auto runner = make_when_any_io_homogeneous_runner(
579 52 : std::move(a), state_, index);
580 :
581 52 : auto h = runner.release();
582 52 : h.promise().state_ = state_;
583 52 : h.promise().index_ = index;
584 52 : h.promise().env_ = io_env{caller_env->executor, token,
585 52 : caller_env->frame_allocator};
586 :
587 52 : state_->runner_handles_[index] = std::coroutine_handle<>{h};
588 52 : ++index;
589 : }
590 :
591 : // Phase 2: Post all runners. Any may complete synchronously.
592 15 : std::coroutine_handle<>* handles = state_->runner_handles_.data();
593 15 : std::size_t count = state_->runner_handles_.size();
594 67 : for(std::size_t i = 0; i < count; ++i)
595 52 : caller_env->executor.post(handles[i]);
596 :
597 30 : return std::noop_coroutine();
598 67 : }
599 :
600 15 : void await_resume() const noexcept {}
601 : };
602 :
603 : } // namespace detail
604 :
605 : /** Race a range of io_result-returning awaitables (non-void payloads).
606 :
607 : Only a child returning !ec can win. Errors and exceptions do not
608 : claim winner status. If all children fail, the last failure
609 : is reported — either the last error_code at variant index 0,
610 : or the last exception rethrown.
611 :
612 : @param awaitables Range of io_result-returning awaitables (must
613 : not be empty).
614 :
615 : @return A task yielding variant<error_code, pair<size_t, PayloadT>>
616 : where index 0 is failure and index 1 carries the winner's
617 : index and payload.
618 :
619 : @throws std::invalid_argument if range is empty.
620 : @throws Rethrows last exception when no winner and the last
621 : failure was an exception.
622 :
623 : @par Example
624 : @code
625 : task<void> example()
626 : {
627 : std::vector<io_task<size_t>> reads;
628 : for (auto& buf : buffers)
629 : reads.push_back(stream.read_some(buf));
630 :
631 : auto result = co_await when_any(std::move(reads));
632 : if (result.index() == 1)
633 : {
634 : auto [idx, n] = std::get<1>(result);
635 : }
636 : }
637 : @endcode
638 :
639 : @see IoAwaitableRange, when_any
640 : */
641 : template<IoAwaitableRange R>
642 : requires detail::is_io_result_v<
643 : awaitable_result_t<std::ranges::range_value_t<R>>>
644 : && (!std::is_same_v<
645 : detail::io_result_payload_t<
646 : awaitable_result_t<std::ranges::range_value_t<R>>>,
647 : std::tuple<>>)
648 14 : [[nodiscard]] auto when_any(R&& awaitables)
649 : -> task<std::variant<std::error_code,
650 : std::pair<std::size_t,
651 : detail::io_result_payload_t<
652 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
653 : {
654 : using Awaitable = std::ranges::range_value_t<R>;
655 : using PayloadT = detail::io_result_payload_t<
656 : awaitable_result_t<Awaitable>>;
657 : using result_type = std::variant<std::error_code,
658 : std::pair<std::size_t, PayloadT>>;
659 : using OwnedRange = std::remove_cvref_t<R>;
660 :
661 : auto count = std::ranges::size(awaitables);
662 : if(count == 0)
663 : throw std::invalid_argument("when_any requires at least one awaitable");
664 :
665 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
666 :
667 : detail::when_any_io_homogeneous_state<PayloadT> state(count);
668 :
669 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
670 : &owned_awaitables, &state);
671 :
672 : // Winner found
673 : if(state.core_.has_winner_.load(std::memory_order_acquire))
674 : {
675 : if(state.core_.winner_exception_)
676 : std::rethrow_exception(state.core_.winner_exception_);
677 : co_return result_type{std::in_place_index<1>,
678 : std::pair{state.core_.winner_index_, std::move(*state.result_)}};
679 : }
680 :
681 : // No winner — report last failure
682 : if(state.last_exception_)
683 : std::rethrow_exception(state.last_exception_);
684 : co_return result_type{std::in_place_index<0>, state.last_error_};
685 28 : }
686 :
687 : /** Race a range of void io_result-returning awaitables.
688 :
689 : Only a child returning !ec can win. Returns the winner's index
690 : at variant index 1, or error_code at index 0 on all-fail.
691 :
692 : @param awaitables Range of io_result<>-returning awaitables (must
693 : not be empty).
694 :
695 : @return A task yielding variant<error_code, size_t> where index 0
696 : is failure and index 1 carries the winner's index.
697 :
698 : @throws std::invalid_argument if range is empty.
699 : @throws Rethrows first exception when no winner and at least one
700 : child threw.
701 :
702 : @par Example
703 : @code
704 : task<void> example()
705 : {
706 : std::vector<io_task<>> jobs;
707 : jobs.push_back(background_work_a());
708 : jobs.push_back(background_work_b());
709 :
710 : auto result = co_await when_any(std::move(jobs));
711 : if (result.index() == 1)
712 : {
713 : auto winner = std::get<1>(result);
714 : }
715 : }
716 : @endcode
717 :
718 : @see IoAwaitableRange, when_any
719 : */
720 : template<IoAwaitableRange R>
721 : requires detail::is_io_result_v<
722 : awaitable_result_t<std::ranges::range_value_t<R>>>
723 : && std::is_same_v<
724 : detail::io_result_payload_t<
725 : awaitable_result_t<std::ranges::range_value_t<R>>>,
726 : std::tuple<>>
727 2 : [[nodiscard]] auto when_any(R&& awaitables)
728 : -> task<std::variant<std::error_code, std::size_t>>
729 : {
730 : using OwnedRange = std::remove_cvref_t<R>;
731 : using result_type = std::variant<std::error_code, std::size_t>;
732 :
733 : auto count = std::ranges::size(awaitables);
734 : if(count == 0)
735 : throw std::invalid_argument("when_any requires at least one awaitable");
736 :
737 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
738 :
739 : detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
740 :
741 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
742 : &owned_awaitables, &state);
743 :
744 : // Winner found
745 : if(state.core_.has_winner_.load(std::memory_order_acquire))
746 : {
747 : if(state.core_.winner_exception_)
748 : std::rethrow_exception(state.core_.winner_exception_);
749 : co_return result_type{std::in_place_index<1>,
750 : state.core_.winner_index_};
751 : }
752 :
753 : // No winner — report last failure
754 : if(state.last_exception_)
755 : std::rethrow_exception(state.last_exception_);
756 : co_return result_type{std::in_place_index<0>, state.last_error_};
757 4 : }
758 :
759 : /** Race io_result-returning awaitables, selecting the first success.
760 :
761 : Overload selected when all children return io_result<Ts...>.
762 : Only a child returning !ec can win. Errors and exceptions do
763 : not claim winner status.
764 :
765 : @return A task yielding variant<error_code, R1, ..., Rn> where
766 : index 0 is the failure/no-winner case and index i+1
767 : identifies the winning child.
768 : */
769 : template<IoAwaitable... As>
770 : requires (sizeof...(As) > 0)
771 : && detail::all_io_result_awaitables<As...>
772 14 : [[nodiscard]] auto when_any(As... as)
773 : -> task<std::variant<
774 : std::error_code,
775 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
776 : {
777 : using result_type = std::variant<
778 : std::error_code,
779 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
780 :
781 : detail::when_any_io_state<
782 : detail::io_result_payload_t<awaitable_result_t<As>>...> state;
783 : std::tuple<As...> awaitable_tuple(std::move(as)...);
784 :
785 : co_await detail::when_any_io_launcher<As...>(
786 : &awaitable_tuple, &state);
787 :
788 : // Winner found: return their result
789 : if(state.result_.has_value())
790 : co_return std::move(*state.result_);
791 :
792 : // Winner claimed but payload construction failed
793 : if(state.core_.winner_exception_)
794 : std::rethrow_exception(state.core_.winner_exception_);
795 :
796 : // No winner — report last failure
797 : if(state.last_exception_)
798 : std::rethrow_exception(state.last_exception_);
799 : co_return result_type{std::in_place_index<0>, state.last_error_};
800 28 : }
801 :
802 : } // namespace capy
803 : } // namespace boost
804 :
805 : #endif
|