Skip to content

Commit eb2595c

Browse files
Improve request/response body handling.
1 parent 9cfab94 commit eb2595c

File tree

14 files changed

+121
-115
lines changed

14 files changed

+121
-115
lines changed

async-grpc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@ Gem::Specification.new do |spec|
2222
spec.required_ruby_version = ">= 3.2"
2323

2424
spec.add_dependency "async-http"
25-
spec.add_dependency "protocol-grpc", "~> 0.2"
25+
spec.add_dependency "protocol-grpc", "~> 0.4"
2626
end

context/getting-started.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,3 @@ Async do
169169
server.run
170170
end
171171
```
172-
173-
174-

gems.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,5 +31,3 @@
3131
gem "bake-test"
3232
gem "bake-test-external"
3333
end
34-
35-

guides/getting-started/readme.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,3 @@ Async do
169169
server.run
170170
end
171171
```
172-
173-
174-

guides/links.yaml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,2 @@
11
getting-started:
22
order: 1
3-
4-
5-

lib/async/grpc/client.rb

Lines changed: 14 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ def stub(interface_class, service_name)
100100
def call(request)
101101
request.headers = @headers.merge(request.headers)
102102

103-
super
103+
super.tap do |response|
104+
response.headers.policy = Protocol::GRPC::HEADER_POLICY
105+
end
104106
end
105107

106108
# Make a gRPC call.
@@ -166,19 +168,17 @@ def unary_call(path, headers, request_message, request_class, response_class, en
166168
begin
167169
# Read body first - trailers are only available after body is consumed
168170
response_encoding = response.headers["grpc-encoding"]
169-
readable_body = Protocol::GRPC::Body::ReadableBody.new(
170-
response.body,
171-
message_class: response_class,
172-
encoding: response_encoding
173-
)
171+
response_body = Protocol::GRPC::Body::ReadableBody.wrap(response, message_class: response_class, encoding: response_encoding)
174172

175-
message = readable_body.read
176-
readable_body.close
173+
if response_body
174+
response_value = response_body.read
175+
response_body.close
176+
end
177177

178178
# Check status after reading body (trailers are now available)
179179
check_status!(response)
180180

181-
message
181+
return response_value
182182
ensure
183183
response.close
184184
end
@@ -203,32 +203,19 @@ def server_streaming_call(path, headers, request_message, request_class, respons
203203
response = call(http_request)
204204

205205
begin
206-
# Set gRPC policy BEFORE reading body so trailers are processed correctly:
207-
unless response.headers.policy == Protocol::GRPC::HEADER_POLICY
208-
response.headers.policy = Protocol::GRPC::HEADER_POLICY
209-
end
210-
211206
# Read body first - trailers are only available after body is consumed:
212207
response_encoding = response.headers["grpc-encoding"]
213-
readable_body = Protocol::GRPC::Body::ReadableBody.new(
214-
response.body,
215-
message_class: response_class,
216-
encoding: response_encoding
217-
)
208+
response_body = Protocol::GRPC::Body::ReadableBody.wrap(response, message_class: response_class, encoding: response_encoding)
218209

219-
return readable_body unless block_given?
220-
221-
begin
222-
readable_body.each(&block)
223-
ensure
224-
readable_body.close
210+
if block_given? and response_body
211+
response_body.each(&block)
225212
end
226213

227214
# Check status after reading all body chunks (trailers are now available):
228215
check_status!(response)
229216

230-
readable_body
231-
rescue StandardError
217+
return response_body
218+
rescue
232219
response.close
233220
raise
234221
end
@@ -327,12 +314,6 @@ def bidirectional_call(path, headers, request_class, response_class, encoding, &
327314
# @parameter response [Protocol::HTTP::Response]
328315
# @raises [Protocol::GRPC::Error] If status is not OK
329316
def check_status!(response)
330-
# Policy should already be set before calling this method:
331-
# But ensure it's set just in case
332-
unless response.headers.policy == Protocol::GRPC::HEADER_POLICY
333-
response.headers.policy = Protocol::GRPC::HEADER_POLICY
334-
end
335-
336317
status = Protocol::GRPC::Metadata.extract_status(response.headers)
337318

338319
# If status is UNKNOWN (not found), default to OK:

lib/async/grpc/dispatcher_middleware.rb

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
# Released under the MIT License.
44
# Copyright, 2025, by Samuel Williams.
55

6+
require "async"
7+
require "async/deadline"
8+
69
require "protocol/grpc/middleware"
710
require "protocol/grpc/methods"
811
require "protocol/grpc/call"
@@ -41,6 +44,38 @@ def register(service_name, service)
4144

4245
protected
4346

47+
def invoke_service(service, handler_method, input, output, call)
48+
begin
49+
service.send(handler_method, input, output, call)
50+
ensure
51+
# Close input stream:
52+
input.close
53+
54+
# Close output stream:
55+
output.close_write unless output.closed?
56+
end
57+
58+
# Mark trailers and add status (if not already set by handler):
59+
if call.response&.headers
60+
call.response.headers.trailer!
61+
62+
# Only add OK status if grpc-status hasn't been set by the handler:
63+
unless call.response.headers["grpc-status"]
64+
Protocol::GRPC::Metadata.add_status_trailer!(call.response.headers, status: Protocol::GRPC::Status::OK)
65+
end
66+
end
67+
end
68+
69+
def dispatch_to_service(service, handler_method, input, output, call, deadline, parent: Async::Task.current)
70+
if deadline
71+
parent.with_timeout(deadline) do
72+
invoke_service(service, handler_method, input, output, call)
73+
end
74+
else
75+
invoke_service(service, handler_method, input, output, call)
76+
end
77+
end
78+
4479
# Dispatch the request to the appropriate service.
4580
# @parameter request [Protocol::HTTP::Request] The HTTP request
4681
# @returns [Protocol::HTTP::Response] The HTTP response
@@ -66,9 +101,9 @@ def dispatch(request)
66101
raise Protocol::GRPC::Error.new(Protocol::GRPC::Status::UNIMPLEMENTED, "Method not found: #{method_name}")
67102
end
68103

69-
handler_method = rpc_descriptor[:method]
70-
request_class = rpc_descriptor[:request_class]
71-
response_class = rpc_descriptor[:response_class]
104+
handler_method = rpc_descriptor.method
105+
request_class = rpc_descriptor.request_class
106+
response_class = rpc_descriptor.response_class
72107

73108
# Verify handler method exists:
74109
unless service.respond_to?(handler_method, true)
@@ -80,34 +115,34 @@ def dispatch(request)
80115
input = Protocol::GRPC::Body::ReadableBody.new(request.body, message_class: request_class, encoding: encoding)
81116
output = Protocol::GRPC::Body::WritableBody.new(message_class: response_class, encoding: encoding)
82117

83-
# Create call context:
118+
# Create response headers:
84119
response_headers = Protocol::HTTP::Headers.new([], nil, policy: Protocol::GRPC::HEADER_POLICY)
85120
response_headers["content-type"] = "application/grpc+proto"
86121
response_headers["grpc-encoding"] = encoding if encoding
87122

123+
# Create response object:
124+
response = Protocol::HTTP::Response[200, response_headers, output]
125+
88126
# Parse deadline from timeout header:
89-
timeout_value = request.headers["grpc-timeout"]
90-
deadline = if timeout_value
91-
timeout_seconds = Protocol::GRPC::Methods.parse_timeout(timeout_value)
92-
require "async/deadline"
93-
Async::Deadline.start(timeout_seconds) if timeout_seconds
127+
timeout = Protocol::GRPC::Methods.parse_timeout(request.headers["grpc-timeout"])
128+
deadline = if timeout
129+
Async::Deadline.start(timeout)
94130
end
95131

96-
call = Protocol::GRPC::Call.new(request, deadline: deadline)
97-
98-
# Call the handler method on the service:
99-
service.send(handler_method, input, output, call)
132+
# Create call context with request and response:
133+
call = Protocol::GRPC::Call.new(request, response, deadline: deadline)
100134

101-
# Close output stream:
102-
output.close_write unless output.closed?
103-
104-
# Mark trailers and add status:
105-
response_headers.trailer!
106-
Protocol::GRPC::Metadata.add_status_trailer!(response_headers, status: Protocol::GRPC::Status::OK)
135+
if rpc_descriptor.streaming?
136+
Async do |task|
137+
dispatch_to_service(service, handler_method, input, output, call, deadline, parent: task)
138+
end
139+
else
140+
# Unary call:
141+
dispatch_to_service(service, handler_method, input, output, call, deadline)
142+
end
107143

108-
Protocol::HTTP::Response[200, response_headers, output]
144+
response
109145
end
110146
end
111147
end
112148
end
113-

lib/async/grpc/service.rb

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,36 +58,12 @@ def rpc_descriptions
5858
descriptions = {}
5959

6060
@interface_class.rpcs.each do |pascal_case_name, rpc|
61-
# Use explicit method name if provided, otherwise convert PascalCase to snake_case:
62-
ruby_method_name = if rpc.method
63-
rpc.method
64-
else
65-
snake_case_name = pascal_case_to_snake_case(pascal_case_name.to_s)
66-
snake_case_name.to_sym
67-
end
68-
69-
descriptions[pascal_case_name.to_s] = {
70-
method: ruby_method_name,
71-
request_class: rpc.request_class,
72-
response_class: rpc.response_class,
73-
streaming: rpc.streaming
74-
}
61+
# rpc.method is always set (either explicitly or auto-converted in Interface.rpc)
62+
descriptions[pascal_case_name.to_s] = rpc
7563
end
7664

7765
descriptions
7866
end
79-
80-
private
81-
82-
# Convert PascalCase to snake_case.
83-
# @parameter pascal_case [String] PascalCase string (e.g., "SayHello")
84-
# @returns [String] snake_case string (e.g., "say_hello")
85-
def pascal_case_to_snake_case(pascal_case)
86-
pascal_case
87-
.gsub(/([A-Z]+)([A-Z][a-z])/, '\1_\2') # Insert underscore before capital letters followed by lowercase
88-
.gsub(/([a-z\d])([A-Z])/, '\1_\2') # Insert underscore between lowercase/digit and uppercase
89-
.downcase
90-
end
9167
end
9268
end
9369
end

lib/async/grpc/stub.rb

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ def initialize(client, interface)
2424
# rpc.method is always set (either explicit or auto-converted from PascalCase)
2525
snake_case_method = rpc.method
2626

27-
# Index by snake_case method name, storing RPC and PascalCase name for path building
28-
@rpcs_by_method[snake_case_method] = [rpc, pascal_case_name]
27+
# Index by snake_case method name, storing RPC (which includes name field)
28+
@rpcs_by_method[snake_case_method] = rpc
2929
end
3030
end
3131

@@ -53,8 +53,7 @@ def method_missing(method_name, *args, **options, &block)
5353
encoding = options.delete(:encoding)
5454

5555
# Delegate to client.invoke with PascalCase method name (for interface lookup):
56-
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding,
57-
&block)
56+
@client.invoke(@interface, interface_method_name, request, metadata: metadata, timeout: timeout, encoding: encoding, &block)
5857
else
5958
super
6059
end
@@ -78,8 +77,8 @@ def respond_to_missing?(method_name, include_private = false)
7877
# @returns [Array(Protocol::GRPC::RPC, Symbol) | Array(Nil, Nil)] RPC definition and PascalCase method name, or nil if not found
7978
def lookup_rpc(method_name)
8079
if @rpcs_by_method.key?(method_name)
81-
rpc, pascal_case_name = @rpcs_by_method[method_name]
82-
return [rpc, pascal_case_name]
80+
rpc = @rpcs_by_method[method_name]
81+
return [rpc, rpc.name]
8382
end
8483

8584
[nil, nil]

readme.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,14 @@ Please see the [project documentation](https://socketry.github.io/async-grpc/) f
2525

2626
Please see the [project releases](https://socketry.github.io/async-grpc/releases/index) for all releases.
2727

28+
### Unreleased
29+
30+
- Tidy up request and response body handling.
31+
2832
### v0.1.0
2933

34+
- Initial hack.
35+
3036
## See Also
3137

3238
- [protocol-grpc](https://github.com/socketry/protocol-grpc) — Protocol abstractions for gRPC that this gem builds upon.

0 commit comments

Comments
 (0)