1- using Mux, MemPool, Distributed, Sockets
1+ using Mux, Sockets
2+ import HTTP
3+ import HTTP: WebSockets
4+ using MemPool, Distributed
25
36struct LinePlot
47 core_key:: Symbol
@@ -172,22 +175,22 @@ function client_handler(sock, id, port, port_range, config_updated, config, seek
172175 if id != myid ()
173176 fsock_ready = Base. Event ()
174177 worker_host, worker_port = worker_host_port (id, port, port_range)
175- Mux . HTTP . WebSockets. open (" ws://$worker_host :$worker_port /data_feed" ) do _fsock
178+ WebSockets. open (" ws://$worker_host :$worker_port /data_feed" ) do _fsock
176179 fsock = _fsock
177180 @debug " D3R forwarder for $id ready"
178181 notify (fsock_ready)
179- while ! eof (fsock) && isopen (fsock)
182+ while true
180183 try
181- bytes = readavailable (fsock)
184+ bytes = WebSockets . receive (fsock)
182185 if length (bytes) == 0
183186 sleep (0.1 )
184187 continue
185188 end
186189 data = String (bytes)
187190 # @info "D3R forwarder for $id received data"
188- write (sock, data)
191+ WebSockets . send (sock, data)
189192 catch err
190- if err isa Mux . WebSockets. WebSocketClosedError || err isa Base . IOError
193+ if err isa WebSockets. WebSocketError && err. message isa WebSockets . CloseFrameBody
191194 # Force-close client and forwarder
192195 @async close (sock)
193196 @async close (fsock)
@@ -203,25 +206,25 @@ function client_handler(sock, id, port, port_range, config_updated, config, seek
203206 end
204207 if id == myid ()
205208 @debug " D3R client for $id sending initial config"
206- write (sock, JSON3. write ((;cmd= " data" , payload= sanitize (D3R_LOGS[port]))))
209+ WebSockets . send (sock, JSON3. write ((;cmd= " data" , payload= sanitize (D3R_LOGS[port]))))
207210 _workers = workers ()
208211 if ! (myid () in _workers)
209212 # FIXME : Get this from the Context
210213 _workers = vcat (myid (), _workers)
211214 end
212- write (sock, JSON3. write ((;cmd= " config" , payload= sanitize ((;myid= myid (),workers= _workers,ctxs= config)))))
215+ WebSockets . send (sock, JSON3. write ((;cmd= " config" , payload= sanitize ((;myid= myid (),workers= _workers,ctxs= config)))))
213216 end
214217 push! (get! (()-> [], D3R_CLIENT_SOCKETS[port], id), sock)
215218 @debug " D3R client for $id ready"
216- while ! eof (sock) && isopen (sock)
219+ while true
217220 try
218- data = String (read (sock))
221+ data = String (WebSockets . receive (sock))
219222 @debug " D3R client for $id received: $data "
220223 if id == myid ()
221224 #= FIXME
222225 if config_updated[]
223226 config_updated[] = false
224- write (sock, JSON3.write((;cmd="config", payload=sanitize(config))))
227+ WebSockets.send (sock, JSON3.write((;cmd="config", payload=sanitize(config))))
225228 end
226229 =#
227230 if seek_store != = nothing
@@ -231,7 +234,7 @@ function client_handler(sock, id, port, port_range, config_updated, config, seek
231234 for (idx,key) in enumerate (Tables. columnnames (raw_logs))
232235 logs[key] = Tables. columns (raw_logs)[idx]
233236 end
234- write (sock, JSON3. write ((;cmd= " data" , payload= sanitize (logs))))
237+ WebSockets . send (sock, JSON3. write ((;cmd= " data" , payload= sanitize (logs))))
235238 continue
236239 end
237240 m = match (r" seek\( ([0-9]*),([0-9]*)\) " , data)
@@ -242,19 +245,19 @@ function client_handler(sock, id, port, port_range, config_updated, config, seek
242245 for (idx,key) in enumerate (Tables. columnnames (raw_logs))
243246 logs[key] = Tables. columns (raw_logs)[idx]
244247 end
245- write (sock, JSON3. write ((;cmd= " data" , payload= sanitize (logs))))
248+ WebSockets . send (sock, JSON3. write ((;cmd= " data" , payload= sanitize (logs))))
246249 continue
247250 end
248251 end
249252 if data == " data"
250- write (sock, JSON3. write ((;cmd= " data" , payload= sanitize (D3R_LOGS[port]))))
253+ WebSockets . send (sock, JSON3. write ((;cmd= " data" , payload= sanitize (D3R_LOGS[port]))))
251254 end
252255 else
253256 @debug " D3R client sending to forwarder: $data "
254- write (fsock, data)
257+ WebSockets . send (fsock, data)
255258 end
256259 catch err
257- if err isa Mux . WebSockets. WebSocketClosedError || err isa Base . IOError
260+ if err isa WebSockets. WebSocketError && err. message isa WebSockets . CloseFrameBody
258261 idx = findfirst (x-> x== sock, D3R_CLIENT_SOCKETS[port][id])
259262 if idx != = nothing
260263 deleteat! (D3R_CLIENT_SOCKETS[port][id], idx)
273276function TimespanLogging. Events. creation_hook (d3r:: D3Renderer , log)
274277 for sock in get! (()-> [], get! (()-> Dict {Int,Vector{Any}} (), D3R_CLIENT_SOCKETS, d3r. port), myid ())
275278 try
276- if isopen (sock)
277- write (sock, JSON3. write ((;cmd= " add" , payload= sanitize (log))))
278- end
279+ WebSockets. send (sock, JSON3. write ((;cmd= " add" , payload= sanitize (log))))
279280 catch err
280- if err isa Mux . WebSockets. WebSocketClosedError
281+ if err isa WebSockets . WebSocketError && err . message isa WebSockets. CloseFrameBody
281282 continue
282283 end
283284 rethrow (err)
287288function TimespanLogging. Events. deletion_hook (d3r:: D3Renderer , idx)
288289 for sock in get! (()-> [], get! (()-> Dict {Int,Vector{Any}} (), D3R_CLIENT_SOCKETS, d3r. port), myid ())
289290 try
290- if isopen (sock)
291- write (sock, JSON3. write ((;cmd= " delete" , payload= idx)))
292- end
291+ WebSockets. send (sock, JSON3. write ((;cmd= " delete" , payload= idx)))
293292 catch err
294- if err isa Mux . WebSockets. WebSocketClosedError
293+ if err isa WebSockets . WebSocketError && err . message isa WebSockets. CloseFrameBody
295294 continue
296295 end
297296 rethrow (err)
0 commit comments