diff --git a/src/Curl.jl b/src/Curl.jl index 9033ed0..3dcfb01 100644 --- a/src/Curl.jl +++ b/src/Curl.jl @@ -12,11 +12,12 @@ const NOCHANNEL = NoChannel() Base.isopen(req::NoChannel) = false -Base.isempty(req::NoChannel) = true +Base.isempty(req::NoChannel) = true Base.put!(req::NoChannel, ::IOBuffer) = false Base.take!(req::NoChannel) = nothing Base.close(req::NoChannel) = false -Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0))) +Base.iterate(req::NoChannel) = + Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0))) function write_callback( @@ -197,8 +198,8 @@ mutable struct gRPCRequest response::IOBuffer # These are only used when the request or response is streaming - request_c::Union{Channel{IOBuffer}, NoChannel} - response_c::Union{Channel{IOBuffer}, NoChannel} + request_c::Union{Channel{IOBuffer},NoChannel} + response_c::Union{Channel{IOBuffer},NoChannel} # The task making the request can block on this until the request is complete ready::Event @@ -231,13 +232,20 @@ mutable struct gRPCRequest url::String, request::IOBuffer, response::IOBuffer, - request_c::Union{Channel{IOBuffer}, NoChannel}, - response_c::Union{Channel{IOBuffer}, NoChannel}; + request_c::Union{Channel{IOBuffer},NoChannel}, + response_c::Union{Channel{IOBuffer},NoChannel}; deadline = 10, keepalive = 60, max_send_message_length = 4 * 1024 * 1024, max_recieve_message_length = 4 * 1024 * 1024, ) + !isready(grpc) && throw( + gRPCServiceCallException( + GRPC_FAILED_PRECONDITION, + "gRPCCURL backend is not running, did you forget to call grpc_init()?", + ), + ) + # If the grpc handle is shutdown avoid acquiring the request semaphore and immediately throw an exception if !grpc.running throw( @@ -259,8 +267,12 @@ mutable struct gRPCRequest # curl_easy_setopt(easy_handle, CURLOPT_VERBOSE, UInt32(1)) curl_easy_setopt(easy_handle, CURLOPT_URL, url) - curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000*deadline))) - curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT_MS, Clong(ceil(1000*deadline))) + curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000 * deadline))) + curl_easy_setopt( + easy_handle, + CURLOPT_CONNECTTIMEOUT_MS, + Clong(ceil(1000 * deadline)), + ) curl_easy_setopt(easy_handle, CURLOPT_PIPEWAIT, Clong(1)) curl_easy_setopt(easy_handle, CURLOPT_POST, Clong(1)) curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, "POST") @@ -702,30 +714,27 @@ mutable struct gRPCCURL running::Bool requests::Vector{gRPCRequest} # Allows for controlling the maximum number of concurrent gRPC requests/streams - events::Channel{Event} + sem::Channel{Event} - function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS) + function gRPCCURL(; max_streams::Int = GRPC_MAX_STREAMS, running = true) grpc = new( Ptr{Cvoid}(0), ReentrantLock(), nothing, Dict{curl_socket_t,CURLWatcher}(), ReentrantLock(), - true, + running, Vector{gRPCRequest}(), Channel{Event}(max_streams), ) - # We use a channel as a Semaphore which also acts as a way to reuse Events to reduce allocations - for _ = 1:max_streams - put!(grpc.events, Event()) - end - - preserve_handle(grpc) + finalizer((x) -> close(x), grpc) - grpc_multi_init(grpc) + # This is used for the global const gRPCCURL handle + # The user is expected to call grpc_init() in order to use it + !running && return grpc - finalizer((x) -> close(x), grpc) + open(grpc) return grpc end @@ -774,9 +783,9 @@ function Base.open(grpc::gRPCCURL) grpc.watchers = Dict{curl_socket_t,CURLWatcher}() end - grpc.events = Channel{Event}(grpc.events.sz_max) - for _ = 1:grpc.events.sz_max - put!(grpc.events, Event()) + grpc.sem = Channel{Event}(grpc.sem.sz_max) + for _ = 1:grpc.sem.sz_max + put!(grpc.sem, Event()) end grpc.requests = Vector{gRPCRequest}() @@ -789,11 +798,13 @@ function Base.open(grpc::gRPCCURL) end end -max_reqs_dec(grpc::gRPCCURL) = take!(grpc.events) +isready(grpc::gRPCCURL) = grpc.running + +max_reqs_dec(grpc::gRPCCURL) = take!(grpc.sem) function max_reqs_inc(grpc::gRPCCURL, req::gRPCRequest) # Reset before we recycle reset(req.curl_done_reading) - put!(grpc.events, req.curl_done_reading) + put!(grpc.sem, req.curl_done_reading) end function cleanup_request(grpc::gRPCCURL, req::gRPCRequest) diff --git a/src/ProtoBuf.jl b/src/ProtoBuf.jl index ee84a07..9a1f935 100644 --- a/src/ProtoBuf.jl +++ b/src/ProtoBuf.jl @@ -53,8 +53,8 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte end import_cb(io, ctx, definitions) = - mapreduce(x->x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 && - println(io, "import gRPCClient") + mapreduce(x -> x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 && + println(io, "import gRPCClient") grpc_register_service_codegen() = CodeGenerators.register_external_codegen_handler( diff --git a/src/Streaming.jl b/src/Streaming.jl index 02779cf..ebe91ff 100644 --- a/src/Streaming.jl +++ b/src/Streaming.jl @@ -163,10 +163,10 @@ test_response = grpc_async_await(client, req) function grpc_async_request( client::gRPCServiceClient{TRequest,true,TResponse,false}, request::Channel{TRequest}; - deadline=client.deadline, - keepalive=client.keepalive, - max_send_message_length=client.max_send_message_length, - max_recieve_message_length=client.max_recieve_message_length, + deadline = client.deadline, + keepalive = client.keepalive, + max_send_message_length = client.max_send_message_length, + max_recieve_message_length = client.max_recieve_message_length, ) where {TRequest<:Any,TResponse<:Any} req = gRPCRequest( @@ -176,10 +176,10 @@ function grpc_async_request( IOBuffer(), Channel{IOBuffer}(16), NOCHANNEL; - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) request_task = Threads.@spawn grpc_async_stream_request(req, request) @@ -249,10 +249,10 @@ function grpc_async_request( client::gRPCServiceClient{TRequest,false,TResponse,true}, request::TRequest, response::Channel{TResponse}; - deadline=client.deadline, - keepalive=client.keepalive, - max_send_message_length=client.max_send_message_length, - max_recieve_message_length=client.max_recieve_message_length + deadline = client.deadline, + keepalive = client.keepalive, + max_send_message_length = client.max_send_message_length, + max_recieve_message_length = client.max_recieve_message_length, ) where {TRequest<:Any,TResponse<:Any} request_buf = grpc_encode_request_iobuffer( @@ -268,10 +268,10 @@ function grpc_async_request( IOBuffer(), NOCHANNEL, Channel{IOBuffer}(16); - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) response_task = Threads.@spawn grpc_async_stream_response(req, response) @@ -353,10 +353,10 @@ function grpc_async_request( client::gRPCServiceClient{TRequest,true,TResponse,true}, request::Channel{TRequest}, response::Channel{TResponse}; - deadline=client.deadline, - keepalive=client.keepalive, - max_send_message_length=client.max_send_message_length, - max_recieve_message_length=client.max_recieve_message_length, + deadline = client.deadline, + keepalive = client.keepalive, + max_send_message_length = client.max_send_message_length, + max_recieve_message_length = client.max_recieve_message_length, ) where {TRequest<:Any,TResponse<:Any} req = gRPCRequest( @@ -366,10 +366,10 @@ function grpc_async_request( IOBuffer(), Channel{IOBuffer}(16), Channel{IOBuffer}(16); - deadline=deadline, - keepalive=keepalive, - max_send_message_length=max_send_message_length, - max_recieve_message_length=max_recieve_message_length, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) request_task = Threads.@spawn grpc_async_stream_request(req, request) diff --git a/src/Unary.jl b/src/Unary.jl index 936ad9c..7499c39 100644 --- a/src/Unary.jl +++ b/src/Unary.jl @@ -60,7 +60,11 @@ end """ function grpc_async_request( client::gRPCServiceClient{TRequest,false,TResponse,false}, - request::TRequest, + request::TRequest; + deadline = client.deadline, + keepalive = client.keepalive, + max_send_message_length = client.max_send_message_length, + max_recieve_message_length = client.max_recieve_message_length, ) where {TRequest<:Any,TResponse<:Any} request_buf = grpc_encode_request_iobuffer( @@ -76,10 +80,10 @@ function grpc_async_request( IOBuffer(), NOCHANNEL, NOCHANNEL; - deadline = client.deadline, - keepalive = client.keepalive, - max_send_message_length = client.max_send_message_length, - max_recieve_message_length = client.max_recieve_message_length, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) req @@ -154,7 +158,11 @@ function grpc_async_request( client::gRPCServiceClient{TRequest,false,TResponse,false}, request::TRequest, channel::Channel{gRPCAsyncChannelResponse{TResponse}}, - index::Int64, + index::Int64; + deadline = client.deadline, + keepalive = client.keepalive, + max_send_message_length = client.max_send_message_length, + max_recieve_message_length = client.max_recieve_message_length, ) where {TRequest<:Any,TResponse<:Any} request_buf = grpc_encode_request_iobuffer( @@ -170,10 +178,10 @@ function grpc_async_request( IOBuffer(), NOCHANNEL, NOCHANNEL; - deadline = client.deadline, - keepalive = client.keepalive, - max_send_message_length = client.max_send_message_length, - max_recieve_message_length = client.max_recieve_message_length, + deadline = deadline, + keepalive = keepalive, + max_send_message_length = max_send_message_length, + max_recieve_message_length = max_recieve_message_length, ) Threads.@spawn begin diff --git a/src/gRPC.jl b/src/gRPC.jl index 17ae345..47ab994 100644 --- a/src/gRPC.jl +++ b/src/gRPC.jl @@ -1,4 +1,4 @@ -const _grpc = gRPCCURL() +const _grpc = gRPCCURL(running = false) """ grpc_global_handle()