From 385f50f14f30fc9678d39f68a1cc77da666c9757 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 3 Mar 2026 09:59:18 +0000 Subject: [PATCH 1/4] feat(testing): add utilities for feeding partial frames and fragments into in-process app - Introduced chunked-write drivers to write codec-encoded payloads in configurable byte-sized chunks, simulating partial-frame network reads. - Added fragment-feeding drivers that fragment raw payloads, encode fragments, and feed them through WireframeApp. - Provided multiple driver variants: owned app, mutable app, with capacity, returning raw frames or payloads. - Established comprehensive BDD test suite and unit tests covering chunked and fragmented feeding scenarios. - Updated docs/users-guide.md with new "Feeding partial frames and fragments" section including usage examples. - Marked roadmap item 8.5.1 as done in docs/roadmap.md. - Registered modules and re-exported public APIs in wireframe_testing crate. These utilities enable testing realistic network conditions where frames arrive across multiple reads or are fragmented, improving codec and application robustness validation. Co-authored-by: devboxerhub[bot] --- ...ding-partial-frames-into-in-process-app.md | 637 ++++++++++++++++++ docs/roadmap.md | 2 +- docs/users-guide.md | 61 ++ tests/features/partial_frame_feeding.feature | 24 + tests/fixtures/mod.rs | 1 + tests/fixtures/partial_frame_feeding.rs | 204 ++++++ tests/partial_frame_feeding.rs | 194 ++++++ tests/scenarios/mod.rs | 1 + .../partial_frame_feeding_scenarios.rs | 45 ++ tests/steps/mod.rs | 1 + tests/steps/partial_frame_feeding_steps.rs | 86 +++ wireframe_testing/src/helpers.rs | 15 + .../src/helpers/fragment_drive.rs | 312 +++++++++ .../src/helpers/partial_frame.rs | 304 +++++++++ wireframe_testing/src/lib.rs | 9 + 15 files changed, 1895 insertions(+), 1 deletion(-) create mode 100644 docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md create mode 100644 tests/features/partial_frame_feeding.feature create mode 100644 tests/fixtures/partial_frame_feeding.rs create mode 100644 tests/partial_frame_feeding.rs create mode 100644 tests/scenarios/partial_frame_feeding_scenarios.rs create mode 100644 tests/steps/partial_frame_feeding_steps.rs create mode 100644 wireframe_testing/src/helpers/fragment_drive.rs create mode 100644 wireframe_testing/src/helpers/partial_frame.rs diff --git a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md new file mode 100644 index 00000000..b53595b6 --- /dev/null +++ b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md @@ -0,0 +1,637 @@ +# 8.5.1 Add utilities for feeding partial frames or fragments into an in-process app + +This ExecPlan (execution plan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: DONE + +## Purpose / big picture + +The `wireframe_testing` crate already provides driver functions that feed +**complete** frames into an in-process `WireframeApp` via a `tokio::io::duplex` +stream. Every existing driver writes each frame atomically with `write_all()`, +so the codec decoder never has to buffer a partial read. Real networks do not +behave this way: a single codec frame can arrive across multiple TCP reads, and +a logical message may span multiple fragment frames. + +After this work, library consumers gain two new families of test helpers: + +1. **Chunked-write drivers** — encode payloads via a codec, then drip-feed the + resulting wire bytes in configurable chunk sizes (including one byte at a + time). This exercises the codec decoder's buffering logic under realistic + conditions. +2. **Fragment-feeding drivers** — accept a raw payload, fragment it with a + `Fragmenter`, encode each fragment into a codec frame via + `encode_fragment_payload`, and feed the resulting frames through the app. + This lets test authors verify that fragmented messages survive the full app + pipeline without manually constructing fragment wire bytes. + +Success is observable by running `make test` and seeing the new unit and BDD +tests pass (they will fail before the implementation and pass after). + +## Constraints + +- `wireframe_testing` is **not** a workspace member. Integration tests must + live in the main crate's `tests/` directory, not inside `wireframe_testing`. +- No single source file may exceed 400 lines. +- All code must pass `make check-fmt`, `make lint` (clippy with `-D warnings` + and all strict lints), and `make test`. +- Clippy strictness includes: no `[]` indexing (use `.get()`), no `assert!` / + `panic!` in Result-returning functions (return `Err`), no + `#[allow]`/`#[expect]` without `reason = "..."`, `doc_markdown` backticking, + `cast_possible_truncation` via `try_from`. +- en-GB-oxendict spelling in comments and documentation ("-ize" / "-yse" / + "-our"). +- Public functions require `///` doc comments with usage examples. +- BDD fixtures must use `rstest-bdd` v0.5.0, and step function parameter names + must match the fixture name exactly (e.g., `partial_frame_feeding_world`). +- Existing public APIs must not change signature. + +## Tolerances (exception triggers) + +- Scope: if the implementation requires changes to more than 18 files or 1200 + net lines, stop and escalate. +- Interface: if any existing public API signature must change, stop and + escalate. +- Dependencies: if a new external dependency is required in `Cargo.toml` beyond + what is already present, stop and escalate. +- Iterations: if tests still fail after 5 attempts at fixing, stop and + escalate. +- Ambiguity: if multiple valid interpretations exist and the choice materially + affects the outcome, stop and present options with trade-offs. + +## Risks + +- Risk: Fragment payload encoding depends on `bincode` serialization of + `FragmentHeader` which is an internal wire format. If the fragment adapter + layer intercepts before the driver sees the data, the test utility may need + to bypass the adapter. Severity: medium Likelihood: low Mitigation: Use the + existing public `encode_fragment_payload()` and `decode_fragment_payload()` + from `src/fragment/payload.rs` which are the canonical encode/decode path. Do + not bypass the adapter. + +- Risk: The 400-line file limit may be tight for the BDD world fixture if it + contains many helper methods. Severity: low Likelihood: medium Mitigation: + Split the world fixture into submodules (following the + `tests/fixtures/fragment/` pattern which already uses `mod reassembly`). + +- Risk: The chunked-write driver may cause deadlocks if the duplex buffer is + smaller than a single chunk and the server hasn't read yet. Severity: medium + Likelihood: low Mitigation: Chunk writes are interleaved with the server + running concurrently via `tokio::try_join!`. The duplex buffer provides + backpressure naturally. Document the minimum capacity requirement. + +## Progress + +- [x] Stage A: Scaffolding — create new module files and register them. +- [x] Stage B: Implement `partial_frame.rs` (chunked-write driver). +- [x] Stage C: Implement `fragment_drive.rs` (fragment-feeding driver). +- [x] Stage D: Wire up re-exports in `helpers.rs` and `lib.rs`. +- [x] Stage E: Write unit tests in `tests/partial_frame_feeding.rs`. +- [x] Stage F: Write BDD infrastructure (feature, fixture, steps, scenarios). +- [x] Stage G: Update `docs/users-guide.md` with new public API. +- [x] Stage H: Mark roadmap item 8.5.1 as done. +- [x] Stage I: Run full validation (`make check-fmt && make lint && make test`). + +## Surprises & discoveries + +- Fragment payloads are `FRAG`-prefixed raw bytes, not valid `Envelope` + serialisations. The app receives them via the codec but cannot route + them (no matching envelope), so it produces no response. Fragment-feeding + tests therefore verify successful I/O completion (no error) rather than + non-empty responses. This is a correct reflection of the pipeline: the + test utilities exercise the codec/transport layer, not the application + routing layer. + +## Decision log + +- Decision: Place chunked-write logic in a new `partial_frame.rs` module + alongside the existing `drive.rs`, rather than extending `drive_internal`. + Rationale: `drive_internal` is `pub(super)` and used by all existing drivers. + Adding chunked-write semantics to it would change the behaviour of every + driver. A parallel internal function keeps the existing code untouched. + Date/Author: 2026-03-01 + +- Decision: Fragment-feeding goes in `fragment_drive.rs`, composing + `encode_fragment_payload` + `encode_payloads_with_codec` + `drive_internal`. + Rationale: Follows the layering pattern of `codec_drive.rs` which composes + `encode_payloads_with_codec` + `drive_internal` + `decode_frames_with_codec`. + Fragment feeding is one layer above codec encoding. Date/Author: 2026-03-01 + +- Decision: BDD test suite named `partial_frame_feeding` (not + `partial_frames` or `chunked_driver`) to match the roadmap wording "feeding + partial frames". Rationale: Consistency with roadmap item title. Date/Author: + 2026-03-01 + +## Outcomes & retrospective + +All acceptance criteria met: + +- 9 unit tests pass in `tests/partial_frame_feeding.rs`. +- 4 BDD scenarios pass in + `tests/scenarios/partial_frame_feeding_scenarios.rs`. +- `make check-fmt`, `make lint`, and `make test` all exit 0. +- `docs/users-guide.md` contains a new "Feeding partial frames and + fragments" section documenting all 9 public functions. +- `docs/roadmap.md` item 8.5.1 is marked `[x]`. +- 14 files touched (7 created, 5 modified for registration/re-exports, + 2 documentation files updated), within the 18-file tolerance. + +## Context and orientation + +The wireframe project is a Rust async networking framework. The +`wireframe_testing` companion crate (at `wireframe_testing/`) provides +in-memory test drivers for `WireframeApp`. It is a `dev-dependency` of the main +crate, not a workspace member. + +Key directories and files: + +- `wireframe_testing/src/helpers/drive.rs` — base `drive_internal()` function + that creates a `tokio::io::duplex`, writes frames to the client half, and + collects response bytes. All higher-level drivers delegate here. +- `wireframe_testing/src/helpers/codec_drive.rs` — codec-aware drivers + (`drive_with_codec_payloads`, `drive_with_codec_frames`) that encode payloads + via a `FrameCodec`, transport via `drive_internal`, and decode responses. +- `wireframe_testing/src/helpers/codec_ext.rs` — `encode_payloads_with_codec` + and `decode_frames_with_codec` utility functions. +- `wireframe_testing/src/helpers.rs` — module root, re-exports all public + helpers, defines `TestSerializer` trait, constants `DEFAULT_CAPACITY` (4096), + `MAX_CAPACITY` (10 MB), `TEST_MAX_FRAME` (4096). +- `wireframe_testing/src/lib.rs` — crate root, re-exports from `helpers`. +- `src/fragment/fragmenter.rs` — `Fragmenter`, `FragmentBatch`, + `FragmentFrame` types for splitting payloads into fragments. +- `src/fragment/payload.rs` — `encode_fragment_payload(header, payload)` and + `decode_fragment_payload(payload)` for fragment wire encoding. +- `src/fragment/header.rs` — `FragmentHeader` with `message_id`, + `fragment_index`, `is_last_fragment`. +- `tests/fixtures/mod.rs` — fixture module index. +- `tests/steps/mod.rs` — step definition module index. +- `tests/scenarios/mod.rs` — scenario binding module index (includes + `#[path = "../steps/mod.rs"] pub(crate) mod steps;`). + +Established BDD pattern (exemplified by `codec_test_harness`): + +1. Feature file: `tests/features/.feature` (Gherkin). +2. World fixture: `tests/fixtures/.rs` (struct + `#[fixture]` + + helper methods; manual `Debug` impl if it holds `WireframeApp`). +3. Step definitions: `tests/steps/_steps.rs` (uses `#[given]`, + `#[when]`, `#[then]` from `rstest_bdd_macros`; async steps use + `tokio::runtime::Runtime::new()?.block_on()`). +4. Scenario bindings: `tests/scenarios/_scenarios.rs` (uses + `#[scenario(path = "...", name = "...")]`; + `#[expect(unused_variables, reason = "rstest-bdd wires steps + via parameters without using them directly")]`). +5. Module registration: add `mod ;` to `tests/fixtures/mod.rs`, + `mod _steps;` to `tests/steps/mod.rs`, `mod _scenarios;` to + `tests/scenarios/mod.rs`. + +## Plan of work + +### Stage A: Scaffolding + +Create empty module files and register them in their respective `mod.rs` files. + +New files to create: + +1. `wireframe_testing/src/helpers/partial_frame.rs` — chunked-write driver. +2. `wireframe_testing/src/helpers/fragment_drive.rs` — fragment-feeding + driver. +3. `tests/features/partial_frame_feeding.feature` — Gherkin scenarios. +4. `tests/fixtures/partial_frame_feeding.rs` — BDD world fixture. +5. `tests/steps/partial_frame_feeding_steps.rs` — BDD step definitions. +6. `tests/scenarios/partial_frame_feeding_scenarios.rs` — scenario bindings. +7. `tests/partial_frame_feeding.rs` — rstest unit tests (integration test + file in main crate). + +Files to modify for registration: + +1. `wireframe_testing/src/helpers.rs` — add `mod partial_frame; mod + fragment_drive;` and re-export public symbols. +2. `wireframe_testing/src/lib.rs` — re-export new public symbols. +3. `tests/fixtures/mod.rs` — add `pub mod partial_frame_feeding;`. +4. `tests/steps/mod.rs` — add `mod partial_frame_feeding_steps;`. +5. `tests/scenarios/mod.rs` — add `mod partial_frame_feeding_scenarios;`. + +### Stage B: Implement chunked-write driver (`partial_frame.rs`) + +This module provides a `drive_chunked_internal` function (analogous to +`drive_internal` in `drive.rs`) and public driver functions that compose codec +encoding with chunked writing. + +#### `drive_chunked_internal` (private to `helpers`) + +```rust +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result> +where + F: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +``` + +Instead of writing each frame via `write_all`, this function concatenates all +wire bytes and writes them `chunk_size` bytes at a time, calling +`write_all(&chunk)` for each slice. This forces the codec decoder on the server +side to buffer partial frames across reads. + +The server-side panic handling mirrors `drive_internal` exactly (using +`catch_unwind` + `wireframe::panic::format_panic`). The client-side loop +replaces the per-frame `write_all` with a chunked iteration: + +```rust +let client_fut = async { + let total = wire_bytes.len(); + let step = chunk_size.get(); + let mut offset = 0; + while offset < total { + let end = (offset + step).min(total); + let chunk = wire_bytes.get(offset..end) + .ok_or_else(|| io::Error::other("chunk slice out of bounds"))?; + client.write_all(chunk).await?; + offset = end; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + io::Result::Ok(buf) +}; +``` + +#### Public API: chunked codec drivers + +Following the naming pattern of `drive_with_codec_payloads`, the new functions +are: + +```rust +/// Drive `app` with payloads encoded by `codec`, writing wire bytes in +/// chunks of `chunk_size` to exercise partial-frame buffering. +pub async fn drive_with_partial_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> + +/// Variant returning full decoded codec frames. +pub async fn drive_with_partial_codec_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result> + +/// Mutable-app variant returning decoded payloads. +pub async fn drive_with_partial_frames_mut( + app: &mut WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> + +/// Variant with explicit duplex buffer capacity. +pub async fn drive_with_partial_frames_with_capacity( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>> +``` + +Each function composes: `encode_payloads_with_codec` → flatten to single +`Vec` → `drive_chunked_internal` → `decode_frames_with_codec` → optionally +`extract_payloads`. + +### Stage C: Implement fragment-feeding driver (`fragment_drive.rs`) + +This module provides functions that fragment a payload and feed the resulting +fragment frames through the app. + +#### Public API + +```rust +/// Fragment `payload` using `fragmenter`, encode each fragment with +/// `encode_fragment_payload`, wrap in codec frames, and drive through `app`. +/// Returns decoded response payloads. +pub async fn drive_with_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> + +/// Variant returning full decoded codec frames. +pub async fn drive_with_fragment_frames( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result> + +/// Mutable-app variant. +pub async fn drive_with_fragments_mut( + app: &mut WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> + +/// Fragment and feed with configurable duplex capacity. +pub async fn drive_with_fragments_with_capacity( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + capacity: usize, +) -> io::Result>> +``` + +Internal composition: `fragmenter.fragment_bytes(payload)` → for each +`FragmentFrame`: `encode_fragment_payload(header, payload)` → collect as +payloads → `encode_payloads_with_codec(codec, payloads)` → +`drive_internal(handler, encoded, capacity)` → +`decode_frames_with_codec(codec, raw)` → optionally `extract_payloads`. + +A combined helper that fragments AND feeds in chunks is also provided: + +```rust +/// Fragment `payload` and feed the resulting wire bytes in chunks of +/// `chunk_size`, exercising both fragmentation and partial-frame +/// buffering simultaneously. +pub async fn drive_with_partial_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + chunk_size: NonZeroUsize, +) -> io::Result>> +``` + +### Stage D: Wire up re-exports + +In `wireframe_testing/src/helpers.rs`: + +- Add `mod partial_frame;` and `mod fragment_drive;`. +- Add `pub use partial_frame::{...};` for all public functions. +- Add `pub use fragment_drive::{...};` for all public functions. + +In `wireframe_testing/src/lib.rs`: + +- Add re-exports for all new public symbols to the existing `pub use + helpers::{…};` block. + +### Stage E: Unit tests (`tests/partial_frame_feeding.rs`) + +Integration test file in the main crate. Uses `rstest` for fixtures and +parameterised cases. Tests: + +1. **Chunked single-byte write round-trips** — encode a payload via + `HotlineFrameCodec`, feed one byte at a time via + `drive_with_partial_frames`, verify decoded payloads match input. +2. **Chunked multi-byte write round-trips** — same with `chunk_size = 7` + (deliberately misaligned with frame boundaries). +3. **Multiple payloads chunked** — encode two payloads, feed in 3-byte + chunks, verify both decoded correctly. +4. **Fragment round-trip** — fragment a 100-byte payload with + `max_fragment_size = 20`, feed via `drive_with_fragments`, verify response + received (app processes the fragment frames). +5. **Partial fragment round-trip** — fragment a payload AND feed in chunks + via `drive_with_partial_fragments`, verify response received. +6. **Mutable app reuse** — call `drive_with_partial_frames_mut` twice with + the same app, verify both succeed. + +### Stage F: BDD infrastructure + +#### Feature file: `tests/features/partial_frame_feeding.feature` + +```gherkin +@partial-frame-feeding +Feature: Partial frame and fragment feeding utilities + + Scenario: Single payload survives byte-at-a-time chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + When a test payload is fed in 1-byte chunks + Then the decoded response payloads are non-empty + + Scenario: Multiple payloads survive misaligned chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + When 2 test payloads are fed in 7-byte chunks + Then the decoded response contains 2 payloads + + Scenario: Fragmented payload is delivered as fragment frames + Given a wireframe app with a Hotline codec allowing 4096-byte frames + And a fragmenter capped at 20 bytes per fragment + When a 100-byte payload is fragmented and fed through the app + Then the app receives fragment frames + + Scenario: Fragmented payload survives chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + And a fragmenter capped at 20 bytes per fragment + When a 100-byte payload is fragmented and fed in 3-byte chunks + Then the app receives fragment frames +``` + +#### World fixture: `tests/fixtures/partial_frame_feeding.rs` + +A `PartialFrameFeedingWorld` struct holding: + +- `codec: Option` +- `app: Option>` +- `fragmenter: Option` +- `response_payloads: Vec>` +- `response_frames: Vec` + +Manual `Debug` impl (because `WireframeApp` lacks `Debug`). Default impl. +`#[fixture] pub fn partial_frame_feeding_world()` fixture function. + +Helper methods: + +- `configure_app(max_frame_length)` — build app with `HotlineFrameCodec` + and a no-op route handler. +- `configure_fragmenter(max_payload)` — create `Fragmenter`. +- `drive_chunked(chunk_size)` — call `drive_with_partial_frames`. +- `drive_chunked_multiple(count, chunk_size)` — multiple payloads. +- `drive_fragmented(payload_len)` — call `drive_with_fragments`. +- `drive_partial_fragmented(payload_len, chunk_size)` — call + `drive_with_partial_fragments`. +- Assertion helpers: `assert_payloads_non_empty()`, + `assert_payload_count(n)`, `assert_received_fragments()`. + +#### Steps: `tests/steps/partial_frame_feeding_steps.rs` + +Step functions delegating to world methods, following the +`codec_test_harness_steps.rs` pattern. Async operations wrapped in +`tokio::runtime::Runtime::new()?.block_on(...)`. + +#### Scenarios: `tests/scenarios/partial_frame_feeding_scenarios.rs` + +One `#[scenario]` function per feature scenario, each with +`#[expect(unused_variables, reason = "rstest-bdd wires steps +via parameters without using them directly")]`. + +### Stage G: Update `docs/users-guide.md` + +Add a new subsection after the existing "Testing custom codecs with +`wireframe_testing`" section (around line 257) titled "Feeding partial frames +and fragments". Document all new public functions with a short code example +showing `drive_with_partial_frames` and `drive_with_fragments`. + +### Stage H: Mark roadmap done + +In `docs/roadmap.md`, change the 8.5.1 line from `- [ ]` to `- [x]`. + +### Stage I: Full validation + +Run `make check-fmt && make lint && make test` and verify all pass. + +## Concrete steps + +All commands run from the repository root `/home/user/project`. + +### Validation before changes + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-lint.log +make test 2>&1 | tee /tmp/wireframe-test.log +``` + +Expected: all three exit 0. + +### After each stage + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-lint.log +make test 2>&1 | tee /tmp/wireframe-test.log +``` + +Expected: all three exit 0. + +### Final validation + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log; echo "check-fmt exit: $?" +make lint 2>&1 | tee /tmp/wireframe-lint.log; echo "lint exit: $?" +make test 2>&1 | tee /tmp/wireframe-test.log; echo "test exit: $?" +``` + +Expected: all three report exit 0. + +## Validation and acceptance + +Quality criteria: + +- Tests: `make test` passes. New tests in `tests/partial_frame_feeding.rs` + and BDD scenarios in `tests/scenarios/partial_frame_feeding_scenarios.rs` all + pass. BDD feature file has at least 4 scenarios. +- Lint/typecheck: `make check-fmt` and `make lint` exit 0 with no warnings. +- Documentation: `docs/users-guide.md` contains a new section documenting + the partial-frame and fragment feeding utilities. +- Roadmap: `docs/roadmap.md` item 8.5.1 is marked `[x]`. + +Quality method: + +- `make check-fmt && make lint && make test` run from the repository root. +- Manual inspection that the new section exists in `docs/users-guide.md`. +- Manual inspection that `docs/roadmap.md` shows `[x]` for 8.5.1. + +## Idempotence and recovery + +All stages are additive (new files and new `pub use` lines). No existing code +is modified except for adding `mod` and `pub use` declarations. Stages can be +re-run by overwriting the created files. If a stage fails validation, fix the +issue and re-run that stage's validation. + +## Artifacts and notes + +Files created (7): + +- `wireframe_testing/src/helpers/partial_frame.rs` +- `wireframe_testing/src/helpers/fragment_drive.rs` +- `tests/features/partial_frame_feeding.feature` +- `tests/fixtures/partial_frame_feeding.rs` +- `tests/steps/partial_frame_feeding_steps.rs` +- `tests/scenarios/partial_frame_feeding_scenarios.rs` +- `tests/partial_frame_feeding.rs` + +Files modified (5): + +- `wireframe_testing/src/helpers.rs` +- `wireframe_testing/src/lib.rs` +- `tests/fixtures/mod.rs` +- `tests/steps/mod.rs` +- `tests/scenarios/mod.rs` + +Documentation files modified (2): + +- `docs/users-guide.md` +- `docs/roadmap.md` + +Total: 14 files touched. + +## Interfaces and dependencies + +All dependencies are already present in `wireframe_testing/Cargo.toml` +(`tokio`, `wireframe`, `bytes`, `futures`, `tokio-util`) and the main crate's +`Cargo.toml` (`rstest`, `rstest-bdd`, `wireframe_testing`). No new dependencies +are required. + +### New public types and functions in `wireframe_testing` + +In `wireframe_testing/src/helpers/partial_frame.rs`: + +```rust +use std::num::NonZeroUsize; + +// Internal (pub(super)): +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>; + +// Public: +pub async fn drive_with_partial_frames(...) -> io::Result>>; +pub async fn drive_with_partial_frames_with_capacity(...) -> io::Result>>; +pub async fn drive_with_partial_frames_mut(...) -> io::Result>>; +pub async fn drive_with_partial_codec_frames(...) -> io::Result>; +``` + +In `wireframe_testing/src/helpers/fragment_drive.rs`: + +```rust +pub async fn drive_with_fragments(...) -> io::Result>>; +pub async fn drive_with_fragments_with_capacity(...) -> io::Result>>; +pub async fn drive_with_fragments_mut(...) -> io::Result>>; +pub async fn drive_with_fragment_frames(...) -> io::Result>; +pub async fn drive_with_partial_fragments(...) -> io::Result>>; +``` + +### Reused existing functions + +- `drive_internal` from `wireframe_testing/src/helpers/drive.rs` — used by + `fragment_drive.rs` for non-chunked fragment feeding. +- `encode_payloads_with_codec` from + `wireframe_testing/src/helpers/codec_ext.rs` — used by both new modules for + codec encoding. +- `decode_frames_with_codec` from + `wireframe_testing/src/helpers/codec_ext.rs` — used by both new modules for + response decoding. +- `extract_payloads` from `wireframe_testing/src/helpers/codec_ext.rs` — + used to convert frames to payload byte vectors. +- `encode_fragment_payload` from `src/fragment/payload.rs` — used by + `fragment_drive.rs` to encode fragment headers and payloads. +- `Fragmenter`, `FragmentBatch`, `FragmentFrame` from + `src/fragment/fragmenter.rs` — used by `fragment_drive.rs`. +- `DEFAULT_CAPACITY` from `wireframe_testing/src/helpers.rs`. diff --git a/docs/roadmap.md b/docs/roadmap.md index 41710933..2df39e97 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -313,7 +313,7 @@ and standardized per-connection memory budgets. ### 8.5. Testkit utilities -- [ ] 8.5.1. Add utilities for feeding partial frames or fragments into an +- [x] 8.5.1. Add utilities for feeding partial frames or fragments into an in-process app. - [ ] 8.5.2. Add slow reader and writer simulation for back-pressure testing. - [ ] 8.5.3. Add deterministic assertion helpers for reassembly outcomes. diff --git a/docs/users-guide.md b/docs/users-guide.md index 72769ac4..a5bc24c2 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -305,6 +305,67 @@ Available fixture functions: - `correlated_hotline_wire` — frames sharing a transaction ID. - `sequential_hotline_wire` — frames with incrementing transaction IDs. +#### Feeding partial frames and fragments + +Real networks rarely deliver a complete codec frame in a single TCP read. +The `wireframe_testing` crate provides drivers that simulate these +conditions so you can exercise your codec's buffering logic in tests. + +**Chunked-write drivers** encode payloads via a codec, concatenate the wire +bytes, and write them in configurable chunk sizes (including one byte at a +time): + +```rust,no_run +use std::num::NonZeroUsize; +use wireframe::app::WireframeApp; +use wireframe::codec::examples::HotlineFrameCodec; +use wireframe_testing::drive_with_partial_frames; + +let codec = HotlineFrameCodec::new(4096); +let app = WireframeApp::new()?.with_codec(codec.clone()); +let chunk = NonZeroUsize::new(1).expect("non-zero"); +let payloads = + drive_with_partial_frames(app, &codec, vec![vec![1, 2, 3]], chunk) + .await?; +``` + +Available chunked-write driver functions: + +- `drive_with_partial_frames` / `drive_with_partial_frames_with_capacity` — + owned app, returns payload bytes. +- `drive_with_partial_frames_mut` — mutable app reference, returns payload + bytes. +- `drive_with_partial_codec_frames` — owned app, returns decoded `F::Frame` + values. + +**Fragment-feeding drivers** accept a raw payload, fragment it with a +`Fragmenter`, encode each fragment into a codec frame, and feed the frames +through the app: + +```rust,no_run +use std::num::NonZeroUsize; +use wireframe::app::WireframeApp; +use wireframe::codec::examples::HotlineFrameCodec; +use wireframe::fragment::Fragmenter; +use wireframe_testing::drive_with_fragments; + +let codec = HotlineFrameCodec::new(4096); +let app = WireframeApp::new()?.with_codec(codec.clone()); +let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +let payloads = + drive_with_fragments(app, &codec, &fragmenter, vec![0; 100]).await?; +``` + +Available fragment-feeding driver functions: + +- `drive_with_fragments` / `drive_with_fragments_with_capacity` — owned + app, returns payload bytes. +- `drive_with_fragments_mut` — mutable app reference, returns payload + bytes. +- `drive_with_fragment_frames` — owned app, returns decoded `F::Frame` + values. +- `drive_with_partial_fragments` — fragment AND feed in chunks, + exercising both fragmentation and partial-frame buffering simultaneously. #### Zero-copy payload extraction For performance-critical codecs, use `Bytes` instead of `Vec` for payload diff --git a/tests/features/partial_frame_feeding.feature b/tests/features/partial_frame_feeding.feature new file mode 100644 index 00000000..0531d77e --- /dev/null +++ b/tests/features/partial_frame_feeding.feature @@ -0,0 +1,24 @@ +@partial-frame-feeding +Feature: Partial frame and fragment feeding utilities + + Scenario: Single payload survives byte-at-a-time chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + When a test payload is fed in 1-byte chunks + Then the partial feeding response payloads are non-empty + + Scenario: Multiple payloads survive misaligned chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + When 2 test payloads are fed in 7-byte chunks + Then the partial feeding response contains 2 payloads + + Scenario: Fragmented payload is delivered as fragment frames + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + And a fragmenter capped at 20 bytes per fragment for partial feeding + When a 100-byte payload is fragmented and fed through the app + Then the fragment feeding completes without error + + Scenario: Fragmented payload survives chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + And a fragmenter capped at 20 bytes per fragment for partial feeding + When a 100-byte payload is fragmented and fed in 3-byte chunks + Then the fragment feeding completes without error diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 54c63950..90ea6ebb 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -30,6 +30,7 @@ pub mod message_assembly; pub mod message_assembly_inbound; pub mod multi_packet; pub mod panic; +pub mod partial_frame_feeding; pub mod request_parts; pub mod serializer_boundaries; pub mod stream_end; diff --git a/tests/fixtures/partial_frame_feeding.rs b/tests/fixtures/partial_frame_feeding.rs new file mode 100644 index 00000000..c26ea721 --- /dev/null +++ b/tests/fixtures/partial_frame_feeding.rs @@ -0,0 +1,204 @@ +//! BDD world fixture for partial frame and fragment feeding scenarios. +//! +//! Tracks application configuration, fragmenter state, and collected +//! responses across behavioural test steps. + +use std::{num::NonZeroUsize, sync::Arc}; + +use futures::future::BoxFuture; +use rstest::fixture; +use wireframe::{ + app::{Envelope, WireframeApp}, + codec::examples::HotlineFrameCodec, + fragment::Fragmenter, + serializer::{BincodeSerializer, Serializer}, +}; +/// Re-export `TestResult` from `wireframe_testing` for use in steps. +pub use wireframe_testing::TestResult; + +/// BDD world holding the app, codec, fragmenter, and collected responses. +/// +/// `WireframeApp` does not implement `Debug`, so this type provides a manual +/// implementation that redacts the app field. +#[derive(Default)] +pub struct PartialFrameFeedingWorld { + codec: Option, + app: Option>, + fragmenter: Option, + response_payloads: Vec>, + fragment_feeding_completed: bool, +} + +impl std::fmt::Debug for PartialFrameFeedingWorld { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartialFrameFeedingWorld") + .field("codec", &self.codec) + .field("app", &self.app.as_ref().map(|_| "..")) + .field("fragmenter", &self.fragmenter) + .field("response_payloads", &self.response_payloads.len()) + .field( + "fragment_feeding_completed", + &self.fragment_feeding_completed, + ) + .finish() + } +} + +/// Fixture for partial frame feeding scenarios used by rstest-bdd steps. +/// +/// Note: rustfmt collapses simple fixtures into one line, which triggers +/// `unused_braces`, so keep `rustfmt::skip`. +#[rustfmt::skip] +#[fixture] +pub fn partial_frame_feeding_world() -> PartialFrameFeedingWorld { + PartialFrameFeedingWorld::default() +} + +impl PartialFrameFeedingWorld { + /// Configure the app with a `HotlineFrameCodec`. + /// + /// # Errors + /// Returns an error if the app or route registration fails. + pub fn configure_app(&mut self, max_frame_length: usize) -> TestResult { + let codec = HotlineFrameCodec::new(max_frame_length); + let app = WireframeApp::::new()? + .with_codec(codec.clone()) + .route( + 1, + Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), + )?; + self.codec = Some(codec); + self.app = Some(app); + Ok(()) + } + + /// Configure a fragmenter with the given maximum payload per fragment. + /// + /// # Errors + /// Returns an error if the payload cap is zero. + pub fn configure_fragmenter(&mut self, max_payload: usize) -> TestResult { + let cap = NonZeroUsize::new(max_payload).ok_or("fragment payload cap must be non-zero")?; + self.fragmenter = Some(Fragmenter::new(cap)); + Ok(()) + } + + /// Drive the app with a single payload chunked into `chunk_size` pieces. + /// + /// # Errors + /// Returns an error if the app or codec is not configured, or driving fails. + pub async fn drive_chunked(&mut self, chunk_size: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let env = Envelope::new(1, Some(7), b"bdd-chunked".to_vec()); + let serialized = BincodeSerializer.serialize(&env)?; + + self.response_payloads = + wireframe_testing::drive_with_partial_frames(app, codec, vec![serialized], chunk) + .await?; + Ok(()) + } + + /// Drive the app with `count` payloads chunked into `chunk_size` pieces. + /// + /// # Errors + /// Returns an error if the app or codec is not configured, or driving fails. + pub async fn drive_chunked_multiple(&mut self, count: usize, chunk_size: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let mut payloads = Vec::with_capacity(count); + for i in 0..count { + let env = Envelope::new(1, Some(7), vec![u8::try_from(i).unwrap_or(0)]); + payloads.push(BincodeSerializer.serialize(&env)?); + } + + self.response_payloads = + wireframe_testing::drive_with_partial_frames(app, codec, payloads, chunk).await?; + Ok(()) + } + + /// Drive the app with a fragmented payload. + /// + /// # Errors + /// Returns an error if the app, codec, or fragmenter is not configured. + pub async fn drive_fragmented(&mut self, payload_len: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let fragmenter = self + .fragmenter + .as_ref() + .ok_or("fragmenter not configured")?; + + let _payloads = + wireframe_testing::drive_with_fragments(app, codec, fragmenter, vec![0; payload_len]) + .await?; + self.fragment_feeding_completed = true; + Ok(()) + } + + /// Drive the app with a fragmented payload fed in chunks. + /// + /// # Errors + /// Returns an error if the app, codec, or fragmenter is not configured. + pub async fn drive_partial_fragmented( + &mut self, + payload_len: usize, + chunk_size: usize, + ) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let fragmenter = self + .fragmenter + .as_ref() + .ok_or("fragmenter not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let _payloads = wireframe_testing::drive_with_partial_fragments( + app, + codec, + fragmenter, + vec![0; payload_len], + chunk, + ) + .await?; + self.fragment_feeding_completed = true; + Ok(()) + } + + /// Assert that fragment feeding completed without error. + /// + /// # Errors + /// Returns an error if fragment feeding has not been attempted or failed. + pub fn assert_fragment_feeding_completed(&self) -> TestResult { + if !self.fragment_feeding_completed { + return Err("fragment feeding has not completed successfully".into()); + } + Ok(()) + } + + /// Assert that response payloads are non-empty. + /// + /// # Errors + /// Returns an error if no response payloads were collected. + pub fn assert_payloads_non_empty(&self) -> TestResult { + if self.response_payloads.is_empty() { + return Err("expected non-empty response payloads".into()); + } + Ok(()) + } + + /// Assert that the response contains exactly `expected` payloads. + /// + /// # Errors + /// Returns an error if the payload count does not match. + pub fn assert_payload_count(&self, expected: usize) -> TestResult { + let actual = self.response_payloads.len(); + if actual != expected { + return Err(format!("expected {expected} response payloads, got {actual}").into()); + } + Ok(()) + } +} diff --git a/tests/partial_frame_feeding.rs b/tests/partial_frame_feeding.rs new file mode 100644 index 00000000..5804c541 --- /dev/null +++ b/tests/partial_frame_feeding.rs @@ -0,0 +1,194 @@ +//! Integration tests for partial-frame and fragment feeding utilities in +//! `wireframe_testing`. +#![cfg(not(loom))] + +use std::{io, num::NonZeroUsize, sync::Arc}; + +use futures::future::BoxFuture; +use wireframe::{ + app::{Envelope, WireframeApp}, + codec::examples::HotlineFrameCodec, + fragment::Fragmenter, + serializer::{BincodeSerializer, Serializer}, +}; +use wireframe_testing::{ + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_partial_codec_frames, + drive_with_partial_fragments, + drive_with_partial_frames, + drive_with_partial_frames_mut, +}; + +fn hotline_codec() -> HotlineFrameCodec { HotlineFrameCodec::new(4096) } + +fn build_echo_app( + codec: HotlineFrameCodec, +) -> io::Result> { + WireframeApp::::new() + .map_err(|e| io::Error::other(format!("app init: {e}")))? + .with_codec(codec) + .route( + 1, + Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), + ) + .map_err(|e| io::Error::other(format!("route: {e}"))) +} + +fn serialize_envelope(payload: &[u8]) -> io::Result> { + let env = Envelope::new(1, Some(7), payload.to_vec()); + BincodeSerializer + .serialize(&env) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("serialize: {e}"))) +} + +// --------------------------------------------------------------------------- +// Chunked-write (partial frame) tests +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn partial_frames_single_byte_chunks() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[10, 20, 30])?; + let chunk = NonZeroUsize::new(1).ok_or_else(|| io::Error::other("non-zero"))?; + + let payloads = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; + + if payloads.is_empty() { + return Err(io::Error::other("expected non-empty response payloads")); + } + Ok(()) +} + +#[tokio::test] +async fn partial_frames_misaligned_chunks() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[1, 2, 3, 4, 5])?; + let chunk = NonZeroUsize::new(7).ok_or_else(|| io::Error::other("non-zero"))?; + + let payloads = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; + + if payloads.is_empty() { + return Err(io::Error::other("expected non-empty response payloads")); + } + Ok(()) +} + +#[tokio::test] +async fn partial_frames_multiple_payloads() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let p1 = serialize_envelope(&[1])?; + let p2 = serialize_envelope(&[2])?; + let chunk = NonZeroUsize::new(3).ok_or_else(|| io::Error::other("non-zero"))?; + + let payloads = drive_with_partial_frames(app, &codec, vec![p1, p2], chunk).await?; + + if payloads.len() != 2 { + return Err(io::Error::other(format!( + "expected 2 response payloads, got {}", + payloads.len() + ))); + } + Ok(()) +} + +#[tokio::test] +async fn partial_codec_frames_preserves_metadata() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[42])?; + let chunk = NonZeroUsize::new(5).ok_or_else(|| io::Error::other("non-zero"))?; + + let frames = drive_with_partial_codec_frames(app, &codec, vec![serialized], chunk).await?; + + let frame = frames + .first() + .ok_or_else(|| io::Error::other("expected at least one response frame"))?; + if frame.transaction_id != 0 { + return Err(io::Error::other(format!( + "wrap_payload should assign transaction_id 0, got {}", + frame.transaction_id + ))); + } + Ok(()) +} + +#[tokio::test] +async fn partial_frames_mut_allows_reuse() -> io::Result<()> { + let codec = hotline_codec(); + let mut app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[1])?; + let chunk = NonZeroUsize::new(2).ok_or_else(|| io::Error::other("non-zero"))?; + + let first = + drive_with_partial_frames_mut(&mut app, &codec, vec![serialized.clone()], chunk).await?; + if first.is_empty() { + return Err(io::Error::other("first call should produce output")); + } + + let second = drive_with_partial_frames_mut(&mut app, &codec, vec![serialized], chunk).await?; + if second.is_empty() { + return Err(io::Error::other("second call should produce output")); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Fragment feeding tests +// --------------------------------------------------------------------------- + +// Fragment payloads are FRAG-prefixed raw bytes, not valid Envelope +// serializations. The app receives and processes them but does not produce a +// routed response. Verifying no I/O error confirms the full transport pipeline +// (fragment → encode → transport → decode) works end to end. + +#[tokio::test] +async fn fragment_round_trip() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _payloads = drive_with_fragments(app, &codec, &fragmenter, vec![0; 100]).await?; + Ok(()) +} + +#[tokio::test] +async fn fragment_frames_returns_codec_frames() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _frames = drive_with_fragment_frames(app, &codec, &fragmenter, vec![0; 50]).await?; + Ok(()) +} + +#[tokio::test] +async fn fragment_mut_allows_reuse() -> io::Result<()> { + let codec = hotline_codec(); + let mut app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(30).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _first = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 50]).await?; + let _second = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 30]).await?; + Ok(()) +} + +#[tokio::test] +async fn partial_fragments_combines_both() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + let chunk = NonZeroUsize::new(3).ok_or_else(|| io::Error::other("non-zero"))?; + + let _payloads = + drive_with_partial_fragments(app, &codec, &fragmenter, vec![0; 100], chunk).await?; + Ok(()) +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index e01e615e..5058aec3 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -33,6 +33,7 @@ mod message_assembly_inbound_scenarios; mod message_assembly_scenarios; mod multi_packet_scenarios; mod panic_scenarios; +mod partial_frame_feeding_scenarios; mod request_parts_scenarios; mod serializer_boundaries_scenarios; mod stream_end_scenarios; diff --git a/tests/scenarios/partial_frame_feeding_scenarios.rs b/tests/scenarios/partial_frame_feeding_scenarios.rs new file mode 100644 index 00000000..7153212e --- /dev/null +++ b/tests/scenarios/partial_frame_feeding_scenarios.rs @@ -0,0 +1,45 @@ +//! Scenario tests for partial frame and fragment feeding behaviours. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::partial_frame_feeding::*; + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Single payload survives byte-at-a-time chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn single_payload_byte_chunks(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Multiple payloads survive misaligned chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn multiple_payloads_misaligned(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Fragmented payload is delivered as fragment frames" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn fragmented_payload_delivery(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Fragmented payload survives chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn fragmented_payload_chunked(partial_frame_feeding_world: PartialFrameFeedingWorld) {} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index fd2fc2a8..4c3e66de 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -29,6 +29,7 @@ mod message_assembly_inbound_steps; mod message_assembly_steps; mod multi_packet_steps; mod panic_steps; +mod partial_frame_feeding_steps; mod request_parts_steps; mod serializer_boundaries_steps; mod stream_end_steps; diff --git a/tests/steps/partial_frame_feeding_steps.rs b/tests/steps/partial_frame_feeding_steps.rs new file mode 100644 index 00000000..fe4010a2 --- /dev/null +++ b/tests/steps/partial_frame_feeding_steps.rs @@ -0,0 +1,86 @@ +//! Step definitions for partial frame and fragment feeding behavioural tests. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::partial_frame_feeding::{PartialFrameFeedingWorld, TestResult}; + +#[given( + "a wireframe app with a Hotline codec allowing {max_frame_length:usize}-byte frames for \ + partial feeding" +)] +fn given_app_with_codec( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + max_frame_length: usize, +) -> TestResult { + partial_frame_feeding_world.configure_app(max_frame_length) +} + +#[given("a fragmenter capped at {max_payload:usize} bytes per fragment for partial feeding")] +fn given_fragmenter( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + max_payload: usize, +) -> TestResult { + partial_frame_feeding_world.configure_fragmenter(max_payload) +} + +#[when("a test payload is fed in {chunk_size:usize}-byte chunks")] +fn when_single_payload_chunked( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_chunked(chunk_size)) +} + +#[when("{count:usize} test payloads are fed in {chunk_size:usize}-byte chunks")] +fn when_multiple_payloads_chunked( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + count: usize, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_chunked_multiple(count, chunk_size)) +} + +#[when("a {payload_len:usize}-byte payload is fragmented and fed through the app")] +fn when_fragmented( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + payload_len: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_fragmented(payload_len)) +} + +#[when( + "a {payload_len:usize}-byte payload is fragmented and fed in {chunk_size:usize}-byte chunks" +)] +fn when_partial_fragmented( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + payload_len: usize, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_partial_fragmented(payload_len, chunk_size)) +} + +#[then("the partial feeding response payloads are non-empty")] +fn then_payloads_non_empty( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, +) -> TestResult { + partial_frame_feeding_world.assert_payloads_non_empty() +} + +#[then("the partial feeding response contains {expected:usize} payloads")] +fn then_payload_count( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + expected: usize, +) -> TestResult { + partial_frame_feeding_world.assert_payload_count(expected) +} + +#[then("the fragment feeding completes without error")] +fn then_fragment_completed( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, +) -> TestResult { + partial_frame_feeding_world.assert_fragment_feeding_completed() +} diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 516970ee..cfa33cb1 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -14,6 +14,8 @@ mod codec_drive; mod codec_ext; mod codec_fixtures; mod drive; +mod fragment_drive; +mod partial_frame; mod payloads; mod runtime; @@ -80,5 +82,18 @@ pub use drive::{ drive_with_frames_with_capacity, drive_with_frames_with_capacity_mut, }; +pub use fragment_drive::{ + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_fragments_with_capacity, + drive_with_partial_fragments, +}; +pub use partial_frame::{ + drive_with_partial_codec_frames, + drive_with_partial_frames, + drive_with_partial_frames_mut, + drive_with_partial_frames_with_capacity, +}; pub use payloads::{drive_with_bincode, drive_with_payloads, drive_with_payloads_mut}; pub use runtime::{run_app, run_with_duplex_server}; diff --git a/wireframe_testing/src/helpers/fragment_drive.rs b/wireframe_testing/src/helpers/fragment_drive.rs new file mode 100644 index 00000000..6bfb99e8 --- /dev/null +++ b/wireframe_testing/src/helpers/fragment_drive.rs @@ -0,0 +1,312 @@ +//! Fragment-aware in-memory driving helpers. +//! +//! These functions fragment a payload using a [`Fragmenter`], encode each +//! fragment via [`encode_fragment_payload`], wrap the results in codec frames, +//! and feed them through a [`WireframeApp`]. This allows test authors to +//! verify that fragmented messages survive the full application pipeline +//! without manually constructing fragment wire bytes. + +use std::{io, num::NonZeroUsize}; + +use tokio::io::DuplexStream; +use wireframe::{ + app::{Packet, WireframeApp}, + codec::FrameCodec, + fragment::{Fragmenter, encode_fragment_payload}, +}; + +use super::{ + DEFAULT_CAPACITY, + TestSerializer, + codec_ext::{decode_frames_with_codec, encode_payloads_with_codec, extract_payloads}, + drive::drive_internal, + partial_frame::drive_chunked_internal, +}; + +// --------------------------------------------------------------------------- +// Shared fragment encoding +// --------------------------------------------------------------------------- + +/// Fragment `payload` and encode each fragment into a codec-ready payload. +fn fragment_and_encode(fragmenter: &Fragmenter, payload: Vec) -> io::Result>> { + let batch = fragmenter.fragment_bytes(payload).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("fragmentation failed: {err}"), + ) + })?; + batch + .into_iter() + .map(|frame| { + let (header, body) = frame.into_parts(); + encode_fragment_payload(header, &body).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("fragment encoding failed: {err}"), + ) + }) + }) + .collect() +} + +// --------------------------------------------------------------------------- +// Shared internal helper +// --------------------------------------------------------------------------- + +/// Fragment, encode, transport, and decode — returning full codec frames. +async fn drive_fragments_internal( + handler: H, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + capacity: usize, +) -> io::Result> +where + F: FrameCodec, + H: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let fragment_payloads = fragment_and_encode(fragmenter, payload)?; + let encoded = encode_payloads_with_codec(codec, fragment_payloads)?; + let raw = drive_internal(handler, encoded, capacity).await?; + decode_frames_with_codec(codec, raw) +} + +// --------------------------------------------------------------------------- +// Payload-level drivers (return Vec>) +// --------------------------------------------------------------------------- + +/// Fragment `payload`, encode each fragment into a codec frame, and drive +/// through `app`. Returns decoded response payloads. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let payloads = drive_with_fragments(app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_with_fragments_with_capacity(app, codec, fragmenter, payload, DEFAULT_CAPACITY).await +} + +/// Fragment and feed with a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments_with_capacity; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let payloads = +/// drive_with_fragments_with_capacity(app, &codec, &fragmenter, vec![0; 50], 8192).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments_with_capacity( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + capacity: usize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + fragmenter, + payload, + capacity, + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +/// Fragment and feed through a mutable `app`. +/// +/// The mutable reference allows the app instance to be reused across +/// successive calls. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments_mut; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let mut app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let payloads = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments_mut( + app: &mut WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + fragmenter, + payload, + DEFAULT_CAPACITY, + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +// --------------------------------------------------------------------------- +// Frame-level driver (returns Vec) +// --------------------------------------------------------------------------- + +/// Fragment and feed through `app`, returning decoded response frames. +/// +/// Unlike the payload-level drivers, this variant returns the full codec +/// frames so tests can inspect frame-level metadata. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragment_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let frames = drive_with_fragment_frames(app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragment_frames( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + fragmenter, + payload, + DEFAULT_CAPACITY, + ) + .await +} + +// --------------------------------------------------------------------------- +// Combined: fragment + chunked delivery +// --------------------------------------------------------------------------- + +/// Fragment `payload` and feed the wire bytes in chunks of `chunk_size`, +/// exercising both fragmentation and partial-frame buffering simultaneously. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_partial_fragments; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let chunk = NonZeroUsize::new(3).expect("non-zero"); +/// let payloads = +/// drive_with_partial_fragments(app, &codec, &fragmenter, vec![0; 50], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let fragment_payloads = fragment_and_encode(fragmenter, payload)?; + let encoded = encode_payloads_with_codec(codec, fragment_payloads)?; + let wire_bytes: Vec = encoded.into_iter().flatten().collect(); + let raw = drive_chunked_internal( + |server| async move { app.handle_connection(server).await }, + wire_bytes, + chunk_size, + DEFAULT_CAPACITY, + ) + .await?; + let frames = decode_frames_with_codec(codec, raw)?; + Ok(extract_payloads::(&frames)) +} diff --git a/wireframe_testing/src/helpers/partial_frame.rs b/wireframe_testing/src/helpers/partial_frame.rs new file mode 100644 index 00000000..b7fcc34f --- /dev/null +++ b/wireframe_testing/src/helpers/partial_frame.rs @@ -0,0 +1,304 @@ +//! Chunked-write in-memory driving helpers. +//! +//! These functions extend the frame-oriented drivers in [`super::drive`] with +//! configurable chunk sizes, forcing the codec decoder on the server side to +//! buffer partial frames across reads. This exercises realistic network +//! conditions where a single codec frame may arrive across multiple TCP reads. + +use std::{io, num::NonZeroUsize}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use wireframe::{ + app::{Packet, WireframeApp}, + codec::FrameCodec, +}; + +use super::{ + DEFAULT_CAPACITY, + TestSerializer, + codec_ext::{decode_frames_with_codec, encode_payloads_with_codec, extract_payloads}, +}; + +/// Drive a server function by writing `wire_bytes` in chunks of +/// `chunk_size` bytes, forcing partial-frame reads on the server side. +/// +/// This mirrors [`super::drive::drive_internal`] but replaces per-frame +/// `write_all` calls with a chunked iteration that slices the concatenated +/// wire bytes into fixed-size pieces. +/// +/// ```rust +/// use std::num::NonZeroUsize; +/// +/// use tokio::io::{AsyncWriteExt, DuplexStream}; +/// use wireframe_testing::helpers::partial_frame::drive_chunked_internal; +/// +/// async fn echo(mut s: DuplexStream) { let _ = s.write_all(&[1, 2]).await; } +/// +/// # async fn demo() -> std::io::Result<()> { +/// let out = drive_chunked_internal(echo, vec![0], NonZeroUsize::new(1).unwrap(), 64).await?; +/// assert_eq!(out, [1, 2]); +/// # Ok(()) +/// # } +/// ``` +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result> +where + F: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let (mut client, server) = duplex(capacity); + + let server_fut = async { + use futures::FutureExt as _; + let result = std::panic::AssertUnwindSafe(server_fn(server)) + .catch_unwind() + .await; + match result { + Ok(()) => Ok(()), + Err(panic) => { + let panic_msg = wireframe::panic::format_panic(&panic); + Err(io::Error::new( + io::ErrorKind::Other, + format!("server task failed: {panic_msg}"), + )) + } + } + }; + + let client_fut = async { + let total = wire_bytes.len(); + let step = chunk_size.get(); + let mut offset = 0; + while offset < total { + let end = (offset + step).min(total); + let chunk = wire_bytes + .get(offset..end) + .ok_or_else(|| io::Error::other("chunk slice out of bounds"))?; + client.write_all(chunk).await?; + offset = end; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + io::Result::Ok(buf) + }; + + let ((), buf) = tokio::try_join!(server_fut, client_fut)?; + Ok(buf) +} + +// --------------------------------------------------------------------------- +// Shared internal helper +// --------------------------------------------------------------------------- + +/// Encode payloads, chunk the wire bytes, drive the server, and decode +/// response frames. +async fn drive_partial_frames_internal( + handler: H, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result> +where + F: FrameCodec, + H: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let encoded = encode_payloads_with_codec(codec, payloads)?; + let wire_bytes: Vec = encoded.into_iter().flatten().collect(); + let raw = drive_chunked_internal(handler, wire_bytes, chunk_size, capacity).await?; + decode_frames_with_codec(codec, raw) +} + +// --------------------------------------------------------------------------- +// Payload-level drivers (return Vec>) +// --------------------------------------------------------------------------- + +/// Drive `app` with payloads encoded by `codec`, writing wire bytes in +/// chunks of `chunk_size` to exercise partial-frame buffering. +/// +/// Each input payload is encoded through the codec, and the resulting wire +/// bytes are concatenated and written `chunk_size` bytes at a time. The +/// server's responses are decoded and returned as payload byte vectors. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(1).expect("non-zero"); +/// let payloads = drive_with_partial_frames(app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_with_partial_frames_with_capacity(app, codec, payloads, chunk_size, DEFAULT_CAPACITY) + .await +} + +/// Drive `app` with payloads in chunks using a duplex buffer of `capacity` +/// bytes. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames_with_capacity; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(3).expect("non-zero"); +/// let payloads = +/// drive_with_partial_frames_with_capacity(app, &codec, vec![vec![1]], chunk, 8192).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames_with_capacity( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + chunk_size, + capacity, + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +/// Drive a mutable `app` with payloads in chunks of `chunk_size`. +/// +/// The mutable reference allows the app instance to be reused across +/// successive calls. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames_mut; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let mut app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(5).expect("non-zero"); +/// let payloads = drive_with_partial_frames_mut(&mut app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames_mut( + app: &mut WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + chunk_size, + DEFAULT_CAPACITY, + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +// --------------------------------------------------------------------------- +// Frame-level driver (returns Vec) +// --------------------------------------------------------------------------- + +/// Drive `app` with payloads in chunks and return decoded response frames. +/// +/// Unlike the payload-level drivers, this variant returns the full codec +/// frames so tests can inspect frame-level metadata such as transaction +/// identifiers or sequence numbers. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_codec_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(2).expect("non-zero"); +/// let frames = drive_with_partial_codec_frames(app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_codec_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + chunk_size, + DEFAULT_CAPACITY, + ) + .await +} diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 150bafd8..ca56ffc8 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -43,12 +43,21 @@ pub use helpers::{ drive_with_codec_payloads_mut, drive_with_codec_payloads_with_capacity, drive_with_codec_payloads_with_capacity_mut, + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_fragments_with_capacity, drive_with_frame, drive_with_frame_mut, drive_with_frame_with_capacity, drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, + drive_with_partial_codec_frames, + drive_with_partial_fragments, + drive_with_partial_frames, + drive_with_partial_frames_mut, + drive_with_partial_frames_with_capacity, drive_with_payloads, drive_with_payloads_mut, encode_frame, From f226a14c20f7519801191cba50005e1466069913 Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 3 Mar 2026 20:02:52 +0000 Subject: [PATCH 2/4] refactor(helpers): introduce ChunkConfig and FragmentRequest structs for config - Added ChunkConfig to combine chunk size and duplex buffer capacity in partial_frame. - Added FragmentRequest to bundle fragmenter, payload, and capacity in fragment_drive. - Updated relevant functions to use these structs for clearer and more flexible configuration. - Refactored partial frame tests for reusability using a helper test function. Co-authored-by: devboxerhub[bot] --- tests/partial_frame_feeding.rs | 24 ++++------ .../src/helpers/fragment_drive.rs | 48 +++++++++++++------ .../src/helpers/partial_frame.rs | 46 ++++++++++++++---- 3 files changed, 80 insertions(+), 38 deletions(-) diff --git a/tests/partial_frame_feeding.rs b/tests/partial_frame_feeding.rs index 5804c541..afe5fc9b 100644 --- a/tests/partial_frame_feeding.rs +++ b/tests/partial_frame_feeding.rs @@ -47,12 +47,11 @@ fn serialize_envelope(payload: &[u8]) -> io::Result> { // Chunked-write (partial frame) tests // --------------------------------------------------------------------------- -#[tokio::test] -async fn partial_frames_single_byte_chunks() -> io::Result<()> { +async fn test_partial_frames_with_chunk(payload: &[u8], chunk_size: usize) -> io::Result<()> { let codec = hotline_codec(); let app = build_echo_app(codec.clone())?; - let serialized = serialize_envelope(&[10, 20, 30])?; - let chunk = NonZeroUsize::new(1).ok_or_else(|| io::Error::other("non-zero"))?; + let serialized = serialize_envelope(payload)?; + let chunk = NonZeroUsize::new(chunk_size).ok_or_else(|| io::Error::other("non-zero"))?; let payloads = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; @@ -63,18 +62,13 @@ async fn partial_frames_single_byte_chunks() -> io::Result<()> { } #[tokio::test] -async fn partial_frames_misaligned_chunks() -> io::Result<()> { - let codec = hotline_codec(); - let app = build_echo_app(codec.clone())?; - let serialized = serialize_envelope(&[1, 2, 3, 4, 5])?; - let chunk = NonZeroUsize::new(7).ok_or_else(|| io::Error::other("non-zero"))?; - - let payloads = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; +async fn partial_frames_single_byte_chunks() -> io::Result<()> { + test_partial_frames_with_chunk(&[10, 20, 30], 1).await +} - if payloads.is_empty() { - return Err(io::Error::other("expected non-empty response payloads")); - } - Ok(()) +#[tokio::test] +async fn partial_frames_misaligned_chunks() -> io::Result<()> { + test_partial_frames_with_chunk(&[1, 2, 3, 4, 5], 7).await } #[tokio::test] diff --git a/wireframe_testing/src/helpers/fragment_drive.rs b/wireframe_testing/src/helpers/fragment_drive.rs index 6bfb99e8..d83ec07f 100644 --- a/wireframe_testing/src/helpers/fragment_drive.rs +++ b/wireframe_testing/src/helpers/fragment_drive.rs @@ -53,22 +53,48 @@ fn fragment_and_encode(fragmenter: &Fragmenter, payload: Vec) -> io::Result< // Shared internal helper // --------------------------------------------------------------------------- +/// Bundles the fragmenter, payload, and duplex buffer capacity needed by +/// [`drive_fragments_internal`]. +struct FragmentRequest<'a> { + /// Fragmenter that splits the payload into fragment frames. + fragmenter: &'a Fragmenter, + /// Raw payload bytes to fragment and feed. + payload: Vec, + /// Duplex stream buffer capacity. + capacity: usize, +} + +impl<'a> FragmentRequest<'a> { + /// Create a request with the default duplex buffer capacity. + fn new(fragmenter: &'a Fragmenter, payload: Vec) -> Self { + Self { + fragmenter, + payload, + capacity: DEFAULT_CAPACITY, + } + } + + /// Override the duplex buffer capacity. + fn with_capacity(mut self, capacity: usize) -> Self { + self.capacity = capacity; + self + } +} + /// Fragment, encode, transport, and decode — returning full codec frames. async fn drive_fragments_internal( handler: H, codec: &F, - fragmenter: &Fragmenter, - payload: Vec, - capacity: usize, + request: FragmentRequest<'_>, ) -> io::Result> where F: FrameCodec, H: FnOnce(DuplexStream) -> Fut, Fut: std::future::Future + Send, { - let fragment_payloads = fragment_and_encode(fragmenter, payload)?; + let fragment_payloads = fragment_and_encode(request.fragmenter, request.payload)?; let encoded = encode_payloads_with_codec(codec, fragment_payloads)?; - let raw = drive_internal(handler, encoded, capacity).await?; + let raw = drive_internal(handler, encoded, request.capacity).await?; decode_frames_with_codec(codec, raw) } @@ -151,9 +177,7 @@ where let frames = drive_fragments_internal( |server| async move { app.handle_connection(server).await }, codec, - fragmenter, - payload, - capacity, + FragmentRequest::new(fragmenter, payload).with_capacity(capacity), ) .await?; Ok(extract_payloads::(&frames)) @@ -198,9 +222,7 @@ where let frames = drive_fragments_internal( |server| async move { app.handle_connection(server).await }, codec, - fragmenter, - payload, - DEFAULT_CAPACITY, + FragmentRequest::new(fragmenter, payload), ) .await?; Ok(extract_payloads::(&frames)) @@ -249,9 +271,7 @@ where drive_fragments_internal( |server| async move { app.handle_connection(server).await }, codec, - fragmenter, - payload, - DEFAULT_CAPACITY, + FragmentRequest::new(fragmenter, payload), ) .await } diff --git a/wireframe_testing/src/helpers/partial_frame.rs b/wireframe_testing/src/helpers/partial_frame.rs index b7fcc34f..2cf020e8 100644 --- a/wireframe_testing/src/helpers/partial_frame.rs +++ b/wireframe_testing/src/helpers/partial_frame.rs @@ -19,6 +19,37 @@ use super::{ codec_ext::{decode_frames_with_codec, encode_payloads_with_codec, extract_payloads}, }; +/// Configuration for chunked-write delivery. +/// +/// Combines the chunk size (how many bytes to write per call) with the +/// duplex buffer capacity used by the in-memory stream. +#[derive(Debug, Clone, Copy)] +pub(super) struct ChunkConfig { + /// Number of bytes per write call. + pub chunk_size: NonZeroUsize, + /// Duplex stream buffer capacity. + pub capacity: usize, +} + +impl ChunkConfig { + /// Create a configuration with the given chunk size and the default + /// duplex buffer capacity. + pub fn new(chunk_size: NonZeroUsize) -> Self { + Self { + chunk_size, + capacity: DEFAULT_CAPACITY, + } + } + + /// Create a configuration with an explicit duplex buffer capacity. + pub fn with_capacity(chunk_size: NonZeroUsize, capacity: usize) -> Self { + Self { + chunk_size, + capacity, + } + } +} + /// Drive a server function by writing `wire_bytes` in chunks of /// `chunk_size` bytes, forcing partial-frame reads on the server side. /// @@ -102,8 +133,7 @@ async fn drive_partial_frames_internal( handler: H, codec: &F, payloads: Vec>, - chunk_size: NonZeroUsize, - capacity: usize, + config: ChunkConfig, ) -> io::Result> where F: FrameCodec, @@ -112,7 +142,8 @@ where { let encoded = encode_payloads_with_codec(codec, payloads)?; let wire_bytes: Vec = encoded.into_iter().flatten().collect(); - let raw = drive_chunked_internal(handler, wire_bytes, chunk_size, capacity).await?; + let raw = + drive_chunked_internal(handler, wire_bytes, config.chunk_size, config.capacity).await?; decode_frames_with_codec(codec, raw) } @@ -200,8 +231,7 @@ where |server| async move { app.handle_connection(server).await }, codec, payloads, - chunk_size, - capacity, + ChunkConfig::with_capacity(chunk_size, capacity), ) .await?; Ok(extract_payloads::(&frames)) @@ -246,8 +276,7 @@ where |server| async move { app.handle_connection(server).await }, codec, payloads, - chunk_size, - DEFAULT_CAPACITY, + ChunkConfig::new(chunk_size), ) .await?; Ok(extract_payloads::(&frames)) @@ -297,8 +326,7 @@ where |server| async move { app.handle_connection(server).await }, codec, payloads, - chunk_size, - DEFAULT_CAPACITY, + ChunkConfig::new(chunk_size), ) .await } From 80d14d6ada16230ee84139bf91fd1a6c59039c4a Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 3 Mar 2026 22:53:12 +0000 Subject: [PATCH 3/4] feat(wireframe_testing): wrap fragment payloads in serialized Envelopes Fragment payloads are now wrapped inside serialized `Envelope` packets before being fed through the codec and application pipeline. This ensures the application's deserializer accepts these fragment frames instead of accumulating deserialization failures and closing connections. - Updated fragment_and_encode to wrap `FRAG`-prefixed fragment bytes into `Envelope` packets serialized with `BincodeSerializer`. - Adjusted fragment feeding tests to verify envelope deserialization and responses. - Improved documentation and examples in wireframe_testing helpers to reflect the new envelope wrapping. - Added default route ID for wrapped fragments matching typical app route registrations. This enhancement makes fragment feeding tests more accurate by fully simulating the expected codec and application message framing. Co-authored-by: devboxerhub[bot] --- ...ding-partial-frames-into-in-process-app.md | 20 +++--- docs/users-guide.md | 2 +- tests/fixtures/partial_frame_feeding.rs | 4 +- tests/partial_frame_feeding.rs | 61 +++++++++++++++--- .../src/helpers/fragment_drive.rs | 63 ++++++++++++++----- .../src/helpers/partial_frame.rs | 19 +++--- 6 files changed, 123 insertions(+), 46 deletions(-) diff --git a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md index b53595b6..b1b0a632 100644 --- a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md +++ b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md @@ -28,8 +28,9 @@ After this work, library consumers gain two new families of test helpers: This lets test authors verify that fragmented messages survive the full app pipeline without manually constructing fragment wire bytes. -Success is observable by running `make test` and seeing the new unit and BDD -tests pass (they will fail before the implementation and pass after). +Success is observable by running `make test` and seeing the new unit and +behaviour-driven development (BDD) tests pass (they will fail before the +implementation and pass after). ## Constraints @@ -97,13 +98,14 @@ tests pass (they will fail before the implementation and pass after). ## Surprises & discoveries -- Fragment payloads are `FRAG`-prefixed raw bytes, not valid `Envelope` - serialisations. The app receives them via the codec but cannot route - them (no matching envelope), so it produces no response. Fragment-feeding - tests therefore verify successful I/O completion (no error) rather than - non-empty responses. This is a correct reflection of the pipeline: the - test utilities exercise the codec/transport layer, not the application - routing layer. +- Fragment payloads are `FRAG`-prefixed raw bytes produced by + `encode_fragment_payload`. These must be wrapped inside a serialized + `Envelope` before codec framing so the application's deserializer + accepts them. Without wrapping, accumulating deserialization failures + would close the connection after 10 frames (see + `MAX_DESER_FAILURES` in `src/app/inbound_handler.rs`). + `fragment_and_encode` now creates an `Envelope` with route ID 1 for + each fragment and serializes it with `BincodeSerializer`. ## Decision log diff --git a/docs/users-guide.md b/docs/users-guide.md index a5bc24c2..89e8c75e 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -309,7 +309,7 @@ Available fixture functions: Real networks rarely deliver a complete codec frame in a single TCP read. The `wireframe_testing` crate provides drivers that simulate these -conditions so you can exercise your codec's buffering logic in tests. +conditions so that codec buffering logic can be exercised in tests. **Chunked-write drivers** encode payloads via a codec, concatenate the wire bytes, and write them in configurable chunk sizes (including one byte at a diff --git a/tests/fixtures/partial_frame_feeding.rs b/tests/fixtures/partial_frame_feeding.rs index c26ea721..a3a61477 100644 --- a/tests/fixtures/partial_frame_feeding.rs +++ b/tests/fixtures/partial_frame_feeding.rs @@ -111,7 +111,9 @@ impl PartialFrameFeedingWorld { let mut payloads = Vec::with_capacity(count); for i in 0..count { - let env = Envelope::new(1, Some(7), vec![u8::try_from(i).unwrap_or(0)]); + let byte = u8::try_from(i) + .map_err(|_| format!("payload index {i} exceeds u8 range; use count <= 256"))?; + let env = Envelope::new(1, Some(7), vec![byte]); payloads.push(BincodeSerializer.serialize(&env)?); } diff --git a/tests/partial_frame_feeding.rs b/tests/partial_frame_feeding.rs index afe5fc9b..f01cdbd0 100644 --- a/tests/partial_frame_feeding.rs +++ b/tests/partial_frame_feeding.rs @@ -47,17 +47,35 @@ fn serialize_envelope(payload: &[u8]) -> io::Result> { // Chunked-write (partial frame) tests // --------------------------------------------------------------------------- +fn deserialize_envelope(bytes: &[u8]) -> io::Result { + let (env, _) = BincodeSerializer + .deserialize::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("deserialize: {e}")))?; + Ok(env) +} + async fn test_partial_frames_with_chunk(payload: &[u8], chunk_size: usize) -> io::Result<()> { let codec = hotline_codec(); let app = build_echo_app(codec.clone())?; let serialized = serialize_envelope(payload)?; let chunk = NonZeroUsize::new(chunk_size).ok_or_else(|| io::Error::other("non-zero"))?; - let payloads = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; + let response = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; - if payloads.is_empty() { + if response.is_empty() { return Err(io::Error::other("expected non-empty response payloads")); } + + let expected = Envelope::new(1, Some(7), payload.to_vec()); + for (idx, bytes) in response.iter().enumerate() { + let env = deserialize_envelope(bytes)?; + if env != expected { + return Err(io::Error::other(format!( + "envelope mismatch at index {idx}: expected {expected:?}, got {env:?}" + ))); + } + } + Ok(()) } @@ -79,12 +97,37 @@ async fn partial_frames_multiple_payloads() -> io::Result<()> { let p2 = serialize_envelope(&[2])?; let chunk = NonZeroUsize::new(3).ok_or_else(|| io::Error::other("non-zero"))?; - let payloads = drive_with_partial_frames(app, &codec, vec![p1, p2], chunk).await?; + let response = drive_with_partial_frames(app, &codec, vec![p1, p2], chunk).await?; - if payloads.len() != 2 { + if response.len() != 2 { return Err(io::Error::other(format!( "expected 2 response payloads, got {}", - payloads.len() + response.len() + ))); + } + + let expected_first = Envelope::new(1, Some(7), vec![1]); + let expected_second = Envelope::new(1, Some(7), vec![2]); + + let first = deserialize_envelope( + response + .first() + .ok_or_else(|| io::Error::other("missing first payload"))?, + )?; + let second = deserialize_envelope( + response + .get(1) + .ok_or_else(|| io::Error::other("missing second payload"))?, + )?; + + if first != expected_first { + return Err(io::Error::other(format!( + "first payload mismatch: expected {expected_first:?}, got {first:?}" + ))); + } + if second != expected_second { + return Err(io::Error::other(format!( + "second payload mismatch: expected {expected_second:?}, got {second:?}" ))); } Ok(()) @@ -135,10 +178,10 @@ async fn partial_frames_mut_allows_reuse() -> io::Result<()> { // Fragment feeding tests // --------------------------------------------------------------------------- -// Fragment payloads are FRAG-prefixed raw bytes, not valid Envelope -// serializations. The app receives and processes them but does not produce a -// routed response. Verifying no I/O error confirms the full transport pipeline -// (fragment → encode → transport → decode) works end to end. +// Fragment payloads are FRAG-prefixed bytes wrapped in serialized Envelopes. +// The app deserializes each frame successfully, but the no-op route handler +// does not produce a response. Verifying no I/O error confirms the full +// transport pipeline (fragment → envelope → encode → transport → decode). #[tokio::test] async fn fragment_round_trip() -> io::Result<()> { diff --git a/wireframe_testing/src/helpers/fragment_drive.rs b/wireframe_testing/src/helpers/fragment_drive.rs index d83ec07f..f8dd9d59 100644 --- a/wireframe_testing/src/helpers/fragment_drive.rs +++ b/wireframe_testing/src/helpers/fragment_drive.rs @@ -1,18 +1,20 @@ //! Fragment-aware in-memory driving helpers. //! //! These functions fragment a payload using a [`Fragmenter`], encode each -//! fragment via [`encode_fragment_payload`], wrap the results in codec frames, -//! and feed them through a [`WireframeApp`]. This allows test authors to -//! verify that fragmented messages survive the full application pipeline -//! without manually constructing fragment wire bytes. +//! fragment via [`encode_fragment_payload`], wrap the `FRAG`-prefixed bytes +//! inside a serialized [`Envelope`] packet, and feed the results through a +//! [`WireframeApp`] as codec frames. Wrapping in an `Envelope` ensures the +//! application's deserializer accepts the frames instead of accumulating +//! consecutive deserialization failures. use std::{io, num::NonZeroUsize}; use tokio::io::DuplexStream; use wireframe::{ - app::{Packet, WireframeApp}, + app::{Envelope, Packet, WireframeApp}, codec::FrameCodec, fragment::{Fragmenter, encode_fragment_payload}, + serializer::{BincodeSerializer, Serializer}, }; use super::{ @@ -27,8 +29,19 @@ use super::{ // Shared fragment encoding // --------------------------------------------------------------------------- -/// Fragment `payload` and encode each fragment into a codec-ready payload. -fn fragment_and_encode(fragmenter: &Fragmenter, payload: Vec) -> io::Result>> { +/// Fragment `payload`, encode each fragment, and wrap the result in +/// serialized [`Envelope`] packets so the app's deserializer accepts them. +/// +/// Each fragment is encoded via [`encode_fragment_payload`] into +/// `FRAG`-prefixed bytes, then placed as the `payload` field of an +/// `Envelope` and serialized with [`BincodeSerializer`]. This matches +/// the inbound path the application expects: deserialize the codec frame +/// payload as an `Envelope`, then hand it to the fragment reassembler. +fn fragment_and_encode( + fragmenter: &Fragmenter, + payload: Vec, + route_id: u32, +) -> io::Result>> { let batch = fragmenter.fragment_bytes(payload).map_err(|err| { io::Error::new( io::ErrorKind::InvalidData, @@ -39,11 +52,18 @@ fn fragment_and_encode(fragmenter: &Fragmenter, payload: Vec) -> io::Result< .into_iter() .map(|frame| { let (header, body) = frame.into_parts(); - encode_fragment_payload(header, &body).map_err(|err| { + let frag_bytes = encode_fragment_payload(header, &body).map_err(|err| { io::Error::new( io::ErrorKind::InvalidData, format!("fragment encoding failed: {err}"), ) + })?; + let env = Envelope::new(route_id, None, frag_bytes); + BincodeSerializer.serialize(&env).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("envelope serialization failed: {err}"), + ) }) }) .collect() @@ -53,6 +73,11 @@ fn fragment_and_encode(fragmenter: &Fragmenter, payload: Vec) -> io::Result< // Shared internal helper // --------------------------------------------------------------------------- +/// Default route identifier used when wrapping fragment payloads in +/// [`Envelope`] packets. Matches the route typically registered by test +/// applications built with `WireframeApp::route(1, ...)`. +const DEFAULT_FRAGMENT_ROUTE_ID: u32 = 1; + /// Bundles the fragmenter, payload, and duplex buffer capacity needed by /// [`drive_fragments_internal`]. struct FragmentRequest<'a> { @@ -60,6 +85,8 @@ struct FragmentRequest<'a> { fragmenter: &'a Fragmenter, /// Raw payload bytes to fragment and feed. payload: Vec, + /// Route identifier for the wrapping [`Envelope`]. + route_id: u32, /// Duplex stream buffer capacity. capacity: usize, } @@ -70,6 +97,7 @@ impl<'a> FragmentRequest<'a> { Self { fragmenter, payload, + route_id: DEFAULT_FRAGMENT_ROUTE_ID, capacity: DEFAULT_CAPACITY, } } @@ -92,8 +120,9 @@ where H: FnOnce(DuplexStream) -> Fut, Fut: std::future::Future + Send, { - let fragment_payloads = fragment_and_encode(request.fragmenter, request.payload)?; - let encoded = encode_payloads_with_codec(codec, fragment_payloads)?; + let serialized_envelopes = + fragment_and_encode(request.fragmenter, request.payload, request.route_id)?; + let encoded = encode_payloads_with_codec(codec, serialized_envelopes)?; let raw = drive_internal(handler, encoded, request.capacity).await?; decode_frames_with_codec(codec, raw) } @@ -119,7 +148,7 @@ where /// # async fn demo() -> std::io::Result<()> { /// let codec = HotlineFrameCodec::new(4096); /// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); -/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); /// let payloads = drive_with_fragments(app, &codec, &fragmenter, vec![0; 50]).await?; /// # Ok(()) /// # } @@ -155,7 +184,7 @@ where /// # async fn demo() -> std::io::Result<()> { /// let codec = HotlineFrameCodec::new(4096); /// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); -/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); /// let payloads = /// drive_with_fragments_with_capacity(app, &codec, &fragmenter, vec![0; 50], 8192).await?; /// # Ok(()) @@ -202,7 +231,7 @@ where /// # async fn demo() -> std::io::Result<()> { /// let codec = HotlineFrameCodec::new(4096); /// let mut app = WireframeApp::new().expect("app").with_codec(codec.clone()); -/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); /// let payloads = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 50]).await?; /// # Ok(()) /// # } @@ -251,7 +280,7 @@ where /// # async fn demo() -> std::io::Result<()> { /// let codec = HotlineFrameCodec::new(4096); /// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); -/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); /// let frames = drive_with_fragment_frames(app, &codec, &fragmenter, vec![0; 50]).await?; /// # Ok(()) /// # } @@ -297,7 +326,7 @@ where /// # async fn demo() -> std::io::Result<()> { /// let codec = HotlineFrameCodec::new(4096); /// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); -/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); /// let chunk = NonZeroUsize::new(3).expect("non-zero"); /// let payloads = /// drive_with_partial_fragments(app, &codec, &fragmenter, vec![0; 50], chunk).await?; @@ -317,8 +346,8 @@ where E: Packet, F: FrameCodec, { - let fragment_payloads = fragment_and_encode(fragmenter, payload)?; - let encoded = encode_payloads_with_codec(codec, fragment_payloads)?; + let serialized_envelopes = fragment_and_encode(fragmenter, payload, DEFAULT_FRAGMENT_ROUTE_ID)?; + let encoded = encode_payloads_with_codec(codec, serialized_envelopes)?; let wire_bytes: Vec = encoded.into_iter().flatten().collect(); let raw = drive_chunked_internal( |server| async move { app.handle_connection(server).await }, diff --git a/wireframe_testing/src/helpers/partial_frame.rs b/wireframe_testing/src/helpers/partial_frame.rs index 2cf020e8..a3bca30a 100644 --- a/wireframe_testing/src/helpers/partial_frame.rs +++ b/wireframe_testing/src/helpers/partial_frame.rs @@ -57,19 +57,20 @@ impl ChunkConfig { /// `write_all` calls with a chunked iteration that slices the concatenated /// wire bytes into fixed-size pieces. /// -/// ```rust -/// use std::num::NonZeroUsize; -/// -/// use tokio::io::{AsyncWriteExt, DuplexStream}; -/// use wireframe_testing::helpers::partial_frame::drive_chunked_internal; +/// This function is `pub(super)` and not exported from the crate. Use one +/// of the public `drive_with_partial_*` wrappers instead. /// +/// ```rust,ignore /// async fn echo(mut s: DuplexStream) { let _ = s.write_all(&[1, 2]).await; } /// -/// # async fn demo() -> std::io::Result<()> { -/// let out = drive_chunked_internal(echo, vec![0], NonZeroUsize::new(1).unwrap(), 64).await?; +/// let out = drive_chunked_internal( +/// echo, +/// vec![0], +/// NonZeroUsize::new(1).expect("non-zero"), +/// 64, +/// ) +/// .await?; /// assert_eq!(out, [1, 2]); -/// # Ok(()) -/// # } /// ``` pub(super) async fn drive_chunked_internal( server_fn: F, From c3abe80b510c3b1d892e2ae054b66910c45f6d4a Mon Sep 17 00:00:00 2001 From: Leynos Date: Tue, 3 Mar 2026 23:09:43 +0000 Subject: [PATCH 4/4] test(partial_frame_feeding): update test terminology and assertions for fragment feeding - Corrected spelling of 'parameterised' to 'parameterized' in comments. - Updated test scenario assertions from 'app receives fragment frames' to 'fragment feeding completed'. - Replaced `response_frames` field with `fragment_feeding_completed` boolean in test fixture. - Changed assertion helpers to check for fragment feeding completion instead of received fragments. These changes improve clarity and accuracy of partial frame feeding tests. Co-authored-by: devboxerhub[bot] --- ...s-for-feeding-partial-frames-into-in-process-app.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md index b1b0a632..01b1df0d 100644 --- a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md +++ b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md @@ -391,7 +391,7 @@ In `wireframe_testing/src/lib.rs`: ### Stage E: Unit tests (`tests/partial_frame_feeding.rs`) Integration test file in the main crate. Uses `rstest` for fixtures and -parameterised cases. Tests: +parameterized cases. Tests: 1. **Chunked single-byte write round-trips** — encode a payload via `HotlineFrameCodec`, feed one byte at a time via @@ -430,13 +430,13 @@ Feature: Partial frame and fragment feeding utilities Given a wireframe app with a Hotline codec allowing 4096-byte frames And a fragmenter capped at 20 bytes per fragment When a 100-byte payload is fragmented and fed through the app - Then the app receives fragment frames + Then fragment feeding completed Scenario: Fragmented payload survives chunked delivery Given a wireframe app with a Hotline codec allowing 4096-byte frames And a fragmenter capped at 20 bytes per fragment When a 100-byte payload is fragmented and fed in 3-byte chunks - Then the app receives fragment frames + Then fragment feeding completed ``` #### World fixture: `tests/fixtures/partial_frame_feeding.rs` @@ -447,7 +447,7 @@ A `PartialFrameFeedingWorld` struct holding: - `app: Option>` - `fragmenter: Option` - `response_payloads: Vec>` -- `response_frames: Vec` +- `fragment_feeding_completed: bool` Manual `Debug` impl (because `WireframeApp` lacks `Debug`). Default impl. `#[fixture] pub fn partial_frame_feeding_world()` fixture function. @@ -463,7 +463,7 @@ Helper methods: - `drive_partial_fragmented(payload_len, chunk_size)` — call `drive_with_partial_fragments`. - Assertion helpers: `assert_payloads_non_empty()`, - `assert_payload_count(n)`, `assert_received_fragments()`. + `assert_payload_count(n)`, `assert_fragment_feeding_completed()`. #### Steps: `tests/steps/partial_frame_feeding_steps.rs`