diff --git a/docs/spec-alignment.md b/docs/spec-alignment.md new file mode 100644 index 0000000..751f681 --- /dev/null +++ b/docs/spec-alignment.md @@ -0,0 +1,63 @@ +# Spec alignment tracker + +This document tracks alignment between the upstream Component Model spec/reference (in `ref/component-model`) and this libraryโ€™s C++ entrypoints. + +- Upstream references: + - `ref/component-model/design/mvp/CanonicalABI.md` + - `ref/component-model/design/mvp/Concurrency.md` + - `ref/component-model/design/mvp/canonical-abi/definitions.py` +- C++ namespace: `cmcpp` + +## Canonical built-ins inventory + +| Spec built-in | C++ entrypoint | Status | Notes | +|---|---|---|---| +| `context.get` | `cmcpp::canon_context_get` | implemented | | +| `context.set` | `cmcpp::canon_context_set` | implemented | | +| `backpressure.inc` | `cmcpp::canon_backpressure_inc` | implemented | | +| `backpressure.dec` | `cmcpp::canon_backpressure_dec` | implemented | | +| `backpressure.set` | `cmcpp::canon_backpressure_set` | implemented | Spec notes this as deprecated in favor of `backpressure.{inc,dec}`. | +| `thread.yield` | `cmcpp::canon_thread_yield` | implemented | | +| `thread.yield-to` | `cmcpp::canon_thread_yield_to` | implemented | | +| `thread.resume-later` | `cmcpp::canon_thread_resume_later` | implemented | | +| `thread.index` | `cmcpp::canon_thread_index` | implemented | | +| `thread.suspend` | `cmcpp::canon_thread_suspend` | implemented | | +| `task.return` | `cmcpp::canon_task_return` | implemented | | +| `task.cancel` | `cmcpp::canon_task_cancel` | implemented | | +| `task.wait` | `cmcpp::canon_task_wait` | implemented | | +| `waitable-set.new` | `cmcpp::canon_waitable_set_new` | implemented | | +| `waitable-set.drop` | `cmcpp::canon_waitable_set_drop` | implemented | | +| `waitable-set.wait` | `cmcpp::canon_waitable_set_wait` | implemented | | +| `waitable-set.poll` | `cmcpp::canon_waitable_set_poll` | implemented | | +| `waitable.join` | `cmcpp::canon_waitable_join` | implemented | | +| `stream.new` | `cmcpp::canon_stream_new` | implemented | | +| `stream.read` | `cmcpp::canon_stream_read` | implemented | | +| `stream.write` | `cmcpp::canon_stream_write` | implemented | | +| `stream.cancel-read` | `cmcpp::canon_stream_cancel_read` | implemented | | +| `stream.cancel-write` | `cmcpp::canon_stream_cancel_write` | implemented | | +| `stream.drop-readable` | `cmcpp::canon_stream_drop_readable` | implemented | | +| `stream.drop-writable` | `cmcpp::canon_stream_drop_writable` | implemented | | +| `future.new` | `cmcpp::canon_future_new` | implemented | | +| `future.read` | `cmcpp::canon_future_read` | implemented | | +| `future.write` | `cmcpp::canon_future_write` | implemented | | +| `future.cancel-read` | `cmcpp::canon_future_cancel_read` | implemented | | +| `future.cancel-write` | `cmcpp::canon_future_cancel_write` | implemented | | +| `future.drop-readable` | `cmcpp::canon_future_drop_readable` | implemented | | +| `future.drop-writable` | `cmcpp::canon_future_drop_writable` | implemented | | +| `resource.new` | `cmcpp::canon_resource_new` | implemented | | +| `resource.rep` | `cmcpp::canon_resource_rep` | implemented | | +| `resource.drop` | `cmcpp::canon_resource_drop` | implemented | | +| `error-context.new` | `cmcpp::canon_error_context_new` | implemented | | +| `error-context.debug-message` | `cmcpp::canon_error_context_debug_message` | implemented | | +| `error-context.drop` | `cmcpp::canon_error_context_drop` | implemented | | + +## Thread built-ins (upstream) + +These are referenced by the upstream spec/reference but do not currently have direct C++ canonical built-in entrypoints in this repo: + +- `thread.new_ref` / `thread.new-ref` (implemented: `cmcpp::canon_thread_new_ref`) +- `thread.new-indirect` (implemented: `cmcpp::canon_thread_new_indirect`) +- `thread.spawn-ref` (implemented: `cmcpp::canon_thread_spawn_ref`) +- `thread.spawn-indirect` (implemented: `cmcpp::canon_thread_spawn_indirect`) +- `thread.switch-to` (implemented: `cmcpp::canon_thread_switch_to`) +- `thread.available-parallelism` (implemented: `cmcpp::canon_thread_available_parallelism`) diff --git a/include/cmcpp/context.hpp b/include/cmcpp/context.hpp index d7b9b6f..9e01144 100644 --- a/include/cmcpp/context.hpp +++ b/include/cmcpp/context.hpp @@ -12,10 +12,13 @@ #include #include #include +#include +#include #include #include #include #include +#include #include #if __has_include() #include @@ -34,6 +37,7 @@ namespace cmcpp using GuestPostReturn = std::function; using GuestCallback = std::function; using HostUnicodeConversion = std::function(void *dest, uint32_t dest_byte_len, const void *src, uint32_t src_byte_len, Encoding from_encoding, Encoding to_encoding)>; + using ReclaimBuffer = std::function; // Canonical ABI Options --- class LiftOptions @@ -51,6 +55,12 @@ namespace cmcpp } }; + enum class SuspendResult : uint32_t + { + NOT_CANCELLED = 0, + CANCELLED = 1, + }; + class LiftLowerOptions : public LiftOptions { public: @@ -197,14 +207,29 @@ namespace cmcpp class WaitableSet; + class ThreadEntry : public TableEntry + { + public: + explicit ThreadEntry(std::shared_ptr thread) : thread_(std::move(thread)) {} + + const std::shared_ptr &thread() const + { + return thread_; + } + + private: + std::shared_ptr thread_; + }; + class Waitable : public TableEntry { public: Waitable() = default; - void set_pending_event(const Event &event) + void set_pending_event(const Event &event, ReclaimBuffer reclaim = {}) { pending_event_ = event; + pending_reclaim_ = std::move(reclaim); } bool has_pending_event() const @@ -216,14 +241,21 @@ namespace cmcpp { auto trap_cx = make_trap_context(trap); trap_if(trap_cx, !pending_event_.has_value(), "waitable pending event missing"); + auto reclaim = std::move(pending_reclaim_); Event event = *pending_event_; pending_event_.reset(); + pending_reclaim_ = {}; + if (reclaim) + { + reclaim(); + } return event; } void clear_pending_event() { pending_event_.reset(); + pending_reclaim_ = {}; } void join(WaitableSet *set, const HostTrap &trap); @@ -237,6 +269,7 @@ namespace cmcpp private: std::optional pending_event_; + ReclaimBuffer pending_reclaim_; WaitableSet *wset_ = nullptr; }; @@ -467,9 +500,11 @@ namespace cmcpp enum class CopyState : uint8_t { - Idle = 0, - Copying = 1, - Done = 2 + IDLE = 1, + SYNC_COPYING = 2, + ASYNC_COPYING = 3, + CANCELLING_COPY = 4, + DONE = 5 }; inline uint32_t pack_copy_result(CopyResult result, uint32_t progress) @@ -493,447 +528,833 @@ namespace cmcpp trap_if(trap_cx, expected.type != actual.type, "future descriptor type mismatch"); } - class ReadableStreamEnd; - class WritableStreamEnd; + using OnCopy = std::function; + using OnCopyDone = std::function; - struct SharedStreamState + class BufferGuestImpl { - explicit SharedStreamState(const StreamDescriptor &desc) : descriptor(desc) {} - - StreamDescriptor descriptor; - std::deque> queue; - bool readable_dropped = false; - bool writable_dropped = false; - - struct PendingRead - { - std::shared_ptr cx; - uint32_t ptr = 0; - uint32_t requested = 0; - uint32_t progress = 0; - uint32_t handle_index = 0; - ReadableStreamEnd *endpoint = nullptr; - }; + public: + static constexpr uint32_t MAX_LENGTH = (1u << 28) - 1; - std::optional pending_read; - }; + virtual ~BufferGuestImpl() = default; - inline void copy_into_queue(const std::shared_ptr &cx, uint32_t ptr, uint32_t count, SharedStreamState &state, const HostTrap &trap) - { - if (count == 0) + BufferGuestImpl(uint32_t elem_size, uint32_t alignment, std::shared_ptr cx, uint32_t ptr, uint32_t length, const HostTrap &trap) + : elem_size_(elem_size), alignment_(alignment), cx_(std::move(cx)), ptr_(ptr), length_(length) { - return; - } - ensure_memory_range(*cx, ptr, count, state.descriptor.alignment, state.descriptor.element_size); - auto *src = cx->opts.memory.data() + ptr; - for (uint32_t i = 0; i < count; ++i) - { - std::vector bytes(state.descriptor.element_size); - std::memcpy(bytes.data(), src + i * state.descriptor.element_size, state.descriptor.element_size); - state.queue.push_back(std::move(bytes)); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, length_ > MAX_LENGTH, "buffer length overflow"); + trap_if(trap_cx, !cx_, "lift/lower context required"); + if (length_ > 0) + { + ensure_memory_range(*cx_, ptr_, length_, alignment_, elem_size_); + } } - } - inline uint32_t copy_from_queue(const std::shared_ptr &cx, - uint32_t ptr, - uint32_t offset, - uint32_t max_count, - SharedStreamState &state, - const HostTrap &trap) - { - if (max_count == 0) + uint32_t remain() const { - return 0; + return length_ - progress_; } - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state.queue.size() > std::numeric_limits::max(), "stream queue size overflow"); - uint32_t available = std::min(max_count, static_cast(state.queue.size())); - if (available == 0) + + bool is_zero_length() const { - return 0; + return length_ == 0; } - ensure_memory_range(*cx, ptr, offset + available, state.descriptor.alignment, state.descriptor.element_size); - auto *dest = cx->opts.memory.data() + ptr + offset * state.descriptor.element_size; - for (uint32_t i = 0; i < available; ++i) + + uint32_t progress() const { - const auto &bytes = state.queue.front(); - trap_if(trap_cx, bytes.size() != state.descriptor.element_size, "stream element size mismatch"); - std::memcpy(dest + i * state.descriptor.element_size, bytes.data(), state.descriptor.element_size); - state.queue.pop_front(); + return progress_; } - return available; - } - inline void satisfy_pending_read(SharedStreamState &state, const HostTrap &trap); + protected: + uint32_t elem_size_ = 0; + uint32_t alignment_ = 1; + std::shared_ptr cx_; + uint32_t ptr_ = 0; + uint32_t length_ = 0; + uint32_t progress_ = 0; + }; - class ReadableStreamEnd : public Waitable + class ReadableBufferGuestImpl : public BufferGuestImpl { public: - explicit ReadableStreamEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} + using BufferGuestImpl::BufferGuestImpl; - const StreamDescriptor &descriptor() const + std::vector read(uint32_t n, const HostTrap &trap) { - return shared_->descriptor; + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, n > remain(), "buffer read past end"); + std::vector bytes(static_cast(n) * elem_size_); + if (n > 0) + { + uint32_t read_ptr = ptr_ + progress_ * elem_size_; + ensure_memory_range(*cx_, read_ptr, n, alignment_, elem_size_); + std::memcpy(bytes.data(), cx_->opts.memory.data() + read_ptr, bytes.size()); + } + progress_ += n; + return bytes; } - - uint32_t read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, bool sync, const HostTrap &trap); - uint32_t cancel(bool sync, const HostTrap &trap); - void drop(const HostTrap &trap); - void complete_async(const std::shared_ptr &cx, uint32_t handle_index, CopyResult result, uint32_t progress, const HostTrap &trap); - - private: - std::shared_ptr shared_; - CopyState state_ = CopyState::Idle; }; - class WritableStreamEnd : public Waitable + class WritableBufferGuestImpl : public BufferGuestImpl { public: - explicit WritableStreamEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} + using BufferGuestImpl::BufferGuestImpl; - const StreamDescriptor &descriptor() const + void write(const std::vector &bytes, const HostTrap &trap) { - return shared_->descriptor; + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, elem_size_ == 0, "invalid element size"); + trap_if(trap_cx, bytes.size() % elem_size_ != 0, "buffer write size mismatch"); + uint32_t n = static_cast(bytes.size() / elem_size_); + trap_if(trap_cx, n > remain(), "buffer write past end"); + if (n > 0) + { + uint32_t write_ptr = ptr_ + progress_ * elem_size_; + ensure_memory_range(*cx_, write_ptr, n, alignment_, elem_size_); + std::memcpy(cx_->opts.memory.data() + write_ptr, bytes.data(), bytes.size()); + } + progress_ += n; } - - uint32_t write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, const HostTrap &trap); - uint32_t cancel(bool sync, const HostTrap &trap); - void drop(const HostTrap &trap); - - private: - std::shared_ptr shared_; - CopyState state_ = CopyState::Idle; }; - inline uint32_t ReadableStreamEnd::read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, bool sync, const HostTrap &trap) + struct SharedStreamState { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, !shared_, "stream state missing"); - trap_if(trap_cx, shared_->descriptor.element_size == 0, "invalid stream descriptor"); - trap_if(trap_cx, state_ != CopyState::Idle, "stream read busy"); - - uint32_t consumed = copy_from_queue(cx, ptr, 0, n, *shared_, trap); - if (consumed > 0 || n == 0) - { - set_pending_event({EventCode::STREAM_READ, handle_index, pack_copy_result(CopyResult::Completed, consumed)}); - auto event = get_pending_event(trap); - state_ = CopyState::Idle; - return event.payload; - } + explicit SharedStreamState(const StreamDescriptor &desc) : descriptor(desc) {} - if (shared_->writable_dropped) - { - set_pending_event({EventCode::STREAM_READ, handle_index, pack_copy_result(CopyResult::Dropped, 0)}); - auto event = get_pending_event(trap); - state_ = CopyState::Done; - return event.payload; - } + StreamDescriptor descriptor; + bool dropped = false; - trap_if(trap_cx, sync, "sync stream read would block"); - shared_->pending_read = SharedStreamState::PendingRead{cx, ptr, n, 0, handle_index, this}; - state_ = CopyState::Copying; - return BLOCKED; - } + std::shared_ptr pending_buffer; + OnCopy pending_on_copy; + OnCopyDone pending_on_copy_done; - inline uint32_t ReadableStreamEnd::cancel(bool sync, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state_ != CopyState::Copying, "no pending stream read"); - trap_if(trap_cx, !shared_ || !shared_->pending_read, "no pending stream read"); + std::mutex mu; + std::condition_variable cv; - auto pending = std::move(*shared_->pending_read); - shared_->pending_read.reset(); - auto payload = pack_copy_result(CopyResult::Cancelled, pending.progress); - set_pending_event({EventCode::STREAM_READ, pending.handle_index, payload}); - state_ = CopyState::Done; + void notify_all() + { + cv.notify_all(); + } - if (sync) + template + void wait_until(Pred pred) { - auto event = get_pending_event(trap); - return event.payload; + std::unique_lock lock(mu); + cv.wait(lock, std::move(pred)); } - if (pending.cx) + + void reset_pending() { - pending.cx->notify_async_event(EventCode::STREAM_READ, pending.handle_index, payload); + pending_buffer.reset(); + pending_on_copy = {}; + pending_on_copy_done = {}; } - return BLOCKED; - } - inline void ReadableStreamEnd::drop(const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state_ == CopyState::Copying, "cannot drop pending stream read"); - trap_if(trap_cx, shared_ && shared_->pending_read.has_value(), "pending read must complete before drop"); - if (shared_) + void set_pending(const std::shared_ptr &buffer, OnCopy on_copy, OnCopyDone on_copy_done) { - shared_->readable_dropped = true; + pending_buffer = buffer; + pending_on_copy = std::move(on_copy); + pending_on_copy_done = std::move(on_copy_done); } - state_ = CopyState::Done; - Waitable::drop(trap); - } - inline void ReadableStreamEnd::complete_async(const std::shared_ptr &cx, uint32_t handle_index, CopyResult result, uint32_t progress, const HostTrap &trap) - { - auto payload = pack_copy_result(result, progress); - set_pending_event({EventCode::STREAM_READ, handle_index, payload}); - state_ = (result == CopyResult::Completed) ? CopyState::Idle : CopyState::Done; - if (cx) + void reset_and_notify_pending(CopyResult result) { - cx->notify_async_event(EventCode::STREAM_READ, handle_index, payload); + auto on_copy_done = std::move(pending_on_copy_done); + reset_pending(); + if (on_copy_done) + { + on_copy_done(result); + } } - } - inline void satisfy_pending_read(SharedStreamState &state, const HostTrap &trap) - { - if (!state.pending_read) + void cancel() { - return; + if (pending_buffer) + { + reset_and_notify_pending(CopyResult::Cancelled); + } } - auto &pending = *state.pending_read; - uint32_t remaining = pending.requested - pending.progress; - uint32_t consumed = copy_from_queue(pending.cx, pending.ptr, pending.progress, remaining, state, trap); - pending.progress += consumed; - if (pending.progress >= pending.requested) + + void drop() { - pending.endpoint->complete_async(pending.cx, pending.handle_index, CopyResult::Completed, pending.progress, trap); - state.pending_read.reset(); + if (dropped) + { + return; + } + dropped = true; + if (pending_buffer) + { + reset_and_notify_pending(CopyResult::Dropped); + } } - } - inline uint32_t WritableStreamEnd::write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, !shared_, "stream state missing"); - trap_if(trap_cx, shared_->descriptor.element_size == 0, "invalid stream descriptor"); - trap_if(trap_cx, state_ != CopyState::Idle, "stream write busy"); + void read(const std::shared_ptr &dst, OnCopy on_copy, OnCopyDone on_copy_done, const HostTrap &trap) + { + std::scoped_lock lock(mu); + if (dropped) + { + on_copy_done(CopyResult::Dropped); + return; + } + if (!pending_buffer) + { + set_pending(dst, std::move(on_copy), std::move(on_copy_done)); + return; + } - copy_into_queue(cx, ptr, n, *shared_, trap); - satisfy_pending_read(*shared_, trap); + auto src = std::dynamic_pointer_cast(pending_buffer); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !src, "stream pending buffer type mismatch"); - set_pending_event({EventCode::STREAM_WRITE, handle_index, pack_copy_result(CopyResult::Completed, n)}); - auto event = get_pending_event(trap); - state_ = CopyState::Idle; - return event.payload; - } + if (src->remain() > 0) + { + if (dst->remain() > 0) + { + uint32_t n = std::min(dst->remain(), src->remain()); + dst->write(src->read(n, trap), trap); + if (pending_on_copy) + { + pending_on_copy([this]() + { + std::scoped_lock reclaim_lock(mu); + reset_pending(); }); + } + } + on_copy_done(CopyResult::Completed); + return; + } - inline uint32_t WritableStreamEnd::cancel(bool, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, true, "no pending stream write"); - return BLOCKED; - } + reset_and_notify_pending(CopyResult::Completed); + set_pending(dst, std::move(on_copy), std::move(on_copy_done)); + } - inline void WritableStreamEnd::drop(const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state_ == CopyState::Copying, "cannot drop pending stream write"); - if (shared_) + void write(const std::shared_ptr &src, OnCopy on_copy, OnCopyDone on_copy_done, const HostTrap &trap) { - if (shared_->pending_read) + std::scoped_lock lock(mu); + if (dropped) { - auto pending = std::move(*shared_->pending_read); - shared_->pending_read.reset(); - pending.endpoint->complete_async(pending.cx, pending.handle_index, CopyResult::Dropped, pending.progress, trap); + on_copy_done(CopyResult::Dropped); + return; + } + if (!pending_buffer) + { + set_pending(src, std::move(on_copy), std::move(on_copy_done)); + return; } - shared_->writable_dropped = true; - } - state_ = CopyState::Done; - Waitable::drop(trap); - } - - class ReadableFutureEnd; - class WritableFutureEnd; - struct SharedFutureState - { - explicit SharedFutureState(const FutureDescriptor &desc) : descriptor(desc), value(desc.element_size) {} + auto dst = std::dynamic_pointer_cast(pending_buffer); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !dst, "stream pending buffer type mismatch"); - FutureDescriptor descriptor; - bool readable_dropped = false; - bool writable_dropped = false; - bool value_ready = false; - std::vector value; + if (dst->remain() > 0) + { + if (src->remain() > 0) + { + uint32_t n = std::min(src->remain(), dst->remain()); + dst->write(src->read(n, trap), trap); + if (pending_on_copy) + { + pending_on_copy( + [this]() + { + std::unique_lock reclaim_lock(mu, std::try_to_lock); + if (!reclaim_lock.owns_lock()) + { + return; + } + reset_pending(); + }); + } + } + on_copy_done(CopyResult::Completed); + return; + } - struct PendingRead - { - std::shared_ptr cx; - uint32_t ptr = 0; - uint32_t handle_index = 0; - ReadableFutureEnd *endpoint = nullptr; - }; + if (src->is_zero_length() && dst->is_zero_length()) + { + on_copy_done(CopyResult::Completed); + return; + } - std::optional pending_read; + reset_and_notify_pending(CopyResult::Completed); + set_pending(src, std::move(on_copy), std::move(on_copy_done)); + } }; - class ReadableFutureEnd : public Waitable + class CopyEnd : public Waitable { public: - explicit ReadableFutureEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} - - const FutureDescriptor &descriptor() const + bool copying() const { - return shared_->descriptor; + switch (state_) + { + case CopyState::IDLE: + case CopyState::DONE: + return false; + case CopyState::SYNC_COPYING: + case CopyState::ASYNC_COPYING: + case CopyState::CANCELLING_COPY: + return true; + } + return false; } - uint32_t read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, bool sync, const HostTrap &trap); - uint32_t cancel(bool sync, const HostTrap &trap); - void drop(const HostTrap &trap); - void complete_async(const std::shared_ptr &cx, uint32_t handle_index, CopyResult result, uint32_t progress, const HostTrap &trap); - - private: - std::shared_ptr shared_; - CopyState state_ = CopyState::Idle; + protected: + CopyState state_ = CopyState::IDLE; }; - class WritableFutureEnd : public Waitable + class ReadableStreamEnd final : public CopyEnd { public: - explicit WritableFutureEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} + explicit ReadableStreamEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} - const FutureDescriptor &descriptor() const + const StreamDescriptor &descriptor() const { return shared_->descriptor; } - uint32_t write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, const HostTrap &trap); - uint32_t cancel(bool sync, const HostTrap &trap); - void drop(const HostTrap &trap); + uint32_t read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !shared_, "stream state missing"); + trap_if(trap_cx, state_ != CopyState::IDLE, "stream read not idle"); - private: - std::shared_ptr shared_; - CopyState state_ = CopyState::Idle; - }; + auto buffer = std::make_shared(shared_->descriptor.element_size, shared_->descriptor.alignment, cx, ptr, n, trap); - inline uint32_t ReadableFutureEnd::read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, bool sync, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, !shared_, "future state missing"); - trap_if(trap_cx, shared_->descriptor.element_size == 0, "invalid future descriptor"); - trap_if(trap_cx, state_ != CopyState::Idle, "future read busy"); + auto make_payload = [buffer](CopyResult result) -> uint32_t + { + return pack_copy_result(result, buffer->progress()); + }; - if (shared_->value_ready) - { - ensure_memory_range(*cx, ptr, 1, shared_->descriptor.alignment, shared_->descriptor.element_size); - std::memcpy(cx->opts.memory.data() + ptr, shared_->value.data(), shared_->descriptor.element_size); - set_pending_event({EventCode::FUTURE_READ, handle_index, pack_copy_result(CopyResult::Completed, 1)}); + OnCopy on_copy = [this, handle_index, cx, make_payload](ReclaimBuffer reclaim) + { + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + uint32_t payload = make_payload(CopyResult::Completed); + set_pending_event({EventCode::STREAM_READ, handle_index, payload}, std::move(reclaim)); + state_ = CopyState::IDLE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::STREAM_READ, handle_index, payload); + } + }; + + OnCopyDone on_copy_done = [this, handle_index, cx, make_payload](CopyResult result) + { + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + uint32_t payload = make_payload(result); + set_pending_event({EventCode::STREAM_READ, handle_index, payload}); + state_ = (result == CopyResult::Dropped) ? CopyState::DONE : CopyState::IDLE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::STREAM_READ, handle_index, payload); + } + }; + + shared_->read(buffer, std::move(on_copy), std::move(on_copy_done), trap); + + if (!has_pending_event()) + { + if (sync) + { + state_ = CopyState::SYNC_COPYING; + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + state_ = CopyState::ASYNC_COPYING; + return BLOCKED; + } + } auto event = get_pending_event(trap); - state_ = CopyState::Idle; return event.payload; } - if (shared_->writable_dropped) + uint32_t cancel(bool sync, const HostTrap &trap) { - set_pending_event({EventCode::FUTURE_READ, handle_index, pack_copy_result(CopyResult::Dropped, 0)}); - auto event = get_pending_event(trap); - state_ = CopyState::Done; - return event.payload; - } + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, state_ != CopyState::ASYNC_COPYING, "stream cancel requires async copy"); + state_ = CopyState::CANCELLING_COPY; - trap_if(trap_cx, sync, "sync future read would block"); - shared_->pending_read = SharedFutureState::PendingRead{cx, ptr, handle_index, this}; - state_ = CopyState::Copying; - return BLOCKED; - } + if (!has_pending_event() && shared_) + { + shared_->cancel(); + } - inline uint32_t ReadableFutureEnd::cancel(bool sync, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state_ != CopyState::Copying, "no pending future read"); - trap_if(trap_cx, !shared_ || !shared_->pending_read, "no pending future read"); + if (!has_pending_event()) + { + if (sync) + { + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + return BLOCKED; + } + } - auto pending = std::move(*shared_->pending_read); - shared_->pending_read.reset(); - auto payload = pack_copy_result(CopyResult::Cancelled, 0); - set_pending_event({EventCode::FUTURE_READ, pending.handle_index, payload}); - state_ = CopyState::Done; + if (!sync) + { + return BLOCKED; + } - if (sync) - { auto event = get_pending_event(trap); return event.payload; } - if (pending.cx) - { - pending.cx->notify_async_event(EventCode::FUTURE_READ, pending.handle_index, payload); - } - return BLOCKED; - } - inline void ReadableFutureEnd::drop(const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, state_ == CopyState::Copying, "cannot drop pending future read"); - trap_if(trap_cx, shared_ && shared_->pending_read.has_value(), "pending future read must complete before drop"); - if (shared_) + void drop(const HostTrap &trap) { - shared_->readable_dropped = true; + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, copying(), "cannot drop stream end while copying"); + if (shared_) + { + shared_->drop(); + } + state_ = CopyState::DONE; + Waitable::drop(trap); } - state_ = CopyState::Done; - Waitable::drop(trap); - } - inline void ReadableFutureEnd::complete_async(const std::shared_ptr &cx, uint32_t handle_index, CopyResult result, uint32_t progress, const HostTrap &trap) - { - auto payload = pack_copy_result(result, progress); - set_pending_event({EventCode::FUTURE_READ, handle_index, payload}); - state_ = (result == CopyResult::Completed) ? CopyState::Idle : CopyState::Done; - if (cx) - { - cx->notify_async_event(EventCode::FUTURE_READ, handle_index, payload); - } - } + private: + std::shared_ptr shared_; + }; - inline uint32_t WritableFutureEnd::write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, const HostTrap &trap) + class WritableStreamEnd final : public CopyEnd { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, !shared_, "future state missing"); - trap_if(trap_cx, shared_->descriptor.element_size == 0, "invalid future descriptor"); - trap_if(trap_cx, shared_->value_ready, "future already resolved"); - - ensure_memory_range(*cx, ptr, 1, shared_->descriptor.alignment, shared_->descriptor.element_size); - std::memcpy(shared_->value.data(), cx->opts.memory.data() + ptr, shared_->descriptor.element_size); - shared_->value_ready = true; + public: + explicit WritableStreamEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} - if (shared_->pending_read) + const StreamDescriptor &descriptor() const { - auto pending = std::move(*shared_->pending_read); - shared_->pending_read.reset(); - ensure_memory_range(*pending.cx, pending.ptr, 1, shared_->descriptor.alignment, shared_->descriptor.element_size); - std::memcpy(pending.cx->opts.memory.data() + pending.ptr, shared_->value.data(), shared_->descriptor.element_size); - pending.endpoint->complete_async(pending.cx, pending.handle_index, CopyResult::Completed, 1, trap); + return shared_->descriptor; } - set_pending_event({EventCode::FUTURE_WRITE, handle_index, pack_copy_result(CopyResult::Completed, 1)}); - auto event = get_pending_event(trap); - state_ = CopyState::Idle; - return event.payload; - } + uint32_t write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, uint32_t n, bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !shared_, "stream state missing"); + trap_if(trap_cx, state_ != CopyState::IDLE, "stream write not idle"); - inline uint32_t WritableFutureEnd::cancel(bool, const HostTrap &trap) - { - auto trap_cx = make_trap_context(trap); - trap_if(trap_cx, true, "no pending future write"); - return BLOCKED; - } + auto buffer = std::make_shared(shared_->descriptor.element_size, shared_->descriptor.alignment, cx, ptr, n, trap); + auto make_payload = [buffer](CopyResult result) -> uint32_t + { + return pack_copy_result(result, buffer->progress()); + }; - inline void WritableFutureEnd::drop(const HostTrap &trap) - { - if (shared_ && !shared_->value_ready) - { - if (shared_->pending_read) + OnCopy on_copy = [this, handle_index, cx, make_payload](ReclaimBuffer reclaim) { - auto pending = std::move(*shared_->pending_read); - shared_->pending_read.reset(); - pending.endpoint->complete_async(pending.cx, pending.handle_index, CopyResult::Dropped, 0, trap); - } - shared_->writable_dropped = true; - } - state_ = CopyState::Done; - Waitable::drop(trap); - } + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + uint32_t payload = make_payload(CopyResult::Completed); + set_pending_event({EventCode::STREAM_WRITE, handle_index, payload}, std::move(reclaim)); + state_ = CopyState::IDLE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::STREAM_WRITE, handle_index, payload); + } + }; - struct ResourceType - { + OnCopyDone on_copy_done = [this, handle_index, cx, make_payload](CopyResult result) + { + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + uint32_t payload = make_payload(result); + set_pending_event({EventCode::STREAM_WRITE, handle_index, payload}); + state_ = (result == CopyResult::Dropped) ? CopyState::DONE : CopyState::IDLE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::STREAM_WRITE, handle_index, payload); + } + }; + + shared_->write(buffer, std::move(on_copy), std::move(on_copy_done), trap); + + if (!has_pending_event()) + { + if (sync) + { + state_ = CopyState::SYNC_COPYING; + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + state_ = CopyState::ASYNC_COPYING; + return BLOCKED; + } + } + auto event = get_pending_event(trap); + return event.payload; + } + + uint32_t cancel(bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, state_ != CopyState::ASYNC_COPYING, "stream cancel requires async copy"); + state_ = CopyState::CANCELLING_COPY; + + if (!has_pending_event() && shared_) + { + shared_->cancel(); + } + + if (!has_pending_event()) + { + if (sync) + { + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + return BLOCKED; + } + } + + if (!sync) + { + return BLOCKED; + } + + auto event = get_pending_event(trap); + return event.payload; + } + + void drop(const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, copying(), "cannot drop stream end while copying"); + if (shared_) + { + shared_->drop(); + } + state_ = CopyState::DONE; + Waitable::drop(trap); + } + + private: + std::shared_ptr shared_; + }; + + struct SharedFutureState + { + explicit SharedFutureState(const FutureDescriptor &desc) : descriptor(desc) {} + + FutureDescriptor descriptor; + bool dropped = false; + + std::shared_ptr pending_buffer; + OnCopyDone pending_on_copy_done; + + std::mutex mu; + std::condition_variable cv; + + void notify_all() + { + cv.notify_all(); + } + + template + void wait_until(Pred pred) + { + std::unique_lock lock(mu); + cv.wait(lock, std::move(pred)); + } + + void reset_pending() + { + pending_buffer.reset(); + pending_on_copy_done = {}; + } + + void set_pending(const std::shared_ptr &buffer, OnCopyDone on_copy_done) + { + pending_buffer = buffer; + pending_on_copy_done = std::move(on_copy_done); + } + + void reset_and_notify_pending(CopyResult result) + { + auto on_copy_done = std::move(pending_on_copy_done); + reset_pending(); + if (on_copy_done) + { + on_copy_done(result); + } + } + + void cancel() + { + if (pending_buffer) + { + reset_and_notify_pending(CopyResult::Cancelled); + } + } + + void drop() + { + if (dropped) + { + return; + } + dropped = true; + if (pending_buffer) + { + reset_and_notify_pending(CopyResult::Dropped); + } + } + + void read(const std::shared_ptr &dst, OnCopyDone on_copy_done, const HostTrap &trap) + { + std::scoped_lock lock(mu); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, dst->remain() != 1, "future read length must be 1"); + + if (dropped) + { + on_copy_done(CopyResult::Dropped); + return; + } + + if (!pending_buffer) + { + set_pending(dst, std::move(on_copy_done)); + return; + } + + auto src = std::dynamic_pointer_cast(pending_buffer); + trap_if(trap_cx, !src, "future pending buffer type mismatch"); + dst->write(src->read(1, trap), trap); + reset_and_notify_pending(CopyResult::Completed); + on_copy_done(CopyResult::Completed); + } + + void write(const std::shared_ptr &src, OnCopyDone on_copy_done, const HostTrap &trap) + { + std::scoped_lock lock(mu); + if (dropped) + { + on_copy_done(CopyResult::Dropped); + return; + } + + if (!pending_buffer) + { + set_pending(src, std::move(on_copy_done)); + return; + } + + auto dst = std::dynamic_pointer_cast(pending_buffer); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !dst, "future pending buffer type mismatch"); + dst->write(src->read(1, trap), trap); + reset_and_notify_pending(CopyResult::Completed); + on_copy_done(CopyResult::Completed); + } + }; + + class ReadableFutureEnd final : public CopyEnd + { + public: + explicit ReadableFutureEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} + + const FutureDescriptor &descriptor() const + { + return shared_->descriptor; + } + + uint32_t read(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !shared_, "future state missing"); + trap_if(trap_cx, state_ != CopyState::IDLE, "future read not idle"); + + auto buffer = std::make_shared(shared_->descriptor.element_size, shared_->descriptor.alignment, cx, ptr, 1, trap); + OnCopyDone on_copy_done = [this, handle_index, cx](CopyResult result) + { + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + set_pending_event({EventCode::FUTURE_READ, handle_index, static_cast(result)}); + state_ = (result == CopyResult::Cancelled) ? CopyState::IDLE : CopyState::DONE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::FUTURE_READ, handle_index, static_cast(result)); + } + }; + + shared_->read(buffer, std::move(on_copy_done), trap); + + if (!has_pending_event()) + { + if (sync) + { + state_ = CopyState::SYNC_COPYING; + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + state_ = CopyState::ASYNC_COPYING; + return BLOCKED; + } + } + auto event = get_pending_event(trap); + return event.payload; + } + + uint32_t cancel(bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, state_ != CopyState::ASYNC_COPYING, "future cancel requires async copy"); + state_ = CopyState::CANCELLING_COPY; + if (!has_pending_event() && shared_) + { + shared_->cancel(); + } + if (!has_pending_event()) + { + if (sync) + { + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + return BLOCKED; + } + } + if (!sync) + { + return BLOCKED; + } + auto event = get_pending_event(trap); + return event.payload; + } + + void drop(const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, copying(), "cannot drop future end while copying"); + if (shared_) + { + shared_->drop(); + } + state_ = CopyState::DONE; + Waitable::drop(trap); + } + + private: + std::shared_ptr shared_; + }; + + class WritableFutureEnd final : public CopyEnd + { + public: + explicit WritableFutureEnd(std::shared_ptr shared) : shared_(std::move(shared)) {} + + const FutureDescriptor &descriptor() const + { + return shared_->descriptor; + } + + uint32_t write(const std::shared_ptr &cx, uint32_t handle_index, uint32_t ptr, bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, !shared_, "future state missing"); + trap_if(trap_cx, state_ != CopyState::IDLE, "future write not idle"); + + auto buffer = std::make_shared(shared_->descriptor.element_size, shared_->descriptor.alignment, cx, ptr, 1, trap); + OnCopyDone on_copy_done = [this, handle_index, cx](CopyResult result) + { + bool notify = (state_ == CopyState::ASYNC_COPYING || state_ == CopyState::CANCELLING_COPY); + set_pending_event({EventCode::FUTURE_WRITE, handle_index, static_cast(result)}); + // For future.write, COMPLETED and DROPPED end the write. + state_ = (result == CopyResult::Cancelled) ? CopyState::IDLE : CopyState::DONE; + if (shared_) + { + shared_->notify_all(); + } + if (cx && notify) + { + cx->notify_async_event(EventCode::FUTURE_WRITE, handle_index, static_cast(result)); + } + }; + + shared_->write(buffer, std::move(on_copy_done), trap); + + if (!has_pending_event()) + { + if (sync) + { + state_ = CopyState::SYNC_COPYING; + shared_->wait_until([this]() + { return has_pending_event(); }); + } + else + { + state_ = CopyState::ASYNC_COPYING; + return BLOCKED; + } + } + auto event = get_pending_event(trap); + return event.payload; + } + + uint32_t cancel(bool sync, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, state_ != CopyState::ASYNC_COPYING, "future cancel requires async copy"); + state_ = CopyState::CANCELLING_COPY; + if (!has_pending_event() && shared_) + { + shared_->cancel(); + } + if (!has_pending_event()) + { + return BLOCKED; + } + if (!sync) + { + return BLOCKED; + } + auto event = get_pending_event(trap); + return event.payload; + } + + void drop(const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, state_ != CopyState::DONE, "writable future end must be done before drop"); + if (shared_) + { + shared_->drop(); + } + Waitable::drop(trap); + } + + private: + std::shared_ptr shared_; + }; + + struct ResourceType + { ComponentInstance *impl = nullptr; std::function dtor; @@ -1088,27 +1509,6 @@ namespace cmcpp inst.backpressure -= 1; } - class ContextLocalStorage - { - public: - static constexpr uint32_t LENGTH = 1; - - ContextLocalStorage() = default; - - void set(uint32_t index, int32_t value) - { - storage_[index] = value; - } - - int32_t get(uint32_t index) const - { - return storage_[index]; - } - - private: - std::array storage_{}; - }; - class Task : public std::enable_shared_from_this { public: @@ -1224,7 +1624,7 @@ namespace cmcpp thread_->request_cancellation(); } - bool suspend_until(Thread::ReadyFn ready, bool cancellable) + bool suspend_until(Thread::ReadyFn ready, bool cancellable, bool force_yield = false) { if (cancellable && state_ == State::CancelDelivered) { @@ -1239,7 +1639,7 @@ namespace cmcpp { return false; } - bool completed = thread_->suspend_until(std::move(ready), cancellable); + bool completed = thread_->suspend_until(std::move(ready), cancellable, force_yield); if (!completed && cancellable && state_ == State::PendingCancel) { state_ = State::CancelDelivered; @@ -1247,9 +1647,9 @@ namespace cmcpp return completed; } - Event yield_until(Thread::ReadyFn ready, bool cancellable) + Event yield_until(Thread::ReadyFn ready, bool cancellable, bool force_yield = false) { - if (!suspend_until(std::move(ready), cancellable)) + if (!suspend_until(std::move(ready), cancellable, force_yield)) { return {EventCode::TASK_CANCELLED, 0, 0}; } @@ -1283,16 +1683,6 @@ namespace cmcpp return state_; } - ContextLocalStorage &context() - { - return context_; - } - - const ContextLocalStorage &context() const - { - return context_; - } - ComponentInstance *component_instance() const { return inst_; @@ -1316,6 +1706,11 @@ namespace cmcpp } } + bool may_block() const + { + return !opts_.sync || state_ == State::Resolved; + } + private: bool needs_exclusive() const { @@ -1345,7 +1740,6 @@ namespace cmcpp uint32_t num_borrows_ = 0; std::shared_ptr thread_; State state_ = State::Initial; - ContextLocalStorage context_{}; }; inline void canon_task_return(Task &task, std::vector result, const HostTrap &trap) @@ -1370,20 +1764,225 @@ namespace cmcpp task.cancel(trap); } - inline uint32_t canon_yield(bool cancellable, Task &task, const HostTrap &trap) + inline uint32_t canon_thread_yield(bool cancellable, Task &task, const HostTrap &trap) { if (auto *inst = task.component_instance()) { ensure_may_leave(*inst, trap); } - if (task.state() == Task::State::CancelDelivered || task.state() == Task::State::PendingCancel) + if (!task.may_block()) { - return 1u; + return static_cast(SuspendResult::NOT_CANCELLED); } + auto event = task.yield_until([] { return true; }, cancellable); - return event.code == EventCode::TASK_CANCELLED ? 1u : 0u; + switch (event.code) + { + case EventCode::NONE: + return static_cast(SuspendResult::NOT_CANCELLED); + case EventCode::TASK_CANCELLED: + return static_cast(SuspendResult::CANCELLED); + default: + return static_cast(SuspendResult::NOT_CANCELLED); + } + } + + inline void canon_thread_resume_later(Task &task, uint32_t thread_index, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.resume-later missing component instance"); + ensure_may_leave(*inst, trap); + + auto entry = inst->table.get(thread_index, trap); + auto other_thread = entry->thread(); + trap_if(trap_cx, !other_thread, "thread.resume-later null thread"); + trap_if(trap_cx, !other_thread->suspended(), "thread not suspended"); + other_thread->resume_later(); + } + + inline uint32_t canon_thread_yield_to(bool cancellable, Task &task, uint32_t thread_index, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.yield-to missing component instance"); + ensure_may_leave(*inst, trap); + + auto entry = inst->table.get(thread_index, trap); + auto other_thread = entry->thread(); + trap_if(trap_cx, !other_thread, "thread.yield-to null thread"); + trap_if(trap_cx, !other_thread->suspended(), "thread not suspended"); + + // Make the other thread runnable. + other_thread->resume_later(); + + if (!cancellable) + { + return static_cast(SuspendResult::NOT_CANCELLED); + } + if (task.state() == Task::State::CancelDelivered || task.state() == Task::State::PendingCancel) + { + return static_cast(SuspendResult::CANCELLED); + } + return static_cast(SuspendResult::NOT_CANCELLED); + } + + inline uint32_t canon_thread_switch_to(bool cancellable, Task &task, uint32_t thread_index, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.switch-to missing component instance"); + ensure_may_leave(*inst, trap); + + auto entry = inst->table.get(thread_index, trap); + auto other_thread = entry->thread(); + trap_if(trap_cx, !other_thread, "thread.switch-to null thread"); + trap_if(trap_cx, !other_thread->suspended(), "thread not suspended"); + + // Make the other thread runnable. + other_thread->resume_later(); + + // Approximate a switch by forcing this thread to yield for at least one tick. + task.suspend_until([] + { return true; }, + cancellable, + true); + + if (!cancellable) + { + return static_cast(SuspendResult::NOT_CANCELLED); + } + if (task.state() == Task::State::CancelDelivered || task.state() == Task::State::PendingCancel) + { + return static_cast(SuspendResult::CANCELLED); + } + return static_cast(SuspendResult::NOT_CANCELLED); + } + + inline uint32_t canon_thread_new_ref(bool /*shared*/, Task &task, std::function callee, uint32_t c, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.new-ref missing component instance"); + ensure_may_leave(*inst, trap); + trap_if(trap_cx, inst->store == nullptr, "thread.new-ref missing store"); + trap_if(trap_cx, !callee, "thread.new-ref null callee"); + + auto thread = Thread::create_suspended( + *inst->store, + [callee = std::move(callee), c](bool) + { + callee(c); + return false; + }, + true, + {}); + trap_if(trap_cx, !thread || !thread->suspended(), "thread.new-ref failed to create suspended thread"); + + uint32_t index = inst->table.add(std::make_shared(thread), trap); + thread->set_index(index); + return index; + } + + inline uint32_t canon_thread_new_indirect(bool /*shared*/, Task &task, const std::vector> &table, uint32_t fi, uint32_t c, const HostTrap &trap) + { + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, fi >= table.size(), "thread.new-indirect out of bounds"); + auto callee = table[fi]; + trap_if(trap_cx, !callee, "thread.new-indirect null callee"); + return canon_thread_new_ref(false, task, std::move(callee), c, trap); + } + + inline uint32_t canon_thread_spawn_ref(bool shared, Task &task, std::function callee, uint32_t c, const HostTrap &trap) + { + uint32_t index = canon_thread_new_ref(shared, task, std::move(callee), c, trap); + canon_thread_resume_later(task, index, trap); + return index; + } + + inline uint32_t canon_thread_spawn_indirect(bool shared, Task &task, const std::vector> &table, uint32_t fi, uint32_t c, const HostTrap &trap) + { + uint32_t index = canon_thread_new_indirect(shared, task, table, fi, c, trap); + canon_thread_resume_later(task, index, trap); + return index; + } + + inline uint32_t canon_thread_available_parallelism(bool shared, Task &task, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.available-parallelism missing component instance"); + ensure_may_leave(*inst, trap); + + if (!shared) + { + return 1; + } + auto hc = std::thread::hardware_concurrency(); + return hc == 0 ? 1u : hc; + } + + inline uint32_t canon_thread_index(Task &task, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.index missing component instance"); + ensure_may_leave(*inst, trap); + + auto thread = task.thread(); + trap_if(trap_cx, !thread, "thread missing"); + auto index = thread->index(); + trap_if(trap_cx, !index.has_value(), "thread index missing"); + return *index; + } + + inline uint32_t canon_thread_suspend(bool cancellable, Task &task, const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "thread.suspend missing component instance"); + ensure_may_leave(*inst, trap); + trap_if(trap_cx, !task.may_block(), "thread.suspend may not block"); + + // Force a yield of this thread for at least one tick. + task.suspend_until([] + { return true; }, + cancellable, + true); + + if (cancellable && (task.state() == Task::State::CancelDelivered || task.state() == Task::State::PendingCancel)) + { + return static_cast(SuspendResult::CANCELLED); + } + return static_cast(SuspendResult::NOT_CANCELLED); + } + + inline uint32_t canon_task_wait(bool /*cancellable*/, + GuestMemory mem, + Task &task, + uint32_t waitable_set_handle, + uint32_t event_ptr, + const HostTrap &trap) + { + auto *inst = task.component_instance(); + auto trap_cx = make_trap_context(trap); + trap_if(trap_cx, inst == nullptr, "task.wait missing component instance"); + ensure_may_leave(*inst, trap); + + auto wset = inst->table.get(waitable_set_handle, trap); + wset->begin_wait(); + if (!wset->has_pending_event()) + { + wset->end_wait(); + write_event_fields(mem, event_ptr, 0, 0, trap); + return BLOCKED; + } + auto event = wset->take_pending_event(trap); + wset->end_wait(); + write_event_fields(mem, event_ptr, event.index, event.payload, trap); + return static_cast(event.code); } struct Subtask : Waitable @@ -1458,7 +2057,9 @@ namespace cmcpp } auto trap_cx = make_trap_context(trap); trap_if(trap_cx, index >= ContextLocalStorage::LENGTH, "context index out of bounds"); - return task.context().get(index); + auto thread = task.thread(); + trap_if(trap_cx, !thread, "thread missing"); + return thread->context().get(index); } inline void canon_context_set(Task &task, uint32_t index, int32_t value, const HostTrap &trap) @@ -1469,7 +2070,9 @@ namespace cmcpp } auto trap_cx = make_trap_context(trap); trap_if(trap_cx, index >= ContextLocalStorage::LENGTH, "context index out of bounds"); - task.context().set(index, value); + auto thread = task.thread(); + trap_if(trap_cx, !thread, "thread missing"); + thread->context().set(index, value); } inline uint32_t canon_waitable_set_new(ComponentInstance &inst, const HostTrap &trap) @@ -1573,7 +2176,8 @@ namespace cmcpp trap_if(trap_cx, !cx, "lift/lower context required"); auto writable = inst.table.get(writable_index, trap); validate_descriptor(descriptor, writable->descriptor(), trap); - return writable->write(cx, writable_index, ptr, n, trap); + bool sync = cx->is_sync(); + return writable->write(cx, writable_index, ptr, n, sync, trap); } inline uint32_t canon_stream_cancel_read(ComponentInstance &inst, uint32_t readable_index, bool sync, const HostTrap &trap) @@ -1645,7 +2249,8 @@ namespace cmcpp trap_if(trap_cx, !cx, "lift/lower context required"); auto writable = inst.table.get(writable_index, trap); validate_descriptor(descriptor, writable->descriptor(), trap); - return writable->write(cx, writable_index, ptr, trap); + bool sync = cx->is_sync(); + return writable->write(cx, writable_index, ptr, sync, trap); } inline uint32_t canon_future_cancel_read(ComponentInstance &inst, uint32_t readable_index, bool sync, const HostTrap &trap) diff --git a/include/cmcpp/runtime.hpp b/include/cmcpp/runtime.hpp index 524188a..2d79c45 100644 --- a/include/cmcpp/runtime.hpp +++ b/include/cmcpp/runtime.hpp @@ -3,7 +3,9 @@ #include #include +#include #include +#include #include #include #include @@ -17,6 +19,33 @@ namespace cmcpp class Store; struct ComponentInstance; + class ContextLocalStorage + { + public: + // Number of int32_t "slots" available for context-local data. + // + // Slot 0: reserved for cmcpp runtime/internal bookkeeping associated + // with the current execution context. + // Slot 1: additional per-context storage, intended for extensibility + // (e.g., user-defined or future runtime data). + static constexpr uint32_t LENGTH = 2; + + ContextLocalStorage() = default; + + void set(uint32_t index, int32_t value) + { + storage_[index] = value; + } + + int32_t get(uint32_t index) const + { + return storage_[index]; + } + + private: + std::array storage_{}; + }; + struct Supertask; using SupertaskPtr = std::shared_ptr; @@ -31,6 +60,7 @@ namespace cmcpp using CancelFn = std::function; static std::shared_ptr create(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable = false, CancelFn on_cancel = {}); + static std::shared_ptr create_suspended(Store &store, ResumeFn resume, bool cancellable = false, CancelFn on_cancel = {}); Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel); @@ -41,6 +71,12 @@ namespace cmcpp bool cancelled() const; bool completed() const; + void set_index(uint32_t index); + std::optional index() const; + + bool suspended() const; + void resume_later(); + bool suspend_until(ReadyFn ready, bool cancellable, bool force_yield = false); void set_ready(ReadyFn ready); void set_allow_cancellation(bool allow); @@ -48,9 +84,20 @@ namespace cmcpp void set_in_event_loop(bool value); bool in_event_loop() const; + ContextLocalStorage &context() + { + return context_; + } + + const ContextLocalStorage &context() const + { + return context_; + } + private: enum class State { + Suspended, Pending, Running, Completed @@ -66,6 +113,8 @@ namespace cmcpp bool cancellable_; bool cancelled_; bool in_event_loop_; + ContextLocalStorage context_{}; + std::optional index_; mutable std::mutex mutex_; State state_; std::atomic reschedule_requested_{false}; @@ -137,6 +186,16 @@ namespace cmcpp return thread; } + inline std::shared_ptr Thread::create_suspended(Store &store, ResumeFn resume, bool cancellable, CancelFn on_cancel) + { + auto thread = std::shared_ptr(new Thread(store, nullptr, std::move(resume), cancellable, std::move(on_cancel))); + { + std::lock_guard lock(thread->mutex_); + thread->state_ = State::Suspended; + } + return thread; + } + inline Thread::Thread(Store &store, ReadyFn ready, ResumeFn resume, bool cancellable, CancelFn on_cancel) : store_(&store), ready_(std::move(ready)), @@ -157,6 +216,10 @@ namespace cmcpp { return false; } + if (cancelled_ && cancellable_) + { + return true; + } if (!ready_) { return true; @@ -172,7 +235,7 @@ namespace cmcpp { std::lock_guard lock(mutex_); - if (state_ != State::Pending) + if (state_ != State::Pending && state_ != State::Suspended) { return; } @@ -229,6 +292,44 @@ namespace cmcpp return state_ == State::Completed; } + inline void Thread::set_index(uint32_t index) + { + std::lock_guard lock(mutex_); + index_ = index; + } + + inline std::optional Thread::index() const + { + std::lock_guard lock(mutex_); + return index_; + } + + inline bool Thread::suspended() const + { + std::lock_guard lock(mutex_); + return state_ == State::Suspended; + } + + inline void Thread::resume_later() + { + auto self = shared_from_this(); + { + std::lock_guard lock(mutex_); + if (state_ != State::Suspended) + { + return; + } + ready_ = []() + { + return true; + }; + cancellable_ = false; + cancelled_ = false; + state_ = State::Pending; + } + store_->schedule(self); + } + inline bool Thread::suspend_until(ReadyFn ready, bool cancellable, bool force_yield) { bool ready_now = false; diff --git a/include/cmcpp/util.hpp b/include/cmcpp/util.hpp index 76c30d2..da844ba 100644 --- a/include/cmcpp/util.hpp +++ b/include/cmcpp/util.hpp @@ -3,10 +3,6 @@ #include "context.hpp" -#include -#include -#include - namespace cmcpp { const bool DETERMINISTIC_PROFILE = false; diff --git a/ref/component-model b/ref/component-model index 4626bdf..0b951b7 160000 --- a/ref/component-model +++ b/ref/component-model @@ -1 +1 @@ -Subproject commit 4626bdf5b3afac3ecfbfb040f36ccd95c5c549a6 +Subproject commit 0b951b72b3f4019edf386a93585782587d221a0c diff --git a/ref/wasm-micro-runtime b/ref/wasm-micro-runtime index df90804..3f4145e 160000 --- a/ref/wasm-micro-runtime +++ b/ref/wasm-micro-runtime @@ -1 +1 @@ -Subproject commit df908048de3990c9a74056d44c0870c5efb9dd04 +Subproject commit 3f4145e6914d5cfc717a257aa660605490f26908 diff --git a/ref/wit-bindgen b/ref/wit-bindgen index 3a2114d..6f67b5c 160000 --- a/ref/wit-bindgen +++ b/ref/wit-bindgen @@ -1 +1 @@ -Subproject commit 3a2114dd74544b0c71d99fb0e97d2edb56d0a5a6 +Subproject commit 6f67b5c2fe63e2e417b552f9f7bb6edc26e4955c diff --git a/samples/wamr/README.md b/samples/wamr/README.md new file mode 100644 index 0000000..622f897 --- /dev/null +++ b/samples/wamr/README.md @@ -0,0 +1,332 @@ +# WAMR Component Model Sample + +This sample demonstrates how to use the Component Model C++ library with WAMR (WebAssembly Micro Runtime) to interact between C++ host code and WebAssembly guest modules. + +## Overview + +The sample showcases: +- **Host Function Registration**: Exporting C++ functions to WebAssembly modules +- **Guest Function Invocation**: Calling WebAssembly functions from C++ code +- **Component Model Types**: Working with all major Component Model types including: + - Primitive types (bool, integers, floats, strings) + - Composite types (tuples, records, lists) + - Advanced types (variants, options, results, enums, flags) +- **Memory Management**: Proper handling of WebAssembly linear memory +- **Error Handling**: Comprehensive error checking and resource cleanup + +## Architecture + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ C++ Host โ”‚ โ”‚ Component Model โ”‚ โ”‚ WASM Guest โ”‚ +โ”‚ Application โ”‚โ—„โ”€โ”€โ–บโ”‚ C++ Library โ”‚โ—„โ”€โ”€โ–บโ”‚ Module โ”‚ +โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ +โ”‚ โ€ข Host funcs โ”‚ โ”‚ โ€ข Type mapping โ”‚ โ”‚ โ€ข Guest funcs โ”‚ +โ”‚ โ€ข Memory mgmt โ”‚ โ”‚ โ€ข ABI handling โ”‚ โ”‚ โ€ข Exports โ”‚ +โ”‚ โ€ข Error handlingโ”‚ โ”‚ โ€ข Serialization โ”‚ โ”‚ โ€ข Imports โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +## Prerequisites + +- **WAMR**: WebAssembly Micro Runtime (installed via vcpkg) +- **C++20**: Modern C++ compiler with C++20 support +- **CMake**: Build system +- **Component Model WASM**: The sample.wasm file (built from samples/wasm) + +## Building + +From the project root: + +```bash +mkdir -p build +cd build +cmake .. +cmake --build . --target wamr --parallel +``` + +This will: +1. Build the WASM guest module (`samples/wasm` โ†’ `sample.wasm`) +2. Build the WAMR host application (`samples/wamr` โ†’ `wamr`) + +## Running + +```bash +cd build/samples/wamr +./wamr +``` + +### Expected Output + +The sample produces well-organized output demonstrating each Component Model feature: + +``` +WAMR Component Model C++ Sample +=============================== +Starting WAMR runtime initialization... +WAMR runtime initialized successfully +Successfully loaded WASM file (104637 bytes) +Successfully loaded WASM module +Successfully instantiated WASM module + +=== Testing Guest Functions === + +--- String Functions --- +call_reverse("Hello World!"): !DLROW OLLEH +call_reverse(call_reverse("Hello World!")): HELLO WORLD! + +--- Variant Functions --- +variant_func((uint32_t)40)80 +variant_func((bool_t)true)0 +variant_func((bool_t)false)1 + +--- Option Functions --- +option_func((uint32_t)40).has_value()1 +option_func((uint32_t)40).value()80 +option_func(std::nullopt).has_value()0 + +--- Void Functions --- +Hello, Void_Func! + +--- Result Functions --- +ok_func result: 42 +err_func result: error + +--- Boolean Functions --- +call_and(false, false): 0 +call_and(false, true): 0 +call_and(true, false): 0 +call_and(true, true): 1 + +--- Float Functions --- +call_add(3.1, 0.2): 3.3 +call_add(1.5, 2.5): 4 +call_add(DBL_MAX, 0.0): 8.98847e+307 +call_add(DBL_MAX / 2, DBL_MAX / 2): 1.79769e+308 + +--- Complex String Functions --- +call_lots result: 42 + +--- Tuple Functions --- +call_reverse_tuple({false, "Hello World!"}): !DLROW OLLEH, 1 + +--- List Functions --- +call_list_filter result: 2 + +--- Enum Functions --- +enum_func(e::a): 0 +enum_func(e::b): 1 +enum_func(e::c): 2 + +=== Cleanup and Summary === +Sample completed successfully! +``` + +## Code Structure + +### Host Functions (C++ โ†’ WASM) + +Host functions are C++ functions exported to the WebAssembly module: + +```cpp +// Simple host function +void_t void_func() { + std::cout << "Hello from host!" << std::endl; +} + +// Host function with parameters and return value +cmcpp::bool_t and_func(cmcpp::bool_t a, cmcpp::bool_t b) { + return a && b; +} + +// Register host functions +NativeSymbol booleans_symbol[] = { + host_function("and", and_func), +}; +wasm_runtime_register_natives_raw("example:sample/booleans", + booleans_symbol, + sizeof(booleans_symbol) / sizeof(NativeSymbol)); +``` + +### Guest Functions (WASM โ†’ C++) + +Guest functions are WebAssembly functions called from C++ code: + +```cpp +// Create a callable wrapper for a WASM function +auto call_reverse = guest_function( + module_inst, exec_env, liftLowerContext, + "example:sample/strings#reverse" +); + +// Call the WASM function +auto result = call_reverse("Hello World!"); +std::cout << "Result: " << result << std::endl; +``` + +### Component Model Types + +The sample demonstrates all major Component Model types: + +```cpp +// Primitive types +bool_t, uint32_t, float64_t, string_t + +// Composite types +tuple_t +list_t +variant_t +option_t +result_t +enum_t +``` + +## Key Components + +### 1. Runtime Initialization + +```cpp +// Initialize WAMR runtime +wasm_runtime_init(); + +// Load and instantiate module +module = wasm_runtime_load(buffer, size, error_buf, sizeof(error_buf)); +module_inst = wasm_runtime_instantiate(module, stack_size, heap_size, + error_buf, sizeof(error_buf)); +``` + +### 2. Memory Management + +```cpp +// Set up guest memory reallocation function +GuestRealloc realloc = [exec_env, cabi_realloc](int original_ptr, + int original_size, + int alignment, + int new_size) -> int { + uint32_t argv[4] = {original_ptr, original_size, alignment, new_size}; + wasm_runtime_call_wasm(exec_env, cabi_realloc, 4, argv); + return argv[0]; +}; +``` + +### 3. Context Setup + +```cpp +// Create lift/lower context for type conversions +LiftLowerOptions opts(Encoding::Utf8, memory_span, realloc); +LiftLowerContext liftLowerContext(trap, convert, opts); +``` + +### 4. Error Handling + +The sample includes comprehensive error checking: +- File I/O errors +- WAMR initialization failures +- Module loading/instantiation errors +- Function lookup failures +- Runtime execution errors + +## WebAssembly Interface Types (WIT) + +The sample uses the following WIT definition (`samples/wasm/sample.wit`): + +```wit +package example:sample; + +interface booleans { + and: func(a: bool, b: bool) -> bool; +} + +interface strings { + reverse: func(a: string) -> string; + lots: func(p1: string, p2: string, /* ... */) -> u32; +} + +// ... more interfaces + +world sample { + export booleans; + export strings; + // ... more exports + + import logging; + import booleans; + // ... more imports +} +``` + +## Performance Considerations + +- **Memory Management**: The sample uses efficient memory allocation with proper cleanup +- **Type Safety**: Strong typing prevents ABI mismatches at compile time +- **Zero-copy**: String and binary data can be shared without copying when possible +- **Flat Values**: Simple types are passed directly without heap allocation + +## Troubleshooting + +### Common Issues + +1. **WASM file not found** + - Ensure `sample.wasm` is built: `cmake --build . --target wasm` + - Check file path in error message + +2. **Library linking errors** + - Verify WAMR is installed via vcpkg + - Check CMake finds the libraries correctly + +3. **Runtime crashes** + - Enable debug symbols: `CMAKE_BUILD_TYPE=Debug` + - Check WAMR error messages in `error_buf` + +4. **Function not found** + - Verify function name matches WIT specification exactly + - Check exports in the WASM module + +### Debug Mode + +Build with debug information for better error messages: + +```bash +cmake -DCMAKE_BUILD_TYPE=Debug .. +cmake --build . --target wamr +``` + +## Recent Improvements + +This sample has been enhanced with: + +### Code Quality +- **Comprehensive Error Handling**: Added proper error checking for all WAMR operations +- **Resource Management**: Improved cleanup and memory management +- **Configuration Constants**: Added configurable parameters for stack/heap sizes +- **Better Diagnostics**: Enhanced logging and status messages + +### Build System +- **Improved CMakeLists.txt**: Better library detection and error reporting +- **Build Scripts**: Added convenient `build.sh` and `run.sh` scripts +- **Cross-platform Support**: Better handling of library paths and linking + +### Documentation +- **Organized Output**: Added section headers for different test categories +- **Comprehensive README**: Detailed documentation with examples and troubleshooting +- **Code Comments**: Better inline documentation and explanations + +### Testing Coverage +- **All Component Model Types**: Comprehensive testing of primitives, composites, and advanced types +- **Error Scenarios**: Testing of error conditions and edge cases +- **Performance Examples**: Demonstrations of efficient memory usage patterns + +## Related Files + +- **`main.cpp`**: Main sample application +- **`CMakeLists.txt`**: Build configuration +- **`../wasm/sample.wit`**: WebAssembly Interface Types definition +- **`../wasm/main.cpp`**: Guest-side implementation +- **`../../include/wamr.hpp`**: WAMR integration header +- **`../../include/cmcpp/`**: Component Model C++ library headers + +## Further Reading + +- [WebAssembly Component Model Specification](https://github.com/WebAssembly/component-model) +- [WAMR Documentation](https://github.com/bytecodealliance/wasm-micro-runtime) +- [Component Model C++ Library Documentation](../../README.md) diff --git a/samples/wamr/main.cpp b/samples/wamr/main.cpp index 8c627e3..88974a7 100644 --- a/samples/wamr/main.cpp +++ b/samples/wamr/main.cpp @@ -72,6 +72,53 @@ char *read_wasm_binary_to_buffer(const std::filesystem::path &filename, uint32_t return buffer; } +void_t void_func() +{ + std::cout << "Hello, Void_Func!" << std::endl; +} +NativeSymbol root_symbol[] = { + host_function("void-func", void_func), +}; + +bool_t and_func(bool_t a, bool_t b) +{ + return a && b; +} +NativeSymbol booleans_symbol[] = { + host_function("and", and_func), +}; + +float64_t add(float64_t a, float64_t b) +{ + return a + b; +} +NativeSymbol floats_symbol[] = { + host_function("add", add), +}; + +string_t reverse(const string_t &a) +{ + std::string result = a; + std::transform(result.begin(), result.end(), result.begin(), ::toupper); + return result; +} +uint32_t lots(const string_t &p1, const string_t &p2, const string_t &p3, const string_t &p4, const string_t &p5, const string_t &p6, const string_t &p7, const string_t &p8, const string_t &p9, const string_t &p10, const string_t &p11, const string_t &p12, const string_t &p13, const string_t &p14, const string_t &p15, const string_t &p16, const string_t &p17) +{ + return p1.length() + p2.length() + p3.length() + p4.length() + p5.length() + p6.length() + p7.length() + p8.length() + p9.length() + p10.length() + p11.length() + p12.length() + p13.length() + p14.length() + p15.length() + p16.length() + p17.length(); +} +NativeSymbol strings_symbol[] = { + host_function("reverse", reverse), + host_function("lots", lots), +}; + +void_t log_u32(uint32_t a, string_t b) +{ + std::cout << "wasm-log: " << b << a << std::endl; +} +NativeSymbol logging_symbol[] = { + host_function("log-u32", log_u32), +}; + int main(int argc, char **argv) { static_cast(argc); diff --git a/test/host-util.cpp b/test/host-util.cpp index 3af544f..18f69b6 100644 --- a/test/host-util.cpp +++ b/test/host-util.cpp @@ -38,7 +38,8 @@ std::pair convert(void *dest, uint32_t dest_byte_len, const void if (from_encoding == to_encoding) { assert(dest_byte_len >= src_byte_len); - if (src_byte_len > 0){ + if (src_byte_len > 0) + { std::memcpy(dest, src, src_byte_len); return std::make_pair(dest, src_byte_len); } diff --git a/test/host-util.hpp b/test/host-util.hpp index 587c7ee..5a29d0e 100644 --- a/test/host-util.hpp +++ b/test/host-util.hpp @@ -59,11 +59,13 @@ inline std::unique_ptr createLiftLowerContext(Heap *heap, Enco inline std::unique_ptr createLiftLowerContext(Heap *heap, CanonicalOptions options) { - std::unique_ptr instanceContext = std::make_unique(trap, convert, - [heap](int original_ptr, int original_size, int alignment, int new_size) -> int - { - return heap->realloc(original_ptr, original_size, alignment, new_size); - }); + std::unique_ptr instanceContext = std::make_unique( + trap, + convert, + [heap](int original_ptr, int original_size, int alignment, int new_size) -> int + { + return heap->realloc(original_ptr, original_size, alignment, new_size); + }); options.memory = heap->memory; return instanceContext->createLiftLowerContext(std::move(options)); } diff --git a/test/main.cpp b/test/main.cpp index b0476ba..1d837c9 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -12,16 +12,59 @@ using namespace cmcpp; #include #include #include +#include +#include #include #include #include #include #include +#include // #include #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN #include +TEST_CASE("context.get and context.set use thread-local storage") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + auto thread = Thread::create( + store, + []() + { + return true; + }, + [](bool) + { + return false; + }); + + Task task1(inst); + Task task2(inst); + task1.set_thread(thread); + task2.set_thread(thread); + + canon_context_set(task1, 0, 123, trap); + CHECK(canon_context_get(task2, 0, trap) == 123); + + auto thread2 = Thread::create( + store, + []() + { + return true; + }, + [](bool) + { + return false; + }); + Task task3(inst); + task3.set_thread(thread2); + CHECK(canon_context_get(task3, 0, trap) == 0); +} + TEST_CASE("Async runtime schedules threads") { Store store; @@ -207,6 +250,61 @@ TEST_CASE("Async runtime propagates cancellation") CHECK(thread->completed()); } +TEST_CASE("Cancellation wakes suspended cancellable thread") +{ + Store store; + + bool resumed = false; + bool was_cancelled = false; + bool first_resume = true; + + auto gate = std::make_shared>(false); + + std::shared_ptr thread; + + thread = Thread::create( + store, + []() + { + return true; + }, + [&thread, &resumed, &was_cancelled, &first_resume, gate](bool cancelled) + { + if (first_resume) + { + first_resume = false; + bool completed = thread->suspend_until( + [gate]() + { + return gate->load(); + }, + true); + CHECK_FALSE(completed); + return true; + } + + resumed = true; + was_cancelled = cancelled; + return false; + }, + true, + {}); + + CHECK(store.pending_size() == 1); + + store.tick(); + CHECK_FALSE(resumed); + CHECK_FALSE(thread->ready()); + + auto call = Call::from_thread(thread); + call.request_cancellation(); + store.tick(); + + CHECK(resumed); + CHECK(was_cancelled); + CHECK(thread->completed()); +} + TEST_CASE("Thread suspend_until supports force yield gating") { Store store; @@ -311,7 +409,7 @@ TEST_CASE("Backpressure counters and may_leave guards") CHECK_NOTHROW(canon_waitable_set_new(inst, trap)); } -TEST_CASE("Context locals provide per-task storage") +TEST_CASE("Context locals provide per-thread storage") { ComponentInstance inst; HostTrap trap = [](const char *msg) @@ -319,14 +417,33 @@ TEST_CASE("Context locals provide per-task storage") throw std::runtime_error(msg ? msg : "trap"); }; + Store store; + inst.store = &store; + + auto thread = Thread::create( + store, + []() + { + return true; + }, + [](bool) + { + return false; + }); + Task task(inst); + task.set_thread(thread); - CHECK(ContextLocalStorage::LENGTH == 1); + CHECK(ContextLocalStorage::LENGTH == 2); CHECK(canon_context_get(task, 0, trap) == 0); + CHECK(canon_context_get(task, 1, trap) == 0); canon_context_set(task, 0, 42, trap); CHECK(canon_context_get(task, 0, trap) == 42); + canon_context_set(task, 1, 7, trap); + CHECK(canon_context_get(task, 1, trap) == 7); + CHECK_THROWS(canon_context_get(task, ContextLocalStorage::LENGTH, trap)); CHECK_THROWS(canon_context_set(task, ContextLocalStorage::LENGTH, 99, trap)); @@ -485,8 +602,8 @@ TEST_CASE("Task yield, cancel, and return") { CHECK(was_cancelled); REQUIRE(cancel_task->enter(trap)); - auto event_code = canon_yield(true, *cancel_task, trap); - CHECK(event_code == 1); + auto suspend_result = canon_thread_yield(true, *cancel_task, trap); + CHECK(suspend_result == static_cast(SuspendResult::CANCELLED)); canon_task_cancel(*cancel_task, trap); cancel_task->exit(); return false; @@ -517,30 +634,590 @@ TEST_CASE("Task yield, cancel, and return") auto return_gate = std::make_shared>(true); auto return_thread = Thread::create( store, - [return_gate]() + [return_gate]() + { + return return_gate->load(); + }, + [return_task, &trap](bool was_cancelled) + { + CHECK_FALSE(was_cancelled); + REQUIRE(return_task->enter(trap)); + auto suspend_result = canon_thread_yield(false, *return_task, trap); + CHECK(suspend_result == static_cast(SuspendResult::NOT_CANCELLED)); + std::vector payload; + payload.emplace_back(int32_t(42)); + canon_task_return(*return_task, std::move(payload), trap); + return_task->exit(); + return false; + }); + + return_task->set_thread(return_thread); + store.tick(); + + CHECK(resolved_called); + REQUIRE(resolved_value.has_value()); + REQUIRE(resolved_value->size() == 1); + CHECK(std::any_cast((*resolved_value)[0]) == 42); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.yield returns suspend result") +{ + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + SUBCASE("not cancelled") + { + Store store; + ComponentInstance inst; + inst.store = &store; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto task = std::make_shared(inst, async_opts); + auto thread = Thread::create( + store, + []() + { + return true; + }, + [task, &trap](bool) + { + CHECK(canon_thread_yield(true, *task, trap) == static_cast(SuspendResult::NOT_CANCELLED)); + return false; + }, + true, + {}); + task->set_thread(thread); + + store.tick(); + store.tick(); + CHECK(store.pending_size() == 0); + } + + SUBCASE("cancelled") + { + Store store; + ComponentInstance inst; + inst.store = &store; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto task = std::make_shared(inst, async_opts); + auto thread = Thread::create( + store, + []() + { + return true; + }, + [task, &trap](bool) + { + task->request_cancellation(); + CHECK(task->state() == Task::State::CancelDelivered); + CHECK(canon_thread_yield(true, *task, trap) == static_cast(SuspendResult::CANCELLED)); + return false; + }, + true, + {}); + task->set_thread(thread); + + store.tick(); + CHECK(store.pending_size() == 0); + } +} + +TEST_CASE("task.wait returns blocked or event") +{ + ComponentInstance inst; + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + + uint32_t waitable_set = canon_waitable_set_new(inst, trap); + canon_waitable_join(inst, readable, waitable_set, trap); + + Heap heap(128); + GuestMemory mem(heap.memory.data(), heap.memory.size()); + uint32_t event_ptr = 32; + + Task task(inst); + + auto code = canon_task_wait(false, mem, task, waitable_set, event_ptr, trap); + CHECK(code == BLOCKED); + uint32_t p1 = 123; + uint32_t p2 = 456; + std::memcpy(&p1, heap.memory.data() + event_ptr, sizeof(uint32_t)); + std::memcpy(&p2, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(p1 == 0); + CHECK(p2 == 0); + + auto waitable = inst.table.get(readable, trap); + waitable->set_pending_event({EventCode::STREAM_READ, readable, 0xCAFE'BEEFu}); + code = canon_task_wait(false, mem, task, waitable_set, event_ptr, trap); + CHECK(code == static_cast(EventCode::STREAM_READ)); + std::memcpy(&p1, heap.memory.data() + event_ptr, sizeof(uint32_t)); + std::memcpy(&p2, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(p1 == readable); + CHECK(p2 == 0xCAFE'BEEFu); + + canon_stream_drop_readable(inst, readable, trap); + canon_waitable_set_drop(inst, waitable_set, trap); +} + +TEST_CASE("thread.yield-to resumes suspended thread") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto ran_other = std::make_shared(false); + + // Create a suspended thread and put it in the instance table. + auto other = Thread::create_suspended( + store, + [ran_other](bool) + { + *ran_other = true; + return false; + }, + true, + {}); + REQUIRE(other->suspended()); + uint32_t other_index = inst.table.add(std::make_shared(other), trap); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, other_index, ran_other, &trap](bool) + { + REQUIRE_FALSE(*ran_other); + CHECK(canon_thread_yield_to(true, *task, other_index, trap) == static_cast(SuspendResult::NOT_CANCELLED)); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + CHECK_FALSE(*ran_other); + store.tick(); + CHECK(*ran_other); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.yield-to traps if target not suspended") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + // Not suspended: created pending. + auto gate = std::make_shared>(false); + auto other = Thread::create( + store, + [gate]() + { + return gate->load(); + }, + [](bool) + { + return false; + }, + true, + {}); + uint32_t other_index = inst.table.add(std::make_shared(other), trap); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, other_index, &trap](bool) + { + CHECK_THROWS(canon_thread_yield_to(true, *task, other_index, trap)); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + store.tick(); + gate->store(true); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.new-indirect creates a suspended thread") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto ran = std::make_shared(false); + std::vector> table; + table.push_back({}); + table.push_back([ran](uint32_t c) + { + CHECK(c == 123u); + *ran = true; }); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, &inst, &trap, &table, ran](bool) + { + uint32_t idx = canon_thread_new_indirect(false, *task, table, 1, 123, trap); + auto entry = inst.table.get(idx, trap); + REQUIRE(entry->thread()); + CHECK(entry->thread()->suspended()); + + canon_thread_resume_later(*task, idx, trap); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + CHECK_FALSE(*ran); + store.tick(); + CHECK(*ran); + store.tick(); + CHECK(store.pending_size() == 0); + + CHECK_THROWS(canon_thread_new_indirect(false, *task, table, 999, 0, trap)); + CHECK_THROWS(canon_thread_new_indirect(false, *task, table, 0, 0, trap)); +} + +TEST_CASE("thread.new-ref and spawn-ref create threads") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto ran_new = std::make_shared(false); + auto ran_spawn = std::make_shared(false); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, &inst, &trap, ran_new, ran_spawn](bool) + { + uint32_t idx_new = canon_thread_new_ref(false, *task, [ran_new](uint32_t c) + { + CHECK(c == 7u); + *ran_new = true; }, 7, trap); + auto new_entry = inst.table.get(idx_new, trap); + REQUIRE(new_entry->thread()); + CHECK(new_entry->thread()->suspended()); + canon_thread_resume_later(*task, idx_new, trap); + + uint32_t idx_spawn = canon_thread_spawn_ref(false, *task, [ran_spawn](uint32_t c) + { + CHECK(c == 9u); + *ran_spawn = true; }, 9, trap); + auto spawn_entry = inst.table.get(idx_spawn, trap); + REQUIRE(spawn_entry->thread()); + CHECK_FALSE(spawn_entry->thread()->suspended()); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + CHECK_FALSE(*ran_new); + CHECK_FALSE(*ran_spawn); + store.tick(); + CHECK(*ran_new); + store.tick(); + CHECK(*ran_spawn); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.spawn-indirect resumes and runs") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto ran_spawn = std::make_shared(false); + std::vector> table; + table.push_back({}); + table.push_back([ran_spawn](uint32_t c) + { + CHECK(c == 55u); + *ran_spawn = true; }); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, &inst, &trap, &table](bool) + { + uint32_t idx = canon_thread_spawn_indirect(false, *task, table, 1, 55, trap); + auto entry = inst.table.get(idx, trap); + REQUIRE(entry->thread()); + CHECK_FALSE(entry->thread()->suspended()); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + CHECK_FALSE(*ran_spawn); + store.tick(); + CHECK(*ran_spawn); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.available-parallelism returns at least 1") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + Task task(inst, async_opts); + + CHECK(canon_thread_available_parallelism(false, task, trap) == 1); + CHECK(canon_thread_available_parallelism(true, task, trap) >= 1); +} + +TEST_CASE("thread.switch-to schedules a suspended thread") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + bool ran_other = false; + bool ran_second = false; + bool first = true; + + auto other = Thread::create_suspended( + store, + [&ran_other](bool) + { + ran_other = true; + return false; + }, + true, + {}); + uint32_t other_index = inst.table.add(std::make_shared(other), trap); + + auto task = std::make_shared(inst, async_opts); + auto current = Thread::create( + store, + []() + { + return true; + }, + [task, &trap, other_index, &ran_other, &ran_second, &first](bool) + { + REQUIRE(task->enter(trap)); + if (first) + { + first = false; + CHECK(canon_thread_switch_to(false, *task, other_index, trap) == static_cast(SuspendResult::NOT_CANCELLED)); + task->exit(); + return true; + } + ran_second = true; + task->exit(); + return false; + }, + true, + {}); + task->set_thread(current); + + store.tick(); + CHECK_FALSE(ran_other); + CHECK_FALSE(ran_second); + store.tick(); + CHECK(ran_other); + CHECK_FALSE(ran_second); + store.tick(); + CHECK_FALSE(ran_second); + store.tick(); + CHECK(ran_second); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.index returns current thread index") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto task = std::make_shared(inst, async_opts); + + auto thread = Thread::create( + store, + []() + { + return true; + }, + [task, &trap](bool) + { + CHECK_THROWS(canon_thread_index(*task, trap)); + task->thread()->set_index(1234); + CHECK(canon_thread_index(*task, trap) == 1234u); + return false; + }, + true, + {}); + + task->set_thread(thread); + store.tick(); + CHECK(store.pending_size() == 0); +} + +TEST_CASE("thread.suspend forces a yield") +{ + Store store; + ComponentInstance inst; + inst.store = &store; + + HostTrap trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CanonicalOptions async_opts; + async_opts.sync = false; + + auto task = std::make_shared(inst, async_opts); + bool second_resume = false; + bool first_resume = true; + + auto thread = Thread::create( + store, + []() { - return return_gate->load(); + return true; }, - [return_task, &trap](bool was_cancelled) + [task, &trap, &second_resume, &first_resume](bool) { - CHECK_FALSE(was_cancelled); - REQUIRE(return_task->enter(trap)); - auto event_code = canon_yield(false, *return_task, trap); - CHECK(event_code == 0); - std::vector payload; - payload.emplace_back(int32_t(42)); - canon_task_return(*return_task, std::move(payload), trap); - return_task->exit(); + REQUIRE(task->enter(trap)); + if (first_resume) + { + first_resume = false; + CHECK(canon_thread_suspend(false, *task, trap) == static_cast(SuspendResult::NOT_CANCELLED)); + task->exit(); + return true; + } + + second_resume = true; + task->exit(); return false; - }); + }, + true, + {}); - return_task->set_thread(return_thread); - store.tick(); + task->set_thread(thread); - CHECK(resolved_called); - REQUIRE(resolved_value.has_value()); - REQUIRE(resolved_value->size() == 1); - CHECK(std::any_cast((*resolved_value)[0]) == 42); + store.tick(); + CHECK_FALSE(second_resume); + store.tick(); + CHECK_FALSE(second_resume); + store.tick(); + CHECK(second_resume); + store.tick(); CHECK(store.pending_size() == 0); } @@ -631,7 +1308,7 @@ TEST_CASE("Canonical options control lift/lower callbacks") CHECK(blocked == BLOCKED); CHECK_FALSE(callback_called); - writable.write(cx, 2, write_ptr, 1, trap); + writable.write(cx, 2, write_ptr, 1, false, trap); CHECK(callback_called); CHECK(observed_code == EventCode::STREAM_READ); @@ -760,6 +1437,182 @@ TEST_CASE("Canonical callbacks surface waitable events") canon_waitable_set_drop(inst, waitable_set, trap); } +TEST_CASE("context.hpp: trap_if throws when no HostTrap") +{ + LiftLowerContext cx = make_trap_context(HostTrap{}); + CHECK_THROWS_AS(trap_if(cx, true, "boom"), std::runtime_error); +} + +TEST_CASE("context.hpp: normalize_alignment clamps and defaults") +{ + CHECK(normalize_alignment(0) == 1); + CHECK(normalize_alignment(1) == 1); + CHECK(normalize_alignment(256) == 255); +} + +TEST_CASE("context.hpp: write_event_fields traps on out-of-bounds") +{ + std::array bytes{}; + GuestMemory mem(bytes.data(), bytes.size()); + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + CHECK_THROWS(write_event_fields(mem, 0, 1, 2, host_trap)); +} + +TEST_CASE("context.hpp: WaitableSet take_pending_event trap paths") +{ + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + SUBCASE("empty waitable set traps") + { + WaitableSet set; + CHECK_THROWS(set.take_pending_event(host_trap)); + } + + SUBCASE("missing event traps") + { + WaitableSet set; + Waitable w; + w.join(&set, host_trap); + CHECK_THROWS(set.take_pending_event(host_trap)); + } +} + +TEST_CASE("context.hpp: SharedStreamState branch coverage") +{ + Heap heap(128); + CanonicalOptions options; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto descriptor = make_stream_descriptor(); + + SUBCASE("drop causes immediate Dropped") + { + auto shared = std::make_shared(descriptor); + shared->drop(); + + auto dst = std::make_shared(descriptor.element_size, descriptor.alignment, cx, 0, 1, host_trap); + bool done_called = false; + shared->read(dst, {}, [&](CopyResult result) + { + done_called = true; + CHECK(result == CopyResult::Dropped); }, host_trap); + CHECK(done_called); + } + + SUBCASE("pending readable exhausted triggers reset_and_notify_pending") + { + auto shared = std::make_shared(descriptor); + + auto src_zero = std::make_shared(descriptor.element_size, descriptor.alignment, cx, 0, 0, host_trap); + bool src_done_called = false; + shared->write(src_zero, {}, [&](CopyResult result) + { + src_done_called = true; + CHECK(result == CopyResult::Completed); }, host_trap); + + auto dst = std::make_shared(descriptor.element_size, descriptor.alignment, cx, 16, 1, host_trap); + bool dst_done_called = false; + shared->read(dst, {}, [&](CopyResult result) + { + dst_done_called = true; + CHECK(result == CopyResult::Completed); }, host_trap); + + CHECK(src_done_called); + CHECK_FALSE(dst_done_called); + CHECK(shared->pending_buffer == dst); + } + + SUBCASE("both buffers zero-length short-circuits") + { + auto shared = std::make_shared(descriptor); + + auto dst_zero = std::make_shared(descriptor.element_size, descriptor.alignment, cx, 0, 0, host_trap); + shared->read(dst_zero, {}, [](CopyResult) + { FAIL("unexpected on_copy_done for first caller"); }, host_trap); + + auto src_zero = std::make_shared(descriptor.element_size, descriptor.alignment, cx, 0, 0, host_trap); + bool done_called = false; + shared->write(src_zero, {}, [&](CopyResult result) + { + done_called = true; + CHECK(result == CopyResult::Completed); }, host_trap); + CHECK(done_called); + } + + SUBCASE("reclaim buffer resets pending after lock released") + { + auto shared = std::make_shared(descriptor); + + uint32_t read_ptr = 0; + uint32_t write_ptr = 32; + heap.memory[write_ptr] = 0xAB; + + auto dst = std::make_shared(descriptor.element_size, descriptor.alignment, cx, read_ptr, 1, host_trap); + auto src = std::make_shared(descriptor.element_size, descriptor.alignment, cx, write_ptr, 1, host_trap); + + ReclaimBuffer reclaim; + shared->read(dst, [&](ReclaimBuffer r) + { reclaim = std::move(r); }, [](CopyResult) + { FAIL("unexpected on_copy_done for first caller"); }, host_trap); + + bool done_called = false; + shared->write(src, {}, [&](CopyResult result) + { + done_called = true; + CHECK(result == CopyResult::Completed); }, host_trap); + + REQUIRE(done_called); + REQUIRE(reclaim); + CHECK(shared->pending_buffer != nullptr); + + reclaim(); + CHECK(shared->pending_buffer == nullptr); + } + + SUBCASE("reclaim buffer resets pending after read-side lock released") + { + auto shared = std::make_shared(descriptor); + + uint32_t read_ptr = 0; + uint32_t write_ptr = 32; + heap.memory[write_ptr] = 0xCD; + + auto dst = std::make_shared(descriptor.element_size, descriptor.alignment, cx, read_ptr, 1, host_trap); + auto src = std::make_shared(descriptor.element_size, descriptor.alignment, cx, write_ptr, 1, host_trap); + + ReclaimBuffer reclaim; + shared->write(src, [&](ReclaimBuffer r) + { reclaim = std::move(r); }, [](CopyResult) + { FAIL("unexpected on_copy_done for first caller"); }, host_trap); + + bool done_called = false; + shared->read(dst, {}, [&](CopyResult result) + { + done_called = true; + CHECK(result == CopyResult::Completed); }, host_trap); + + REQUIRE(done_called); + REQUIRE(reclaim); + CHECK(shared->pending_buffer != nullptr); + + reclaim(); + CHECK(shared->pending_buffer == nullptr); + } +} + TEST_CASE("Resource handle lifecycle mirrors canonical definitions") { ComponentInstance resource_impl; @@ -1279,42 +2132,205 @@ TEST_CASE("NaN Canonicalization - Python Reference Parity") {0x3ff0000000000000, 0x3ff0000000000000}, // 1.0 }; - for (const auto &[in_bits, expected_bits] : f64_tests) + for (const auto &[in_bits, expected_bits] : f64_tests) + { + float64_t in_f = bits_to_f64(in_bits); + + // Test lift_flat path + auto lifted = lift_flat(*cx, CoreValueIter({in_f})); + uint64_t lifted_bits = f64_to_bits(lifted); + + if (std::isnan(in_f)) + { + CHECK(std::isnan(lifted)); + CHECK(lifted_bits == expected_bits); + } + else + { + CHECK(lifted_bits == expected_bits); + } + + // Test load path + std::memcpy(heap.memory.data(), &in_bits, sizeof(in_bits)); + float64_t loaded = load(*cx, 0); + uint64_t loaded_bits = f64_to_bits(loaded); + + if (std::isnan(in_f)) + { + CHECK(std::isnan(loaded)); + CHECK(loaded_bits == expected_bits); + } + else + { + CHECK(loaded_bits == expected_bits); + } + } +} + +TEST_CASE("Waitable set surfaces stream readiness") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); + canon_waitable_join(inst, readable, waitable_set, host_trap); + + Heap heap(256); + CanonicalOptions options; + options.sync = false; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + uint32_t read_ptr = 0; + uint32_t write_ptr = 32; + uint32_t event_ptr = 128; + + int32_t to_write[2] = {42, 87}; + std::memcpy(heap.memory.data() + write_ptr, to_write, sizeof(to_write)); + + auto blocked = canon_stream_read(inst, desc, readable, cx, read_ptr, 2, false, host_trap); + CHECK(blocked == BLOCKED); + + GuestMemory mem(heap.memory.data(), heap.memory.size()); + auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::NONE)); + + auto write_payload = canon_stream_write(inst, desc, writable, cx, write_ptr, 2, host_trap); + CHECK((write_payload & 0xF) == static_cast(CopyResult::Completed)); + auto write_count = write_payload >> 4; + CHECK(write_count == 2); + + code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::STREAM_READ)); + + uint32_t reported_index = 0; + uint32_t payload = 0; + std::memcpy(&reported_index, heap.memory.data() + event_ptr, sizeof(uint32_t)); + std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(reported_index == readable); + CHECK((payload & 0xF) == static_cast(CopyResult::Completed)); + auto read_count = payload >> 4; + CHECK(read_count == 2); + + int32_t read_values[2] = {0, 0}; + std::memcpy(read_values, heap.memory.data() + read_ptr, sizeof(read_values)); + CHECK(read_values[0] == 42); + CHECK(read_values[1] == 87); + + canon_stream_drop_readable(inst, readable, host_trap); + canon_stream_drop_writable(inst, writable, host_trap); + canon_waitable_set_drop(inst, waitable_set, host_trap); +} + +TEST_CASE("Stream cancellation posts events") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + + uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); + canon_waitable_join(inst, readable, waitable_set, host_trap); + + Heap heap(128); + CanonicalOptions options; + options.sync = false; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + uint32_t read_ptr = 0; + uint32_t event_ptr = 64; + + auto blocked = canon_stream_read(inst, desc, readable, cx, read_ptr, 1, false, host_trap); + CHECK(blocked == BLOCKED); + + auto cancel_payload = canon_stream_cancel_read(inst, readable, false, host_trap); + CHECK(cancel_payload == BLOCKED); + + GuestMemory mem(heap.memory.data(), heap.memory.size()); + auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::STREAM_READ)); + + uint32_t payload = 0; + std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK((payload & 0xF) == static_cast(CopyResult::Cancelled)); + auto cancel_count = payload >> 4; + CHECK(cancel_count == 0); + + canon_stream_drop_readable(inst, readable, host_trap); + canon_waitable_set_drop(inst, waitable_set, host_trap); +} + +TEST_CASE("Stream cancel requires pending async copy") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + CHECK_THROWS(canon_stream_cancel_read(inst, readable, false, host_trap)); + CHECK_THROWS(canon_stream_cancel_write(inst, writable, false, host_trap)); + + canon_stream_drop_readable(inst, readable, host_trap); + canon_stream_drop_writable(inst, writable, host_trap); +} + +TEST_CASE("Stream sync read blocks until write") +{ + Heap heap(256); + CanonicalOptions options; + options.sync = true; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + HostTrap host_trap = [](const char *msg) { - float64_t in_f = bits_to_f64(in_bits); + throw std::runtime_error(msg ? msg : "trap"); + }; - // Test lift_flat path - auto lifted = lift_flat(*cx, CoreValueIter({in_f})); - uint64_t lifted_bits = f64_to_bits(lifted); + auto descriptor = make_stream_descriptor(); + auto shared_state = std::make_shared(descriptor); + ReadableStreamEnd readable(shared_state); + WritableStreamEnd writable(shared_state); - if (std::isnan(in_f)) - { - CHECK(std::isnan(lifted)); - CHECK(lifted_bits == expected_bits); - } - else - { - CHECK(lifted_bits == expected_bits); - } + uint32_t read_ptr = 0; + uint32_t write_ptr = 64; + heap.memory[write_ptr] = 0x7B; - // Test load path - std::memcpy(heap.memory.data(), &in_bits, sizeof(in_bits)); - float64_t loaded = load(*cx, 0); - uint64_t loaded_bits = f64_to_bits(loaded); + std::atomic read_result{0}; + std::thread t([&]() + { read_result = readable.read(cx, 1, read_ptr, 1, true, host_trap); }); - if (std::isnan(in_f)) - { - CHECK(std::isnan(loaded)); - CHECK(loaded_bits == expected_bits); - } - else - { - CHECK(loaded_bits == expected_bits); - } - } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + auto write_payload = writable.write(cx, 2, write_ptr, 1, true, host_trap); + CHECK((write_payload & 0xF) == static_cast(CopyResult::Completed)); + + t.join(); + CHECK(read_result.load() == pack_copy_result(CopyResult::Completed, 1)); + CHECK(heap.memory[read_ptr] == 0x7B); } -TEST_CASE("Waitable set surfaces stream readiness") +TEST_CASE("Stream cancel-write posts events") { ComponentInstance inst; HostTrap host_trap = [](const char *msg) @@ -1324,11 +2340,10 @@ TEST_CASE("Waitable set surfaces stream readiness") auto desc = make_stream_descriptor(); uint64_t handles = canon_stream_new(inst, desc, host_trap); - uint32_t readable = static_cast(handles & 0xFFFFFFFFu); uint32_t writable = static_cast(handles >> 32); uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); - canon_waitable_join(inst, readable, waitable_set, host_trap); + canon_waitable_join(inst, writable, waitable_set, host_trap); Heap heap(256); CanonicalOptions options; @@ -1336,48 +2351,31 @@ TEST_CASE("Waitable set surfaces stream readiness") auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) { delete ptr; }); - uint32_t read_ptr = 0; uint32_t write_ptr = 32; uint32_t event_ptr = 128; + int32_t value = 123; + std::memcpy(heap.memory.data() + write_ptr, &value, sizeof(value)); - int32_t to_write[2] = {42, 87}; - std::memcpy(heap.memory.data() + write_ptr, to_write, sizeof(to_write)); - - auto blocked = canon_stream_read(inst, desc, readable, cx, read_ptr, 2, false, host_trap); + auto blocked = canon_stream_write(inst, desc, writable, cx, write_ptr, 1, host_trap); CHECK(blocked == BLOCKED); + auto cancel_blocked = canon_stream_cancel_write(inst, writable, false, host_trap); + CHECK(cancel_blocked == BLOCKED); + GuestMemory mem(heap.memory.data(), heap.memory.size()); auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); - CHECK(code == static_cast(EventCode::NONE)); - - auto write_payload = canon_stream_write(inst, desc, writable, cx, write_ptr, 2, host_trap); - CHECK((write_payload & 0xF) == static_cast(CopyResult::Completed)); - auto write_count = write_payload >> 4; - CHECK(write_count == 2); - - code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); - CHECK(code == static_cast(EventCode::STREAM_READ)); + CHECK(code == static_cast(EventCode::STREAM_WRITE)); - uint32_t reported_index = 0; uint32_t payload = 0; - std::memcpy(&reported_index, heap.memory.data() + event_ptr, sizeof(uint32_t)); std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); - CHECK(reported_index == readable); - CHECK((payload & 0xF) == static_cast(CopyResult::Completed)); - auto read_count = payload >> 4; - CHECK(read_count == 2); - - int32_t read_values[2] = {0, 0}; - std::memcpy(read_values, heap.memory.data() + read_ptr, sizeof(read_values)); - CHECK(read_values[0] == 42); - CHECK(read_values[1] == 87); + CHECK((payload & 0xF) == static_cast(CopyResult::Cancelled)); + CHECK((payload >> 4) == 0); - canon_stream_drop_readable(inst, readable, host_trap); canon_stream_drop_writable(inst, writable, host_trap); canon_waitable_set_drop(inst, waitable_set, host_trap); } -TEST_CASE("Stream cancellation posts events") +TEST_CASE("Stream read can be retried after cancellation") { ComponentInstance inst; HostTrap host_trap = [](const char *msg) @@ -1388,36 +2386,53 @@ TEST_CASE("Stream cancellation posts events") auto desc = make_stream_descriptor(); uint64_t handles = canon_stream_new(inst, desc, host_trap); uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); canon_waitable_join(inst, readable, waitable_set, host_trap); - Heap heap(128); + Heap heap(256); CanonicalOptions options; options.sync = false; auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) { delete ptr; }); uint32_t read_ptr = 0; - uint32_t event_ptr = 64; + uint32_t write_ptr = 64; + uint32_t event_ptr = 160; auto blocked = canon_stream_read(inst, desc, readable, cx, read_ptr, 1, false, host_trap); CHECK(blocked == BLOCKED); - auto cancel_payload = canon_stream_cancel_read(inst, readable, false, host_trap); - CHECK(cancel_payload == BLOCKED); + auto cancel_result = canon_stream_cancel_read(inst, readable, false, host_trap); + CHECK(cancel_result == BLOCKED); GuestMemory mem(heap.memory.data(), heap.memory.size()); auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); CHECK(code == static_cast(EventCode::STREAM_READ)); + // Start a new read after cancellation. + blocked = canon_stream_read(inst, desc, readable, cx, read_ptr, 1, false, host_trap); + CHECK(blocked == BLOCKED); + + int32_t value = 1234; + std::memcpy(heap.memory.data() + write_ptr, &value, sizeof(value)); + (void)canon_stream_write(inst, desc, writable, cx, write_ptr, 1, host_trap); + + code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::STREAM_READ)); + uint32_t payload = 0; std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); - CHECK((payload & 0xF) == static_cast(CopyResult::Cancelled)); - auto cancel_count = payload >> 4; - CHECK(cancel_count == 0); + CHECK((payload & 0xF) == static_cast(CopyResult::Completed)); + CHECK((payload >> 4) == 1); + + int32_t observed = 0; + std::memcpy(&observed, heap.memory.data() + read_ptr, sizeof(observed)); + CHECK(observed == value); canon_stream_drop_readable(inst, readable, host_trap); + canon_stream_drop_writable(inst, writable, host_trap); canon_waitable_set_drop(inst, waitable_set, host_trap); } @@ -1454,9 +2469,7 @@ TEST_CASE("Future lifecycle completes") std::memcpy(heap.memory.data() + write_ptr, &value, sizeof(int32_t)); auto write_payload = canon_future_write(inst, desc, writable, cx, write_ptr, host_trap); - CHECK((write_payload & 0xF) == static_cast(CopyResult::Completed)); - auto write_count = write_payload >> 4; - CHECK(write_count == 1); + CHECK(write_payload == static_cast(CopyResult::Completed)); GuestMemory mem(heap.memory.data(), heap.memory.size()); auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); @@ -1464,9 +2477,7 @@ TEST_CASE("Future lifecycle completes") uint32_t payload = 0; std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); - CHECK((payload & 0xF) == static_cast(CopyResult::Completed)); - auto read_count = payload >> 4; - CHECK(read_count == 1); + CHECK(payload == static_cast(CopyResult::Completed)); int32_t observed = 0; std::memcpy(&observed, heap.memory.data() + read_ptr, sizeof(int32_t)); @@ -1477,6 +2488,306 @@ TEST_CASE("Future lifecycle completes") canon_waitable_set_drop(inst, waitable_set, host_trap); } +TEST_CASE("Future read can be retried after cancellation") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_future_descriptor(); + uint64_t handles = canon_future_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); + canon_waitable_join(inst, readable, waitable_set, host_trap); + + Heap heap(256); + CanonicalOptions options; + options.sync = false; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + uint32_t read_ptr = 0; + uint32_t write_ptr = 32; + uint32_t event_ptr = 96; + + auto blocked = canon_future_read(inst, desc, readable, cx, read_ptr, false, host_trap); + CHECK(blocked == BLOCKED); + + auto cancel_result = canon_future_cancel_read(inst, readable, false, host_trap); + CHECK(cancel_result == BLOCKED); + + GuestMemory mem(heap.memory.data(), heap.memory.size()); + auto code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::FUTURE_READ)); + + blocked = canon_future_read(inst, desc, readable, cx, read_ptr, false, host_trap); + CHECK(blocked == BLOCKED); + + int32_t value = 4321; + std::memcpy(heap.memory.data() + write_ptr, &value, sizeof(value)); + (void)canon_future_write(inst, desc, writable, cx, write_ptr, host_trap); + + code = canon_waitable_set_poll(false, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::FUTURE_READ)); + + uint32_t payload = 0; + std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(payload == static_cast(CopyResult::Completed)); + + int32_t observed = 0; + std::memcpy(&observed, heap.memory.data() + read_ptr, sizeof(observed)); + CHECK(observed == value); + + canon_future_drop_readable(inst, readable, host_trap); + canon_future_drop_writable(inst, writable, host_trap); + canon_waitable_set_drop(inst, waitable_set, host_trap); +} + +TEST_CASE("Future cancel requires pending async copy") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_future_descriptor(); + uint64_t handles = canon_future_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + CHECK_THROWS(canon_future_cancel_read(inst, readable, false, host_trap)); + CHECK_THROWS(canon_future_cancel_write(inst, writable, false, host_trap)); + + canon_future_drop_readable(inst, readable, host_trap); + CHECK_THROWS(canon_future_drop_writable(inst, writable, host_trap)); +} + +TEST_CASE("Future sync read blocks until write") +{ + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_future_descriptor(); + uint64_t handles = canon_future_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + Heap heap(256); + CanonicalOptions options; + options.sync = true; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + uint32_t read_ptr = 0; + uint32_t write_ptr = 32; + + std::atomic read_result{0}; + std::thread t([&]() + { read_result = canon_future_read(inst, desc, readable, cx, read_ptr, true, host_trap); }); + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + int32_t value = 777; + std::memcpy(heap.memory.data() + write_ptr, &value, sizeof(value)); + auto write_payload = canon_future_write(inst, desc, writable, cx, write_ptr, host_trap); + CHECK(write_payload == static_cast(CopyResult::Completed)); + + t.join(); + CHECK(read_result.load() == static_cast(CopyResult::Completed)); + int32_t observed = 0; + std::memcpy(&observed, heap.memory.data() + read_ptr, sizeof(observed)); + CHECK(observed == value); + + canon_future_drop_readable(inst, readable, host_trap); + canon_future_drop_writable(inst, writable, host_trap); +} + +TEST_CASE("Future/Stream self-copy - Python Reference Parity") +{ + // Mirrors ref/component-model/design/mvp/canonical-abi/run_tests.py::test_self_copy + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + Heap heap(256); + + CanonicalOptions sync_opts; + sync_opts.sync = true; + auto sync_cx = std::shared_ptr(createLiftLowerContext(&heap, sync_opts).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + CanonicalOptions async_opts; + async_opts.sync = false; + auto async_cx = std::shared_ptr(createLiftLowerContext(&heap, async_opts).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + + uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); + GuestMemory mem(heap.memory.data(), heap.memory.size()); + + // ---- Future self-copy ---- + { + auto desc = make_future_descriptor(); + uint64_t handles = canon_future_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + uint32_t ptr = 0; + // Start an async write (no reader yet) => BLOCKED + auto write_ret = canon_future_write(inst, desc, writable, async_cx, ptr, host_trap); + CHECK(write_ret == BLOCKED); + + // Now read in async mode; should complete immediately from pending write + auto read_ret = canon_future_read(inst, desc, readable, async_cx, ptr, false, host_trap); + CHECK(read_ret == static_cast(CopyResult::Completed)); + canon_future_drop_readable(inst, readable, host_trap); + + canon_waitable_join(inst, writable, waitable_set, host_trap); + uint32_t event_ptr = 32; + auto event = canon_waitable_set_wait(true, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(event == static_cast(EventCode::FUTURE_WRITE)); + + uint32_t recorded_index = 0; + uint32_t recorded_payload = 0; + std::memcpy(&recorded_index, heap.memory.data() + event_ptr, sizeof(uint32_t)); + std::memcpy(&recorded_payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(recorded_index == writable); + CHECK(recorded_payload == static_cast(CopyResult::Completed)); + + canon_future_drop_writable(inst, writable, host_trap); + } + + // ---- Stream self-copy ---- + { + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + // Prepare 3 elements to write. + uint32_t write_ptr = 0; + int32_t values[3] = {1, 2, 3}; + std::memcpy(heap.memory.data() + write_ptr, values, sizeof(values)); + + // Start async write => BLOCKED + auto write_ret = canon_stream_write(inst, desc, writable, async_cx, write_ptr, 3, host_trap); + CHECK(write_ret == BLOCKED); + + // Read 1 element; should complete immediately + uint32_t read_ptr = 64; + auto read_payload1 = canon_stream_read(inst, desc, readable, async_cx, read_ptr, 1, false, host_trap); + CHECK((read_payload1 & 0xF) == static_cast(CopyResult::Completed)); + CHECK((read_payload1 >> 4) == 1); + + int32_t observed1 = 0; + std::memcpy(&observed1, heap.memory.data() + read_ptr, sizeof(observed1)); + CHECK(observed1 == 1); + + // Read up to 4 more; only 2 remain. + auto read_payload2 = canon_stream_read(inst, desc, readable, async_cx, read_ptr, 4, false, host_trap); + CHECK((read_payload2 & 0xF) == static_cast(CopyResult::Completed)); + CHECK((read_payload2 >> 4) == 2); + + int32_t observed2[2] = {0, 0}; + std::memcpy(observed2, heap.memory.data() + read_ptr, sizeof(observed2)); + CHECK(observed2[0] == 2); + CHECK(observed2[1] == 3); + + canon_stream_drop_readable(inst, readable, host_trap); + + canon_waitable_join(inst, writable, waitable_set, host_trap); + uint32_t event_ptr = 96; + auto event = canon_waitable_set_wait(true, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(event == static_cast(EventCode::STREAM_WRITE)); + + uint32_t recorded_index = 0; + uint32_t recorded_payload = 0; + std::memcpy(&recorded_index, heap.memory.data() + event_ptr, sizeof(uint32_t)); + std::memcpy(&recorded_payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK(recorded_index == writable); + CHECK((recorded_payload & 0xF) == static_cast(CopyResult::Dropped)); + CHECK((recorded_payload >> 4) == 3); + + canon_stream_drop_writable(inst, writable, host_trap); + } + + canon_waitable_set_drop(inst, waitable_set, host_trap); +} + +TEST_CASE("Stream host partial reads/writes - Python Reference Parity") +{ + // Covers the partial progress + drop behavior exercised by the Python reference (see test_self_copy). + ComponentInstance inst; + HostTrap host_trap = [](const char *msg) + { + throw std::runtime_error(msg ? msg : "trap"); + }; + + auto desc = make_stream_descriptor(); + uint64_t handles = canon_stream_new(inst, desc, host_trap); + uint32_t readable = static_cast(handles & 0xFFFFFFFFu); + uint32_t writable = static_cast(handles >> 32); + + uint32_t waitable_set = canon_waitable_set_new(inst, host_trap); + canon_waitable_join(inst, writable, waitable_set, host_trap); + + Heap heap(256); + CanonicalOptions options; + options.sync = false; + auto cx = std::shared_ptr(createLiftLowerContext(&heap, options).release(), [](LiftLowerContext *ptr) + { delete ptr; }); + GuestMemory mem(heap.memory.data(), heap.memory.size()); + + uint32_t write_ptr = 0; + uint32_t read_ptr = 64; + uint32_t event_ptr = 128; + + const uint8_t bytes[5] = {10, 20, 30, 40, 50}; + std::memcpy(heap.memory.data() + write_ptr, bytes, sizeof(bytes)); + + // Start write of 5 bytes; no pending read => BLOCKED + auto wret = canon_stream_write(inst, desc, writable, cx, write_ptr, 5, host_trap); + CHECK(wret == BLOCKED); + + // Read 2 bytes; should complete immediately. + auto rpay1 = canon_stream_read(inst, desc, readable, cx, read_ptr, 2, false, host_trap); + CHECK((rpay1 & 0xF) == static_cast(CopyResult::Completed)); + CHECK((rpay1 >> 4) == 2); + CHECK(heap.memory[read_ptr + 0] == 10); + CHECK(heap.memory[read_ptr + 1] == 20); + + // Read remaining 3 bytes. + auto rpay2 = canon_stream_read(inst, desc, readable, cx, read_ptr, 10, false, host_trap); + CHECK((rpay2 & 0xF) == static_cast(CopyResult::Completed)); + CHECK((rpay2 >> 4) == 3); + CHECK(heap.memory[read_ptr + 0] == 30); + CHECK(heap.memory[read_ptr + 1] == 40); + CHECK(heap.memory[read_ptr + 2] == 50); + + // Per the reference behavior, the writer remains pending and observes Dropped when the readable end is dropped. + canon_stream_drop_readable(inst, readable, host_trap); + + auto code = canon_waitable_set_wait(true, mem, inst, waitable_set, event_ptr, host_trap); + CHECK(code == static_cast(EventCode::STREAM_WRITE)); + + uint32_t payload = 0; + std::memcpy(&payload, heap.memory.data() + event_ptr + sizeof(uint32_t), sizeof(uint32_t)); + CHECK((payload & 0xF) == static_cast(CopyResult::Dropped)); + CHECK((payload >> 4) == 5); + + canon_stream_drop_writable(inst, writable, host_trap); + canon_waitable_set_drop(inst, waitable_set, host_trap); +} + const char *const hw = "hello World!"; const char *const hw8 = "hello ไธ–็•Œ-๐ŸŒ-!"; const char16_t *hw16 = u"hello ไธ–็•Œ-๐ŸŒ-!"; @@ -2138,53 +3449,65 @@ TEST_CASE("Type Conversions and Boundary Values - Python Reference Parity") // U32 boundary values { + constexpr int32_t i32_max = std::bit_cast(uint32_t{0x7fffffff}); + constexpr int32_t i32_min = std::bit_cast(uint32_t{0x80000000}); + auto check_u32 = [&](int32_t input, uint32_t expected) { WasmValVector flat = {input}; auto result = lift_flat(*cx, flat); CHECK(result == expected); }; - check_u32((1 << 31) - 1, (1u << 31) - 1); - check_u32(1 << 31, 1u << 31); + check_u32(i32_max, (uint32_t{1} << 31) - 1); + check_u32(i32_min, (uint32_t{1} << 31)); check_u32(-1, 0xFFFFFFFF); } // S32 boundary values { + constexpr int32_t i32_max = std::bit_cast(uint32_t{0x7fffffff}); + constexpr int32_t i32_min = std::bit_cast(uint32_t{0x80000000}); + auto check_s32 = [&](int32_t input, int32_t expected) { WasmValVector flat = {input}; auto result = lift_flat(*cx, flat); CHECK(result == expected); }; - check_s32((1 << 31) - 1, (1 << 31) - 1); - check_s32(1 << 31, -(1 << 31)); // INT32_MIN + check_s32(i32_max, i32_max); + check_s32(i32_min, i32_min); // INT32_MIN check_s32(-1, -1); } // U64 boundary values { + constexpr int64_t i64_max = std::bit_cast(uint64_t{0x7fffffffffffffffULL}); + constexpr int64_t i64_min = std::bit_cast(uint64_t{0x8000000000000000ULL}); + auto check_u64 = [&](int64_t input, uint64_t expected) { WasmValVector flat = {input}; auto result = lift_flat(*cx, flat); CHECK(result == expected); }; - check_u64((1LL << 63) - 1, (1ULL << 63) - 1); - check_u64(1LL << 63, 1ULL << 63); + check_u64(i64_max, (uint64_t{1} << 63) - 1); + check_u64(i64_min, (uint64_t{1} << 63)); check_u64(-1, 0xFFFFFFFFFFFFFFFFULL); } // S64 boundary values { + constexpr int64_t i64_max = std::bit_cast(uint64_t{0x7fffffffffffffffULL}); + constexpr int64_t i64_min = std::bit_cast(uint64_t{0x8000000000000000ULL}); + auto check_s64 = [&](int64_t input, int64_t expected) { WasmValVector flat = {input}; auto result = lift_flat(*cx, flat); CHECK(result == expected); }; - check_s64((1LL << 63) - 1, (1LL << 63) - 1); - check_s64(1LL << 63, -(1LL << 63)); // INT64_MIN + check_s64(i64_max, i64_max); + check_s64(i64_min, i64_min); // INT64_MIN check_s64(-1, -1); }