diff --git a/include/exec/at_coroutine_exit.hpp b/include/exec/at_coroutine_exit.hpp index 4118079be..a54668d63 100644 --- a/include/exec/at_coroutine_exit.hpp +++ b/include/exec/at_coroutine_exit.hpp @@ -18,12 +18,13 @@ // The original idea is taken from libunifex and adapted to stdexec. -#include - #include "../stdexec/execution.hpp" #include "any_sender_of.hpp" +#include +#include + namespace experimental::execution { namespace __at_coro_exit @@ -79,8 +80,8 @@ namespace experimental::execution template requires sender_to<_Sender, __receiver<_Receiver>> - auto - connect(_Receiver __rcvr) && noexcept -> connect_result_t<_Sender, __receiver<_Receiver>> + auto connect(_Receiver __rcvr) && noexcept // + -> connect_result_t<_Sender, __receiver<_Receiver>> { return STDEXEC::connect(static_cast<_Sender&&>(__sender_), __receiver<_Receiver>{static_cast<_Receiver&&>(__rcvr)}); @@ -137,6 +138,12 @@ namespace experimental::execution : __coro_(std::exchange(__that.__coro_, {})) {} + ~__task() + { + if (__coro_) + __coro_.destroy(); + } + [[nodiscard]] static constexpr auto await_ready() noexcept -> bool { @@ -148,7 +155,7 @@ namespace experimental::execution //! coroutine exit; i.e., the coroutine that is co_await-ing the result of calling //! at_coroutine_exit. template <__has_continuation _Promise> - auto await_suspend(__std::coroutine_handle<_Promise> __parent) noexcept -> bool + auto await_suspend(__std::coroutine_handle<_Promise> __parent) -> bool { // Set the cleanup task's scheduler to the parent coroutine's scheduler. __coro_.promise().__scheduler_ = get_start_scheduler(get_env(__parent.promise())); @@ -163,11 +170,13 @@ namespace experimental::execution auto await_resume() noexcept -> std::tuple<_Ts&...> { + // Release the cleanup coroutine. It is now responsible for destroying itself in + // its final suspend. return std::exchange(__coro_, {}).promise().__args_; } private: - struct __final_awaitable + struct __final_awaiter { static constexpr auto await_ready() noexcept -> bool { @@ -183,7 +192,7 @@ namespace experimental::execution return STDEXEC_CORO_DESTROY_AND_CONTINUE(__h, __coro); } - void await_resume() const noexcept {} + static constexpr void await_resume() noexcept {} }; struct __env @@ -211,7 +220,7 @@ namespace experimental::execution } [[nodiscard]] - auto final_suspend() noexcept -> __final_awaitable + auto final_suspend() noexcept -> __final_awaiter { return {}; } diff --git a/include/exec/task.hpp b/include/exec/task.hpp index 5f86c7975..c73746769 100644 --- a/include/exec/task.hpp +++ b/include/exec/task.hpp @@ -23,6 +23,7 @@ #include "../stdexec/__detail/__meta.hpp" #include "../stdexec/__detail/__optional.hpp" #include "../stdexec/__detail/__variant.hpp" +#include "../stdexec/coroutine.hpp" #include "../stdexec/execution.hpp" #include "../stdexec/functional.hpp" @@ -42,10 +43,42 @@ namespace experimental::execution // The required set_value_t() scheduler-sender completion signature is added in // any_receiver_ref::any_sender::any_scheduler. - using __any_scheduler_completions = + using __any_scheduler_completions_t = completion_signatures; - using __any_scheduler = any_scheduler>>; + using __any_scheduler_impl_t = + any_scheduler>>; + + // A scheduler concept that does not check for copyability since that creates a cycle + // in the type system. + template + concept __semi_scheduler = requires(_Scheduler& __sched) { + typename _Scheduler::scheduler_concept; + requires __std::derived_from; + { schedule(__sched) } -> sender; + }; + + struct __any_scheduler + { + using scheduler_concept = scheduler_t; + + template <__not_same_as<__any_scheduler> _Scheduler> + requires __semi_scheduler<_Scheduler> + constexpr __any_scheduler(_Scheduler __sched) noexcept + : __impl_(std::forward<_Scheduler>(__sched)) + {} + + bool operator==(__any_scheduler const & __other) const noexcept = default; + + [[nodiscard]] + auto schedule() const + { + return __impl_.schedule(); + } + + private: + __any_scheduler_impl_t __impl_; + }; static_assert(scheduler<__any_scheduler>); @@ -358,13 +391,13 @@ namespace experimental::execution { // Resuming the continuation of the parent coroutine will cause it to continue // executing on the new scheduler. - __parent_.resume(); + STDEXEC::__coroutine_resume_nothrow(__parent_); } void set_error(std::exception_ptr __eptr) noexcept { __eptr_ = std::move(__eptr); - __parent_.resume(); + STDEXEC::__coroutine_resume_nothrow(__parent_); } void set_stopped() noexcept @@ -373,7 +406,7 @@ namespace experimental::execution // a promise that can handle the stopped signal. The coroutine referred to by // __continuation_ will never be resumed. __std::coroutine_handle<> __unwind = __parent_.promise().unhandled_stopped(); - __unwind.resume(); + STDEXEC::__coroutine_resume_nothrow(__unwind); } [[nodiscard]] @@ -505,7 +538,7 @@ namespace experimental::execution using __scheduler_t = __call_result_or_t; - struct __final_awaitable + struct __final_awaiter { static constexpr auto await_ready() noexcept -> bool { @@ -535,7 +568,7 @@ namespace experimental::execution return {}; } - constexpr auto final_suspend() noexcept -> __final_awaitable + constexpr auto final_suspend() noexcept -> __final_awaiter { return {}; } diff --git a/include/stdexec/__detail/__as_awaitable.hpp b/include/stdexec/__detail/__as_awaitable.hpp index c209ff8db..1da30ec74 100644 --- a/include/stdexec/__detail/__as_awaitable.hpp +++ b/include/stdexec/__detail/__as_awaitable.hpp @@ -100,10 +100,10 @@ namespace STDEXEC && __completes_inline_for; template - struct __sender_awaitable_base; + struct __sender_awaiter_base; template - struct __sender_awaitable_base<_Value, true> + struct __sender_awaiter_base<_Value, true> { static constexpr auto await_ready() noexcept -> bool { @@ -126,16 +126,26 @@ namespace STDEXEC return static_cast<__reference_t>(__var::__get<0>(__result_)); } - __std::coroutine_handle<> __continuation_; - __expected_t<_Value> __result_{__no_init}; + [[nodiscard]] + constexpr auto __get_continuation() const noexcept -> __std::coroutine_handle<> + { + // If the operation was stopped (__result_ is valueless), we should use the + // unhandled_stopped() continuation. Otherwise, should resume the __continuation_ + // as normal. + return __result_.__is_valueless() ? __continuation_.unhandled_stopped() + : __continuation_.handle(); + } + + __coroutine_handle<> __continuation_; + __expected_t<_Value> __result_{__no_init}; }; // When the sender is not statically known to complete inline, we need to use atomic // state to guard against too many inline completions causing a stack overflow. template - struct __sender_awaitable_base<_Value, false> : __sender_awaitable_base<_Value, true> + struct __sender_awaiter_base<_Value, false> : __sender_awaiter_base<_Value, true> { - __std::atomic __ready_{true}; + __std::atomic __refcount_{2}; std::thread::id const __starting_thread_{std::this_thread::get_id()}; }; @@ -170,13 +180,13 @@ namespace STDEXEC std::make_exception_ptr(static_cast<_Error&&>(__err))); } - __sender_awaitable_base<_Value, true>& __awaiter_; + __sender_awaiter_base<_Value, true>& __awaiter_; }; template struct __sync_receiver : __receiver_base<_Value> { - using __awaiter_t = __sender_awaitable_base<_Value, true>; + using __awaiter_t = __sender_awaiter_base<_Value, true>; constexpr explicit __sync_receiver(__awaiter_t& __awaiter) noexcept : __receiver_base<_Value>{__awaiter} @@ -184,15 +194,16 @@ namespace STDEXEC void set_stopped() noexcept { - // no-op: the __result_ variant will remain engaged with the monostate - // alternative, which signals that the operation was stopped. + // no-op: the __result_ variant will remain valueless, which signals that the + // operation was stopped. } // Forward get_env query to the coroutine promise [[nodiscard]] constexpr auto get_env() const noexcept -> env_of_t<_Promise&> { - auto __hcoro = STDEXEC::__coroutine_handle_cast<_Promise>(this->__awaiter_.__continuation_); + auto __hcoro = STDEXEC::__coroutine_handle_cast<_Promise>( + this->__awaiter_.__continuation_.handle()); return STDEXEC::get_env(__hcoro.promise()); } }; @@ -201,7 +212,7 @@ namespace STDEXEC template struct __async_receiver : __sync_receiver<_Promise, _Value> { - using __awaiter_t = __sender_awaitable_base<_Value, false>; + using __awaiter_t = __sender_awaiter_base<_Value, false>; constexpr explicit __async_receiver(__awaiter_t& __awaiter) noexcept : __sync_receiver<_Promise, _Value>{__awaiter} @@ -223,55 +234,37 @@ namespace STDEXEC constexpr void set_stopped() noexcept { - STDEXEC_TRY - { - // Resuming the stopped continuation unwinds the coroutine stack until we reach - // a promise that can handle the stopped signal. The coroutine referred to by - // __continuation_ will never be resumed. - auto __hcoro = STDEXEC::__coroutine_handle_cast<_Promise>( - this->__awaiter_.__continuation_); - __std::coroutine_handle<> __unwind = __hcoro.promise().unhandled_stopped(); - __unwind.resume(); - } - STDEXEC_CATCH_ALL - { - this->__awaiter_.__result_.template emplace<1>(std::current_exception()); - this->__awaiter_.__continuation_.resume(); - } + __done(); } private: void __done() noexcept { - // If __ready_ is still false when executing the CAS it means the started - // operation completed before await_suspend checked whether the operation - // completed. In this case resuming execution is handled by await_suspend. - // Otherwise, the execution needs to be resumed from here. - auto& __awaiter = static_cast<__awaiter_t&>(this->__awaiter_); - - if (std::this_thread::get_id() != __awaiter.__starting_thread_) + auto& __awaiter = static_cast<__awaiter_t&>(this->__awaiter_); + bool const __on_other_thread = std::this_thread::get_id() != __awaiter.__starting_thread_; + // Cross-thread receiver decrements by 2; same-thread receiver decrements by 1. + // This encodes the cross-thread information directly in the refcount so that + // await_suspend can determine the correct action from a single fetch_sub without + // any secondary reads that could race with frame destruction. + int const __decrement = __on_other_thread ? 2 : 1; + int const __old_refs = __awaiter.__refcount_.fetch_sub(__decrement, + __std::memory_order_acq_rel); + if (__on_other_thread && __old_refs == 2) { - // If we're completing on a different thread than the one that started the - // operation, we know we are completing asynchronously, so we need to resume - // the continuation from here. - __awaiter.__continuation_.resume(); - return; + // We decremented first on a different thread. await_suspend will observe + // old==0 and return noop_coroutine; it will call notify_one to signal us. + // Wait until await_suspend has decremented, then resume the continuation. + __awaiter.__refcount_.wait(0, __std::memory_order_acquire); + STDEXEC::__coroutine_resume_nothrow(__awaiter.__get_continuation()); } - - bool __expected = false; - bool const __was_ready = - !__awaiter.__ready_.compare_exchange_strong(__expected, - true, - __std::memory_order_release, - __std::memory_order_acquire); - if (__was_ready) + else if (__old_refs == 1) { - // We get here if __ready_ was true when the CAS was executed. It got set to - // true in await_suspend() immediately after the operation was started, which - // implies that this completion is happening asynchronously, so we need to - // resume the continuation from here. - __awaiter.__continuation_.resume(); + // await_suspend already decremented first (refcount was 1 when we decremented, + // meaning await_suspend had already brought it from 2 to 1). Resume now. + STDEXEC::__coroutine_resume_nothrow(__awaiter.__get_continuation()); } + // else __old_refs == 2 and !__on_other_thread (same-thread receiver went first): + // await_suspend will see old_refcount==1 and resume inline. } }; @@ -282,60 +275,58 @@ namespace STDEXEC using __async_receiver_t = __async_receiver<_Promise, __value_t<_Sender, _Promise>>; ////////////////////////////////////////////////////////////////////////////////////// - // __sender_awaitable: awaitable type returned by as_awaitable when given a sender + // __sender_awaiter: awaitable type returned by as_awaitable when given a sender // that does not have an as_awaitable member function template > _Sender> - struct __sender_awaitable : __sender_awaitable_base<__value_t<_Sender, _Promise>, false> + struct __sender_awaiter : __sender_awaiter_base<__value_t<_Sender, _Promise>, false> { using __value_t = __as_awaitable::__value_t<_Sender, _Promise>; - constexpr explicit __sender_awaitable(_Sender&& __sndr, - __std::coroutine_handle<_Promise> __hcoro) + constexpr explicit __sender_awaiter(_Sender&& __sndr, + __std::coroutine_handle<_Promise> __hcoro) noexcept(__nothrow_connectable<_Sender, __receiver_t>) - : __sender_awaitable_base<__value_t, false>{__hcoro} + : __sender_awaiter_base<__value_t, false>{__hcoro} , __opstate_(STDEXEC::connect(static_cast<_Sender&&>(__sndr), __receiver_t(*this))) {} - ~__sender_awaitable() + ~__sender_awaiter() { - // TODO: This wait here is only needed because of the existence of - // exec::at_coroutine_exit, which can cause the destructor of the sender_awaitable - // to run while start() is still executing. We should consider removing - // exec::at_coroutine_exit or fixing it. - this->__ready_.wait(false, __std::memory_order_acquire); + // Refcount ends at 0 (same-thread completion) or -1 (cross-thread completion). + STDEXEC_ASSERT(this->__refcount_.load() <= 0); } constexpr auto - await_suspend([[maybe_unused]] __std::coroutine_handle<_Promise> __hcoro) noexcept -> bool + await_suspend([[maybe_unused]] __std::coroutine_handle<> __continuation) noexcept + -> __std::coroutine_handle<> { - STDEXEC_ASSERT(this->__continuation_ == __hcoro); - - this->__ready_.store(false, __std::memory_order_release); + STDEXEC_ASSERT(this->__continuation_.handle() == __continuation); // Start the operation. STDEXEC::start(__opstate_); - auto __expected = false; - bool const __was_ready = - !this->__ready_.compare_exchange_strong(__expected, - true, - __std::memory_order_release, - __std::memory_order_acquire); - this->__ready_.notify_one(); + int const __old_refcount = this->__refcount_.fetch_sub(1, __std::memory_order_acq_rel); - if (__was_ready) + if (__old_refcount == 1) + { + // If the refcount was 1 before the decrement, then the operation has already + // completed on the same thread and we are responsible for resuming the + // continuation. Otherwise, we can let the receiver resume the continuation when + // the operation completes. + return this->__get_continuation(); + } + else if (__old_refcount == 0) { - // The operation completed inline with set_value or set_error, so we can just - // resume the current coroutine. await_resume will either return the value or - // throw as appropriate. - return false; + // The receiver already decremented by 2 on another thread. It will resume the + // continuation on the correct (other) thread. Return noop_coroutine so we do + // not resume here. + this->__refcount_.notify_one(); + return std::noop_coroutine(); } else { - // If __ready_ was still false when executing the CAS, then the operation did not - // complete inline. The continuation will be resumed when the operation - // completes, so we return a noop_coroutine to suspend the current coroutine. - return true; + // Receiver hasn't completed yet (old == 2). Suspend and let the receiver resume + // the continuation when the operation completes. + return std::noop_coroutine(); } } @@ -348,22 +339,22 @@ namespace STDEXEC // in await_suspend. template > _Sender> requires __completes_inline<_Sender, env_of_t<_Promise&>> - struct __sender_awaitable<_Promise, _Sender> - : __sender_awaitable_base<__value_t<_Sender, _Promise>, true> + struct __sender_awaiter<_Promise, _Sender> + : __sender_awaiter_base<__value_t<_Sender, _Promise>, true> { using __value_t = __as_awaitable::__value_t<_Sender, _Promise>; - constexpr explicit __sender_awaitable(_Sender&& sndr, - __std::coroutine_handle<_Promise> __hcoro) + constexpr explicit __sender_awaiter(_Sender&& __sndr, + __std::coroutine_handle<_Promise> __hcoro) noexcept(__nothrow_move_constructible<_Sender>) - : __sender_awaitable_base<__value_t, true>{__hcoro} - , __sndr_(static_cast<_Sender&&>(sndr)) + : __sender_awaiter_base<__value_t, true>{__hcoro} + , __sndr_(static_cast<_Sender&&>(__sndr)) {} - auto await_suspend([[maybe_unused]] __std::coroutine_handle<_Promise> __hcoro) - -> STDEXEC_PP_IF(STDEXEC_GCC(), bool, __std::coroutine_handle<>) + auto await_suspend([[maybe_unused]] __std::coroutine_handle<> __continuation) + -> __std::coroutine_handle<> { - STDEXEC_ASSERT(this->__continuation_ == __hcoro); + STDEXEC_ASSERT(this->__continuation_.handle() == __continuation); { auto __opstate = STDEXEC::connect(static_cast<_Sender&&>(__sndr_), __receiver_t(*this)); // The following call to start will complete synchronously, writing its result @@ -371,23 +362,7 @@ namespace STDEXEC STDEXEC::start(__opstate); } - if (this->__result_.__is_valueless()) - { - // The operation completed with set_stopped, so we need to call - // unhandled_stopped() on the promise to propagate the stop signal. That will - // result in the coroutine being torn down, so beware. We then resume the - // returned coroutine handle (which may be a noop_coroutine). - return STDEXEC_PP_IF(STDEXEC_GCC(), - (__hcoro.promise().unhandled_stopped().resume(), true), - __hcoro.promise().unhandled_stopped()); - } - else - { - // The operation completed with set_value or set_error, so we can just resume - // the current coroutine. await_resume will either return the value or throw as - // appropriate. - return STDEXEC_PP_IF(STDEXEC_GCC(), false, __hcoro); - } + return this->__get_continuation(); } private: @@ -397,8 +372,8 @@ namespace STDEXEC template STDEXEC_HOST_DEVICE_DEDUCTION_GUIDE - __sender_awaitable(_Sender&&, __std::coroutine_handle<_Promise>) - -> __sender_awaitable<_Promise, _Sender>; + __sender_awaiter(_Sender&&, __std::coroutine_handle<_Promise>) + -> __sender_awaiter<_Promise, _Sender>; template concept __awaitable_adapted_sender = sender_in<_Sender, env_of_t<_Promise&>> @@ -450,7 +425,7 @@ namespace STDEXEC inline constexpr auto __with_sender = // [] _Tp>(_Tp&& __t, _Promise& __promise) - STDEXEC_AUTO_RETURN(__sender_awaitable{ + STDEXEC_AUTO_RETURN(__sender_awaiter{ __as_awaitable::__adapt_sender_for_await( STDEXEC::transform_sender(static_cast<_Tp&&>(__t), STDEXEC::get_env(__promise))), __std::coroutine_handle<_Promise>::from_promise(__promise)}); diff --git a/include/stdexec/__detail/__awaitable.hpp b/include/stdexec/__detail/__awaitable.hpp index 76c268bd6..e3c234820 100644 --- a/include/stdexec/__detail/__awaitable.hpp +++ b/include/stdexec/__detail/__awaitable.hpp @@ -45,7 +45,7 @@ namespace STDEXEC _Promise &__promise) -> decltype(auto) { return __promise.await_transform(static_cast<_Awaitable &&>(__awaitable)); }, - [](_Awaitable &&__awaitable, __ignore = {}) -> decltype(auto) + [](_Awaitable &&__awaitable, __ignore = {}) noexcept -> decltype(auto) { return static_cast<_Awaitable &&>(__awaitable); }}; template diff --git a/include/stdexec/__detail/__connect_awaitable.hpp b/include/stdexec/__detail/__connect_awaitable.hpp index 443d4b36c..7dae2e0b3 100644 --- a/include/stdexec/__detail/__connect_awaitable.hpp +++ b/include/stdexec/__detail/__connect_awaitable.hpp @@ -22,7 +22,9 @@ #include "__concepts.hpp" #include "__config.hpp" #include "__env.hpp" +#include "__manual_lifetime.hpp" #include "__receivers.hpp" +#include "__scope.hpp" #include @@ -99,7 +101,8 @@ namespace STDEXEC } [[nodiscard]] - constexpr auto get_env() const noexcept -> env_of_t<_Receiver> + // constexpr + auto get_env() const noexcept -> env_of_t<_Receiver> { return STDEXEC::get_env(__get_opstate().__rcvr_); } @@ -132,7 +135,7 @@ namespace STDEXEC { constexpr auto& __awaiter() noexcept { - return static_cast<_Derived*>(this)->__awaiter_; + return static_cast<_Derived*>(this)->__awaiter_.__get(); } constexpr auto await_ready() noexcept(noexcept(__awaiter().await_ready())) -> bool @@ -181,40 +184,36 @@ namespace STDEXEC // clause when the result of __get_awaitable or __get_awaiter is immovable; it *seems* // like direct initialization of a member with the result of a function ought to trigger // C++17's mandatory copy elision, and both Clang and MSVC accept that code, but using - // a union with in-place new works around the issue. - new (static_cast(std::addressof(__awaitable_))) - __awaitable_t(__get_awaitable(static_cast<_Awaitable&&>(__source), __coro.promise())); - new (static_cast(std::addressof(__awaiter_))) - __awaiter_t(__get_awaiter(static_cast<__awaitable_t&&>(__awaitable_))); + // __manual_lifetime works around the issue. + __awaitable_.__construct_from(__get_awaitable, + static_cast<_Awaitable&&>(__source), + __coro.promise()); + auto __guard = __scope_guard{[&]() noexcept { __awaitable_.__destroy(); }}; + + __awaiter_.__construct_from(__get_awaiter, + static_cast<__awaitable_t&&>(__awaitable_.__get())); + __guard.__dismiss(); } ~__state() { // make sure to destroy in the reverse order of construction - std::destroy_at(std::addressof(__awaiter_)); - std::destroy_at(std::addressof(__awaitable_)); + __awaiter_.__destroy(); + __awaitable_.__destroy(); } - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __awaitable_t __awaitable_; - }; + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__awaitable_t> __awaitable_; - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __awaiter_t __awaiter_; - }; + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__awaiter_t> __awaiter_; }; STDEXEC_ATTRIBUTE(no_unique_address) _Awaitable __source_awaitable_; - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __state __awaiter_; - }; + + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__state> __awaiter_; template <__not_decays_to<__awaitable_state> _Awaitable2> explicit __awaitable_state(_Awaitable2&& __awaitable) @@ -222,16 +221,14 @@ namespace STDEXEC : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) {} - ~__awaitable_state() {} - constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) { - std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); + __awaiter_.__construct(static_cast<_Awaitable&&>(__source_awaitable_), __coro); } constexpr void destroy() noexcept { - std::destroy_at(&__awaiter_); + __awaiter_.__destroy(); } }; @@ -250,43 +247,30 @@ namespace STDEXEC struct __state : __awaitable_wrapper<__state> { - __state(_Awaitable&& __source, __std::coroutine_handle<_Promise> __coro) - noexcept(__is_nothrow) + __state(_Awaitable&& __source, __std::coroutine_handle<_Promise>) noexcept(__is_nothrow) { // GCC doesn't like initializing __awaiter_ in the member initializer clause when the // result of __get_awaiter is immovable; it *seems* like direct initialization of a // member with the result of a function ought to trigger C++17's mandatory copy elision, // and both Clang and MSVC accept that code, but using a union with in-place new works // around the issue. - new (static_cast(std::addressof(__awaiter_))) - __awaiter_t(__get_awaiter(static_cast<_Awaitable&&>(__source))); - - [[maybe_unused]] - auto&& __awaitable = __get_awaitable(static_cast<_Awaitable&&>(__source), - __coro.promise()); - - STDEXEC_ASSERT(std::addressof(__awaitable) == std::addressof(__source)); + __awaiter_.__construct_from(__get_awaiter, static_cast<_Awaitable&&>(__source)); } ~__state() { - std::destroy_at(std::addressof(__awaiter_)); + __awaiter_.__destroy(); } - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __awaiter_t __awaiter_; - }; + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__awaiter_t> __awaiter_; }; STDEXEC_ATTRIBUTE(no_unique_address) _Awaitable __source_awaitable_; - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __state __awaiter_; - }; + + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__state> __awaiter_; template <__not_decays_to<__awaitable_state> _Awaitable2> explicit __awaitable_state(_Awaitable2&& __awaitable) @@ -294,16 +278,14 @@ namespace STDEXEC : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) {} - ~__awaitable_state() {} - constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) { - std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); + __awaiter_.__construct(static_cast<_Awaitable&&>(__source_awaitable_), __coro); } constexpr void destroy() noexcept { - std::destroy_at(&__awaiter_); + __awaiter_.__destroy(); } }; @@ -330,34 +312,30 @@ namespace STDEXEC // member with the result of a function ought to trigger C++17's mandatory copy elision, // and both Clang and MSVC accept that code, but using a union with in-place new works // around the issue. - new (static_cast(std::addressof(__awaiter_))) - __awaiter_t(__get_awaitable(static_cast<_Awaitable&&>(__source), __coro.promise())); + __awaiter_.__construct_from(__get_awaitable, + static_cast<_Awaitable&&>(__source), + __coro.promise()); [[maybe_unused]] - auto&& __awaiter = __get_awaiter(static_cast<__awaiter_t&&>(__awaiter_)); + auto&& __awaiter = __get_awaiter(static_cast<__awaiter_t&&>(__awaiter_.__get())); - STDEXEC_ASSERT(std::addressof(__awaiter) == std::addressof(__awaiter_)); + STDEXEC_ASSERT(std::addressof(__awaiter) == std::addressof(__awaiter_.__get())); } ~__state() { - std::destroy_at(std::addressof(__awaiter_)); + __awaiter_.__destroy(); } - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __awaiter_t __awaiter_; - }; + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__awaiter_t> __awaiter_; }; STDEXEC_ATTRIBUTE(no_unique_address) _Awaitable __source_awaitable_; - union - { - STDEXEC_ATTRIBUTE(no_unique_address) - __state __awaiter_; - }; + + STDEXEC_ATTRIBUTE(no_unique_address) + __manual_lifetime<__state> __awaiter_; template <__not_decays_to<__awaitable_state> _Awaitable2> explicit __awaitable_state(_Awaitable2&& __awaitable) @@ -365,16 +343,14 @@ namespace STDEXEC : __source_awaitable_(static_cast<_Awaitable2&&>(__awaitable)) {} - ~__awaitable_state() {} - constexpr void construct(__std::coroutine_handle<_Promise> __coro) noexcept(__is_nothrow) { - std::construct_at(&__awaiter_, static_cast<_Awaitable&&>(__source_awaitable_), __coro); + __awaiter_.__construct(static_cast<_Awaitable&&>(__source_awaitable_), __coro); } constexpr void destroy() noexcept { - std::destroy_at(&__awaiter_); + __awaiter_.__destroy(); } }; @@ -388,13 +364,19 @@ namespace STDEXEC // _Awaitable has neither a distinct awaiter, nor a distinct awaitable // so we don't need separate storage for either STDEXEC_ATTRIBUTE(no_unique_address) - _Awaitable __awaiter_; + __manual_lifetime<_Awaitable> __awaiter_; template <__not_decays_to<__awaitable_state> _Awaitable2> explicit __awaitable_state(_Awaitable2&& __awaitable) noexcept(__nothrow_constructible_from<_Awaitable, _Awaitable2>) - : __awaiter_(static_cast<_Awaitable2&&>(__awaitable)) - {} + { + __awaiter_.__construct(static_cast<_Awaitable2&&>(__awaitable)); + } + + ~__awaitable_state() + { + __awaiter_.__destroy(); + } static constexpr void construct(__std::coroutine_handle<_Promise>) noexcept { @@ -462,17 +444,7 @@ namespace STDEXEC { static_assert(__std::convertible_to<__suspend_result_t, __std::coroutine_handle<>>); auto __resume_target = __awaiter_.await_suspend(__coro); - STDEXEC_TRY - { - __resume_target.resume(); - } - STDEXEC_CATCH_ALL - { - STDEXEC_ASSERT(false - && "about to deliberately commit UB in response to a misbehaving " - "awaitable"); - __std::unreachable(); - } + STDEXEC::__coroutine_resume_nothrow(__resume_target); return; } } @@ -492,10 +464,7 @@ namespace STDEXEC } private: - using __promise_t = __promise<_Awaitable, _Receiver>; - using __awaitable_t = __result_of<__get_awaitable, _Awaitable, __promise_t&>; - using __awaiter_t = __awaiter_of_t<__awaitable_t>; - + using __promise_t = __promise<_Awaitable, _Receiver>; friend __promise_t; static auto __co_impl(__opstate& __op) noexcept -> __std::coroutine_handle<__promise_t> diff --git a/include/stdexec/__detail/__sender_concepts.hpp b/include/stdexec/__detail/__sender_concepts.hpp index 8e1e1644e..a3101933d 100644 --- a/include/stdexec/__detail/__sender_concepts.hpp +++ b/include/stdexec/__detail/__sender_concepts.hpp @@ -142,7 +142,7 @@ namespace STDEXEC static_assert(__std::move_constructible<__decay_t<_Sender>>, "The sender type is not move-constructible."); } - else if constexpr (!__std::constructible_from<__decay_t<_Sender>, _Sender>) + else if constexpr (!__decay_copyable<_Sender>) { static_assert(__decay_copyable<_Sender>, "The sender cannot be decay-copied. Did you forget a std::move?"); diff --git a/include/stdexec/__detail/__task.hpp b/include/stdexec/__detail/__task.hpp index ac9c1fce8..1189f2722 100644 --- a/include/stdexec/__detail/__task.hpp +++ b/include/stdexec/__detail/__task.hpp @@ -23,6 +23,7 @@ #include "__optional.hpp" #include "__schedulers.hpp" #include "__task_scheduler.hpp" +#include "__with_awaitable_senders.hpp" #include #include @@ -48,6 +49,12 @@ namespace STDEXEC __result_.emplace(static_cast<_Value&&>(__value)); } + [[nodiscard]] + constexpr auto __result() noexcept -> _Ty& + { + return *__result_; + } + __optional<_Ty> __result_{}; }; @@ -55,6 +62,7 @@ namespace STDEXEC struct __promise_base { constexpr void return_void() {} + constexpr void __result() {} }; constexpr size_t __divmod(size_t __total_size, size_t __chunk_size) noexcept @@ -197,10 +205,25 @@ namespace STDEXEC template using __stop_callback_box_t = __stop_callback_box, _StopSource>; - inline constexpr auto __throw_error = __overload{ - []([[maybe_unused]] auto&& __error) { STDEXEC_THROW((decltype(__error)&&) __error); }, - []([[maybe_unused]] std::error_code __ec) { STDEXEC_THROW(std::system_error(__ec)); }, - []([[maybe_unused]] std::exception_ptr __eptr) { std::rethrow_exception(__eptr); }}; + inline constexpr struct __throw_error_t + { + template + [[noreturn]] + void operator()([[maybe_unused]] _Error&& __error) const + { + STDEXEC_THROW(static_cast<_Error&&>(__error)); + } + [[noreturn]] + void operator()([[maybe_unused]] std::error_code __ec) const + { + STDEXEC_THROW(std::system_error(__ec)); + } + [[noreturn]] + void operator()([[maybe_unused]] std::exception_ptr __eptr) const + { + std::rethrow_exception(__eptr); + } + } __throw_error{}; } // namespace __task //////////////////////////////////////////////////////////////////////////////// @@ -221,8 +244,6 @@ namespace STDEXEC class [[nodiscard]] task { struct __promise; - template - struct __opstate; template using __own_env_t = __minvoke_or_q<__task::__environment_type, env<>, _TaskEnv, _Env>; public: @@ -248,40 +269,19 @@ namespace STDEXEC __coro_.destroy(); } - template - constexpr auto connect(_Rcvr rcvr) && -> __opstate<_Rcvr> - { - static_assert(__task::__has_compatible_allocator, allocator_type>, - "The receiver's environment must contain an allocator that is compatible with " - "the task's allocator type. Alternatively, the task's allocator type must be " - "default constructible."); - STDEXEC_ASSERT(__coro_); - return __opstate<_Rcvr>(static_cast(*this), static_cast<_Rcvr&&>(rcvr)); - } - - template - static consteval auto get_completion_signatures() noexcept - { - return __completions_t{}; - } - [[nodiscard]] constexpr auto get_env() const noexcept { return __attrs{}; } -# if !STDEXEC_GCC() // This transforms a task into an __awaiter that can perform symmetric transfer when - // co_awaited. It is disabled on GCC due to - // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=94794, which causes unbounded stack - // growth. + // co_awaited. template constexpr auto as_awaitable(_ParentPromise& __parent) && noexcept { return __awaiter<_ParentPromise>(static_cast(*this), __parent); } -# endif private: using __on_stopped_t = __forward_stop_request; @@ -308,8 +308,6 @@ namespace STDEXEC completion_signatures<__single_value_sig_t<_Ty>, set_stopped_t()>, error_types>; - static constexpr void __sink(task) noexcept {} - template [[nodiscard]] static auto __mk_alloc(_Env const & __env) noexcept -> allocator_type @@ -332,8 +330,8 @@ namespace STDEXEC template [[nodiscard]] - static auto - __mk_sched(_Env const & __env, allocator_type const & __alloc) noexcept -> start_scheduler_type + static auto __mk_sched(_Env const & __env, allocator_type const & __alloc) noexcept // + -> start_scheduler_type { // NOT TO SPEC: try constructing the scheduler with the allocator if possible. if constexpr (__task::__has_scheduler_compatible_with<_Env, @@ -392,10 +390,10 @@ namespace STDEXEC } } - struct __opstate_base : private allocator_type + struct __awaiter_base : private allocator_type { template - constexpr explicit __opstate_base(task&& __task, + constexpr explicit __awaiter_base(task&& __task, _Env const & __env, _OwnEnv const & __own_env) noexcept : allocator_type(__mk_alloc(__env)) @@ -420,7 +418,7 @@ namespace STDEXEC } } - STDEXEC_IMMOVABLE(__opstate_base); + STDEXEC_IMMOVABLE(__awaiter_base); virtual auto __completed() noexcept -> __std::coroutine_handle<> = 0; virtual auto __canceled() noexcept -> __std::coroutine_handle<> = 0; @@ -451,12 +449,12 @@ namespace STDEXEC template struct STDEXEC_ATTRIBUTE(empty_bases) __awaiter final : __own_env_box> - , __opstate_base + , __awaiter_base , __stop_callback_box_t> { constexpr explicit __awaiter(task&& __task, _ParentPromise& __parent) noexcept : __awaiter::__own_env_box{__mk_own_env(STDEXEC::get_env(__parent))} - , __opstate_base(static_cast(__task), STDEXEC::get_env(__parent), this->__own_env_) + , __awaiter_base(static_cast(__task), STDEXEC::get_env(__parent), this->__own_env_) , __parent_(__parent) {} @@ -465,54 +463,49 @@ namespace STDEXEC return false; } - constexpr auto await_suspend(__std::coroutine_handle<_ParentPromise> __h) + constexpr auto await_suspend(__std::coroutine_handle<_ParentPromise> __continuation) noexcept(__nothrow_callback_registration>) -> __std::coroutine_handle<> { - auto& __task_promise = this->__handle().promise(); + auto& __task_promise = this->__handle().promise(); + __task_promise.__state_ = this; + __task_promise.set_continuation(__continuation); // If the following throws, the coroutine is immediately resumed and the exception // is rethrown at the suspension point. - this->__register_callback(STDEXEC::get_env(__h.promise()), __task_promise.__stop_); - __task_promise.__state_ = this; - __continuation_ = __h; + this->__register_callback(STDEXEC::get_env(__continuation.promise()), + __task_promise.__stop_); return this->__handle(); } constexpr auto await_resume() -> _Ty { // Destroy the coroutine after moving the result/error out of it + [[maybe_unused]] auto __task = std::move(this->__task_); if (!this->__errors_.__is_valueless()) { __visit(__task::__throw_error, std::move(this->__errors_)); + __std::unreachable(); } - else if constexpr (__same_as<_Ty, void>) - { - return; - } - else - { - return static_cast<_Ty&&>(*__task.__coro_.promise().__result_); - } - __std::unreachable(); + using __rvalue_ref_t = std::add_rvalue_reference_t<_Ty>; + return static_cast<__rvalue_ref_t>(__task.__coro_.promise().__result()); } [[nodiscard]] auto __completed() noexcept -> __std::coroutine_handle<> final { this->__reset_callback(); - return __continuation_; + return this->__handle().promise().continuation().handle(); } [[nodiscard]] auto __canceled() noexcept -> __std::coroutine_handle<> final { this->__reset_callback(); - return __parent_.unhandled_stopped(); + return this->__handle().promise().continuation().unhandled_stopped(); } - __std::coroutine_handle<> __continuation_; - _ParentPromise& __parent_; + _ParentPromise& __parent_; }; struct __attrs @@ -548,108 +541,12 @@ namespace STDEXEC _Rcvr __rcvr_; }; - //////////////////////////////////////////////////////////////////////////////////////// - // task::__opstate - template - template - struct STDEXEC_ATTRIBUTE(empty_bases) task<_Ty, _TaskEnv>::__opstate final - : __rcvr_box<_Rcvr> // holds the receiver so that we can pass __opstate_base a reference to it - , __own_env_box> - , __stop_callback_box_t> - , __opstate_base - { - public: - using operation_state_concept = operation_state_tag; - - explicit __opstate(task&& __task, _Rcvr&& __rcvr) noexcept - : __rcvr_box<_Rcvr>{static_cast<_Rcvr&&>(__rcvr)} - , __opstate::__own_env_box{__mk_own_env(STDEXEC::get_env(this->__rcvr_))} - , __opstate_base(static_cast(__task), - STDEXEC::get_env(this->__rcvr_), - this->__own_env_) - {} - - void start() & noexcept - { - STDEXEC_TRY - { - // Register a stop callback if needed - this->__register_callback(STDEXEC::get_env(this->__rcvr_), - this->__handle().promise().__stop_); - this->__handle().resume(); - } - STDEXEC_CATCH_ALL - { - if constexpr (__nothrow_callback_registration>) - { - // no-op - } - else if constexpr (__mapply<__mcontains, - error_types>::value) - { - STDEXEC::set_error(static_cast<_Rcvr&&>(this->__rcvr_), std::current_exception()); - } - else - { - STDEXEC::__die("Starting the task failed due to an exception being thrown while " - "registering a stop callback, but the task's error_types does not " - "include std::exception_ptr, so the exception cannot be propagated."); - } - } - } - - private: - auto __completed() noexcept -> __std::coroutine_handle<> final - { - STDEXEC_TRY - { - this->__reset_callback(); - - if (!this->__errors_.__is_valueless()) - { - // Move the errors out of the promise before destroying the coroutine. - auto __errors = std::move(this->__errors_); - __sink(static_cast(this->__task_)); - __visit(STDEXEC::set_error, std::move(__errors), static_cast<_Rcvr&&>(this->__rcvr_)); - } - else if constexpr (__same_as<_Ty, void>) - { - __sink(static_cast(this->__task_)); - STDEXEC::set_value(static_cast<_Rcvr&&>(this->__rcvr_)); - } - else - { - // Move the result out of the promise before destroying the coroutine. - _Ty __result = static_cast<_Ty&&>(*this->__handle().promise().__result_); - __sink(static_cast(this->__task_)); - STDEXEC::set_value(static_cast<_Rcvr&&>(this->__rcvr_), static_cast<_Ty&&>(__result)); - } - } - STDEXEC_CATCH_ALL - { - if constexpr (!__nothrow_move_constructible<_Ty> - || !__nothrow_move_constructible<__error_variant_t>) - { - __sink(static_cast(this->__task_)); - STDEXEC::set_error(static_cast<_Rcvr&&>(this->__rcvr_), std::current_exception()); - } - } - return std::noop_coroutine(); - } - - auto __canceled() noexcept -> __std::coroutine_handle<> final - { - this->__reset_callback(); - __sink(static_cast(this->__task_)); - STDEXEC::set_stopped(static_cast<_Rcvr&&>(this->__rcvr_)); - return std::noop_coroutine(); - } - }; - //////////////////////////////////////////////////////////////////////////////////////// // task::promise_type template - struct task<_Ty, _TaskEnv>::__promise : __task::__promise_base<_Ty> + struct STDEXEC_ATTRIBUTE(empty_bases) task<_Ty, _TaskEnv>::__promise + : __task::__promise_base<_Ty> + , with_awaitable_senders<__promise> { __promise() noexcept = default; @@ -669,7 +566,7 @@ namespace STDEXEC return __completed_awaiter{}; } - void unhandled_exception() + void unhandled_exception() noexcept { if constexpr (!__mapply<__mcontains, __error_variant_t>::value) { @@ -684,7 +581,7 @@ namespace STDEXEC } [[nodiscard]] - auto unhandled_stopped() noexcept -> __std::coroutine_handle<> + auto unhandled_stopped() const noexcept -> __std::coroutine_handle<> { return __state_->__canceled(); } @@ -773,9 +670,7 @@ namespace STDEXEC private: template friend struct __awaiter; - template - friend struct __opstate; - friend struct __opstate_base; + friend struct __awaiter_base; struct __completed_awaiter { @@ -835,7 +730,7 @@ namespace STDEXEC }; __stop_variant_t __stop_{__no_init}; - __opstate_base* __state_ = nullptr; + __awaiter_base* __state_ = nullptr; }; #endif // !STDEXEC_NO_STDCPP_COROUTINES() } // namespace STDEXEC diff --git a/include/stdexec/__detail/__with_awaitable_senders.hpp b/include/stdexec/__detail/__with_awaitable_senders.hpp index 0b4831d97..722328891 100644 --- a/include/stdexec/__detail/__with_awaitable_senders.hpp +++ b/include/stdexec/__detail/__with_awaitable_senders.hpp @@ -31,13 +31,13 @@ namespace STDEXEC { struct __with_awaitable_senders { - template + template <__not_same_as _OtherPromise> constexpr void set_continuation(__std::coroutine_handle<_OtherPromise> __hcoro) noexcept { - static_assert(!__same_as<_OtherPromise, void>); __continuation_ = __hcoro; } + // NOT TO SPEC: constexpr void set_continuation(__coroutine_handle<> __continuation) noexcept { __continuation_ = __continuation; @@ -50,7 +50,7 @@ namespace STDEXEC } [[nodiscard]] - constexpr auto unhandled_stopped() noexcept -> __std::coroutine_handle<> + constexpr auto unhandled_stopped() const noexcept -> __std::coroutine_handle<> { return __continuation_.unhandled_stopped(); } diff --git a/include/stdexec/coroutine.hpp b/include/stdexec/coroutine.hpp index b4590656d..f65f93b95 100644 --- a/include/stdexec/coroutine.hpp +++ b/include/stdexec/coroutine.hpp @@ -18,6 +18,7 @@ #include "__detail/__awaitable.hpp" // IWYU pragma: export #include "__detail/__concepts.hpp" #include "__detail/__config.hpp" +#include "__detail/__utility.hpp" #include @@ -32,6 +33,21 @@ namespace STDEXEC return __std::coroutine_handle<_Tp>::from_address(__h.address()); } + STDEXEC_ATTRIBUTE(always_inline) + void __coroutine_resume_nothrow(__std::coroutine_handle<> __h) noexcept // + { + STDEXEC_TRY + { + STDEXEC_ASSERT(__h); + __h.resume(); + } + STDEXEC_CATCH_ALL + { + STDEXEC_ASSERT(!"Coroutine resume threw an exception!"); + __std::unreachable(); + } + } + // A coroutine handle that also supports unhandled_stopped() for propagating stop // signals through co_awaits of senders. template @@ -47,7 +63,11 @@ namespace STDEXEC constexpr __coroutine_handle(__std::coroutine_handle<_Promise> __coro) noexcept : __std::coroutine_handle<>(__coro) { - if constexpr (requires(_Promise& __promise) { __promise.unhandled_stopped(); }) + constexpr bool __has_unhandled_stopped = requires { __coro.promise().unhandled_stopped(); }; + static_assert(__has_unhandled_stopped, + "Coroutine promises used with senders must implement unhandled_stopped()"); + + if constexpr (__has_unhandled_stopped) { __stopped_callback_ = [](void* __address) noexcept -> __std::coroutine_handle<> { @@ -104,13 +124,13 @@ namespace STDEXEC [[nodiscard]] constexpr auto promise() const noexcept -> _Promise& { - return __std::coroutine_handle<_Promise>::from_address(address()).promise(); + return STDEXEC::__coroutine_handle_cast<_Promise>(*this).promise(); } [[nodiscard]] constexpr auto handle() const noexcept -> __std::coroutine_handle<_Promise> { - return __std::coroutine_handle<_Promise>::from_address(address()); + return STDEXEC::__coroutine_handle_cast<_Promise>(*this); } [[nodiscard]] diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 006e1d6f2..e14a0469d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,7 +18,7 @@ check_compiler_flag(CXX -Wno-c2y-extensions HAVE_C2Y_EXTENSIONS_WARNING) function(add_compile_diagnostics TARGET) if(CMAKE_CXX_COMPILER_ID STREQUAL Clang OR CMAKE_CXX_COMPILER_ID STREQUAL GNU) - target_compile_options(${TARGET} PRIVATE -Wall -Wextra -Wpedantic -Werror -Wfatal-errors $<$:-Wno-c2y-extensions>) + target_compile_options(${TARGET} PRIVATE -Wall -Wextra -Wpedantic -Werror $<$:-Wno-c2y-extensions>) endif() endfunction() diff --git a/test/exec/test_task.cpp b/test/exec/test_task.cpp index 23288d268..e64e662a5 100644 --- a/test/exec/test_task.cpp +++ b/test/exec/test_task.cpp @@ -122,6 +122,9 @@ namespace TEST_CASE("Test stickiness with two single threads", "[types][sticky][task]") { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); exec::single_thread_context context1; exec::single_thread_context context2; ex::scheduler auto scheduler1 = context1.get_scheduler(); @@ -136,6 +139,9 @@ namespace TEST_CASE("Test stickiness with two single threads with on", "[types][sticky][task]") { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); exec::single_thread_context context1; exec::single_thread_context context2; ex::scheduler auto scheduler1 = context1.get_scheduler(); @@ -152,6 +158,9 @@ namespace TEST_CASE("Test stickiness with two single threads with sender", "[types][sticky][task]") { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); exec::single_thread_context context1; exec::single_thread_context context2; ex::scheduler auto scheduler1 = context1.get_scheduler(); @@ -168,6 +177,9 @@ namespace TEST_CASE("Test stickiness with two single threads with sender with starts_on", "[types][sticky][task]") { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); exec::single_thread_context context1; exec::single_thread_context context2; ex::scheduler auto scheduler1 = context1.get_scheduler(); @@ -186,6 +198,9 @@ namespace TEST_CASE("Use two inline schedulers", "[types][sticky][task]") { + [[maybe_unused]] + // repeat this test 1000 times because it can expose race conditions + int i = GENERATE(repeat(1000, values({1}))); ex::scheduler auto scheduler1 = STDEXEC::inline_scheduler{}; ex::scheduler auto scheduler2 = STDEXEC::inline_scheduler{}; ex::sync_wait(ex::when_all(ex::schedule(scheduler1) | ex::then([] { thread_id = 0; }), diff --git a/test/stdexec/types/test_task.cpp b/test/stdexec/types/test_task.cpp index 0c6fd3ff2..2fb808238 100644 --- a/test/stdexec/types/test_task.cpp +++ b/test/stdexec/types/test_task.cpp @@ -22,6 +22,7 @@ # include # include +# include # include # include @@ -244,6 +245,11 @@ namespace CHECK(i == 42); } + // In debug GCC builds, this test can cause a stack overflow due to + // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=94794, results in a symmetric + // transfer failing to be a tail call. +# if !STDEXEC_GCC() \ + || (defined(__OPTIMIZE__) && !defined(__SANITIZE_ADDRESS__) && !defined(__SANITIZE_THREAD__)) auto sync() -> ex::task { co_return 42; @@ -277,6 +283,7 @@ namespace auto [i] = ex::sync_wait(std::move(t)).value(); CHECK(i == 84'000'042); } +# endif struct my_env { @@ -334,6 +341,74 @@ namespace // ex::sync_wait(std::move(t)); // } + struct inline_affine_stopped_sender + { + using sender_concept = ex::sender_tag; + using completion_signatures = ex::completion_signatures; + + template + struct operation + { + Receiver rcvr_; + + void start() & noexcept + { + ex::set_stopped(std::move(rcvr_)); + } + }; + + template + auto connect(Receiver rcvr) && -> operation + { + return {std::move(rcvr)}; + } + + struct attrs + { + [[nodiscard]] + static constexpr auto query(ex::__get_completion_behavior_t) noexcept + { + return ex::__completion_behavior::__inline_completion + | ex::__completion_behavior::__asynchronous_affine; + } + }; + + [[nodiscard]] + auto get_env() const noexcept -> attrs + { + return {}; + } + }; + + TEST_CASE("task co_awaiting inline|async_affine stopped sender does not deadlock", + "[types][task]") + { + auto res = ex::sync_wait( + []() -> ex::task + { + co_await inline_affine_stopped_sender{}; + FAIL("Expected co_awaiting inline_affine_stopped_sender to stop the task"); + co_return 42; + }()); + CHECK(!res.has_value()); + } + + TEST_CASE("repro for NVIDIA/stdexec#2041", "[types][task]") + { + auto task = []() -> ex::task + { + co_return; + }; + auto pool = exec::static_thread_pool(1); + auto scope = ex::counting_scope(); + for (int i = 0; i < 1000; ++i) + { + ex::spawn(ex::starts_on(pool.get_scheduler(), task()) | ex::upon_error([](auto) noexcept {}), + scope.get_token()); + } + ex::sync_wait(scope.join()); + } + // TODO: add tests for stop token support in task } // anonymous namespace