11mutable struct Multi
22 lock :: ReentrantLock
33 handle :: Ptr{Cvoid}
4- timer :: Ptr{Cvoid}
4+ timer :: Timer
55 easies :: Vector{Easy}
66 grace :: UInt64
77
88 function Multi (grace:: Integer = typemax (UInt64))
9- timer = jl_malloc (Base. _sizeof_uv_timer)
10- uv_timer_init (timer)
11- multi = new (ReentrantLock (), C_NULL , timer, Easy[], grace)
9+ multi = new (ReentrantLock (), C_NULL , Timer (0 ), Easy[], grace)
1210 finalizer (multi) do multi
13- uv_timer_stop (multi. timer)
14- uv_close (multi. timer, cglobal (:jl_free ))
11+ close (multi. timer)
1512 done! (multi)
1613 end
1714 end
3229
3330# adding & removing easy handles
3431
35- function cleanup_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
36- # # TODO : use a member access API
37- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
38- multi = unsafe_pointer_to_objref (multi_p):: Multi
39- done! (multi)
40- return
41- end
42-
4332function add_handle (multi:: Multi , easy:: Easy )
4433 lock (multi. lock) do
4534 if isempty (multi. easies)
4635 preserve_handle (multi)
47- uv_timer_stop (multi. timer) # stop grace timer
36+ close (multi. timer) # stop grace timer
4837 end
4938 push! (multi. easies, easy)
5039 init! (multi)
@@ -57,11 +46,14 @@ function remove_handle(multi::Multi, easy::Easy)
5746 @check curl_multi_remove_handle (multi. handle, easy. handle)
5847 deleteat! (multi. easies, findlast (== (easy), multi. easies):: Int )
5948 ! isempty (multi. easies) && return
60- cleanup_cb = @cfunction (cleanup_callback, Cvoid, (Ptr{Cvoid},))
6149 if multi. grace <= 0
6250 done! (multi)
6351 elseif 0 < multi. grace < typemax (multi. grace)
64- uv_timer_start (multi. timer, cleanup_cb, multi. grace, 0 )
52+ multi. timer = Timer (multi. grace/ 1000 )
53+ @async begin
54+ wait (multi. timer)
55+ isopen (multi. timer) && done! (multi)
56+ end
6557 end
6658 unpreserve_handle (multi)
6759 end
@@ -73,15 +65,14 @@ function set_defaults(multi::Multi)
7365 # currently no defaults
7466end
7567
76- # libuv callbacks
68+ # multi-socket handle state updates
7769
7870struct CURLMsg
7971 msg :: CURLMSG
8072 easy :: Ptr{Cvoid}
8173 code :: CURLcode
8274end
8375
84- # should already be locked
8576function check_multi_info (multi:: Multi )
8677 while true
8778 p = curl_multi_info_read (multi. handle, Ref {Cint} ())
@@ -104,37 +95,15 @@ function check_multi_info(multi::Multi)
10495 end
10596end
10697
107- function event_callback (
108- uv_poll_p :: Ptr{Cvoid} ,
109- status :: Cint ,
110- events :: Cint ,
111- ):: Cvoid
112- # # TODO : use a member access API
113- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_poll_p))
114- multi = unsafe_pointer_to_objref (multi_p):: Multi
115- sock_p = uv_poll_p + Base. _sizeof_uv_poll
116- sock = unsafe_load (convert (Ptr{curl_socket_t}, sock_p))
117- flags = 0
118- events & UV_READABLE != 0 && (flags |= CURL_CSELECT_IN)
119- events & UV_WRITABLE != 0 && (flags |= CURL_CSELECT_OUT)
120- lock (multi. lock) do
121- @check curl_multi_socket_action (multi. handle, sock, flags)
122- check_multi_info (multi)
123- end
124- end
98+ # curl callbacks
12599
126- function timeout_callback (uv_timer_p:: Ptr{Cvoid} ):: Cvoid
127- # # TODO : use a member access API
128- multi_p = unsafe_load (convert (Ptr{Ptr{Cvoid}}, uv_timer_p))
129- multi = unsafe_pointer_to_objref (multi_p):: Multi
100+ function do_multi (multi:: Multi )
130101 lock (multi. lock) do
131102 @check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
132103 check_multi_info (multi)
133104 end
134105end
135106
136- # curl callbacks
137-
138107function timer_callback (
139108 multi_h :: Ptr{Cvoid} ,
140109 timeout_ms :: Clong ,
@@ -143,15 +112,13 @@ function timer_callback(
143112 multi = unsafe_pointer_to_objref (multi_p):: Multi
144113 @assert multi_h == multi. handle
145114 if timeout_ms == 0
146- lock (multi. lock) do
147- @check curl_multi_socket_action (multi. handle, CURL_SOCKET_TIMEOUT, 0 )
148- check_multi_info (multi)
149- end
115+ do_multi (multi)
150116 elseif timeout_ms >= 0
151- timeout_cb = @cfunction (timeout_callback, Cvoid, (Ptr{Cvoid},))
152- uv_timer_start (multi. timer, timeout_cb, max (1 , timeout_ms), 0 )
117+ multi. timer = Timer (timeout_ms/ 1000 ) do timer
118+ do_multi (multi)
119+ end
153120 elseif timeout_ms == - 1
154- uv_timer_stop (multi. timer)
121+ close (multi. timer)
155122 else
156123 @async @error (" timer_callback: invalid timeout value" , timeout_ms)
157124 return - 1
@@ -164,46 +131,47 @@ function socket_callback(
164131 sock :: curl_socket_t ,
165132 action :: Cint ,
166133 multi_p :: Ptr{Cvoid} ,
167- uv_poll_p :: Ptr{Cvoid} ,
134+ watcher_p :: Ptr{Cvoid} ,
168135):: Cint
136+ if action ∉ (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT, CURL_POLL_REMOVE)
137+ @async @error (" socket_callback: unexpected action" , action)
138+ return - 1
139+ end
169140 multi = unsafe_pointer_to_objref (multi_p):: Multi
141+ if watcher_p != C_NULL
142+ old_watcher = unsafe_pointer_to_objref (watcher_p):: FDWatcher
143+ @check curl_multi_assign (multi. handle, sock, C_NULL )
144+ unpreserve_handle (old_watcher)
145+ end
170146 if action in (CURL_POLL_IN, CURL_POLL_OUT, CURL_POLL_INOUT)
171- if uv_poll_p == C_NULL
172- uv_poll_p = uv_poll_alloc ()
173- uv_poll_init (uv_poll_p, sock)
174- # # TODO : use a member access API
175- unsafe_store! (convert (Ptr{Ptr{Cvoid}}, uv_poll_p), multi_p)
176- sock_p = uv_poll_p + Base. _sizeof_uv_poll
177- unsafe_store! (convert (Ptr{curl_socket_t}, sock_p), sock)
178- lock (multi. lock) do
179- @check curl_multi_assign (multi. handle, sock, uv_poll_p)
147+ readable = action in (CURL_POLL_IN, CURL_POLL_INOUT)
148+ writable = action in (CURL_POLL_OUT, CURL_POLL_INOUT)
149+ watcher = FDWatcher (OS_HANDLE (sock), readable, writable)
150+ preserve_handle (watcher)
151+ watcher_p = pointer_from_objref (watcher)
152+ @check curl_multi_assign (multi. handle, sock, watcher_p)
153+ task = @async while true
154+ events = try wait (watcher)
155+ catch err
156+ err isa EOFError && break
157+ rethrow ()
180158 end
181- end
182- events = 0
183- action != CURL_POLL_IN && (events |= UV_WRITABLE)
184- action != CURL_POLL_OUT && (events |= UV_READABLE)
185- event_cb = @cfunction (event_callback, Cvoid, (Ptr{Cvoid}, Cint, Cint))
186- uv_poll_start (uv_poll_p, events, event_cb)
187- elseif action == CURL_POLL_REMOVE
188- if uv_poll_p != C_NULL
189- uv_poll_stop (uv_poll_p)
190- uv_close (uv_poll_p, cglobal (:jl_free ))
159+ flags = CURL_CSELECT_IN * isreadable (events) +
160+ CURL_CSELECT_OUT * iswritable (events) +
161+ CURL_CSELECT_ERR * events. disconnect
191162 lock (multi. lock) do
192- @check curl_multi_assign (multi. handle, sock, C_NULL )
163+ @check curl_multi_socket_action (multi. handle, sock, flags)
164+ check_multi_info (multi)
193165 end
194166 end
195- else
196- @async @error (" socket_callback: unexpected action" , action)
197- return - 1
167+ @isdefined (errormonitor) && errormonitor (task)
198168 end
169+ @isdefined (old_watcher) && close (old_watcher)
199170 return 0
200171end
201172
202173function add_callbacks (multi:: Multi )
203- # stash multi handle pointer in timer
204174 multi_p = pointer_from_objref (multi)
205- # # TODO : use a member access API
206- unsafe_store! (convert (Ptr{Ptr{Cvoid}}, multi. timer), multi_p)
207175
208176 # set timer callback
209177 timer_cb = @cfunction (timer_callback, Cint, (Ptr{Cvoid}, Clong, Ptr{Cvoid}))
0 commit comments