diff --git a/src/Std/Internal/Http/Data.lean b/src/Std/Internal/Http/Data.lean index bdc4c40805cb..45a355523bd8 100644 --- a/src/Std/Internal/Http/Data.lean +++ b/src/Std/Internal/Http/Data.lean @@ -14,6 +14,7 @@ public import Std.Internal.Http.Data.Status public import Std.Internal.Http.Data.Chunk public import Std.Internal.Http.Data.Headers public import Std.Internal.Http.Data.URI +public import Std.Internal.Http.Data.Body /-! # HTTP Data Types diff --git a/src/Std/Internal/Http/Data/Body.lean b/src/Std/Internal/Http/Data/Body.lean new file mode 100644 index 000000000000..31a0524d0541 --- /dev/null +++ b/src/Std/Internal/Http/Data/Body.lean @@ -0,0 +1,17 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async.ContextAsync +public import Std.Internal.Http.Data.Headers +public import Std.Internal.Http.Data.Body.Basic +public import Std.Internal.Http.Data.Body.Length +public import Std.Internal.Http.Data.Body.Reader +public import Std.Internal.Http.Data.Body.Writer +public import Std.Internal.Http.Data.Body.Stream +public import Std.Internal.Http.Data.Body.Empty +public import Std.Internal.Http.Data.Body.Full diff --git a/src/Std/Internal/Http/Data/Body/Basic.lean b/src/Std/Internal/Http/Data/Body/Basic.lean new file mode 100644 index 000000000000..32c0597777d3 --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Basic.lean @@ -0,0 +1,61 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async.ContextAsync +public import Std.Internal.Http.Data.Headers +public import Std.Internal.Http.Data.Body.Length + +public section + +/-! +# Body.Basic + +This module defines shared types for HTTP body handling. +-/ + +namespace Std.Http.Body + +set_option linter.all true + +/-- +Typeclass for types that can be converted to a `ByteArray`. +-/ +class ToByteArray (α : Type) where + + /-- + Transforms into a `ByteArray`. + -/ + toByteArray : α → ByteArray + +instance : ToByteArray ByteArray where + toByteArray := id + +instance : ToByteArray String where + toByteArray := String.toUTF8 + +/-- +Typeclass for types that can be decoded from a `ByteArray`. The conversion may fail with an error +message if the bytes are not valid for the target type. +-/ +class FromByteArray (α : Type) where + + /-- + Attempts to decode a `ByteArray` into the target type, returning an error message on failure. + -/ + fromByteArray : ByteArray → Except String α + +instance : FromByteArray ByteArray where + fromByteArray := .ok + +instance : FromByteArray String where + fromByteArray bs := + match String.fromUTF8? bs with + | some s => .ok s + | none => .error "invalid UTF-8 encoding" + +end Std.Http.Body diff --git a/src/Std/Internal/Http/Data/Body/Empty.lean b/src/Std/Internal/Http/Data/Body/Empty.lean new file mode 100644 index 000000000000..3e1fd34c784b --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Empty.lean @@ -0,0 +1,103 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async +public import Std.Internal.Http.Data.Request +public import Std.Internal.Http.Data.Response +public import Std.Internal.Http.Data.Body.Length +public import Std.Internal.Http.Data.Body.Reader +public import Std.Internal.Http.Data.Chunk + +public section + +/-! +# Body.Empty + +Represents an always-empty, already-closed body handle. +-/ + +namespace Std.Http.Body +open Std Internal IO Async + +set_option linter.all true + +/-- +An empty body handle. +-/ +structure Empty where +deriving Inhabited + +namespace Empty + +/-- +Receives from an empty body, always returning end-of-stream. +-/ +@[inline] +def recv (_ : Empty) : Async (Option Chunk) := + pure none + +/-- +Closes an empty body (no-op). +-/ +@[inline] +def close (_ : Empty) : Async Unit := + pure () + +/-- +Empty bodies are always closed for reading. +-/ +@[inline] +def isClosed (_ : Empty) : Async Bool := + pure true + +open Internal.IO.Async in + +/-- +Selector that immediately resolves with end-of-stream for an empty body. +-/ +@[inline] +def recvSelector (_ : Empty) : Selector (Option Chunk) where + tryFn := pure (some none) + registerFn waiter := do + let lose := pure () + let win promise := do + promise.resolve (.ok none) + waiter.race lose win + unregisterFn := pure () + +end Empty + +instance : Reader Empty where + recv := Empty.recv + close := Empty.close + isClosed := Empty.isClosed + recvSelector := Empty.recvSelector + +end Std.Http.Body + +namespace Std.Http.Request.Builder +open Internal.IO.Async + +/-- +Builds a request with an empty body. +-/ +def blank (builder : Builder) : Async (Request Body.Empty) := + pure <| builder.body {} + +end Std.Http.Request.Builder + +namespace Std.Http.Response.Builder +open Internal.IO.Async + +/-- +Builds a response with an empty body. +-/ +def blank (builder : Builder) : Async (Response Body.Empty) := + pure <| builder.body {} + +end Std.Http.Response.Builder diff --git a/src/Std/Internal/Http/Data/Body/Full.lean b/src/Std/Internal/Http/Data/Body/Full.lean new file mode 100644 index 000000000000..8979dca81313 --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Full.lean @@ -0,0 +1,266 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Sync +public import Std.Internal.Async +public import Std.Internal.Http.Data.Request +public import Std.Internal.Http.Data.Response +public import Std.Internal.Http.Data.Body.Length +public import Std.Internal.Http.Data.Body.Reader +public import Std.Internal.Http.Data.Chunk +public import Init.Data.ByteArray + +public section + +/-! +# Body.Full + +A body backed by a fixed `ByteArray` held in a `Mutex`. + +The byte array is consumed at most once: the first call to `recv` or `tryRecv` atomically +takes the data and returns it as a single chunk; subsequent calls return `none` (end-of-stream). +Closing the body discards any unconsumed data. + +`Full` implements `Body.Writer`. The `Writer` instance is a no-op for sends since the content is +fixed at construction; it is provided so that `Full` can substitute for a streaming channel in +contexts that require a writable body handle. +-/ + +namespace Std.Http.Body +open Std Internal IO Async + +set_option linter.all true + +/-- +A body backed by a fixed, mutex-protected `ByteArray`. + +The data is consumed on the first read. Once consumed (or explicitly closed), the body +behaves as a closed, empty channel. +-/ +structure Full where + private mk :: + private state : Mutex (Option ByteArray) +deriving Nonempty + +namespace Full + +private def takeChunk [Monad m] [MonadLiftT (ST IO.RealWorld) m] : + AtomicT (Option ByteArray) m (Option Chunk) := do + match ← get with + | none => + pure none + | some data => + set (none : Option ByteArray) + if data.isEmpty then + pure none + else + pure (some (Chunk.ofByteArray data)) + +/-- +Creates a `Full` body from a `ByteArray`. +-/ +def ofByteArray (data : ByteArray) : Async Full := do + let state ← Mutex.new (some data) + return { state } + +/-- +Creates a `Full` body from a `String`. +-/ +def ofString (data : String) : Async Full := do + let state ← Mutex.new (some data.toUTF8) + return { state } + +/-- +Atomically takes the byte array and returns it as a chunk. +Returns `none` if the data has already been consumed or the body is closed. +-/ +def tryRecv (full : Full) : Async (Option Chunk) := + full.state.atomically do + takeChunk + +/-- +Receives the body data. Returns the full byte array on the first call as a single chunk, +then `none` on all subsequent calls. +-/ +def recv (full : Full) : Async (Option Chunk) := + full.tryRecv + +/-- +No-op send for a fixed full body. +-/ +@[inline] +def send (_ : Full) (_ : Chunk) (_incomplete : Bool := false) : Async Unit := + pure () + +/-- +Closes the body, discarding any unconsumed data. +-/ +def close (full : Full) : Async Unit := + full.state.atomically do + set (none : Option ByteArray) + +/-- +Returns `true` when the data has been consumed or the body has been closed. +-/ +def isClosed (full : Full) : Async Bool := + full.state.atomically do + return (← get).isNone + +/-- +A fixed full body never has consumer interest. +-/ +@[inline] +def hasInterest (_ : Full) : Async Bool := + pure false + +/-- +Returns known-size metadata based on current remaining bytes. +-/ +def getKnownSize (full : Full) : Async (Option Body.Length) := + full.state.atomically do + match ← get with + | none => pure (some (.fixed 0)) + | some data => pure (some (.fixed data.size)) + +/-- +No-op metadata setter for a fixed full body. +-/ +@[inline] +def setKnownSize (_ : Full) (_ : Option Body.Length) : Async Unit := + pure () + +open Internal.IO.Async in +/-- +Selector that immediately resolves to `false` for interest. +-/ +def interestSelector (_ : Full) : Selector Bool where + tryFn := pure (some false) + registerFn waiter := do + let lose := pure () + let win promise := do + promise.resolve (.ok false) + waiter.race lose win + unregisterFn := pure () + +open Internal.IO.Async in +/-- +Selector that immediately resolves to the remaining chunk (or EOF). +-/ +def recvSelector (full : Full) : Selector (Option Chunk) where + tryFn := do + let chunk ← full.state.atomically do + takeChunk + pure (some chunk) + registerFn waiter := do + let chunk ← full.state.atomically do + takeChunk + let lose := pure () + let win promise := do + promise.resolve (.ok chunk) + waiter.race lose win + unregisterFn := pure () + +end Full + +instance : Reader Full where + recv := Full.recv + close := Full.close + isClosed := Full.isClosed + recvSelector := Full.recvSelector + +end Std.Http.Body + +namespace Std.Http.Request.Builder +open Internal.IO.Async + +private def fromBytesCore + (builder : Builder) + (content : ByteArray) : + Async (Request Body.Full) := do + return builder.body (← Body.Full.ofByteArray content) + +/-- +Builds a request from raw bytes. +-/ +def fromBytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) := + fromBytesCore builder content + +/-- +Builds a request with a binary body. +-/ +def bytes (builder : Builder) (content : ByteArray) : Async (Request Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream") + fromBytesCore builder content + +/-- +Builds a request with a text body. +-/ +def text (builder : Builder) (content : String) : Async (Request Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8") + fromBytesCore builder content.toUTF8 + +/-- +Builds a request with a JSON body. +-/ +def json (builder : Builder) (content : String) : Async (Request Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "application/json") + fromBytesCore builder content.toUTF8 + +/-- +Builds a request with an HTML body. +-/ +def html (builder : Builder) (content : String) : Async (Request Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8") + fromBytesCore builder content.toUTF8 + +end Std.Http.Request.Builder + +namespace Std.Http.Response.Builder +open Internal.IO.Async + +private def fromBytesCore + (builder : Builder) + (content : ByteArray) : + Async (Response Body.Full) := do + return builder.body (← Body.Full.ofByteArray content) + +/-- +Builds a response from raw bytes. +-/ +def fromBytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) := + fromBytesCore builder content + +/-- +Builds a response with a binary body. +-/ +def bytes (builder : Builder) (content : ByteArray) : Async (Response Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "application/octet-stream") + fromBytesCore builder content + +/-- +Builds a response with a text body. +-/ +def text (builder : Builder) (content : String) : Async (Response Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "text/plain; charset=utf-8") + fromBytesCore builder content.toUTF8 + +/-- +Builds a response with a JSON body. +-/ +def json (builder : Builder) (content : String) : Async (Response Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "application/json") + fromBytesCore builder content.toUTF8 + +/-- +Builds a response with an HTML body. +-/ +def html (builder : Builder) (content : String) : Async (Response Body.Full) := do + let builder := builder.header Header.Name.contentType (Header.Value.ofString! "text/html; charset=utf-8") + fromBytesCore builder content.toUTF8 + +end Std.Http.Response.Builder diff --git a/src/Std/Internal/Http/Data/Body/Length.lean b/src/Std/Internal/Http/Data/Body/Length.lean new file mode 100644 index 000000000000..593bb61811a8 --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Length.lean @@ -0,0 +1,49 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Init.Data.Repr + +public section + +/-! +# Body.Length + +This module defines the `Length` type, that represents the Content-Length or Transfer-Encoding +of an HTTP request or response. +-/ + +namespace Std.Http.Body + +set_option linter.all true + +/-- +Size of the body of a response or request. +-/ +inductive Length + /-- + Indicates that the HTTP message body uses **chunked transfer encoding**. + -/ + | chunked + + /-- + Indicates that the HTTP message body has a **fixed, known length**, as specified by the + `Content-Length` header. + -/ + | fixed (n : Nat) +deriving Repr, BEq + +namespace Length + +/-- +Checks if the `Length` is chunked. +-/ +def isChunked : Length → Bool + | .chunked => true + | _ => false + +end Std.Http.Body.Length diff --git a/src/Std/Internal/Http/Data/Body/Reader.lean b/src/Std/Internal/Http/Data/Body/Reader.lean new file mode 100644 index 000000000000..5de6e46f342f --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Reader.lean @@ -0,0 +1,63 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async +public import Std.Internal.Http.Data.Chunk +public import Std.Internal.Http.Data.Body.Basic +public import Std.Internal.Http.Data.Body.Stream + +public section + +/-! +# Body.Reader + +Reader typeclass for body-like values that can be consumed as chunk streams. +-/ + +namespace Std.Http.Body +open Std Internal IO Async + +set_option linter.all true + +/-- +Typeclass for values that can be read as HTTP body streams. +-/ +class Reader (α : Type) where + /-- + Receives the next body chunk. Returns `none` at end-of-stream. + -/ + recv : α → Async (Option Chunk) + + /-- + Closes the reader stream. + -/ + close : α → Async Unit + + /-- + Returns `true` when the reader stream is closed. + -/ + isClosed : α → Async Bool + + /-- + Selector that resolves when a chunk is available or EOF is reached. + -/ + recvSelector : α → Selector (Option Chunk) + +instance : Reader Incoming where + recv := Incoming.recv + close := Incoming.close + isClosed := Incoming.isClosed + recvSelector := Incoming.recvSelector + +instance : Reader Outgoing where + recv body := Reader.recv (Body.Internal.outgoingToIncoming body) + close body := Reader.close (Body.Internal.outgoingToIncoming body) + isClosed body := Reader.isClosed (Body.Internal.outgoingToIncoming body) + recvSelector body := Reader.recvSelector (Body.Internal.outgoingToIncoming body) + +end Std.Http.Body diff --git a/src/Std/Internal/Http/Data/Body/Stream.lean b/src/Std/Internal/Http/Data/Body/Stream.lean new file mode 100644 index 000000000000..bcb020ed7602 --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Stream.lean @@ -0,0 +1,661 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Sync +public import Std.Internal.Async +public import Std.Internal.Http.Data.Request +public import Std.Internal.Http.Data.Response +public import Std.Internal.Http.Data.Chunk +public import Std.Internal.Http.Data.Body.Basic +public import Std.Internal.Http.Data.Body.Length +public import Init.Data.ByteArray + +public section + +/-! +# Body.Stream + +This module defines a zero-buffer rendezvous body channel split into two faces: + +- `Body.Outgoing`: producer side (send chunks) +- `Body.Incoming`: consumer side (receive chunks) + +There is no queue and no capacity. A send waits for a receiver and a receive waits for a sender. +At most one blocked producer and one blocked consumer are supported. +-/ + +namespace Std.Http.Body +open Std Internal IO Async + +set_option linter.all true + +namespace Channel + +open Internal.IO.Async in +private inductive Consumer where + | normal (promise : IO.Promise (Option Chunk)) + | select (finished : Waiter (Option Chunk)) + +private def Consumer.resolve (c : Consumer) (x : Option Chunk) : BaseIO Bool := do + match c with + | .normal promise => + promise.resolve x + return true + | .select waiter => + let lose := return false + let win promise := do + promise.resolve (.ok x) + return true + waiter.race lose win + +private structure Producer where + chunk : Chunk + + /-- + Resolved with `true` when consumed by a receiver, `false` when the channel closes. + -/ + done : IO.Promise Bool + +open Internal.IO.Async in +private def resolveInterestWaiter (waiter : Waiter Bool) (x : Bool) : BaseIO Bool := do + let lose := return false + let win promise := do + promise.resolve (.ok x) + return true + waiter.race lose win + +private structure State where + /-- + A single blocked producer waiting for a receiver. + -/ + pendingProducer : Option Producer + + /-- + A single blocked consumer waiting for a producer. + -/ + pendingConsumer : Option Consumer + + /-- + A waiter for `Outgoing.interestSelector`. + -/ + interestWaiter : Option (Internal.IO.Async.Waiter Bool) + + /-- + Whether the channel is closed. + -/ + closed : Bool + + /-- + Known size of the stream if available. + -/ + knownSize : Option Body.Length + + /-- + Buffered partial chunk data accumulated from `Outgoing.send ... (incomplete := true)`. + These partial pieces are collapsed and emitted as a single chunk on the next complete send. + -/ + pendingIncompleteChunk : Option Chunk := none +deriving Nonempty + +end Channel + +/-- +Receive-side face of a body channel. +-/ +structure Incoming where + private mk :: + private state : Mutex Channel.State +deriving Nonempty, TypeName + +/-- +Send-side face of a body channel. +-/ +structure Outgoing where + private mk :: + private state : Mutex Channel.State +deriving Nonempty, TypeName + +/-- +Creates a rendezvous body channel. +-/ +def mkChannel : Async (Outgoing × Incoming) := do + let state ← Mutex.new { + pendingProducer := none + pendingConsumer := none + interestWaiter := none + closed := false + knownSize := none + } + return ({ state }, { state }) + +namespace Channel + +private def decreaseKnownSize (knownSize : Option Body.Length) (chunk : Chunk) : Option Body.Length := + match knownSize with + | some (.fixed res) => some (Body.Length.fixed (res - chunk.data.size)) + | _ => knownSize + +private def pruneFinishedWaiters [Monad m] [MonadLiftT (ST IO.RealWorld) m] : + AtomicT State m Unit := do + let st ← get + + let pendingConsumer ← + match st.pendingConsumer with + | some (.select waiter) => + if ← waiter.checkFinished then + pure none + else + pure st.pendingConsumer + | _ => + pure st.pendingConsumer + + let interestWaiter ← + match st.interestWaiter with + | some waiter => + if ← waiter.checkFinished then + pure none + else + pure st.interestWaiter + | none => + pure none + + set { st with pendingConsumer, interestWaiter } + +private def signalInterest [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] : + AtomicT State m Unit := do + let st ← get + if let some waiter := st.interestWaiter then + discard <| resolveInterestWaiter waiter true + set { st with interestWaiter := none } + +private def recvReady' [Monad m] [MonadLiftT (ST IO.RealWorld) m] : + AtomicT State m Bool := do + let st ← get + return st.pendingProducer.isSome || st.closed + +private def hasInterest' [Monad m] [MonadLiftT (ST IO.RealWorld) m] : + AtomicT State m Bool := do + let st ← get + return st.pendingConsumer.isSome + +private def tryRecv' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] : + AtomicT State m (Option Chunk) := do + let st ← get + if let some producer := st.pendingProducer then + set { + st with + pendingProducer := none + knownSize := decreaseKnownSize st.knownSize producer.chunk + } + discard <| producer.done.resolve true + return some producer.chunk + else + return none + +private def close' [Monad m] [MonadLiftT (ST IO.RealWorld) m] [MonadLiftT BaseIO m] : + AtomicT State m Unit := do + let st ← get + if st.closed then + return () + + if let some consumer := st.pendingConsumer then + discard <| consumer.resolve none + + if let some waiter := st.interestWaiter then + discard <| resolveInterestWaiter waiter false + + if let some producer := st.pendingProducer then + discard <| producer.done.resolve false + + set { + st with + pendingProducer := none + pendingConsumer := none + interestWaiter := none + pendingIncompleteChunk := none + closed := true + } + +end Channel + +namespace Incoming + +/-- +Attempts to receive a chunk from the channel without blocking. +Returns `some chunk` only when a producer is already waiting. +-/ +def tryRecv (incoming : Incoming) : Async (Option Chunk) := + incoming.state.atomically do + Channel.pruneFinishedWaiters + Channel.tryRecv' + +private def recv' (incoming : Incoming) : BaseIO (AsyncTask (Option Chunk)) := do + incoming.state.atomically do + Channel.pruneFinishedWaiters + + if let some chunk ← Channel.tryRecv' then + return AsyncTask.pure (some chunk) + + let st ← get + if st.closed then + return AsyncTask.pure none + + if st.pendingConsumer.isSome then + return Task.pure (.error (IO.Error.userError "only one blocked consumer is allowed")) + + let promise ← IO.Promise.new + set { st with pendingConsumer := some (.normal promise) } + Channel.signalInterest + return promise.result?.map (sync := true) fun + | none => .error (IO.Error.userError "the promise linked to the consumer was dropped") + | some res => .ok res + +/-- +Receives a chunk from the channel. Blocks until a producer sends one. +Returns `none` if the channel is closed and no producer is waiting. +-/ +def recv (incoming : Incoming) : Async (Option Chunk) := do + Async.ofAsyncTask (← recv' incoming) + +/-- +Closes the channel. +-/ +def close (incoming : Incoming) : Async Unit := + incoming.state.atomically do + Channel.close' + +/-- +Checks whether the channel is closed. +-/ +@[always_inline, inline] +def isClosed (incoming : Incoming) : Async Bool := + incoming.state.atomically do + return (← get).closed + +/-- +Gets the known size if available. +-/ +@[always_inline, inline] +def getKnownSize (incoming : Incoming) : Async (Option Body.Length) := + incoming.state.atomically do + return (← get).knownSize + +/-- +Sets known size metadata. +-/ +@[always_inline, inline] +def setKnownSize (incoming : Incoming) (size : Option Body.Length) : Async Unit := + incoming.state.atomically do + modify fun st => { st with knownSize := size } + +open Internal.IO.Async in +/-- +Creates a selector that resolves when a producer is waiting (or the channel closes). +-/ +def recvSelector (incoming : Incoming) : Selector (Option Chunk) where + tryFn := do + incoming.state.atomically do + Channel.pruneFinishedWaiters + if ← Channel.recvReady' then + return some (← Channel.tryRecv') + else + return none + + registerFn waiter := do + incoming.state.atomically do + Channel.pruneFinishedWaiters + if ← Channel.recvReady' then + let lose := return () + let win promise := do + promise.resolve (.ok (← Channel.tryRecv')) + waiter.race lose win + else + let st ← get + if st.pendingConsumer.isSome then + throw (.userError "only one blocked consumer is allowed") + + set { st with pendingConsumer := some (.select waiter) } + Channel.signalInterest + + unregisterFn := do + incoming.state.atomically do + Channel.pruneFinishedWaiters + +/-- +Iterates over chunks until the channel closes. +-/ +@[inline] +protected partial def forIn + {β : Type} (incoming : Incoming) (acc : β) + (step : Chunk → β → Async (ForInStep β)) : Async β := do + + let rec @[specialize] loop (incoming : Incoming) (acc : β) : Async β := do + if let some chunk ← incoming.recv then + match ← step chunk acc with + | .done res => return res + | .yield res => loop incoming res + else + return acc + + loop incoming acc + +/-- +Context-aware iteration over chunks until the channel closes. +-/ +@[inline] +protected partial def forIn' + {β : Type} (incoming : Incoming) (acc : β) + (step : Chunk → β → ContextAsync (ForInStep β)) : ContextAsync β := do + + let rec @[specialize] loop (incoming : Incoming) (acc : β) : ContextAsync β := do + let data ← Selectable.one #[ + .case incoming.recvSelector pure, + .case (← ContextAsync.doneSelector) (fun _ => pure none), + ] + + if let some chunk := data then + match ← step chunk acc with + | .done res => return res + | .yield res => loop incoming res + else + return acc + + loop incoming acc + +/-- +Reads all remaining chunks and decodes them into `α`. +-/ +partial def readAll + [FromByteArray α] + (incoming : Incoming) + (maximumSize : Option UInt64 := none) : + ContextAsync α := do + let rec loop (result : ByteArray) : ContextAsync ByteArray := do + let data ← Selectable.one #[ + .case incoming.recvSelector pure, + .case (← ContextAsync.doneSelector) (fun _ => pure none), + ] + + match data with + | none => return result + | some chunk => + let result := result ++ chunk.data + if let some max := maximumSize then + if result.size.toUInt64 > max then + throw (.userError s!"body exceeded maximum size of {max} bytes") + loop result + + let result ← loop ByteArray.empty + + match FromByteArray.fromByteArray result with + | .ok a => return a + | .error msg => throw (.userError msg) + +end Incoming + +namespace Outgoing + +private def collapseForSend + (outgoing : Outgoing) + (chunk : Chunk) + (incomplete : Bool) : BaseIO (Except IO.Error (Option Chunk)) := do + + outgoing.state.atomically do + Channel.pruneFinishedWaiters + let st ← get + + if st.closed then + return .error (.userError "channel closed") + + let merged := match st.pendingIncompleteChunk with + | some pending => + { + data := pending.data ++ chunk.data + extensions := if pending.extensions.isEmpty then chunk.extensions else pending.extensions + } + | none => chunk + + if incomplete then + set { st with pendingIncompleteChunk := some merged } + return .ok none + else + set { st with pendingIncompleteChunk := none } + return .ok (some merged) + +/- +Returns `some true` = delivered directly, `some false` = consumer race lost (retry), +`none` = producer installed, caller must await `done`. +-/ +private partial def send' (outgoing : Outgoing) (chunk : Chunk) : Async Unit := do + let done ← IO.Promise.new + + while true do + let result : Except IO.Error (Option Bool) ← outgoing.state.atomically do + Channel.pruneFinishedWaiters + let st ← get + + if st.closed then + return .error (IO.Error.userError "channel closed") + + if let some consumer := st.pendingConsumer then + let success ← consumer.resolve (some chunk) + + if success then + set { + st with + pendingConsumer := none + knownSize := Channel.decreaseKnownSize st.knownSize chunk + } + return .ok (some true) + else + set { st with pendingConsumer := none } + return .ok (some false) + else if st.pendingProducer.isSome then + return .error (IO.Error.userError "only one blocked producer is allowed") + else + set { st with pendingProducer := some { chunk, done } } + return .ok none + + match result with + | .error err => + throw err + | .ok (some true) => + return () + | .ok (some false) => + send' outgoing chunk + | .ok none => + match ← await done.result? with + | some true => return () + | _ => throw (IO.Error.userError "channel closed") + +/-- +Sends a chunk. + +If `incomplete := true`, the chunk is buffered and collapsed with subsequent chunks, and is not +delivered to the receiver yet. +If `incomplete := false`, any buffered incomplete pieces are collapsed with this chunk and the +single merged chunk is sent. +-/ +def send (outgoing : Outgoing) (chunk : Chunk) (incomplete : Bool := false) : Async Unit := do + match (← collapseForSend outgoing chunk incomplete) with + | .error err => throw err + | .ok none => pure () + | .ok (some toSend) => send' outgoing toSend + +/-- +Closes the channel. +-/ +def close (outgoing : Outgoing) : Async Unit := + outgoing.state.atomically do + Channel.close' + +/-- +Checks whether the channel is closed. +-/ +@[always_inline, inline] +def isClosed (outgoing : Outgoing) : Async Bool := + outgoing.state.atomically do + return (← get).closed + +/-- +Returns `true` when a consumer is currently blocked waiting for data. +-/ +def hasInterest (outgoing : Outgoing) : Async Bool := + outgoing.state.atomically do + Channel.pruneFinishedWaiters + Channel.hasInterest' + +/-- +Gets the known size if available. +-/ +@[always_inline, inline] +def getKnownSize (outgoing : Outgoing) : Async (Option Body.Length) := + outgoing.state.atomically do + return (← get).knownSize + +/-- +Sets known size metadata. +-/ +@[always_inline, inline] +def setKnownSize (outgoing : Outgoing) (size : Option Body.Length) : Async Unit := + outgoing.state.atomically do + modify fun st => { st with knownSize := size } + +open Internal.IO.Async in +/-- +Creates a selector that resolves when consumer interest is present. +Returns `true` when a consumer is waiting, `false` when the channel closes first. +-/ +def interestSelector (outgoing : Outgoing) : Selector Bool where + tryFn := do + outgoing.state.atomically do + Channel.pruneFinishedWaiters + let st ← get + if st.pendingConsumer.isSome then + return some true + else if st.closed then + return some false + else + return none + + registerFn waiter := do + outgoing.state.atomically do + Channel.pruneFinishedWaiters + let st ← get + + if st.pendingConsumer.isSome then + let lose := return () + let win promise := do + promise.resolve (.ok true) + waiter.race lose win + else if st.closed then + let lose := return () + let win promise := do + promise.resolve (.ok false) + waiter.race lose win + else if st.interestWaiter.isSome then + throw (.userError "only one blocked interest selector is allowed") + else + set { st with interestWaiter := some waiter } + + unregisterFn := do + outgoing.state.atomically do + Channel.pruneFinishedWaiters + +end Outgoing + +/- Internal conversions between channel faces. +Use these only in HTTP internals where body direction must be adapted. -/ +namespace Internal + +/-- +Reinterprets the receive-side handle as a send-side handle over the same channel. +-/ +@[always_inline, inline] +def incomingToOutgoing (incoming : Incoming) : Outgoing := + { state := incoming.state } + +/-- +Reinterprets the send-side handle as a receive-side handle over the same channel. +-/ +@[always_inline, inline] +def outgoingToIncoming (outgoing : Outgoing) : Incoming := + { state := outgoing.state } + +end Internal + +/-- +Creates a body from a producer function. +Returns the receive-side handle immediately and runs `gen` in a detached task. +The channel is always closed when `gen` returns or throws. +Errors from `gen` are not rethrown here; consumers observe end-of-stream via `recv = none`. +-/ +def stream (gen : Outgoing → Async Unit) : Async Incoming := do + let (outgoing, incoming) ← mkChannel + background <| do + try + gen outgoing + finally + outgoing.close + return incoming + +/-- +Creates a body from a fixed byte array. +-/ +def fromBytes (content : ByteArray) : Async Incoming := do + stream fun outgoing => do + outgoing.setKnownSize (some (.fixed content.size)) + if content.size > 0 then + outgoing.send (Chunk.ofByteArray content) + +/-- +Creates an empty body. +-/ +def empty : Async Incoming := do + let (outgoing, incoming) ← mkChannel + outgoing.setKnownSize (some (.fixed 0)) + outgoing.close + return incoming + +instance : ForIn Async Incoming Chunk where + forIn := Incoming.forIn + +instance : ForIn ContextAsync Incoming Chunk where + forIn := Incoming.forIn' + +end Std.Http.Body + +namespace Std.Http.Request.Builder +open Internal.IO.Async + +/-- +Builds a request with a streaming body generator. +-/ +def stream + (builder : Builder) + (gen : Body.Outgoing → Async Unit) : + Async (Request Body.Outgoing) := do + let incoming ← Body.stream gen + return Request.Builder.body builder (Body.Internal.incomingToOutgoing incoming) + +end Std.Http.Request.Builder + +namespace Std.Http.Response.Builder +open Internal.IO.Async + +/-- +Builds a response with a streaming body generator. +-/ +def stream + (builder : Builder) + (gen : Body.Outgoing → Async Unit) : + Async (Response Body.Outgoing) := do + let incoming ← Body.stream gen + return Response.Builder.body builder (Body.Internal.incomingToOutgoing incoming) + +end Std.Http.Response.Builder diff --git a/src/Std/Internal/Http/Data/Body/Writer.lean b/src/Std/Internal/Http/Data/Body/Writer.lean new file mode 100644 index 000000000000..064e09771a9e --- /dev/null +++ b/src/Std/Internal/Http/Data/Body/Writer.lean @@ -0,0 +1,227 @@ +/- +Copyright (c) 2025 Lean FRO, LLC. All rights reserved. +Released under Apache 2.0 license as described in the file LICENSE. +Authors: Sofia Rodrigues +-/ +module + +prelude +public import Std.Internal.Async +public import Std.Internal.Http.Data.Body.Length +public import Std.Internal.Http.Data.Chunk +public import Std.Internal.Http.Data.Body.Stream +public import Std.Internal.Http.Data.Body.Full +public import Std.Internal.Http.Data.Body.Empty + +public section + +/-! +# Body.Writer + +Writer typeclass for body-like values that can produce chunk streams. +-/ + +namespace Std.Http.Body +open Std Internal IO Async + +set_option linter.all true + +/-- +Typeclass for values that can be written as HTTP body streams. +-/ +class Writer (α : Type) where + /-- + Sends a body chunk. + -/ + send : α → Chunk → Bool → Async Unit + + /-- + Closes the writer stream. + -/ + close : α → Async Unit + + /-- + Returns `true` when the writer stream is closed. + -/ + isClosed : α → Async Bool + + /-- + Returns `true` when a consumer is waiting for data. + -/ + hasInterest : α → Async Bool + + /-- + Gets known stream size metadata, if available. + -/ + getKnownSize : α → Async (Option Body.Length) + + /-- + Sets known stream size metadata. + -/ + setKnownSize : α → Option Body.Length → Async Unit + + /-- + Selector that resolves when consumer interest appears. + -/ + interestSelector : α → Selector Bool + +namespace Writer + +/-- +Sends a chunk with `incomplete := false`. +-/ +@[inline] +def writeChunk [Writer α] (body : α) (chunk : Chunk) : Async Unit := + Writer.send body chunk false + +end Writer + +/-- +Union of writer-capable body variants. +-/ +inductive AnyBody where + /-- + Channel-backed streaming body writer. + -/ + | outgoing (body : Outgoing) + /-- + Fixed full-body writer handle. + -/ + | full (body : Full) + /-- + Always-empty writer handle. + -/ + | empty (body : Empty) + +instance : Coe Outgoing AnyBody where + coe := .outgoing + +instance : Coe Full AnyBody where + coe := .full + +instance : Coe Empty AnyBody where + coe := .empty + +instance : Coe (Response Empty) (Response AnyBody) where + coe f := { f with } + +instance : Coe (Response Full) (Response AnyBody) where + coe f := { f with } + +instance : Coe (Response Outgoing) (Response AnyBody) where + coe f := { f with } + +instance : Coe (ContextAsync (Response Empty)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Coe (ContextAsync (Response Full)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Coe (ContextAsync (Response Outgoing)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Coe (Async (Response Empty)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Coe (Async (Response Full)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Coe (Async (Response Outgoing)) (ContextAsync (Response AnyBody)) where + coe action := do + let response ← action + pure (response : Response AnyBody) + +instance : Writer Outgoing where + send body chunk incomplete := Outgoing.send body chunk incomplete + close := Outgoing.close + isClosed := Outgoing.isClosed + hasInterest := Outgoing.hasInterest + getKnownSize := Outgoing.getKnownSize + setKnownSize := Outgoing.setKnownSize + interestSelector := Outgoing.interestSelector + +instance : Writer Full where + send body chunk incomplete := Full.send body chunk incomplete + close := Full.close + isClosed := Full.isClosed + hasInterest := Full.hasInterest + getKnownSize := Full.getKnownSize + setKnownSize := Full.setKnownSize + interestSelector := Full.interestSelector + +instance : Writer Empty where + send _ _ _ := throw <| .userError "cannot send" + close _ := pure () + isClosed _ := pure false + hasInterest _ := pure false + getKnownSize _ := pure (some (.fixed 0)) + setKnownSize _ _ := pure () + interestSelector _ := { + tryFn := pure (some false) + registerFn waiter := do + let lose := pure () + let win promise := do + promise.resolve (.ok false) + waiter.race lose win + unregisterFn := pure () + } + +instance : Writer AnyBody where + send + | .outgoing body, chunk, incomplete => Writer.send body chunk incomplete + | .full body, chunk, incomplete => Writer.send body chunk incomplete + | .empty body, chunk, incomplete => Writer.send body chunk incomplete + close + | .outgoing body => Writer.close body + | .full body => Writer.close body + | .empty body => Writer.close body + isClosed + | .outgoing body => Writer.isClosed body + | .full body => Writer.isClosed body + | .empty body => Writer.isClosed body + hasInterest + | .outgoing body => Writer.hasInterest body + | .full body => Writer.hasInterest body + | .empty body => Writer.hasInterest body + getKnownSize + | .outgoing body => Writer.getKnownSize body + | .full body => Writer.getKnownSize body + | .empty body => Writer.getKnownSize body + setKnownSize + | .outgoing body, size => Writer.setKnownSize body size + | .full body, size => Writer.setKnownSize body size + | .empty body, size => Writer.setKnownSize body size + interestSelector + | .outgoing body => Writer.interestSelector body + | .full body => Writer.interestSelector body + | .empty body => Writer.interestSelector body + +instance : Reader AnyBody where + recv + | .outgoing body => Reader.recv body + | .full body => Reader.recv body + | .empty body => Reader.recv body + close + | .outgoing body => Reader.close body + | .full body => Reader.close body + | .empty body => Reader.close body + isClosed + | .outgoing body => Reader.isClosed body + | .full body => Reader.isClosed body + | .empty body => Reader.isClosed body + recvSelector + | .outgoing body => Reader.recvSelector body + | .full body => Reader.recvSelector body + | .empty body => Reader.recvSelector body + +end Std.Http.Body diff --git a/tests/lean/run/async_http_body.lean b/tests/lean/run/async_http_body.lean new file mode 100644 index 000000000000..cc58f2045dcd --- /dev/null +++ b/tests/lean/run/async_http_body.lean @@ -0,0 +1,432 @@ +import Std.Internal.Http.Data.Body + +open Std.Internal.IO Async +open Std.Http +open Std.Http.Body + +/-! ## Channel tests -/ + +-- Test send and recv on channel + +def channelSendRecv : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + let chunk := Chunk.ofByteArray "hello".toUTF8 + + let sendTask ← async (t := AsyncTask) <| outgoing.send chunk + let result ← incoming.recv + + assert! result.isSome + assert! result.get!.data == "hello".toUTF8 + await sendTask + +#eval channelSendRecv.block + + +-- Test tryRecv on empty channel returns none + +def channelTryRecvEmpty : Async Unit := do + let (_outgoing, incoming) ← Body.mkChannel + let result ← incoming.tryRecv + assert! result.isNone + +#eval channelTryRecvEmpty.block + +-- Test tryRecv consumes a waiting producer + +def channelTryRecvWithPendingSend : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + + let sendTask ← async (t := AsyncTask) <| outgoing.send (Chunk.ofByteArray "data".toUTF8) + let mut result := none + let mut fuel := 100 + while result.isNone && fuel > 0 do + result ← incoming.tryRecv + if result.isNone then + let _ ← Selectable.one #[ + .case (← Selector.sleep 1) pure + ] + fuel := fuel - 1 + + assert! result.isSome + assert! result.get!.data == "data".toUTF8 + await sendTask + +#eval channelTryRecvWithPendingSend.block + +-- Test close sets closed flag + +def channelClose : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + assert! !(← outgoing.isClosed) + outgoing.close + assert! (← incoming.isClosed) + +#eval channelClose.block + +-- Test recv on closed channel returns none + +def channelRecvAfterClose : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + outgoing.close + let result ← incoming.recv + assert! result.isNone + +#eval channelRecvAfterClose.block + +-- Test for-in iteration collects chunks until close + +def channelForIn : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + + let producer ← async (t := AsyncTask) <| do + outgoing.send (Chunk.ofByteArray "a".toUTF8) + outgoing.send (Chunk.ofByteArray "b".toUTF8) + outgoing.close + + let mut acc : ByteArray := .empty + for chunk in incoming do + acc := acc ++ chunk.data + + assert! acc == "ab".toUTF8 + await producer + +#eval channelForIn.block + +-- Test chunk extensions are preserved + +def channelExtensions : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + let chunk := { data := "hello".toUTF8, extensions := #[(.mk "key", some (Chunk.ExtensionValue.ofString! "value"))] : Chunk } + + let sendTask ← async (t := AsyncTask) <| outgoing.send chunk + let result ← incoming.recv + + assert! result.isSome + assert! result.get!.extensions.size == 1 + assert! result.get!.extensions[0]! == (Chunk.ExtensionName.mk "key", some <| .ofString! "value") + await sendTask + +#eval channelExtensions.block + +-- Test known size metadata + +def channelKnownSize : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + outgoing.setKnownSize (some (.fixed 100)) + let size ← incoming.getKnownSize + assert! size == some (.fixed 100) + +#eval channelKnownSize.block + +-- Test known size decreases when a chunk is consumed + +def channelKnownSizeDecreases : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + outgoing.setKnownSize (some (.fixed 5)) + + let sendTask ← async (t := AsyncTask) <| outgoing.send (Chunk.ofByteArray "hello".toUTF8) + let _ ← incoming.recv + await sendTask + + let size ← incoming.getKnownSize + assert! size == some (.fixed 0) + +#eval channelKnownSizeDecreases.block + +-- Test only one blocked producer is allowed + +def channelSingleProducerRule : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + let send1 ← async (t := AsyncTask) <| outgoing.send (Chunk.ofByteArray "one".toUTF8) + + -- Yield so `send1` can occupy the single pending-producer slot. + let _ ← Selectable.one #[ + .case (← Selector.sleep 5) pure + ] + + let send2Failed ← + try + outgoing.send (Chunk.ofByteArray "two".toUTF8) + pure false + catch _ => + pure true + assert! send2Failed + + let first ← incoming.recv + assert! first.isSome + assert! first.get!.data == "one".toUTF8 + + await send1 + +#eval channelSingleProducerRule.block + +-- Test only one blocked consumer is allowed + +def channelSingleConsumerRule : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + + let recv1 ← async (t := AsyncTask) <| incoming.recv + + let hasInterest ← Selectable.one #[ + .case outgoing.interestSelector pure + ] + assert! hasInterest + + let recv2Failed ← + try + let _ ← incoming.recv + pure false + catch _ => + pure true + + assert! recv2Failed + + let sendTask ← async (t := AsyncTask) <| outgoing.send (Chunk.ofByteArray "ok".toUTF8) + let r1 ← await recv1 + + assert! r1.isSome + assert! r1.get!.data == "ok".toUTF8 + await sendTask + +#eval channelSingleConsumerRule.block + +-- Test hasInterest reflects blocked receiver state + +def channelHasInterest : Async Unit := do + let (outgoing, incoming) ← Body.mkChannel + assert! !(← outgoing.hasInterest) + + let recvTask ← async (t := AsyncTask) <| incoming.recv + + let hasInterest ← Selectable.one #[ + .case outgoing.interestSelector pure + ] + assert! hasInterest + assert! (← outgoing.hasInterest) + + let sendTask ← async (t := AsyncTask) <| outgoing.send (Chunk.ofByteArray "x".toUTF8) + let _ ← await recvTask + await sendTask + + assert! !(← outgoing.hasInterest) + +#eval channelHasInterest.block + +-- Test interestSelector resolves false when channel closes first + +def channelInterestSelectorClose : Async Unit := do + let (outgoing, _incoming) ← Body.mkChannel + + let waitInterest ← async (t := AsyncTask) <| + Selectable.one #[ + .case outgoing.interestSelector pure + ] + + outgoing.close + let interested ← await waitInterest + assert! interested == false + +#eval channelInterestSelectorClose.block + +/-! ## Full tests -/ + +-- Test Full.recv returns content once then EOF + +def fullRecvConsumesOnce : Async Unit := do + let full ← Body.Full.ofString "hello" + let first ← full.recv + let second ← full.recv + + assert! first.isSome + assert! first.get!.data == "hello".toUTF8 + assert! second.isNone + +#eval fullRecvConsumesOnce.block + +-- Test Full known-size metadata tracks consumption + +def fullKnownSizeLifecycle : Async Unit := do + let data := ByteArray.mk #[0x01, 0x02, 0x03, 0x04] + let full ← Body.Full.ofByteArray data + + assert! (← full.getKnownSize) == some (.fixed 4) + let chunk ← full.tryRecv + assert! chunk.isSome + assert! chunk.get!.data == data + assert! (← full.getKnownSize) == some (.fixed 0) + +#eval fullKnownSizeLifecycle.block + +-- Test Full.close discards remaining content + +def fullClose : Async Unit := do + let full ← Body.Full.ofString "bye" + assert! !(← full.isClosed) + full.close + assert! (← full.isClosed) + assert! (← full.tryRecv).isNone + +#eval fullClose.block + +-- Test Full interest API always reports no consumer interest + +def fullInterest : Async Unit := do + let full ← Body.Full.ofString "x" + assert! !(← full.hasInterest) + let interested ← Selectable.one #[ + .case full.interestSelector pure + ] + assert! interested == false + +#eval fullInterest.block + +/-! ## Empty tests -/ + +-- Test Empty writer metadata and interest behavior + +def emptyWriterBasics : Async Unit := do + let body : Body.Empty := {} + assert! (← Writer.getKnownSize body) == some (.fixed 0) + assert! !(← Writer.isClosed body) + assert! !(← Writer.hasInterest body) + + Writer.setKnownSize body (some (.fixed 99)) + assert! (← Writer.getKnownSize body) == some (.fixed 0) + + Writer.close body + let interested ← Selectable.one #[ + .case (Writer.interestSelector body) pure + ] + assert! interested == false + +#eval emptyWriterBasics.block + +-- Test Empty writer rejects send + +def emptyWriterSendFails : Async Unit := do + let body : Body.Empty := {} + let failed ← + try + Writer.send body (Chunk.ofByteArray "x".toUTF8) false + pure false + catch _ => + pure true + assert! failed + +#eval emptyWriterSendFails.block + +/-! ## Request.Builder body tests -/ + +private def recvBuiltBody (body : Body.Full) : Async (Option Chunk) := + body.recv + +private def emptyBodyKnownSize (body : Body.Empty) : Async (Option Body.Length) := + Writer.getKnownSize body + +-- Test Request.Builder.text sets correct headers + +def requestBuilderText : Async Unit := do + let req ← Request.post (.originForm! "/api") + |>.text "Hello, World!" + + assert! req.head.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8") + assert! req.head.headers.get? Header.Name.contentLength == none + + let body ← recvBuiltBody req.body + assert! body.isSome + assert! body.get!.data == "Hello, World!".toUTF8 + +#eval requestBuilderText.block + +-- Test Request.Builder.json sets correct headers + +def requestBuilderJson : Async Unit := do + let req ← Request.post (.originForm! "/api") + |>.json "{\"key\": \"value\"}" + + assert! req.head.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json") + assert! req.head.headers.get? Header.Name.contentLength == none + let body ← recvBuiltBody req.body + assert! body.isSome + assert! body.get!.data == "{\"key\": \"value\"}".toUTF8 + +#eval requestBuilderJson.block + +-- Test Request.Builder.fromBytes sets body + +def requestBuilderFromBytes : Async Unit := do + let data := ByteArray.mk #[0x01, 0x02, 0x03] + let req ← Request.post (.originForm! "/api") + |>.fromBytes data + + assert! req.head.headers.get? Header.Name.contentLength == none + let body ← recvBuiltBody req.body + assert! body.isSome + assert! body.get!.data == data + +#eval requestBuilderFromBytes.block + +-- Test Request.Builder.blank creates empty body + +def requestBuilderNoBody : Async Unit := do + let req ← Request.get (.originForm! "/api") + |>.blank + + assert! (← emptyBodyKnownSize req.body) == some (.fixed 0) + +#eval requestBuilderNoBody.block + +/-! ## Response.Builder body tests -/ + +-- Test Response.Builder.text sets correct headers + +def responseBuilderText : Async Unit := do + let res ← Response.ok + |>.text "Hello, World!" + + assert! res.head.headers.get? Header.Name.contentType == some (Header.Value.ofString! "text/plain; charset=utf-8") + assert! res.head.headers.get? Header.Name.contentLength == none + + let body ← recvBuiltBody res.body + assert! body.isSome + assert! body.get!.data == "Hello, World!".toUTF8 + +#eval responseBuilderText.block + +-- Test Response.Builder.json sets correct headers + +def responseBuilderJson : Async Unit := do + let res ← Response.ok + |>.json "{\"status\": \"ok\"}" + + assert! res.head.headers.get? Header.Name.contentType == some (Header.Value.ofString! "application/json") + assert! res.head.headers.get? Header.Name.contentLength == none + let body ← recvBuiltBody res.body + assert! body.isSome + assert! body.get!.data == "{\"status\": \"ok\"}".toUTF8 + +#eval responseBuilderJson.block + +-- Test Response.Builder.fromBytes sets body + +def responseBuilderFromBytes : Async Unit := do + let data := ByteArray.mk #[0xaa, 0xbb] + let res ← Response.ok + |>.fromBytes data + + assert! res.head.headers.get? Header.Name.contentLength == none + let body ← recvBuiltBody res.body + assert! body.isSome + assert! body.get!.data == data + +#eval responseBuilderFromBytes.block + +-- Test Response.Builder.blank creates empty body + +def responseBuilderNoBody : Async Unit := do + let res ← Response.ok + |>.blank + + assert! (← emptyBodyKnownSize res.body) == some (.fixed 0) + +#eval responseBuilderNoBody.block