44# Copyright, 2025, by Samuel Williams.
55
66require "protocol/http"
7+ require "protocol/http/body/wrapper"
78require "zlib"
89
910module Protocol
@@ -12,56 +13,36 @@ module GRPC
1213 module Body
1314 # Represents a readable body for gRPC messages with length-prefixed framing.
1415 # This is the standard readable body for gRPC - all gRPC responses use message framing.
15- class ReadableBody
16+ # Wraps the underlying HTTP body and transforms raw chunks into decoded gRPC messages.
17+ class ReadableBody < Protocol ::HTTP ::Body ::Wrapper
18+ def self . wrap ( message , **options )
19+ if body = message . body
20+ message . body = self . new ( body , **options )
21+ end
22+
23+ return message . body
24+ end
25+
1626 # Initialize a new readable body for gRPC messages.
1727 # @parameter body [Protocol::HTTP::Body::Readable] The underlying HTTP body
1828 # @parameter message_class [Class | Nil] Protobuf message class with .decode method.
1929 # If `nil`, returns raw binary data (useful for channel adapters)
2030 # @parameter encoding [String | Nil] Compression encoding (from grpc-encoding header)
2131 def initialize ( body , message_class : nil , encoding : nil )
22- @ body = body
32+ super ( body )
2333 @message_class = message_class
2434 @encoding = encoding
2535 @buffer = String . new . force_encoding ( Encoding ::BINARY )
26- @closed = false
2736 end
2837
29- # @attribute [Protocol::HTTP::Body::Readable] The underlying HTTP body.
30- attr_reader :body
31-
3238 # @attribute [String | Nil] The compression encoding.
3339 attr_reader :encoding
3440
35- # Close the input body.
36- # @parameter error [Exception | Nil] Optional error that caused the close
37- # @returns [Nil]
38- def close ( error = nil )
39- @closed = true
40-
41- if @body
42- @body . close ( error )
43- @body = nil
44- end
45-
46- nil
47- end
48-
49- # Check if the stream has been closed.
50- # @returns [Boolean] `true` if the stream is closed, `false` otherwise
51- def closed?
52- @closed or @body . nil?
53- end
54-
55- # Check if there are any input chunks remaining.
56- # @returns [Boolean] `true` if the body is empty, `false` otherwise
57- def empty?
58- @body . nil?
59- end
60-
6141 # Read the next gRPC message.
42+ # Overrides Wrapper#read to transform raw HTTP body chunks into decoded gRPC messages.
6243 # @returns [Object | String | Nil] Decoded message, raw binary, or `Nil` if stream ended
6344 def read
64- return nil if closed ?
45+ return nil if @body . nil? || @body . empty ?
6546
6647 # Read 5-byte prefix: 1 byte compression flag + 4 bytes length
6748 prefix = read_exactly ( 5 )
@@ -87,24 +68,6 @@ def read
8768 end
8869 end
8970
90- # Enumerate all messages until finished, then invoke {close}.
91- # @yields {|message| ...} The block to call with each message.
92- def each
93- return to_enum unless block_given?
94-
95- error = nil
96- begin
97- while ( message = read )
98- yield message
99- end
100- rescue StandardError => e
101- error = e
102- raise
103- ensure
104- close ( error )
105- end
106- end
107-
10871 private
10972
11073 # Read exactly n bytes from the underlying body.
@@ -113,28 +76,18 @@ def each
11376 def read_exactly ( n )
11477 # Fill buffer until we have enough data:
11578 while @buffer . bytesize < n
116- return nil if closed ?
79+ return nil if @body . nil? || @body . empty ?
11780
11881 # Read chunk from underlying body:
11982 chunk = @body . read
12083
12184 if chunk . nil?
12285 # End of stream:
123- if @body && !@closed
124- @body . close
125- @closed = true
126- end
12786 return nil
12887 end
12988
13089 # Append to buffer:
13190 @buffer << chunk . force_encoding ( Encoding ::BINARY )
132-
133- # Check if body is empty and close if needed:
134- if @body . empty?
135- @body . close
136- @closed = true
137- end
13891 end
13992
14093 # Extract the required data:
0 commit comments