Skip to content

Commit 1ee5577

Browse files
authored
Gracefully handle forgetting grpc_init() (#94)
* Add support for custom deadline/keepalive/max lengths for unary RPC * Gracefully handle forgetting to call grpc_init (no more hang) Rename grpc.events to grpc.sem to make it more clear what its intended purpose is * Improve code-reuse for gRPCCURL first initialization
1 parent bde341f commit 1ee5577

File tree

5 files changed

+80
-61
lines changed

5 files changed

+80
-61
lines changed

src/Curl.jl

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ const NOCHANNEL = NoChannel()
1212

1313

1414
Base.isopen(req::NoChannel) = false
15-
Base.isempty(req::NoChannel) = true
15+
Base.isempty(req::NoChannel) = true
1616
Base.put!(req::NoChannel, ::IOBuffer) = false
1717
Base.take!(req::NoChannel) = nothing
1818
Base.close(req::NoChannel) = false
19-
Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))
19+
Base.iterate(req::NoChannel) =
20+
Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))
2021

2122

2223
function write_callback(
@@ -197,8 +198,8 @@ mutable struct gRPCRequest
197198
response::IOBuffer
198199

199200
# These are only used when the request or response is streaming
200-
request_c::Union{Channel{IOBuffer}, NoChannel}
201-
response_c::Union{Channel{IOBuffer}, NoChannel}
201+
request_c::Union{Channel{IOBuffer},NoChannel}
202+
response_c::Union{Channel{IOBuffer},NoChannel}
202203

203204
# The task making the request can block on this until the request is complete
204205
ready::Event
@@ -231,13 +232,20 @@ mutable struct gRPCRequest
231232
url::String,
232233
request::IOBuffer,
233234
response::IOBuffer,
234-
request_c::Union{Channel{IOBuffer}, NoChannel},
235-
response_c::Union{Channel{IOBuffer}, NoChannel};
235+
request_c::Union{Channel{IOBuffer},NoChannel},
236+
response_c::Union{Channel{IOBuffer},NoChannel};
236237
deadline = 10,
237238
keepalive = 60,
238239
max_send_message_length = 4 * 1024 * 1024,
239240
max_recieve_message_length = 4 * 1024 * 1024,
240241
)
242+
!isready(grpc) && throw(
243+
gRPCServiceCallException(
244+
GRPC_FAILED_PRECONDITION,
245+
"gRPCCURL backend is not running, did you forget to call grpc_init()?",
246+
),
247+
)
248+
241249
# If the grpc handle is shutdown avoid acquiring the request semaphore and immediately throw an exception
242250
if !grpc.running
243251
throw(
@@ -259,8 +267,12 @@ mutable struct gRPCRequest
259267
# curl_easy_setopt(easy_handle, CURLOPT_VERBOSE, UInt32(1))
260268

261269
curl_easy_setopt(easy_handle, CURLOPT_URL, url)
262-
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000*deadline)))
263-
curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT_MS, Clong(ceil(1000*deadline)))
270+
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000 * deadline)))
271+
curl_easy_setopt(
272+
easy_handle,
273+
CURLOPT_CONNECTTIMEOUT_MS,
274+
Clong(ceil(1000 * deadline)),
275+
)
264276
curl_easy_setopt(easy_handle, CURLOPT_PIPEWAIT, Clong(1))
265277
curl_easy_setopt(easy_handle, CURLOPT_POST, Clong(1))
266278
curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, "POST")
@@ -702,30 +714,27 @@ mutable struct gRPCCURL
702714
running::Bool
703715
requests::Vector{gRPCRequest}
704716
# Allows for controlling the maximum number of concurrent gRPC requests/streams
705-
events::Channel{Event}
717+
sem::Channel{Event}
706718

707-
function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS)
719+
function gRPCCURL(; max_streams::Int = GRPC_MAX_STREAMS, running = true)
708720
grpc = new(
709721
Ptr{Cvoid}(0),
710722
ReentrantLock(),
711723
nothing,
712724
Dict{curl_socket_t,CURLWatcher}(),
713725
ReentrantLock(),
714-
true,
726+
running,
715727
Vector{gRPCRequest}(),
716728
Channel{Event}(max_streams),
717729
)
718730

719-
# We use a channel as a Semaphore which also acts as a way to reuse Events to reduce allocations
720-
for _ = 1:max_streams
721-
put!(grpc.events, Event())
722-
end
723-
724-
preserve_handle(grpc)
731+
finalizer((x) -> close(x), grpc)
725732

726-
grpc_multi_init(grpc)
733+
# This is used for the global const gRPCCURL handle
734+
# The user is expected to call grpc_init() in order to use it
735+
!running && return grpc
727736

728-
finalizer((x) -> close(x), grpc)
737+
open(grpc)
729738

730739
return grpc
731740
end
@@ -774,9 +783,9 @@ function Base.open(grpc::gRPCCURL)
774783
grpc.watchers = Dict{curl_socket_t,CURLWatcher}()
775784
end
776785

777-
grpc.events = Channel{Event}(grpc.events.sz_max)
778-
for _ = 1:grpc.events.sz_max
779-
put!(grpc.events, Event())
786+
grpc.sem = Channel{Event}(grpc.sem.sz_max)
787+
for _ = 1:grpc.sem.sz_max
788+
put!(grpc.sem, Event())
780789
end
781790

782791
grpc.requests = Vector{gRPCRequest}()
@@ -789,11 +798,13 @@ function Base.open(grpc::gRPCCURL)
789798
end
790799
end
791800

792-
max_reqs_dec(grpc::gRPCCURL) = take!(grpc.events)
801+
isready(grpc::gRPCCURL) = grpc.running
802+
803+
max_reqs_dec(grpc::gRPCCURL) = take!(grpc.sem)
793804
function max_reqs_inc(grpc::gRPCCURL, req::gRPCRequest)
794805
# Reset before we recycle
795806
reset(req.curl_done_reading)
796-
put!(grpc.events, req.curl_done_reading)
807+
put!(grpc.sem, req.curl_done_reading)
797808
end
798809

799810
function cleanup_request(grpc::gRPCCURL, req::gRPCRequest)

src/ProtoBuf.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte
5353
end
5454

5555
import_cb(io, ctx, definitions) =
56-
mapreduce(x->x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 &&
57-
println(io, "import gRPCClient")
56+
mapreduce(x -> x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 &&
57+
println(io, "import gRPCClient")
5858

5959

6060
grpc_register_service_codegen() = CodeGenerators.register_external_codegen_handler(

src/Streaming.jl

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ test_response = grpc_async_await(client, req)
163163
function grpc_async_request(
164164
client::gRPCServiceClient{TRequest,true,TResponse,false},
165165
request::Channel{TRequest};
166-
deadline=client.deadline,
167-
keepalive=client.keepalive,
168-
max_send_message_length=client.max_send_message_length,
169-
max_recieve_message_length=client.max_recieve_message_length,
166+
deadline = client.deadline,
167+
keepalive = client.keepalive,
168+
max_send_message_length = client.max_send_message_length,
169+
max_recieve_message_length = client.max_recieve_message_length,
170170
) where {TRequest<:Any,TResponse<:Any}
171171

172172
req = gRPCRequest(
@@ -176,10 +176,10 @@ function grpc_async_request(
176176
IOBuffer(),
177177
Channel{IOBuffer}(16),
178178
NOCHANNEL;
179-
deadline=deadline,
180-
keepalive=keepalive,
181-
max_send_message_length=max_send_message_length,
182-
max_recieve_message_length=max_recieve_message_length,
179+
deadline = deadline,
180+
keepalive = keepalive,
181+
max_send_message_length = max_send_message_length,
182+
max_recieve_message_length = max_recieve_message_length,
183183
)
184184

185185
request_task = Threads.@spawn grpc_async_stream_request(req, request)
@@ -249,10 +249,10 @@ function grpc_async_request(
249249
client::gRPCServiceClient{TRequest,false,TResponse,true},
250250
request::TRequest,
251251
response::Channel{TResponse};
252-
deadline=client.deadline,
253-
keepalive=client.keepalive,
254-
max_send_message_length=client.max_send_message_length,
255-
max_recieve_message_length=client.max_recieve_message_length
252+
deadline = client.deadline,
253+
keepalive = client.keepalive,
254+
max_send_message_length = client.max_send_message_length,
255+
max_recieve_message_length = client.max_recieve_message_length,
256256
) where {TRequest<:Any,TResponse<:Any}
257257

258258
request_buf = grpc_encode_request_iobuffer(
@@ -268,10 +268,10 @@ function grpc_async_request(
268268
IOBuffer(),
269269
NOCHANNEL,
270270
Channel{IOBuffer}(16);
271-
deadline=deadline,
272-
keepalive=keepalive,
273-
max_send_message_length=max_send_message_length,
274-
max_recieve_message_length=max_recieve_message_length,
271+
deadline = deadline,
272+
keepalive = keepalive,
273+
max_send_message_length = max_send_message_length,
274+
max_recieve_message_length = max_recieve_message_length,
275275
)
276276

277277
response_task = Threads.@spawn grpc_async_stream_response(req, response)
@@ -353,10 +353,10 @@ function grpc_async_request(
353353
client::gRPCServiceClient{TRequest,true,TResponse,true},
354354
request::Channel{TRequest},
355355
response::Channel{TResponse};
356-
deadline=client.deadline,
357-
keepalive=client.keepalive,
358-
max_send_message_length=client.max_send_message_length,
359-
max_recieve_message_length=client.max_recieve_message_length,
356+
deadline = client.deadline,
357+
keepalive = client.keepalive,
358+
max_send_message_length = client.max_send_message_length,
359+
max_recieve_message_length = client.max_recieve_message_length,
360360
) where {TRequest<:Any,TResponse<:Any}
361361

362362
req = gRPCRequest(
@@ -366,10 +366,10 @@ function grpc_async_request(
366366
IOBuffer(),
367367
Channel{IOBuffer}(16),
368368
Channel{IOBuffer}(16);
369-
deadline=deadline,
370-
keepalive=keepalive,
371-
max_send_message_length=max_send_message_length,
372-
max_recieve_message_length=max_recieve_message_length,
369+
deadline = deadline,
370+
keepalive = keepalive,
371+
max_send_message_length = max_send_message_length,
372+
max_recieve_message_length = max_recieve_message_length,
373373
)
374374

375375
request_task = Threads.@spawn grpc_async_stream_request(req, request)

src/Unary.jl

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ end
6060
"""
6161
function grpc_async_request(
6262
client::gRPCServiceClient{TRequest,false,TResponse,false},
63-
request::TRequest,
63+
request::TRequest;
64+
deadline = client.deadline,
65+
keepalive = client.keepalive,
66+
max_send_message_length = client.max_send_message_length,
67+
max_recieve_message_length = client.max_recieve_message_length,
6468
) where {TRequest<:Any,TResponse<:Any}
6569

6670
request_buf = grpc_encode_request_iobuffer(
@@ -76,10 +80,10 @@ function grpc_async_request(
7680
IOBuffer(),
7781
NOCHANNEL,
7882
NOCHANNEL;
79-
deadline = client.deadline,
80-
keepalive = client.keepalive,
81-
max_send_message_length = client.max_send_message_length,
82-
max_recieve_message_length = client.max_recieve_message_length,
83+
deadline = deadline,
84+
keepalive = keepalive,
85+
max_send_message_length = max_send_message_length,
86+
max_recieve_message_length = max_recieve_message_length,
8387
)
8488

8589
req
@@ -154,7 +158,11 @@ function grpc_async_request(
154158
client::gRPCServiceClient{TRequest,false,TResponse,false},
155159
request::TRequest,
156160
channel::Channel{gRPCAsyncChannelResponse{TResponse}},
157-
index::Int64,
161+
index::Int64;
162+
deadline = client.deadline,
163+
keepalive = client.keepalive,
164+
max_send_message_length = client.max_send_message_length,
165+
max_recieve_message_length = client.max_recieve_message_length,
158166
) where {TRequest<:Any,TResponse<:Any}
159167

160168
request_buf = grpc_encode_request_iobuffer(
@@ -170,10 +178,10 @@ function grpc_async_request(
170178
IOBuffer(),
171179
NOCHANNEL,
172180
NOCHANNEL;
173-
deadline = client.deadline,
174-
keepalive = client.keepalive,
175-
max_send_message_length = client.max_send_message_length,
176-
max_recieve_message_length = client.max_recieve_message_length,
181+
deadline = deadline,
182+
keepalive = keepalive,
183+
max_send_message_length = max_send_message_length,
184+
max_recieve_message_length = max_recieve_message_length,
177185
)
178186

179187
Threads.@spawn begin

src/gRPC.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const _grpc = gRPCCURL()
1+
const _grpc = gRPCCURL(running = false)
22

33
"""
44
grpc_global_handle()

0 commit comments

Comments
 (0)