Skip to content

Commit 74a0b88

Browse files
authored
Fix incorrect response streaming leftover bytes calculation (#63)
1 parent f41b241 commit 74a0b88

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

src/Curl.jl

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,35 @@ function write_callback(
1717
try
1818
req = unsafe_pointer_to_objref(req_p)::gRPCRequest
1919

20+
!isnothing(req.ex) && return typemax(Csize_t)
21+
2022
n = size * count
2123
buf = unsafe_wrap(Array, convert(Ptr{UInt8}, data), (n,))
2224

2325
handled_n_bytes_total = 0
2426
try
2527
while !isnothing(buf) && handled_n_bytes_total < n
2628
handled_n_bytes, buf = handle_write(req, buf)
27-
handled_n_bytes_total += handled_n_bytes
29+
handled_n_bytes_total += handled_n_bytes
2830
handled_n_bytes == 0 && break
2931
end
30-
catch ex
32+
catch ex
3133
# Eat InvalidStateException raised on put! to closed channel
3234
!isa(ex, InvalidStateException) && rethrow(ex)
3335
end
3436

35-
# If there was an exception handle it
3637
!isnothing(req.ex) && return typemax(Csize_t)
3738

3839
# Check that we handled the correct number of bytes
3940
# If there was no exception in handle_write this should always match
4041
if handled_n_bytes_total != n
41-
req.ex = gRPCServiceCallException(
42-
GRPC_INTERNAL,
43-
"Recieved $(n) bytes from curl but only handled $(handled_n_bytes_total)",
44-
)
42+
if isnothing(req.ex)
43+
req.ex = gRPCServiceCallException(
44+
GRPC_INTERNAL,
45+
"Recieved $(n) bytes from curl but only handled $(handled_n_bytes_total)",
46+
)
47+
end
48+
4549
# If we are response streaming unblock the task waiting on response_c
4650
!isnothing(req.response_c) && close(req.response_c)
4751
return typemax(Csize_t)
@@ -337,7 +341,6 @@ function handle_write(req::gRPCRequest, buf::Vector{UInt8})::Tuple{Int64, Union{
337341
# Not enough data yet to read the entire header
338342
return write(req.response, buf), nothing
339343
else
340-
341344
buf_header = buf[1:header_bytes_left]
342345
n = write(req.response, buf_header)
343346

@@ -405,11 +408,11 @@ function handle_write(req::gRPCRequest, buf::Vector{UInt8})::Tuple{Int64, Union{
405408
req.response_length = 0
406409

407410
# Handle the remaining data
408-
leftover_bytes = message_bytes_left - n
411+
leftover_bytes = length(buf) - n
409412

410413
buf_leftover = nothing
411414
if leftover_bytes > 0
412-
buf_leftover = unsafe_wrap(Array, pointer(buf) + n, (length(leftover_bytes),))
415+
buf_leftover = unsafe_wrap(Array, pointer(buf) + n, (leftover_bytes,))
413416
end
414417

415418
return n, buf_leftover

test/runtests.jl

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,6 @@ include("gen/test/test_pb.jl")
119119
end
120120
# end
121121

122-
# @testset "Max Message Size" begin
123-
# Create a client with much more restictive max message lengths
124-
client = TestService_TestRPC_Client(_TEST_HOST, _TEST_PORT; max_send_message_length=1024, max_recieve_message_length=1024)
125-
126-
# Send too much
127-
@test_throws gRPCServiceCallException grpc_sync_request(client, TestRequest(1, zeros(UInt64, 1024)))
128-
# Receive too much
129-
@test_throws gRPCServiceCallException grpc_sync_request(client, TestRequest(1024, zeros(UInt64, 1)))
130-
# end
131-
132122
# @testset "Async Channels" begin
133123
client = TestService_TestRPC_Client(_TEST_HOST, _TEST_PORT)
134124

@@ -144,7 +134,7 @@ include("gen/test/test_pb.jl")
144134
end
145135
# end
146136

147-
@static if VERSION >= v"1.12"
137+
@static if VERSION >= v"1.12"
148138
# @testset "Response Streaming" begin
149139
N = 1000
150140

@@ -203,6 +193,7 @@ include("gen/test/test_pb.jl")
203193
@test last(response.data) == i
204194
end
205195

196+
206197
close(request_c)
207198
grpc_async_await(req)
208199
# end
@@ -346,7 +337,7 @@ include("gen/test/test_pb.jl")
346337
@test isa(ex, gRPCServiceCallException)
347338
end
348339
# end
349-
end
340+
end
350341

351342
# @testset "Timeout Header Value Formatting" begin
352343
# Test integer seconds
@@ -379,5 +370,15 @@ include("gen/test/test_pb.jl")
379370
end
380371
# end
381372

373+
# @testset "Max Message Size" begin
374+
# Create a client with much more restictive max message lengths
375+
client = TestService_TestRPC_Client(_TEST_HOST, _TEST_PORT; max_send_message_length=1024, max_recieve_message_length=1024)
376+
377+
# Send too much
378+
@test_throws gRPCServiceCallException grpc_sync_request(client, TestRequest(1, zeros(UInt64, 1024)))
379+
# Receive too much
380+
@test_throws gRPCServiceCallException grpc_sync_request(client, TestRequest(1024, zeros(UInt64, 1)))
381+
# end
382+
382383
grpc_shutdown()
383384
end

0 commit comments

Comments
 (0)