From 0855f96ea9baa974c7d7fe379b87cb695c4fdba3 Mon Sep 17 00:00:00 2001 From: Stefan Karpinski Date: Tue, 26 Oct 2021 15:55:58 -0400 Subject: [PATCH 1/4] rectify control of progress callback --- src/Curl/Easy.jl | 8 ++++---- src/Curl/Multi.jl | 1 - src/Downloads.jl | 7 +------ 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index 0dd1108..4c744a1 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -4,7 +4,7 @@ mutable struct Easy ready :: Threads.Event seeker :: Union{Function,Nothing} output :: Channel{Vector{UInt8}} - progress :: Channel{NTuple{4,Int}} + progress :: Function req_hdrs :: Ptr{curl_slist_t} res_hdrs :: Vector{String} code :: CURLcode @@ -13,14 +13,14 @@ end const EMPTY_BYTE_VECTOR = UInt8[] -function Easy() +function Easy(progress::Union{Function,Nothing}) easy = Easy( curl_easy_init(), EMPTY_BYTE_VECTOR, Threads.Event(), nothing, Channel{Vector{UInt8}}(Inf), - Channel{NTuple{4,Int}}(Inf), + something(progress, (_, _, _, _) -> nothing), C_NULL, String[], typemax(CURLcode), @@ -372,7 +372,7 @@ function progress_callback( ul_now :: curl_off_t, )::Cint easy = unsafe_pointer_to_objref(easy_p)::Easy - put!(easy.progress, (dl_total, dl_now, ul_total, ul_now)) + easy.progress(dl_total, dl_now, ul_total, ul_now) return 0 end diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c56b94a..c6540c4 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -85,7 +85,6 @@ function check_multi_info(multi::Multi) easy = unsafe_pointer_to_objref(easy_p_ref[])::Easy @assert easy_handle == easy.handle easy.code = message.code - close(easy.progress) close(easy.output) easy.input = nothing notify(easy.ready) diff --git a/src/Downloads.jl b/src/Downloads.jl index b56561d..a5b2ea9 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -308,7 +308,7 @@ function request( progress = p_func(progress, input, output) arg_read(input) do input arg_write(output) do output - with_handle(Easy()) do easy + with_handle(Easy(progress)) do easy # setup the request set_url(easy, url) set_timeout(easy, timeout) @@ -348,11 +348,6 @@ function request( @async for buf in easy.output write(output, buf) end - if progress !== nothing - @async for prog in easy.progress - progress(prog...) - end - end if have_input @async upload_data(easy, input) end From 92d1f7826c31ee01b735b293d155bc7fa3a9ae2d Mon Sep 17 00:00:00 2001 From: Stefan Karpinski Date: Tue, 26 Oct 2021 16:24:07 -0400 Subject: [PATCH 2/4] rectify control of write callback --- src/Curl/Easy.jl | 20 ++++++++++---------- src/Curl/Multi.jl | 1 - src/Downloads.jl | 6 ++---- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index 4c744a1..bab801b 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -3,7 +3,7 @@ mutable struct Easy input :: Union{Vector{UInt8},Nothing} ready :: Threads.Event seeker :: Union{Function,Nothing} - output :: Channel{Vector{UInt8}} + output :: IO progress :: Function req_hdrs :: Ptr{curl_slist_t} res_hdrs :: Vector{String} @@ -13,13 +13,16 @@ end const EMPTY_BYTE_VECTOR = UInt8[] -function Easy(progress::Union{Function,Nothing}) +function Easy( + output::IO, + progress::Union{Function,Nothing}, +) easy = Easy( curl_easy_init(), EMPTY_BYTE_VECTOR, Threads.Event(), nothing, - Channel{Vector{UInt8}}(Inf), + output, something(progress, (_, _, _, _) -> nothing), C_NULL, String[], @@ -351,17 +354,14 @@ function seek_callback( end function write_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - n = size * count - buf = Array{UInt8}(undef, n) - ccall(:memcpy, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), buf, data, n) - put!(easy.output, buf) - return n + unsafe_write(easy.output, data, size*count) + return size*count end function progress_callback( @@ -393,7 +393,7 @@ function add_callbacks(easy::Easy) # set write callback write_cb = @cfunction(write_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_WRITEFUNCTION, write_cb) setopt(easy, CURLOPT_WRITEDATA, easy_p) diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c6540c4..c076ffe 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -85,7 +85,6 @@ function check_multi_info(multi::Multi) easy = unsafe_pointer_to_objref(easy_p_ref[])::Easy @assert easy_handle == easy.handle easy.code = message.code - close(easy.output) easy.input = nothing notify(easy.ready) else diff --git a/src/Downloads.jl b/src/Downloads.jl index a5b2ea9..4d427f3 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -308,7 +308,7 @@ function request( progress = p_func(progress, input, output) arg_read(input) do input arg_write(output) do output - with_handle(Easy(progress)) do easy + with_handle(Easy(output, progress)) do easy # setup the request set_url(easy, url) set_timeout(easy, timeout) @@ -345,12 +345,10 @@ function request( add_handle(downloader.multi, easy) try # ensure handle is removed @sync begin - @async for buf in easy.output - write(output, buf) - end if have_input @async upload_data(easy, input) end + @async wait(easy.ready) end finally remove_handle(downloader.multi, easy) From 2958934b23a387517cfb8423b291b2651ca4c4e7 Mon Sep 17 00:00:00 2001 From: Stefan Karpinski Date: Tue, 26 Oct 2021 17:08:33 -0400 Subject: [PATCH 3/4] rectify control of read callback --- src/Curl/Easy.jl | 51 +++++++++++++---------------------------------- src/Curl/Multi.jl | 1 - src/Downloads.jl | 10 ++-------- 3 files changed, 16 insertions(+), 46 deletions(-) diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index bab801b..8128c24 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -1,6 +1,6 @@ mutable struct Easy handle :: Ptr{Cvoid} - input :: Union{Vector{UInt8},Nothing} + input :: IO ready :: Threads.Event seeker :: Union{Function,Nothing} output :: IO @@ -11,15 +11,14 @@ mutable struct Easy errbuf :: Vector{UInt8} end -const EMPTY_BYTE_VECTOR = UInt8[] - function Easy( - output::IO, - progress::Union{Function,Nothing}, + input :: IO, + output :: IO, + progress :: Union{Function,Nothing}, ) easy = Easy( curl_easy_init(), - EMPTY_BYTE_VECTOR, + input, Threads.Event(), nothing, output, @@ -287,50 +286,28 @@ end # callbacks function header_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - n = size * count + n = size*count hdr = unsafe_string(data, n) push!(easy.res_hdrs, hdr) return n end -# feed data to read_callback -function upload_data(easy::Easy, input::IO) - while true - data = eof(input) ? nothing : readavailable(input) - easy.input === nothing && break - easy.input = data - curl_easy_pause(easy.handle, Curl.CURLPAUSE_CONT) - wait(easy.ready) - easy.input === nothing && break - easy.ready = Threads.Event() - end -end - function read_callback( - data :: Ptr{Cchar}, + data :: Ptr{UInt8}, size :: Csize_t, count :: Csize_t, easy_p :: Ptr{Cvoid}, )::Csize_t easy = unsafe_pointer_to_objref(easy_p)::Easy - buf = easy.input - if buf === nothing - notify(easy.ready) - return 0 # done uploading - end - if isempty(buf) - notify(easy.ready) - return CURL_READFUNC_PAUSE # wait for more data - end - n = min(size * count, length(buf)) - ccall(:memcpy, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), data, buf, n) - deleteat!(buf, 1:n) + eof(easy.input) && return 0 + buf = unsafe_wrap(Vector{UInt8}, data, size*count) + n = readbytes!(easy.input, buf, size*count) return n end @@ -347,7 +324,7 @@ function seek_callback( easy.seeker === nothing && return CURL_SEEKFUNC_CANTSEEK try easy.seeker(offset) catch err - @async @error("seek_callback: seeker failed", err) + @async @error("seek_callback: seek failed", err) return CURL_SEEKFUNC_FAIL end return CURL_SEEKFUNC_OK @@ -387,7 +364,7 @@ function add_callbacks(easy::Easy) # set header callback header_cb = @cfunction(header_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_HEADERFUNCTION, header_cb) setopt(easy, CURLOPT_HEADERDATA, easy_p) @@ -410,7 +387,7 @@ function add_upload_callbacks(easy::Easy) # set read callback read_cb = @cfunction(read_callback, - Csize_t, (Ptr{Cchar}, Csize_t, Csize_t, Ptr{Cvoid})) + Csize_t, (Ptr{UInt8}, Csize_t, Csize_t, Ptr{Cvoid})) setopt(easy, CURLOPT_READFUNCTION, read_cb) setopt(easy, CURLOPT_READDATA, easy_p) end diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index c076ffe..6b9d2aa 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -85,7 +85,6 @@ function check_multi_info(multi::Multi) easy = unsafe_pointer_to_objref(easy_p_ref[])::Easy @assert easy_handle == easy.handle easy.code = message.code - easy.input = nothing notify(easy.ready) else @async @error("curl_multi_info_read: unknown message", message) diff --git a/src/Downloads.jl b/src/Downloads.jl index 4d427f3..d71e550 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -308,7 +308,7 @@ function request( progress = p_func(progress, input, output) arg_read(input) do input arg_write(output) do output - with_handle(Easy(output, progress)) do easy + with_handle(Easy(input, output, progress)) do easy # setup the request set_url(easy, url) set_timeout(easy, timeout) @@ -343,13 +343,7 @@ function request( # do the request add_handle(downloader.multi, easy) - try # ensure handle is removed - @sync begin - if have_input - @async upload_data(easy, input) - end - @async wait(easy.ready) - end + try wait(easy.ready) # can this throw? finally remove_handle(downloader.multi, easy) end From 2af290f1dd4294869e54973de30c5b6b3d431eeb Mon Sep 17 00:00:00 2001 From: Stefan Karpinski Date: Tue, 26 Oct 2021 17:14:09 -0400 Subject: [PATCH 4/4] rename easy.{ready => done} Also eliminate try/catch around `wait(easy.done)` since it can't throw. --- src/Curl/Easy.jl | 2 +- src/Curl/Multi.jl | 2 +- src/Downloads.jl | 6 ++---- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/Curl/Easy.jl b/src/Curl/Easy.jl index 8128c24..8ad57fa 100644 --- a/src/Curl/Easy.jl +++ b/src/Curl/Easy.jl @@ -1,7 +1,7 @@ mutable struct Easy handle :: Ptr{Cvoid} input :: IO - ready :: Threads.Event + done :: Threads.Event seeker :: Union{Function,Nothing} output :: IO progress :: Function diff --git a/src/Curl/Multi.jl b/src/Curl/Multi.jl index 6b9d2aa..bc74955 100644 --- a/src/Curl/Multi.jl +++ b/src/Curl/Multi.jl @@ -85,7 +85,7 @@ function check_multi_info(multi::Multi) easy = unsafe_pointer_to_objref(easy_p_ref[])::Easy @assert easy_handle == easy.handle easy.code = message.code - notify(easy.ready) + notify(easy.done) else @async @error("curl_multi_info_read: unknown message", message) end diff --git a/src/Downloads.jl b/src/Downloads.jl index d71e550..f6ec990 100644 --- a/src/Downloads.jl +++ b/src/Downloads.jl @@ -343,10 +343,8 @@ function request( # do the request add_handle(downloader.multi, easy) - try wait(easy.ready) # can this throw? - finally - remove_handle(downloader.multi, easy) - end + wait(easy.done) + remove_handle(downloader.multi, easy) # return the response or throw an error response = Response(get_response_info(easy)...)