diff --git a/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md new file mode 100644 index 00000000..01b1df0d --- /dev/null +++ b/docs/execplans/8-5-1-utilities-for-feeding-partial-frames-into-in-process-app.md @@ -0,0 +1,639 @@ +# 8.5.1 Add utilities for feeding partial frames or fragments into an in-process app + +This ExecPlan (execution plan) is a living document. The sections +`Constraints`, `Tolerances`, `Risks`, `Progress`, `Surprises & Discoveries`, +`Decision Log`, and `Outcomes & Retrospective` must be kept up to date as work +proceeds. + +Status: DONE + +## Purpose / big picture + +The `wireframe_testing` crate already provides driver functions that feed +**complete** frames into an in-process `WireframeApp` via a `tokio::io::duplex` +stream. Every existing driver writes each frame atomically with `write_all()`, +so the codec decoder never has to buffer a partial read. Real networks do not +behave this way: a single codec frame can arrive across multiple TCP reads, and +a logical message may span multiple fragment frames. + +After this work, library consumers gain two new families of test helpers: + +1. **Chunked-write drivers** — encode payloads via a codec, then drip-feed the + resulting wire bytes in configurable chunk sizes (including one byte at a + time). This exercises the codec decoder's buffering logic under realistic + conditions. +2. **Fragment-feeding drivers** — accept a raw payload, fragment it with a + `Fragmenter`, encode each fragment into a codec frame via + `encode_fragment_payload`, and feed the resulting frames through the app. + This lets test authors verify that fragmented messages survive the full app + pipeline without manually constructing fragment wire bytes. + +Success is observable by running `make test` and seeing the new unit and +behaviour-driven development (BDD) tests pass (they will fail before the +implementation and pass after). + +## Constraints + +- `wireframe_testing` is **not** a workspace member. Integration tests must + live in the main crate's `tests/` directory, not inside `wireframe_testing`. +- No single source file may exceed 400 lines. +- All code must pass `make check-fmt`, `make lint` (clippy with `-D warnings` + and all strict lints), and `make test`. +- Clippy strictness includes: no `[]` indexing (use `.get()`), no `assert!` / + `panic!` in Result-returning functions (return `Err`), no + `#[allow]`/`#[expect]` without `reason = "..."`, `doc_markdown` backticking, + `cast_possible_truncation` via `try_from`. +- en-GB-oxendict spelling in comments and documentation ("-ize" / "-yse" / + "-our"). +- Public functions require `///` doc comments with usage examples. +- BDD fixtures must use `rstest-bdd` v0.5.0, and step function parameter names + must match the fixture name exactly (e.g., `partial_frame_feeding_world`). +- Existing public APIs must not change signature. + +## Tolerances (exception triggers) + +- Scope: if the implementation requires changes to more than 18 files or 1200 + net lines, stop and escalate. +- Interface: if any existing public API signature must change, stop and + escalate. +- Dependencies: if a new external dependency is required in `Cargo.toml` beyond + what is already present, stop and escalate. +- Iterations: if tests still fail after 5 attempts at fixing, stop and + escalate. +- Ambiguity: if multiple valid interpretations exist and the choice materially + affects the outcome, stop and present options with trade-offs. + +## Risks + +- Risk: Fragment payload encoding depends on `bincode` serialization of + `FragmentHeader` which is an internal wire format. If the fragment adapter + layer intercepts before the driver sees the data, the test utility may need + to bypass the adapter. Severity: medium Likelihood: low Mitigation: Use the + existing public `encode_fragment_payload()` and `decode_fragment_payload()` + from `src/fragment/payload.rs` which are the canonical encode/decode path. Do + not bypass the adapter. + +- Risk: The 400-line file limit may be tight for the BDD world fixture if it + contains many helper methods. Severity: low Likelihood: medium Mitigation: + Split the world fixture into submodules (following the + `tests/fixtures/fragment/` pattern which already uses `mod reassembly`). + +- Risk: The chunked-write driver may cause deadlocks if the duplex buffer is + smaller than a single chunk and the server hasn't read yet. Severity: medium + Likelihood: low Mitigation: Chunk writes are interleaved with the server + running concurrently via `tokio::try_join!`. The duplex buffer provides + backpressure naturally. Document the minimum capacity requirement. + +## Progress + +- [x] Stage A: Scaffolding — create new module files and register them. +- [x] Stage B: Implement `partial_frame.rs` (chunked-write driver). +- [x] Stage C: Implement `fragment_drive.rs` (fragment-feeding driver). +- [x] Stage D: Wire up re-exports in `helpers.rs` and `lib.rs`. +- [x] Stage E: Write unit tests in `tests/partial_frame_feeding.rs`. +- [x] Stage F: Write BDD infrastructure (feature, fixture, steps, scenarios). +- [x] Stage G: Update `docs/users-guide.md` with new public API. +- [x] Stage H: Mark roadmap item 8.5.1 as done. +- [x] Stage I: Run full validation (`make check-fmt && make lint && make test`). + +## Surprises & discoveries + +- Fragment payloads are `FRAG`-prefixed raw bytes produced by + `encode_fragment_payload`. These must be wrapped inside a serialized + `Envelope` before codec framing so the application's deserializer + accepts them. Without wrapping, accumulating deserialization failures + would close the connection after 10 frames (see + `MAX_DESER_FAILURES` in `src/app/inbound_handler.rs`). + `fragment_and_encode` now creates an `Envelope` with route ID 1 for + each fragment and serializes it with `BincodeSerializer`. + +## Decision log + +- Decision: Place chunked-write logic in a new `partial_frame.rs` module + alongside the existing `drive.rs`, rather than extending `drive_internal`. + Rationale: `drive_internal` is `pub(super)` and used by all existing drivers. + Adding chunked-write semantics to it would change the behaviour of every + driver. A parallel internal function keeps the existing code untouched. + Date/Author: 2026-03-01 + +- Decision: Fragment-feeding goes in `fragment_drive.rs`, composing + `encode_fragment_payload` + `encode_payloads_with_codec` + `drive_internal`. + Rationale: Follows the layering pattern of `codec_drive.rs` which composes + `encode_payloads_with_codec` + `drive_internal` + `decode_frames_with_codec`. + Fragment feeding is one layer above codec encoding. Date/Author: 2026-03-01 + +- Decision: BDD test suite named `partial_frame_feeding` (not + `partial_frames` or `chunked_driver`) to match the roadmap wording "feeding + partial frames". Rationale: Consistency with roadmap item title. Date/Author: + 2026-03-01 + +## Outcomes & retrospective + +All acceptance criteria met: + +- 9 unit tests pass in `tests/partial_frame_feeding.rs`. +- 4 BDD scenarios pass in + `tests/scenarios/partial_frame_feeding_scenarios.rs`. +- `make check-fmt`, `make lint`, and `make test` all exit 0. +- `docs/users-guide.md` contains a new "Feeding partial frames and + fragments" section documenting all 9 public functions. +- `docs/roadmap.md` item 8.5.1 is marked `[x]`. +- 14 files touched (7 created, 5 modified for registration/re-exports, + 2 documentation files updated), within the 18-file tolerance. + +## Context and orientation + +The wireframe project is a Rust async networking framework. The +`wireframe_testing` companion crate (at `wireframe_testing/`) provides +in-memory test drivers for `WireframeApp`. It is a `dev-dependency` of the main +crate, not a workspace member. + +Key directories and files: + +- `wireframe_testing/src/helpers/drive.rs` — base `drive_internal()` function + that creates a `tokio::io::duplex`, writes frames to the client half, and + collects response bytes. All higher-level drivers delegate here. +- `wireframe_testing/src/helpers/codec_drive.rs` — codec-aware drivers + (`drive_with_codec_payloads`, `drive_with_codec_frames`) that encode payloads + via a `FrameCodec`, transport via `drive_internal`, and decode responses. +- `wireframe_testing/src/helpers/codec_ext.rs` — `encode_payloads_with_codec` + and `decode_frames_with_codec` utility functions. +- `wireframe_testing/src/helpers.rs` — module root, re-exports all public + helpers, defines `TestSerializer` trait, constants `DEFAULT_CAPACITY` (4096), + `MAX_CAPACITY` (10 MB), `TEST_MAX_FRAME` (4096). +- `wireframe_testing/src/lib.rs` — crate root, re-exports from `helpers`. +- `src/fragment/fragmenter.rs` — `Fragmenter`, `FragmentBatch`, + `FragmentFrame` types for splitting payloads into fragments. +- `src/fragment/payload.rs` — `encode_fragment_payload(header, payload)` and + `decode_fragment_payload(payload)` for fragment wire encoding. +- `src/fragment/header.rs` — `FragmentHeader` with `message_id`, + `fragment_index`, `is_last_fragment`. +- `tests/fixtures/mod.rs` — fixture module index. +- `tests/steps/mod.rs` — step definition module index. +- `tests/scenarios/mod.rs` — scenario binding module index (includes + `#[path = "../steps/mod.rs"] pub(crate) mod steps;`). + +Established BDD pattern (exemplified by `codec_test_harness`): + +1. Feature file: `tests/features/.feature` (Gherkin). +2. World fixture: `tests/fixtures/.rs` (struct + `#[fixture]` + + helper methods; manual `Debug` impl if it holds `WireframeApp`). +3. Step definitions: `tests/steps/_steps.rs` (uses `#[given]`, + `#[when]`, `#[then]` from `rstest_bdd_macros`; async steps use + `tokio::runtime::Runtime::new()?.block_on()`). +4. Scenario bindings: `tests/scenarios/_scenarios.rs` (uses + `#[scenario(path = "...", name = "...")]`; + `#[expect(unused_variables, reason = "rstest-bdd wires steps + via parameters without using them directly")]`). +5. Module registration: add `mod ;` to `tests/fixtures/mod.rs`, + `mod _steps;` to `tests/steps/mod.rs`, `mod _scenarios;` to + `tests/scenarios/mod.rs`. + +## Plan of work + +### Stage A: Scaffolding + +Create empty module files and register them in their respective `mod.rs` files. + +New files to create: + +1. `wireframe_testing/src/helpers/partial_frame.rs` — chunked-write driver. +2. `wireframe_testing/src/helpers/fragment_drive.rs` — fragment-feeding + driver. +3. `tests/features/partial_frame_feeding.feature` — Gherkin scenarios. +4. `tests/fixtures/partial_frame_feeding.rs` — BDD world fixture. +5. `tests/steps/partial_frame_feeding_steps.rs` — BDD step definitions. +6. `tests/scenarios/partial_frame_feeding_scenarios.rs` — scenario bindings. +7. `tests/partial_frame_feeding.rs` — rstest unit tests (integration test + file in main crate). + +Files to modify for registration: + +1. `wireframe_testing/src/helpers.rs` — add `mod partial_frame; mod + fragment_drive;` and re-export public symbols. +2. `wireframe_testing/src/lib.rs` — re-export new public symbols. +3. `tests/fixtures/mod.rs` — add `pub mod partial_frame_feeding;`. +4. `tests/steps/mod.rs` — add `mod partial_frame_feeding_steps;`. +5. `tests/scenarios/mod.rs` — add `mod partial_frame_feeding_scenarios;`. + +### Stage B: Implement chunked-write driver (`partial_frame.rs`) + +This module provides a `drive_chunked_internal` function (analogous to +`drive_internal` in `drive.rs`) and public driver functions that compose codec +encoding with chunked writing. + +#### `drive_chunked_internal` (private to `helpers`) + +```rust +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result> +where + F: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +``` + +Instead of writing each frame via `write_all`, this function concatenates all +wire bytes and writes them `chunk_size` bytes at a time, calling +`write_all(&chunk)` for each slice. This forces the codec decoder on the server +side to buffer partial frames across reads. + +The server-side panic handling mirrors `drive_internal` exactly (using +`catch_unwind` + `wireframe::panic::format_panic`). The client-side loop +replaces the per-frame `write_all` with a chunked iteration: + +```rust +let client_fut = async { + let total = wire_bytes.len(); + let step = chunk_size.get(); + let mut offset = 0; + while offset < total { + let end = (offset + step).min(total); + let chunk = wire_bytes.get(offset..end) + .ok_or_else(|| io::Error::other("chunk slice out of bounds"))?; + client.write_all(chunk).await?; + offset = end; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + io::Result::Ok(buf) +}; +``` + +#### Public API: chunked codec drivers + +Following the naming pattern of `drive_with_codec_payloads`, the new functions +are: + +```rust +/// Drive `app` with payloads encoded by `codec`, writing wire bytes in +/// chunks of `chunk_size` to exercise partial-frame buffering. +pub async fn drive_with_partial_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> + +/// Variant returning full decoded codec frames. +pub async fn drive_with_partial_codec_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result> + +/// Mutable-app variant returning decoded payloads. +pub async fn drive_with_partial_frames_mut( + app: &mut WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> + +/// Variant with explicit duplex buffer capacity. +pub async fn drive_with_partial_frames_with_capacity( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>> +``` + +Each function composes: `encode_payloads_with_codec` → flatten to single +`Vec` → `drive_chunked_internal` → `decode_frames_with_codec` → optionally +`extract_payloads`. + +### Stage C: Implement fragment-feeding driver (`fragment_drive.rs`) + +This module provides functions that fragment a payload and feed the resulting +fragment frames through the app. + +#### Public API + +```rust +/// Fragment `payload` using `fragmenter`, encode each fragment with +/// `encode_fragment_payload`, wrap in codec frames, and drive through `app`. +/// Returns decoded response payloads. +pub async fn drive_with_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> + +/// Variant returning full decoded codec frames. +pub async fn drive_with_fragment_frames( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result> + +/// Mutable-app variant. +pub async fn drive_with_fragments_mut( + app: &mut WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> + +/// Fragment and feed with configurable duplex capacity. +pub async fn drive_with_fragments_with_capacity( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + capacity: usize, +) -> io::Result>> +``` + +Internal composition: `fragmenter.fragment_bytes(payload)` → for each +`FragmentFrame`: `encode_fragment_payload(header, payload)` → collect as +payloads → `encode_payloads_with_codec(codec, payloads)` → +`drive_internal(handler, encoded, capacity)` → +`decode_frames_with_codec(codec, raw)` → optionally `extract_payloads`. + +A combined helper that fragments AND feeds in chunks is also provided: + +```rust +/// Fragment `payload` and feed the resulting wire bytes in chunks of +/// `chunk_size`, exercising both fragmentation and partial-frame +/// buffering simultaneously. +pub async fn drive_with_partial_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + chunk_size: NonZeroUsize, +) -> io::Result>> +``` + +### Stage D: Wire up re-exports + +In `wireframe_testing/src/helpers.rs`: + +- Add `mod partial_frame;` and `mod fragment_drive;`. +- Add `pub use partial_frame::{...};` for all public functions. +- Add `pub use fragment_drive::{...};` for all public functions. + +In `wireframe_testing/src/lib.rs`: + +- Add re-exports for all new public symbols to the existing `pub use + helpers::{…};` block. + +### Stage E: Unit tests (`tests/partial_frame_feeding.rs`) + +Integration test file in the main crate. Uses `rstest` for fixtures and +parameterized cases. Tests: + +1. **Chunked single-byte write round-trips** — encode a payload via + `HotlineFrameCodec`, feed one byte at a time via + `drive_with_partial_frames`, verify decoded payloads match input. +2. **Chunked multi-byte write round-trips** — same with `chunk_size = 7` + (deliberately misaligned with frame boundaries). +3. **Multiple payloads chunked** — encode two payloads, feed in 3-byte + chunks, verify both decoded correctly. +4. **Fragment round-trip** — fragment a 100-byte payload with + `max_fragment_size = 20`, feed via `drive_with_fragments`, verify response + received (app processes the fragment frames). +5. **Partial fragment round-trip** — fragment a payload AND feed in chunks + via `drive_with_partial_fragments`, verify response received. +6. **Mutable app reuse** — call `drive_with_partial_frames_mut` twice with + the same app, verify both succeed. + +### Stage F: BDD infrastructure + +#### Feature file: `tests/features/partial_frame_feeding.feature` + +```gherkin +@partial-frame-feeding +Feature: Partial frame and fragment feeding utilities + + Scenario: Single payload survives byte-at-a-time chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + When a test payload is fed in 1-byte chunks + Then the decoded response payloads are non-empty + + Scenario: Multiple payloads survive misaligned chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + When 2 test payloads are fed in 7-byte chunks + Then the decoded response contains 2 payloads + + Scenario: Fragmented payload is delivered as fragment frames + Given a wireframe app with a Hotline codec allowing 4096-byte frames + And a fragmenter capped at 20 bytes per fragment + When a 100-byte payload is fragmented and fed through the app + Then fragment feeding completed + + Scenario: Fragmented payload survives chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames + And a fragmenter capped at 20 bytes per fragment + When a 100-byte payload is fragmented and fed in 3-byte chunks + Then fragment feeding completed +``` + +#### World fixture: `tests/fixtures/partial_frame_feeding.rs` + +A `PartialFrameFeedingWorld` struct holding: + +- `codec: Option` +- `app: Option>` +- `fragmenter: Option` +- `response_payloads: Vec>` +- `fragment_feeding_completed: bool` + +Manual `Debug` impl (because `WireframeApp` lacks `Debug`). Default impl. +`#[fixture] pub fn partial_frame_feeding_world()` fixture function. + +Helper methods: + +- `configure_app(max_frame_length)` — build app with `HotlineFrameCodec` + and a no-op route handler. +- `configure_fragmenter(max_payload)` — create `Fragmenter`. +- `drive_chunked(chunk_size)` — call `drive_with_partial_frames`. +- `drive_chunked_multiple(count, chunk_size)` — multiple payloads. +- `drive_fragmented(payload_len)` — call `drive_with_fragments`. +- `drive_partial_fragmented(payload_len, chunk_size)` — call + `drive_with_partial_fragments`. +- Assertion helpers: `assert_payloads_non_empty()`, + `assert_payload_count(n)`, `assert_fragment_feeding_completed()`. + +#### Steps: `tests/steps/partial_frame_feeding_steps.rs` + +Step functions delegating to world methods, following the +`codec_test_harness_steps.rs` pattern. Async operations wrapped in +`tokio::runtime::Runtime::new()?.block_on(...)`. + +#### Scenarios: `tests/scenarios/partial_frame_feeding_scenarios.rs` + +One `#[scenario]` function per feature scenario, each with +`#[expect(unused_variables, reason = "rstest-bdd wires steps +via parameters without using them directly")]`. + +### Stage G: Update `docs/users-guide.md` + +Add a new subsection after the existing "Testing custom codecs with +`wireframe_testing`" section (around line 257) titled "Feeding partial frames +and fragments". Document all new public functions with a short code example +showing `drive_with_partial_frames` and `drive_with_fragments`. + +### Stage H: Mark roadmap done + +In `docs/roadmap.md`, change the 8.5.1 line from `- [ ]` to `- [x]`. + +### Stage I: Full validation + +Run `make check-fmt && make lint && make test` and verify all pass. + +## Concrete steps + +All commands run from the repository root `/home/user/project`. + +### Validation before changes + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-lint.log +make test 2>&1 | tee /tmp/wireframe-test.log +``` + +Expected: all three exit 0. + +### After each stage + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log +make lint 2>&1 | tee /tmp/wireframe-lint.log +make test 2>&1 | tee /tmp/wireframe-test.log +``` + +Expected: all three exit 0. + +### Final validation + +```bash +set -o pipefail +make check-fmt 2>&1 | tee /tmp/wireframe-check-fmt.log; echo "check-fmt exit: $?" +make lint 2>&1 | tee /tmp/wireframe-lint.log; echo "lint exit: $?" +make test 2>&1 | tee /tmp/wireframe-test.log; echo "test exit: $?" +``` + +Expected: all three report exit 0. + +## Validation and acceptance + +Quality criteria: + +- Tests: `make test` passes. New tests in `tests/partial_frame_feeding.rs` + and BDD scenarios in `tests/scenarios/partial_frame_feeding_scenarios.rs` all + pass. BDD feature file has at least 4 scenarios. +- Lint/typecheck: `make check-fmt` and `make lint` exit 0 with no warnings. +- Documentation: `docs/users-guide.md` contains a new section documenting + the partial-frame and fragment feeding utilities. +- Roadmap: `docs/roadmap.md` item 8.5.1 is marked `[x]`. + +Quality method: + +- `make check-fmt && make lint && make test` run from the repository root. +- Manual inspection that the new section exists in `docs/users-guide.md`. +- Manual inspection that `docs/roadmap.md` shows `[x]` for 8.5.1. + +## Idempotence and recovery + +All stages are additive (new files and new `pub use` lines). No existing code +is modified except for adding `mod` and `pub use` declarations. Stages can be +re-run by overwriting the created files. If a stage fails validation, fix the +issue and re-run that stage's validation. + +## Artifacts and notes + +Files created (7): + +- `wireframe_testing/src/helpers/partial_frame.rs` +- `wireframe_testing/src/helpers/fragment_drive.rs` +- `tests/features/partial_frame_feeding.feature` +- `tests/fixtures/partial_frame_feeding.rs` +- `tests/steps/partial_frame_feeding_steps.rs` +- `tests/scenarios/partial_frame_feeding_scenarios.rs` +- `tests/partial_frame_feeding.rs` + +Files modified (5): + +- `wireframe_testing/src/helpers.rs` +- `wireframe_testing/src/lib.rs` +- `tests/fixtures/mod.rs` +- `tests/steps/mod.rs` +- `tests/scenarios/mod.rs` + +Documentation files modified (2): + +- `docs/users-guide.md` +- `docs/roadmap.md` + +Total: 14 files touched. + +## Interfaces and dependencies + +All dependencies are already present in `wireframe_testing/Cargo.toml` +(`tokio`, `wireframe`, `bytes`, `futures`, `tokio-util`) and the main crate's +`Cargo.toml` (`rstest`, `rstest-bdd`, `wireframe_testing`). No new dependencies +are required. + +### New public types and functions in `wireframe_testing` + +In `wireframe_testing/src/helpers/partial_frame.rs`: + +```rust +use std::num::NonZeroUsize; + +// Internal (pub(super)): +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>; + +// Public: +pub async fn drive_with_partial_frames(...) -> io::Result>>; +pub async fn drive_with_partial_frames_with_capacity(...) -> io::Result>>; +pub async fn drive_with_partial_frames_mut(...) -> io::Result>>; +pub async fn drive_with_partial_codec_frames(...) -> io::Result>; +``` + +In `wireframe_testing/src/helpers/fragment_drive.rs`: + +```rust +pub async fn drive_with_fragments(...) -> io::Result>>; +pub async fn drive_with_fragments_with_capacity(...) -> io::Result>>; +pub async fn drive_with_fragments_mut(...) -> io::Result>>; +pub async fn drive_with_fragment_frames(...) -> io::Result>; +pub async fn drive_with_partial_fragments(...) -> io::Result>>; +``` + +### Reused existing functions + +- `drive_internal` from `wireframe_testing/src/helpers/drive.rs` — used by + `fragment_drive.rs` for non-chunked fragment feeding. +- `encode_payloads_with_codec` from + `wireframe_testing/src/helpers/codec_ext.rs` — used by both new modules for + codec encoding. +- `decode_frames_with_codec` from + `wireframe_testing/src/helpers/codec_ext.rs` — used by both new modules for + response decoding. +- `extract_payloads` from `wireframe_testing/src/helpers/codec_ext.rs` — + used to convert frames to payload byte vectors. +- `encode_fragment_payload` from `src/fragment/payload.rs` — used by + `fragment_drive.rs` to encode fragment headers and payloads. +- `Fragmenter`, `FragmentBatch`, `FragmentFrame` from + `src/fragment/fragmenter.rs` — used by `fragment_drive.rs`. +- `DEFAULT_CAPACITY` from `wireframe_testing/src/helpers.rs`. diff --git a/docs/roadmap.md b/docs/roadmap.md index 41710933..2df39e97 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -313,7 +313,7 @@ and standardized per-connection memory budgets. ### 8.5. Testkit utilities -- [ ] 8.5.1. Add utilities for feeding partial frames or fragments into an +- [x] 8.5.1. Add utilities for feeding partial frames or fragments into an in-process app. - [ ] 8.5.2. Add slow reader and writer simulation for back-pressure testing. - [ ] 8.5.3. Add deterministic assertion helpers for reassembly outcomes. diff --git a/docs/users-guide.md b/docs/users-guide.md index 72769ac4..89e8c75e 100644 --- a/docs/users-guide.md +++ b/docs/users-guide.md @@ -305,6 +305,67 @@ Available fixture functions: - `correlated_hotline_wire` — frames sharing a transaction ID. - `sequential_hotline_wire` — frames with incrementing transaction IDs. +#### Feeding partial frames and fragments + +Real networks rarely deliver a complete codec frame in a single TCP read. +The `wireframe_testing` crate provides drivers that simulate these +conditions so that codec buffering logic can be exercised in tests. + +**Chunked-write drivers** encode payloads via a codec, concatenate the wire +bytes, and write them in configurable chunk sizes (including one byte at a +time): + +```rust,no_run +use std::num::NonZeroUsize; +use wireframe::app::WireframeApp; +use wireframe::codec::examples::HotlineFrameCodec; +use wireframe_testing::drive_with_partial_frames; + +let codec = HotlineFrameCodec::new(4096); +let app = WireframeApp::new()?.with_codec(codec.clone()); +let chunk = NonZeroUsize::new(1).expect("non-zero"); +let payloads = + drive_with_partial_frames(app, &codec, vec![vec![1, 2, 3]], chunk) + .await?; +``` + +Available chunked-write driver functions: + +- `drive_with_partial_frames` / `drive_with_partial_frames_with_capacity` — + owned app, returns payload bytes. +- `drive_with_partial_frames_mut` — mutable app reference, returns payload + bytes. +- `drive_with_partial_codec_frames` — owned app, returns decoded `F::Frame` + values. + +**Fragment-feeding drivers** accept a raw payload, fragment it with a +`Fragmenter`, encode each fragment into a codec frame, and feed the frames +through the app: + +```rust,no_run +use std::num::NonZeroUsize; +use wireframe::app::WireframeApp; +use wireframe::codec::examples::HotlineFrameCodec; +use wireframe::fragment::Fragmenter; +use wireframe_testing::drive_with_fragments; + +let codec = HotlineFrameCodec::new(4096); +let app = WireframeApp::new()?.with_codec(codec.clone()); +let fragmenter = Fragmenter::new(NonZeroUsize::new(20).unwrap()); +let payloads = + drive_with_fragments(app, &codec, &fragmenter, vec![0; 100]).await?; +``` + +Available fragment-feeding driver functions: + +- `drive_with_fragments` / `drive_with_fragments_with_capacity` — owned + app, returns payload bytes. +- `drive_with_fragments_mut` — mutable app reference, returns payload + bytes. +- `drive_with_fragment_frames` — owned app, returns decoded `F::Frame` + values. +- `drive_with_partial_fragments` — fragment AND feed in chunks, + exercising both fragmentation and partial-frame buffering simultaneously. #### Zero-copy payload extraction For performance-critical codecs, use `Bytes` instead of `Vec` for payload diff --git a/tests/features/partial_frame_feeding.feature b/tests/features/partial_frame_feeding.feature new file mode 100644 index 00000000..0531d77e --- /dev/null +++ b/tests/features/partial_frame_feeding.feature @@ -0,0 +1,24 @@ +@partial-frame-feeding +Feature: Partial frame and fragment feeding utilities + + Scenario: Single payload survives byte-at-a-time chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + When a test payload is fed in 1-byte chunks + Then the partial feeding response payloads are non-empty + + Scenario: Multiple payloads survive misaligned chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + When 2 test payloads are fed in 7-byte chunks + Then the partial feeding response contains 2 payloads + + Scenario: Fragmented payload is delivered as fragment frames + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + And a fragmenter capped at 20 bytes per fragment for partial feeding + When a 100-byte payload is fragmented and fed through the app + Then the fragment feeding completes without error + + Scenario: Fragmented payload survives chunked delivery + Given a wireframe app with a Hotline codec allowing 4096-byte frames for partial feeding + And a fragmenter capped at 20 bytes per fragment for partial feeding + When a 100-byte payload is fragmented and fed in 3-byte chunks + Then the fragment feeding completes without error diff --git a/tests/fixtures/mod.rs b/tests/fixtures/mod.rs index 54c63950..90ea6ebb 100644 --- a/tests/fixtures/mod.rs +++ b/tests/fixtures/mod.rs @@ -30,6 +30,7 @@ pub mod message_assembly; pub mod message_assembly_inbound; pub mod multi_packet; pub mod panic; +pub mod partial_frame_feeding; pub mod request_parts; pub mod serializer_boundaries; pub mod stream_end; diff --git a/tests/fixtures/partial_frame_feeding.rs b/tests/fixtures/partial_frame_feeding.rs new file mode 100644 index 00000000..a3a61477 --- /dev/null +++ b/tests/fixtures/partial_frame_feeding.rs @@ -0,0 +1,206 @@ +//! BDD world fixture for partial frame and fragment feeding scenarios. +//! +//! Tracks application configuration, fragmenter state, and collected +//! responses across behavioural test steps. + +use std::{num::NonZeroUsize, sync::Arc}; + +use futures::future::BoxFuture; +use rstest::fixture; +use wireframe::{ + app::{Envelope, WireframeApp}, + codec::examples::HotlineFrameCodec, + fragment::Fragmenter, + serializer::{BincodeSerializer, Serializer}, +}; +/// Re-export `TestResult` from `wireframe_testing` for use in steps. +pub use wireframe_testing::TestResult; + +/// BDD world holding the app, codec, fragmenter, and collected responses. +/// +/// `WireframeApp` does not implement `Debug`, so this type provides a manual +/// implementation that redacts the app field. +#[derive(Default)] +pub struct PartialFrameFeedingWorld { + codec: Option, + app: Option>, + fragmenter: Option, + response_payloads: Vec>, + fragment_feeding_completed: bool, +} + +impl std::fmt::Debug for PartialFrameFeedingWorld { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PartialFrameFeedingWorld") + .field("codec", &self.codec) + .field("app", &self.app.as_ref().map(|_| "..")) + .field("fragmenter", &self.fragmenter) + .field("response_payloads", &self.response_payloads.len()) + .field( + "fragment_feeding_completed", + &self.fragment_feeding_completed, + ) + .finish() + } +} + +/// Fixture for partial frame feeding scenarios used by rstest-bdd steps. +/// +/// Note: rustfmt collapses simple fixtures into one line, which triggers +/// `unused_braces`, so keep `rustfmt::skip`. +#[rustfmt::skip] +#[fixture] +pub fn partial_frame_feeding_world() -> PartialFrameFeedingWorld { + PartialFrameFeedingWorld::default() +} + +impl PartialFrameFeedingWorld { + /// Configure the app with a `HotlineFrameCodec`. + /// + /// # Errors + /// Returns an error if the app or route registration fails. + pub fn configure_app(&mut self, max_frame_length: usize) -> TestResult { + let codec = HotlineFrameCodec::new(max_frame_length); + let app = WireframeApp::::new()? + .with_codec(codec.clone()) + .route( + 1, + Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), + )?; + self.codec = Some(codec); + self.app = Some(app); + Ok(()) + } + + /// Configure a fragmenter with the given maximum payload per fragment. + /// + /// # Errors + /// Returns an error if the payload cap is zero. + pub fn configure_fragmenter(&mut self, max_payload: usize) -> TestResult { + let cap = NonZeroUsize::new(max_payload).ok_or("fragment payload cap must be non-zero")?; + self.fragmenter = Some(Fragmenter::new(cap)); + Ok(()) + } + + /// Drive the app with a single payload chunked into `chunk_size` pieces. + /// + /// # Errors + /// Returns an error if the app or codec is not configured, or driving fails. + pub async fn drive_chunked(&mut self, chunk_size: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let env = Envelope::new(1, Some(7), b"bdd-chunked".to_vec()); + let serialized = BincodeSerializer.serialize(&env)?; + + self.response_payloads = + wireframe_testing::drive_with_partial_frames(app, codec, vec![serialized], chunk) + .await?; + Ok(()) + } + + /// Drive the app with `count` payloads chunked into `chunk_size` pieces. + /// + /// # Errors + /// Returns an error if the app or codec is not configured, or driving fails. + pub async fn drive_chunked_multiple(&mut self, count: usize, chunk_size: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let mut payloads = Vec::with_capacity(count); + for i in 0..count { + let byte = u8::try_from(i) + .map_err(|_| format!("payload index {i} exceeds u8 range; use count <= 256"))?; + let env = Envelope::new(1, Some(7), vec![byte]); + payloads.push(BincodeSerializer.serialize(&env)?); + } + + self.response_payloads = + wireframe_testing::drive_with_partial_frames(app, codec, payloads, chunk).await?; + Ok(()) + } + + /// Drive the app with a fragmented payload. + /// + /// # Errors + /// Returns an error if the app, codec, or fragmenter is not configured. + pub async fn drive_fragmented(&mut self, payload_len: usize) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let fragmenter = self + .fragmenter + .as_ref() + .ok_or("fragmenter not configured")?; + + let _payloads = + wireframe_testing::drive_with_fragments(app, codec, fragmenter, vec![0; payload_len]) + .await?; + self.fragment_feeding_completed = true; + Ok(()) + } + + /// Drive the app with a fragmented payload fed in chunks. + /// + /// # Errors + /// Returns an error if the app, codec, or fragmenter is not configured. + pub async fn drive_partial_fragmented( + &mut self, + payload_len: usize, + chunk_size: usize, + ) -> TestResult { + let app = self.app.take().ok_or("app not configured")?; + let codec = self.codec.as_ref().ok_or("codec not configured")?; + let fragmenter = self + .fragmenter + .as_ref() + .ok_or("fragmenter not configured")?; + let chunk = NonZeroUsize::new(chunk_size).ok_or("chunk size must be non-zero")?; + + let _payloads = wireframe_testing::drive_with_partial_fragments( + app, + codec, + fragmenter, + vec![0; payload_len], + chunk, + ) + .await?; + self.fragment_feeding_completed = true; + Ok(()) + } + + /// Assert that fragment feeding completed without error. + /// + /// # Errors + /// Returns an error if fragment feeding has not been attempted or failed. + pub fn assert_fragment_feeding_completed(&self) -> TestResult { + if !self.fragment_feeding_completed { + return Err("fragment feeding has not completed successfully".into()); + } + Ok(()) + } + + /// Assert that response payloads are non-empty. + /// + /// # Errors + /// Returns an error if no response payloads were collected. + pub fn assert_payloads_non_empty(&self) -> TestResult { + if self.response_payloads.is_empty() { + return Err("expected non-empty response payloads".into()); + } + Ok(()) + } + + /// Assert that the response contains exactly `expected` payloads. + /// + /// # Errors + /// Returns an error if the payload count does not match. + pub fn assert_payload_count(&self, expected: usize) -> TestResult { + let actual = self.response_payloads.len(); + if actual != expected { + return Err(format!("expected {expected} response payloads, got {actual}").into()); + } + Ok(()) + } +} diff --git a/tests/partial_frame_feeding.rs b/tests/partial_frame_feeding.rs new file mode 100644 index 00000000..f01cdbd0 --- /dev/null +++ b/tests/partial_frame_feeding.rs @@ -0,0 +1,231 @@ +//! Integration tests for partial-frame and fragment feeding utilities in +//! `wireframe_testing`. +#![cfg(not(loom))] + +use std::{io, num::NonZeroUsize, sync::Arc}; + +use futures::future::BoxFuture; +use wireframe::{ + app::{Envelope, WireframeApp}, + codec::examples::HotlineFrameCodec, + fragment::Fragmenter, + serializer::{BincodeSerializer, Serializer}, +}; +use wireframe_testing::{ + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_partial_codec_frames, + drive_with_partial_fragments, + drive_with_partial_frames, + drive_with_partial_frames_mut, +}; + +fn hotline_codec() -> HotlineFrameCodec { HotlineFrameCodec::new(4096) } + +fn build_echo_app( + codec: HotlineFrameCodec, +) -> io::Result> { + WireframeApp::::new() + .map_err(|e| io::Error::other(format!("app init: {e}")))? + .with_codec(codec) + .route( + 1, + Arc::new(|_: &Envelope| -> BoxFuture<'static, ()> { Box::pin(async {}) }), + ) + .map_err(|e| io::Error::other(format!("route: {e}"))) +} + +fn serialize_envelope(payload: &[u8]) -> io::Result> { + let env = Envelope::new(1, Some(7), payload.to_vec()); + BincodeSerializer + .serialize(&env) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("serialize: {e}"))) +} + +// --------------------------------------------------------------------------- +// Chunked-write (partial frame) tests +// --------------------------------------------------------------------------- + +fn deserialize_envelope(bytes: &[u8]) -> io::Result { + let (env, _) = BincodeSerializer + .deserialize::(bytes) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("deserialize: {e}")))?; + Ok(env) +} + +async fn test_partial_frames_with_chunk(payload: &[u8], chunk_size: usize) -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(payload)?; + let chunk = NonZeroUsize::new(chunk_size).ok_or_else(|| io::Error::other("non-zero"))?; + + let response = drive_with_partial_frames(app, &codec, vec![serialized], chunk).await?; + + if response.is_empty() { + return Err(io::Error::other("expected non-empty response payloads")); + } + + let expected = Envelope::new(1, Some(7), payload.to_vec()); + for (idx, bytes) in response.iter().enumerate() { + let env = deserialize_envelope(bytes)?; + if env != expected { + return Err(io::Error::other(format!( + "envelope mismatch at index {idx}: expected {expected:?}, got {env:?}" + ))); + } + } + + Ok(()) +} + +#[tokio::test] +async fn partial_frames_single_byte_chunks() -> io::Result<()> { + test_partial_frames_with_chunk(&[10, 20, 30], 1).await +} + +#[tokio::test] +async fn partial_frames_misaligned_chunks() -> io::Result<()> { + test_partial_frames_with_chunk(&[1, 2, 3, 4, 5], 7).await +} + +#[tokio::test] +async fn partial_frames_multiple_payloads() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let p1 = serialize_envelope(&[1])?; + let p2 = serialize_envelope(&[2])?; + let chunk = NonZeroUsize::new(3).ok_or_else(|| io::Error::other("non-zero"))?; + + let response = drive_with_partial_frames(app, &codec, vec![p1, p2], chunk).await?; + + if response.len() != 2 { + return Err(io::Error::other(format!( + "expected 2 response payloads, got {}", + response.len() + ))); + } + + let expected_first = Envelope::new(1, Some(7), vec![1]); + let expected_second = Envelope::new(1, Some(7), vec![2]); + + let first = deserialize_envelope( + response + .first() + .ok_or_else(|| io::Error::other("missing first payload"))?, + )?; + let second = deserialize_envelope( + response + .get(1) + .ok_or_else(|| io::Error::other("missing second payload"))?, + )?; + + if first != expected_first { + return Err(io::Error::other(format!( + "first payload mismatch: expected {expected_first:?}, got {first:?}" + ))); + } + if second != expected_second { + return Err(io::Error::other(format!( + "second payload mismatch: expected {expected_second:?}, got {second:?}" + ))); + } + Ok(()) +} + +#[tokio::test] +async fn partial_codec_frames_preserves_metadata() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[42])?; + let chunk = NonZeroUsize::new(5).ok_or_else(|| io::Error::other("non-zero"))?; + + let frames = drive_with_partial_codec_frames(app, &codec, vec![serialized], chunk).await?; + + let frame = frames + .first() + .ok_or_else(|| io::Error::other("expected at least one response frame"))?; + if frame.transaction_id != 0 { + return Err(io::Error::other(format!( + "wrap_payload should assign transaction_id 0, got {}", + frame.transaction_id + ))); + } + Ok(()) +} + +#[tokio::test] +async fn partial_frames_mut_allows_reuse() -> io::Result<()> { + let codec = hotline_codec(); + let mut app = build_echo_app(codec.clone())?; + let serialized = serialize_envelope(&[1])?; + let chunk = NonZeroUsize::new(2).ok_or_else(|| io::Error::other("non-zero"))?; + + let first = + drive_with_partial_frames_mut(&mut app, &codec, vec![serialized.clone()], chunk).await?; + if first.is_empty() { + return Err(io::Error::other("first call should produce output")); + } + + let second = drive_with_partial_frames_mut(&mut app, &codec, vec![serialized], chunk).await?; + if second.is_empty() { + return Err(io::Error::other("second call should produce output")); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Fragment feeding tests +// --------------------------------------------------------------------------- + +// Fragment payloads are FRAG-prefixed bytes wrapped in serialized Envelopes. +// The app deserializes each frame successfully, but the no-op route handler +// does not produce a response. Verifying no I/O error confirms the full +// transport pipeline (fragment → envelope → encode → transport → decode). + +#[tokio::test] +async fn fragment_round_trip() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _payloads = drive_with_fragments(app, &codec, &fragmenter, vec![0; 100]).await?; + Ok(()) +} + +#[tokio::test] +async fn fragment_frames_returns_codec_frames() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _frames = drive_with_fragment_frames(app, &codec, &fragmenter, vec![0; 50]).await?; + Ok(()) +} + +#[tokio::test] +async fn fragment_mut_allows_reuse() -> io::Result<()> { + let codec = hotline_codec(); + let mut app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(30).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + + let _first = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 50]).await?; + let _second = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 30]).await?; + Ok(()) +} + +#[tokio::test] +async fn partial_fragments_combines_both() -> io::Result<()> { + let codec = hotline_codec(); + let app = build_echo_app(codec.clone())?; + let cap = NonZeroUsize::new(20).ok_or_else(|| io::Error::other("non-zero"))?; + let fragmenter = Fragmenter::new(cap); + let chunk = NonZeroUsize::new(3).ok_or_else(|| io::Error::other("non-zero"))?; + + let _payloads = + drive_with_partial_fragments(app, &codec, &fragmenter, vec![0; 100], chunk).await?; + Ok(()) +} diff --git a/tests/scenarios/mod.rs b/tests/scenarios/mod.rs index e01e615e..5058aec3 100644 --- a/tests/scenarios/mod.rs +++ b/tests/scenarios/mod.rs @@ -33,6 +33,7 @@ mod message_assembly_inbound_scenarios; mod message_assembly_scenarios; mod multi_packet_scenarios; mod panic_scenarios; +mod partial_frame_feeding_scenarios; mod request_parts_scenarios; mod serializer_boundaries_scenarios; mod stream_end_scenarios; diff --git a/tests/scenarios/partial_frame_feeding_scenarios.rs b/tests/scenarios/partial_frame_feeding_scenarios.rs new file mode 100644 index 00000000..7153212e --- /dev/null +++ b/tests/scenarios/partial_frame_feeding_scenarios.rs @@ -0,0 +1,45 @@ +//! Scenario tests for partial frame and fragment feeding behaviours. + +use rstest_bdd_macros::scenario; + +use crate::fixtures::partial_frame_feeding::*; + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Single payload survives byte-at-a-time chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn single_payload_byte_chunks(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Multiple payloads survive misaligned chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn multiple_payloads_misaligned(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Fragmented payload is delivered as fragment frames" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn fragmented_payload_delivery(partial_frame_feeding_world: PartialFrameFeedingWorld) {} + +#[scenario( + path = "tests/features/partial_frame_feeding.feature", + name = "Fragmented payload survives chunked delivery" +)] +#[expect( + unused_variables, + reason = "rstest-bdd wires steps via parameters without using them directly" +)] +fn fragmented_payload_chunked(partial_frame_feeding_world: PartialFrameFeedingWorld) {} diff --git a/tests/steps/mod.rs b/tests/steps/mod.rs index fd2fc2a8..4c3e66de 100644 --- a/tests/steps/mod.rs +++ b/tests/steps/mod.rs @@ -29,6 +29,7 @@ mod message_assembly_inbound_steps; mod message_assembly_steps; mod multi_packet_steps; mod panic_steps; +mod partial_frame_feeding_steps; mod request_parts_steps; mod serializer_boundaries_steps; mod stream_end_steps; diff --git a/tests/steps/partial_frame_feeding_steps.rs b/tests/steps/partial_frame_feeding_steps.rs new file mode 100644 index 00000000..fe4010a2 --- /dev/null +++ b/tests/steps/partial_frame_feeding_steps.rs @@ -0,0 +1,86 @@ +//! Step definitions for partial frame and fragment feeding behavioural tests. + +use rstest_bdd_macros::{given, then, when}; + +use crate::fixtures::partial_frame_feeding::{PartialFrameFeedingWorld, TestResult}; + +#[given( + "a wireframe app with a Hotline codec allowing {max_frame_length:usize}-byte frames for \ + partial feeding" +)] +fn given_app_with_codec( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + max_frame_length: usize, +) -> TestResult { + partial_frame_feeding_world.configure_app(max_frame_length) +} + +#[given("a fragmenter capped at {max_payload:usize} bytes per fragment for partial feeding")] +fn given_fragmenter( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + max_payload: usize, +) -> TestResult { + partial_frame_feeding_world.configure_fragmenter(max_payload) +} + +#[when("a test payload is fed in {chunk_size:usize}-byte chunks")] +fn when_single_payload_chunked( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_chunked(chunk_size)) +} + +#[when("{count:usize} test payloads are fed in {chunk_size:usize}-byte chunks")] +fn when_multiple_payloads_chunked( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + count: usize, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_chunked_multiple(count, chunk_size)) +} + +#[when("a {payload_len:usize}-byte payload is fragmented and fed through the app")] +fn when_fragmented( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + payload_len: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_fragmented(payload_len)) +} + +#[when( + "a {payload_len:usize}-byte payload is fragmented and fed in {chunk_size:usize}-byte chunks" +)] +fn when_partial_fragmented( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + payload_len: usize, + chunk_size: usize, +) -> TestResult { + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(partial_frame_feeding_world.drive_partial_fragmented(payload_len, chunk_size)) +} + +#[then("the partial feeding response payloads are non-empty")] +fn then_payloads_non_empty( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, +) -> TestResult { + partial_frame_feeding_world.assert_payloads_non_empty() +} + +#[then("the partial feeding response contains {expected:usize} payloads")] +fn then_payload_count( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, + expected: usize, +) -> TestResult { + partial_frame_feeding_world.assert_payload_count(expected) +} + +#[then("the fragment feeding completes without error")] +fn then_fragment_completed( + partial_frame_feeding_world: &mut PartialFrameFeedingWorld, +) -> TestResult { + partial_frame_feeding_world.assert_fragment_feeding_completed() +} diff --git a/wireframe_testing/src/helpers.rs b/wireframe_testing/src/helpers.rs index 516970ee..cfa33cb1 100644 --- a/wireframe_testing/src/helpers.rs +++ b/wireframe_testing/src/helpers.rs @@ -14,6 +14,8 @@ mod codec_drive; mod codec_ext; mod codec_fixtures; mod drive; +mod fragment_drive; +mod partial_frame; mod payloads; mod runtime; @@ -80,5 +82,18 @@ pub use drive::{ drive_with_frames_with_capacity, drive_with_frames_with_capacity_mut, }; +pub use fragment_drive::{ + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_fragments_with_capacity, + drive_with_partial_fragments, +}; +pub use partial_frame::{ + drive_with_partial_codec_frames, + drive_with_partial_frames, + drive_with_partial_frames_mut, + drive_with_partial_frames_with_capacity, +}; pub use payloads::{drive_with_bincode, drive_with_payloads, drive_with_payloads_mut}; pub use runtime::{run_app, run_with_duplex_server}; diff --git a/wireframe_testing/src/helpers/fragment_drive.rs b/wireframe_testing/src/helpers/fragment_drive.rs new file mode 100644 index 00000000..f8dd9d59 --- /dev/null +++ b/wireframe_testing/src/helpers/fragment_drive.rs @@ -0,0 +1,361 @@ +//! Fragment-aware in-memory driving helpers. +//! +//! These functions fragment a payload using a [`Fragmenter`], encode each +//! fragment via [`encode_fragment_payload`], wrap the `FRAG`-prefixed bytes +//! inside a serialized [`Envelope`] packet, and feed the results through a +//! [`WireframeApp`] as codec frames. Wrapping in an `Envelope` ensures the +//! application's deserializer accepts the frames instead of accumulating +//! consecutive deserialization failures. + +use std::{io, num::NonZeroUsize}; + +use tokio::io::DuplexStream; +use wireframe::{ + app::{Envelope, Packet, WireframeApp}, + codec::FrameCodec, + fragment::{Fragmenter, encode_fragment_payload}, + serializer::{BincodeSerializer, Serializer}, +}; + +use super::{ + DEFAULT_CAPACITY, + TestSerializer, + codec_ext::{decode_frames_with_codec, encode_payloads_with_codec, extract_payloads}, + drive::drive_internal, + partial_frame::drive_chunked_internal, +}; + +// --------------------------------------------------------------------------- +// Shared fragment encoding +// --------------------------------------------------------------------------- + +/// Fragment `payload`, encode each fragment, and wrap the result in +/// serialized [`Envelope`] packets so the app's deserializer accepts them. +/// +/// Each fragment is encoded via [`encode_fragment_payload`] into +/// `FRAG`-prefixed bytes, then placed as the `payload` field of an +/// `Envelope` and serialized with [`BincodeSerializer`]. This matches +/// the inbound path the application expects: deserialize the codec frame +/// payload as an `Envelope`, then hand it to the fragment reassembler. +fn fragment_and_encode( + fragmenter: &Fragmenter, + payload: Vec, + route_id: u32, +) -> io::Result>> { + let batch = fragmenter.fragment_bytes(payload).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("fragmentation failed: {err}"), + ) + })?; + batch + .into_iter() + .map(|frame| { + let (header, body) = frame.into_parts(); + let frag_bytes = encode_fragment_payload(header, &body).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("fragment encoding failed: {err}"), + ) + })?; + let env = Envelope::new(route_id, None, frag_bytes); + BincodeSerializer.serialize(&env).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("envelope serialization failed: {err}"), + ) + }) + }) + .collect() +} + +// --------------------------------------------------------------------------- +// Shared internal helper +// --------------------------------------------------------------------------- + +/// Default route identifier used when wrapping fragment payloads in +/// [`Envelope`] packets. Matches the route typically registered by test +/// applications built with `WireframeApp::route(1, ...)`. +const DEFAULT_FRAGMENT_ROUTE_ID: u32 = 1; + +/// Bundles the fragmenter, payload, and duplex buffer capacity needed by +/// [`drive_fragments_internal`]. +struct FragmentRequest<'a> { + /// Fragmenter that splits the payload into fragment frames. + fragmenter: &'a Fragmenter, + /// Raw payload bytes to fragment and feed. + payload: Vec, + /// Route identifier for the wrapping [`Envelope`]. + route_id: u32, + /// Duplex stream buffer capacity. + capacity: usize, +} + +impl<'a> FragmentRequest<'a> { + /// Create a request with the default duplex buffer capacity. + fn new(fragmenter: &'a Fragmenter, payload: Vec) -> Self { + Self { + fragmenter, + payload, + route_id: DEFAULT_FRAGMENT_ROUTE_ID, + capacity: DEFAULT_CAPACITY, + } + } + + /// Override the duplex buffer capacity. + fn with_capacity(mut self, capacity: usize) -> Self { + self.capacity = capacity; + self + } +} + +/// Fragment, encode, transport, and decode — returning full codec frames. +async fn drive_fragments_internal( + handler: H, + codec: &F, + request: FragmentRequest<'_>, +) -> io::Result> +where + F: FrameCodec, + H: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let serialized_envelopes = + fragment_and_encode(request.fragmenter, request.payload, request.route_id)?; + let encoded = encode_payloads_with_codec(codec, serialized_envelopes)?; + let raw = drive_internal(handler, encoded, request.capacity).await?; + decode_frames_with_codec(codec, raw) +} + +// --------------------------------------------------------------------------- +// Payload-level drivers (return Vec>) +// --------------------------------------------------------------------------- + +/// Fragment `payload`, encode each fragment into a codec frame, and drive +/// through `app`. Returns decoded response payloads. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); +/// let payloads = drive_with_fragments(app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_with_fragments_with_capacity(app, codec, fragmenter, payload, DEFAULT_CAPACITY).await +} + +/// Fragment and feed with a duplex buffer of `capacity` bytes. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments_with_capacity; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); +/// let payloads = +/// drive_with_fragments_with_capacity(app, &codec, &fragmenter, vec![0; 50], 8192).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments_with_capacity( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + capacity: usize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + FragmentRequest::new(fragmenter, payload).with_capacity(capacity), + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +/// Fragment and feed through a mutable `app`. +/// +/// The mutable reference allows the app instance to be reused across +/// successive calls. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragments_mut; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let mut app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); +/// let payloads = drive_with_fragments_mut(&mut app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragments_mut( + app: &mut WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + FragmentRequest::new(fragmenter, payload), + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +// --------------------------------------------------------------------------- +// Frame-level driver (returns Vec) +// --------------------------------------------------------------------------- + +/// Fragment and feed through `app`, returning decoded response frames. +/// +/// Unlike the payload-level drivers, this variant returns the full codec +/// frames so tests can inspect frame-level metadata. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_fragment_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); +/// let frames = drive_with_fragment_frames(app, &codec, &fragmenter, vec![0; 50]).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_fragment_frames( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_fragments_internal( + |server| async move { app.handle_connection(server).await }, + codec, + FragmentRequest::new(fragmenter, payload), + ) + .await +} + +// --------------------------------------------------------------------------- +// Combined: fragment + chunked delivery +// --------------------------------------------------------------------------- + +/// Fragment `payload` and feed the wire bytes in chunks of `chunk_size`, +/// exercising both fragmentation and partial-frame buffering simultaneously. +/// +/// # Errors +/// +/// Returns any I/O, fragmentation, or codec error encountered during +/// encoding, transport, or decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe::fragment::Fragmenter; +/// # use wireframe_testing::drive_with_partial_fragments; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let fragmenter = Fragmenter::new(NonZeroUsize::new(20).expect("non-zero")); +/// let chunk = NonZeroUsize::new(3).expect("non-zero"); +/// let payloads = +/// drive_with_partial_fragments(app, &codec, &fragmenter, vec![0; 50], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_fragments( + app: WireframeApp, + codec: &F, + fragmenter: &Fragmenter, + payload: Vec, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let serialized_envelopes = fragment_and_encode(fragmenter, payload, DEFAULT_FRAGMENT_ROUTE_ID)?; + let encoded = encode_payloads_with_codec(codec, serialized_envelopes)?; + let wire_bytes: Vec = encoded.into_iter().flatten().collect(); + let raw = drive_chunked_internal( + |server| async move { app.handle_connection(server).await }, + wire_bytes, + chunk_size, + DEFAULT_CAPACITY, + ) + .await?; + let frames = decode_frames_with_codec(codec, raw)?; + Ok(extract_payloads::(&frames)) +} diff --git a/wireframe_testing/src/helpers/partial_frame.rs b/wireframe_testing/src/helpers/partial_frame.rs new file mode 100644 index 00000000..a3bca30a --- /dev/null +++ b/wireframe_testing/src/helpers/partial_frame.rs @@ -0,0 +1,333 @@ +//! Chunked-write in-memory driving helpers. +//! +//! These functions extend the frame-oriented drivers in [`super::drive`] with +//! configurable chunk sizes, forcing the codec decoder on the server side to +//! buffer partial frames across reads. This exercises realistic network +//! conditions where a single codec frame may arrive across multiple TCP reads. + +use std::{io, num::NonZeroUsize}; + +use tokio::io::{AsyncReadExt, AsyncWriteExt, DuplexStream, duplex}; +use wireframe::{ + app::{Packet, WireframeApp}, + codec::FrameCodec, +}; + +use super::{ + DEFAULT_CAPACITY, + TestSerializer, + codec_ext::{decode_frames_with_codec, encode_payloads_with_codec, extract_payloads}, +}; + +/// Configuration for chunked-write delivery. +/// +/// Combines the chunk size (how many bytes to write per call) with the +/// duplex buffer capacity used by the in-memory stream. +#[derive(Debug, Clone, Copy)] +pub(super) struct ChunkConfig { + /// Number of bytes per write call. + pub chunk_size: NonZeroUsize, + /// Duplex stream buffer capacity. + pub capacity: usize, +} + +impl ChunkConfig { + /// Create a configuration with the given chunk size and the default + /// duplex buffer capacity. + pub fn new(chunk_size: NonZeroUsize) -> Self { + Self { + chunk_size, + capacity: DEFAULT_CAPACITY, + } + } + + /// Create a configuration with an explicit duplex buffer capacity. + pub fn with_capacity(chunk_size: NonZeroUsize, capacity: usize) -> Self { + Self { + chunk_size, + capacity, + } + } +} + +/// Drive a server function by writing `wire_bytes` in chunks of +/// `chunk_size` bytes, forcing partial-frame reads on the server side. +/// +/// This mirrors [`super::drive::drive_internal`] but replaces per-frame +/// `write_all` calls with a chunked iteration that slices the concatenated +/// wire bytes into fixed-size pieces. +/// +/// This function is `pub(super)` and not exported from the crate. Use one +/// of the public `drive_with_partial_*` wrappers instead. +/// +/// ```rust,ignore +/// async fn echo(mut s: DuplexStream) { let _ = s.write_all(&[1, 2]).await; } +/// +/// let out = drive_chunked_internal( +/// echo, +/// vec![0], +/// NonZeroUsize::new(1).expect("non-zero"), +/// 64, +/// ) +/// .await?; +/// assert_eq!(out, [1, 2]); +/// ``` +pub(super) async fn drive_chunked_internal( + server_fn: F, + wire_bytes: Vec, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result> +where + F: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let (mut client, server) = duplex(capacity); + + let server_fut = async { + use futures::FutureExt as _; + let result = std::panic::AssertUnwindSafe(server_fn(server)) + .catch_unwind() + .await; + match result { + Ok(()) => Ok(()), + Err(panic) => { + let panic_msg = wireframe::panic::format_panic(&panic); + Err(io::Error::new( + io::ErrorKind::Other, + format!("server task failed: {panic_msg}"), + )) + } + } + }; + + let client_fut = async { + let total = wire_bytes.len(); + let step = chunk_size.get(); + let mut offset = 0; + while offset < total { + let end = (offset + step).min(total); + let chunk = wire_bytes + .get(offset..end) + .ok_or_else(|| io::Error::other("chunk slice out of bounds"))?; + client.write_all(chunk).await?; + offset = end; + } + client.shutdown().await?; + + let mut buf = Vec::new(); + client.read_to_end(&mut buf).await?; + io::Result::Ok(buf) + }; + + let ((), buf) = tokio::try_join!(server_fut, client_fut)?; + Ok(buf) +} + +// --------------------------------------------------------------------------- +// Shared internal helper +// --------------------------------------------------------------------------- + +/// Encode payloads, chunk the wire bytes, drive the server, and decode +/// response frames. +async fn drive_partial_frames_internal( + handler: H, + codec: &F, + payloads: Vec>, + config: ChunkConfig, +) -> io::Result> +where + F: FrameCodec, + H: FnOnce(DuplexStream) -> Fut, + Fut: std::future::Future + Send, +{ + let encoded = encode_payloads_with_codec(codec, payloads)?; + let wire_bytes: Vec = encoded.into_iter().flatten().collect(); + let raw = + drive_chunked_internal(handler, wire_bytes, config.chunk_size, config.capacity).await?; + decode_frames_with_codec(codec, raw) +} + +// --------------------------------------------------------------------------- +// Payload-level drivers (return Vec>) +// --------------------------------------------------------------------------- + +/// Drive `app` with payloads encoded by `codec`, writing wire bytes in +/// chunks of `chunk_size` to exercise partial-frame buffering. +/// +/// Each input payload is encoded through the codec, and the resulting wire +/// bytes are concatenated and written `chunk_size` bytes at a time. The +/// server's responses are decoded and returned as payload byte vectors. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(1).expect("non-zero"); +/// let payloads = drive_with_partial_frames(app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_with_partial_frames_with_capacity(app, codec, payloads, chunk_size, DEFAULT_CAPACITY) + .await +} + +/// Drive `app` with payloads in chunks using a duplex buffer of `capacity` +/// bytes. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames_with_capacity; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(3).expect("non-zero"); +/// let payloads = +/// drive_with_partial_frames_with_capacity(app, &codec, vec![vec![1]], chunk, 8192).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames_with_capacity( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, + capacity: usize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + ChunkConfig::with_capacity(chunk_size, capacity), + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +/// Drive a mutable `app` with payloads in chunks of `chunk_size`. +/// +/// The mutable reference allows the app instance to be reused across +/// successive calls. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_frames_mut; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let mut app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(5).expect("non-zero"); +/// let payloads = drive_with_partial_frames_mut(&mut app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_frames_mut( + app: &mut WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result>> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + let frames = drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + ChunkConfig::new(chunk_size), + ) + .await?; + Ok(extract_payloads::(&frames)) +} + +// --------------------------------------------------------------------------- +// Frame-level driver (returns Vec) +// --------------------------------------------------------------------------- + +/// Drive `app` with payloads in chunks and return decoded response frames. +/// +/// Unlike the payload-level drivers, this variant returns the full codec +/// frames so tests can inspect frame-level metadata such as transaction +/// identifiers or sequence numbers. +/// +/// # Errors +/// +/// Returns any I/O or codec error encountered during encoding, transport, or +/// decoding. +/// +/// ```rust +/// # use std::num::NonZeroUsize; +/// # use wireframe::app::WireframeApp; +/// # use wireframe::codec::examples::HotlineFrameCodec; +/// # use wireframe_testing::drive_with_partial_codec_frames; +/// # async fn demo() -> std::io::Result<()> { +/// let codec = HotlineFrameCodec::new(4096); +/// let app = WireframeApp::new().expect("app").with_codec(codec.clone()); +/// let chunk = NonZeroUsize::new(2).expect("non-zero"); +/// let frames = drive_with_partial_codec_frames(app, &codec, vec![vec![1]], chunk).await?; +/// # Ok(()) +/// # } +/// ``` +pub async fn drive_with_partial_codec_frames( + app: WireframeApp, + codec: &F, + payloads: Vec>, + chunk_size: NonZeroUsize, +) -> io::Result> +where + S: TestSerializer, + C: Send + 'static, + E: Packet, + F: FrameCodec, +{ + drive_partial_frames_internal( + |server| async move { app.handle_connection(server).await }, + codec, + payloads, + ChunkConfig::new(chunk_size), + ) + .await +} diff --git a/wireframe_testing/src/lib.rs b/wireframe_testing/src/lib.rs index 150bafd8..ca56ffc8 100644 --- a/wireframe_testing/src/lib.rs +++ b/wireframe_testing/src/lib.rs @@ -43,12 +43,21 @@ pub use helpers::{ drive_with_codec_payloads_mut, drive_with_codec_payloads_with_capacity, drive_with_codec_payloads_with_capacity_mut, + drive_with_fragment_frames, + drive_with_fragments, + drive_with_fragments_mut, + drive_with_fragments_with_capacity, drive_with_frame, drive_with_frame_mut, drive_with_frame_with_capacity, drive_with_frames, drive_with_frames_mut, drive_with_frames_with_capacity, + drive_with_partial_codec_frames, + drive_with_partial_fragments, + drive_with_partial_frames, + drive_with_partial_frames_mut, + drive_with_partial_frames_with_capacity, drive_with_payloads, drive_with_payloads_mut, encode_frame,