Skip to content

Commit d69782e

Browse files
committed
update logging, use constructor instead of convert
- use Julia logging - use constructors where possible instead of defining / relying on `convert` methods
1 parent 8e69a43 commit d69782e

File tree

7 files changed

+70
-86
lines changed

7 files changed

+70
-86
lines changed

src/AMQPClient.jl

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,6 @@ import Base: write, read, read!, close, convert, show, isopen
44

55
using Sockets
66

7-
const DEBUG = false
8-
9-
# 0.7: use builtin logging by enabling following statement
10-
# using Logging; Logging.global_logger(Logging.ConsoleLogger(stderr, Logging.Debug))
11-
macro debug(s)
12-
esc(:(DEBUG && Base.@debug($s)))
13-
end
14-
157
# Client property info that gets sent to the server on connection startup
168
const CLIENT_IDENTIFICATION = Dict{String,Any}(
179
"product" => "Julia AMQPClient",

src/auth.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
function auth_resp_amqplain(auth_params::Dict{String,Any})
22
params = Dict{String,Any}("LOGIN" => auth_params["LOGIN"], "PASSWORD" => auth_params["PASSWORD"])
33
iob = IOBuffer()
4-
write(iob, convert(TAMQPFieldTable, params))
4+
write(iob, TAMQPFieldTable(params))
55
bytes = take!(iob)
66
skipbytes = sizeof(fieldtype(TAMQPFieldTable, :len))
77
bytes = bytes[(skipbytes+1):end]
8-
convert(TAMQPLongStr, bytes)
8+
TAMQPLongStr(bytes)
99
end
1010

1111
const AUTH_PROVIDERS = Dict{String,Function}("AMQPLAIN" => auth_resp_amqplain)

src/convert.jl

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,8 @@
1-
convert(::Type{Any}, s::T) where {T<:Union{TAMQPShortStr,TAMQPLongStr,TAMQPByteArray}} = convert(String, s)
2-
convert(::Type{String}, s::T) where {T<:Union{TAMQPShortStr,TAMQPLongStr,TAMQPByteArray}} = String(copy(convert(Vector{UInt8}, s.data)))
3-
convert(::Type{T}, s::AbstractString) where {T<:Union{TAMQPShortStr,TAMQPLongStr,TAMQPByteArray}} = T(length(s), Vector{UInt8}(codeunits(String(s))))
4-
convert(::Type{TAMQPLongStr}, d::Vector{UInt8}) = TAMQPLongStr(length(d), d)
5-
convert(::Type{TAMQPByteArray}, d::Vector{UInt8}) = TAMQPByteArray(length(d), d)
6-
7-
convert(::Type{TAMQPFieldValue{T}}, v::T) where {T} = TAMQPFieldValue{T}(FieldIndicatorMap[T], v)
8-
9-
as_fval(v::T) where {T} = convert(TAMQPFieldValue{T}, v)
10-
as_fval(v::Dict{String,Any}) = convert(TAMQPFieldValue{TAMQPFieldTable}, convert(TAMQPFieldTable, v))
11-
as_fval(v::String) = convert(TAMQPFieldValue{TAMQPLongStr}, convert(TAMQPLongStr, v))
12-
13-
convert(::Type{Any}, t::TAMQPFieldTable) = convert(Dict{Any,Any}, t)
14-
convert(::Type{Dict{K,V}}, t::TAMQPFieldTable) where {K, V} = Dict{K,V}(f.name => f.val for f in t.data)
15-
convert(::Type{Dict{String, String}}, t::TAMQPFieldTable) = Dict{String, String}(String(copy(f.name.data)) => String(copy(f.val.fld.data)) for f in t.data)
16-
function convert(::Type{TAMQPFieldTable}, d::Dict{String,Any})
17-
data = TAMQPFieldValuePair[]
18-
for (n,v) in d
19-
push!(data, TAMQPFieldValuePair(convert(TAMQPShortStr,n), as_fval(v)))
20-
end
21-
TAMQPFieldTable(length(data), data)
22-
end
23-
24-
convert(::Type{Any}, t::TAMQPFieldArray) = convert(Vector, t)
25-
convert(::Type{Vector{T}}, t::TAMQPFieldArray) where {T} = convert(Vector{T}, t.data)
26-
1+
convert(::Type{String}, s::T) where {T<:Union{TAMQPShortStr,TAMQPLongStr,TAMQPByteArray}} = String(copy(s.data))
272
convert(::Type{Bool}, b::TAMQPBit) = Bool(b.val & 0x1)
28-
convert(::Type{TAMQPBit}, b::Bool) = TAMQPBit(convert(UInt8, b))
29-
convert(::Type{TAMQPBit}, b::T) where {T<:Integer} = convert(TAMQPBit, Bool(b))
3+
4+
simplify(val::T) where {T <: Union{TAMQPShortStr,TAMQPLongStr,TAMQPByteArray}} = String(copy(val.data))
5+
simplify(val::TAMQPFieldArray) = [simplify(elem) for elem in val.data]
6+
simplify(table::TAMQPFieldTable) = Dict{String,Any}(simplify(f.name)=>simplify(f.val) for f in table.data)
7+
simplify(val::TAMQPFieldValue) = simplify(val.fld)
8+
simplify(x) = x

src/message.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ function set_properties(msg::Message; kwargs...)
5252
if v === nothing
5353
delete!(msg.properties, k)
5454
else
55-
msg.properties[k] = convert(PROPERTIES[k].typ, v)
55+
# all possible property types have constructors that can be used to create them
56+
msg.properties[k] = (PROPERTIES[k].typ)(v)
5657
end
5758
end
5859
nothing

src/protocol.jl

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ write(io::IO, fv::TAMQPFieldValuePair) = write(io, fv.name, fv.val)
6262

6363
function read(io::IO, ::Type{TAMQPFieldTable})
6464
len = ntoh(read(io, fieldtype(TAMQPFieldTable, :len)))
65-
@debug("read fieldtable length $(len)")
65+
@debug("read fieldtable", len)
6666
buff = read!(io, Vector{UInt8}(undef, len))
6767
data = TAMQPFieldValuePair[]
6868
iob = IOBuffer(buff)
@@ -73,14 +73,14 @@ function read(io::IO, ::Type{TAMQPFieldTable})
7373
end
7474

7575
function write(io::IO, ft::TAMQPFieldTable)
76-
@debug("write fieldtable nfields $(length(ft.data))")
76+
@debug("write fieldtable", nfields=length(ft.data))
7777
iob = IOBuffer()
7878
for fv in ft.data
7979
write(iob, fv)
8080
end
8181
buff = take!(iob)
82-
len = convert(fieldtype(TAMQPFieldTable, :len), length(buff))
83-
@debug("write fieldtable length $len type: $(typeof(len))")
82+
len = TAMQPLongUInt(length(buff))
83+
@debug("write fieldtable", len, type=typeof(len))
8484
l = write(io, hton(len))
8585
if len > 0
8686
l += write(io, buff)
@@ -101,7 +101,7 @@ function read(io::IO, ::Type{TAMQPGenericFrame})
101101
hdr = ntoh(read(io, fieldtype(TAMQPGenericFrame, :hdr)))
102102
@assert hdr in (1,2,3,8)
103103
props = read(io, fieldtype(TAMQPGenericFrame, :props))
104-
@debug("reading generic frame type:$hdr, channel:$(props.channel), payloadsize:$(props.payloadsize)")
104+
@debug("reading generic frame", type=hdr, channel=props.channel, payloadsize=props.payloadsize)
105105
payload = read!(io, TAMQPBodyPayload(Vector{TAMQPOctet}(undef, props.payloadsize)))
106106
fend = ntoh(read(io, fieldtype(TAMQPGenericFrame, :fend)))
107107
@assert fend == FrameEnd
@@ -267,8 +267,7 @@ function send(c::Connection, f, msgframes::Vector=[])
267267
nothing
268268
end
269269
function send(c::MessageChannel, payload::TAMQPMethodPayload, msg::Union{Message, Nothing}=nothing)
270-
logstrmsg = msg === nothing ? "without" : "with"
271-
@debug("sending $(method_name(payload)) $logstrmsg content")
270+
@debug("sending", methodname=method_name(payload), hascontent=(msg !== nothing))
272271
frameprop = TAMQPFrameProperties(c.id,0)
273272
if msg !== nothing
274273
msgframes = []
@@ -307,7 +306,7 @@ function wait_for_state(c, states; interval=1, timeout=typemax(Int))
307306
end
308307

309308
function connection_processor(c, name, fn)
310-
@debug("Starting $name task")
309+
@debug("Starting task", name)
311310
try
312311
while true
313312
fn(c)
@@ -331,7 +330,7 @@ function connection_processor(c, name, fn)
331330
@debug(reason)
332331
else
333332
reason = reason * " Unhandled exception: $err"
334-
#showerror(STDERR, err)
333+
#showerror(stderr, err)
335334
@debug(reason)
336335
close(c, false, true)
337336
#rethrow(err)
@@ -342,9 +341,9 @@ end
342341

343342
function connection_sender(c::Connection)
344343
msg = take!(c.sendq)
345-
@debug("==> sending on conn $(c.virtualhost)")
344+
@debug("==> sending on conn", host=c.virtualhost)
346345
nbytes = write(sock(c), msg)
347-
@debug("==> sent $nbytes bytes")
346+
@debug("==> sent", nbytes)
348347

349348
# update heartbeat time for client
350349
c.heartbeat_time_client = time()
@@ -359,9 +358,9 @@ function connection_receiver(c::Connection)
359358
c.heartbeat_time_server = time()
360359

361360
channelid = f.props.channel
362-
@debug("<== read message on conn $(c.virtualhost) for chan $channelid")
361+
@debug("<== read message on conn", host=c.virtualhost, channelid)
363362
if !(channelid in keys(c.channels))
364-
@debug("Discarding message for unknown channel $channelid")
363+
@debug("Discarding message for unknown channel", channelid)
365364
end
366365
chan = channel(c, channelid)
367366
put!(chan.recvq, f)
@@ -379,7 +378,7 @@ function connection_heartbeater(c::Connection)
379378
end
380379

381380
if (now - c.heartbeat_time_server) > (2 * c.heartbeat)
382-
@debug("server heartbeat missed for $(now - c.heartbeat_time_server) seconds")
381+
@debug("server heartbeat missed", secs=(now - c.heartbeat_time_server))
383382
close(c, false, false)
384383
end
385384
nothing
@@ -389,23 +388,23 @@ function channel_receiver(c::MessageChannel)
389388
f = take!(c.recvq)
390389
if f.hdr == FrameMethod
391390
m = TAMQPMethodFrame(f)
392-
@debug("<== channel: $(f.props.channel), class:$(m.payload.class), method:$(m.payload.method)")
391+
@debug("<== received", channel=f.props.channel, class=m.payload.class, method=m.payload.method)
393392
cbkey = (f.hdr, m.payload.class, m.payload.method)
394393
elseif f.hdr == FrameHeartbeat
395394
m = TAMQPHeartBeatFrame(f)
396-
@debug("<== channel: $(f.props.channel), heartbeat")
395+
@debug("<== received heartbeat", channel=f.props.channel)
397396
cbkey = (f.hdr,)
398397
elseif f.hdr == FrameHeader
399398
m = TAMQPContentHeaderFrame(f)
400-
@debug("<== channel: $(f.props.channel), contentheader")
399+
@debug("<== received contentheader", channel=f.props.channel)
401400
cbkey = (f.hdr,)
402401
elseif f.hdr == FrameBody
403402
m = TAMQPContentBodyFrame(f)
404-
@debug("<== channel: $(f.props.channel), contentbody")
403+
@debug("<== received contentbody", channel=f.props.channel)
405404
cbkey = (f.hdr,)
406405
else
407406
m = f
408-
@debug("<== channel: $(f.props.channel), unhandled frame type $(f.hdr)")
407+
@debug("<== received unhandled frame type", channel=f.props.channel, type=f.hdr)
409408
cbkey = (f.hdr,)
410409
end
411410
(cb,ctx) = get(c.callbacks, cbkey, (on_unexpected_message, nothing))
@@ -489,7 +488,7 @@ function channel(c::Connection, id::Integer, create::Bool; connect_timeout=DEFAU
489488
end
490489

491490
function connection(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAULT_PORT, auth_params=AMQPClient.DEFAULT_AUTH_PARAMS, channelmax=AMQPClient.DEFAULT_CHANNELMAX, framemax=0, heartbeat=0, connect_timeout=AMQPClient.DEFAULT_CONNECT_TIMEOUT)
492-
@debug "connecting to $(host):$(port)$(virtualhost)"
491+
@debug("connecting", host, port, virtualhost)
493492
conn = AMQPClient.Connection(virtualhost, host, port)
494493
chan = channel(conn, AMQPClient.DEFAULT_CHANNEL, true)
495494

@@ -860,12 +859,12 @@ end
860859
# ----------------------------------------
861860

862861
function on_unexpected_message(c::MessageChannel, m::TAMQPMethodFrame, ctx)
863-
@debug("Unexpected message on channel $(c.id): class:$(m.payload.class), method:$(m.payload.method)")
862+
@debug("Unexpected message", channel=c.id, class=m.payload.class, method=m.payload.method)
864863
nothing
865864
end
866865

867866
function on_unexpected_message(c::MessageChannel, f, ctx)
868-
@debug("Unexpected message on channel $(c.id): frame type: $(f.hdr)")
867+
@debug("Unexpected message", channel=c.id, frametype=f.hdr)
869868
nothing
870869
end
871870

@@ -887,7 +886,7 @@ function _on_close_ok(context_class::Symbol, chan::MessageChannel, m::TAMQPMetho
887886
end
888887

889888
function _send_close(context_class::Symbol, chan::MessageChannel, reply_code=ReplySuccess, reply_text="", class_id=0, method_id=0)
890-
chan.closereason = CloseReason(TAMQPReplyCode(reply_code), convert(TAMQPReplyText, reply_text), TAMQPClassId(class_id), TAMQPMethodId(method_id))
889+
chan.closereason = CloseReason(TAMQPReplyCode(reply_code), TAMQPReplyText(reply_text), TAMQPClassId(class_id), TAMQPMethodId(method_id))
891890
if context_class === :Channel && chan.id == DEFAULT_CHANNEL
892891
@debug("closing channel 0 is equivalent to closing the connection!")
893892
context_class = :Connection
@@ -898,7 +897,7 @@ function _send_close(context_class::Symbol, chan::MessageChannel, reply_code=Rep
898897
end
899898

900899
_send_close(context_class::Symbol, context_chan_id, conn::Connection, reply_code=ReplySuccess, reply_text="", class_id=0, method_id=0, chan_id=0) =
901-
send(conn, TAMQPMethodFrame(TAMQPFrameProperties(context_chan_id,0), TAMQPMethodPayload(context_class, :Close, (TAMQPReplyCode(reply_code), convert(TAMQPReplyText, reply_text), TAMQPClassId(class_id), TAMQPMethodId(method_id)))))
900+
send(conn, TAMQPMethodFrame(TAMQPFrameProperties(context_chan_id,0), TAMQPMethodPayload(context_class, :Close, (TAMQPReplyCode(reply_code), TAMQPReplyText(reply_text), TAMQPClassId(class_id), TAMQPMethodId(method_id)))))
902901

903902
send_connection_close_ok(chan::MessageChannel) = _send_close_ok(:Connection, chan)
904903
on_connection_close_ok(chan::MessageChannel, m::TAMQPMethodFrame, ctx) = _on_close_ok(:Connection, chan, m, ctx)
@@ -934,12 +933,10 @@ function on_connection_start(chan::MessageChannel, m::TAMQPMethodFrame, ctx)
934933
conn = chan.conn
935934

936935
# setup server properties and capabilities
937-
merge!(conn.properties, Dict{Symbol,Any}(m.payload.fields...))
938-
server_props = convert(Dict{String,Any}, get_property(chan, :ServerProperties, Dict{String,Any}()))
936+
merge!(conn.properties, Dict{Symbol,Any}(Symbol(n)=>simplify(v) for (n,v) in m.payload.fields))
937+
server_props = simplify(get_property(chan, :ServerProperties, TAMQPFieldTable(Dict{String,Any}())))
939938
if "capabilities" in keys(server_props)
940-
for f in server_props["capabilities"].fld.data
941-
conn.capabilities[convert(String, f.name)] = f.val.fld
942-
end
939+
merge!(conn.capabilities, server_props["capabilities"])
943940
end
944941

945942
handle(chan, :Connection, :Start)
@@ -957,31 +954,31 @@ function send_connection_start_ok(chan::MessageChannel, auth_params::Dict{String
957954
client_props = copy(CLIENT_IDENTIFICATION)
958955
client_cap = client_props["capabilities"]
959956
server_cap = conn.capabilities
960-
@debug("server capabilities: $server_cap")
957+
@debug("server capabilities", server_cap)
961958
if "consumer_cancel_notify" in keys(server_cap)
962959
client_cap["consumer_cancel_notify"] = server_cap["consumer_cancel_notify"]
963960
end
964961
if "connection.blocked" in keys(server_cap)
965962
client_cap["connection.blocked"] = server_cap["connection.blocked"]
966963
end
967-
@debug("client_props: $(client_props)")
964+
@debug("client_props", client_props)
968965

969966
# assert that auth mechanism is supported
970967
mechanism = auth_params["MECHANISM"]
971968
mechanisms = split(get_property(chan, :Mechanisms, ""), ' ')
972-
@debug("mechanism: $mechanism, supported mechanisms: $(mechanisms)")
969+
@debug("checking auth mechanism", mechanism, supported=mechanisms)
973970
@assert mechanism in mechanisms
974971

975972
# set up locale
976973
# pick up one of the server locales
977974
locales = split(get_property(chan, :Locales, ""), ' ')
978-
@debug("supported locales: $(locales)")
975+
@debug("supported locales", locales)
979976
client_locale = locales[1]
980-
@debug("client_locale: $(client_locale)")
977+
@debug("client_locale", client_locale)
981978

982979
# respond to login
983980
auth_resp = AUTH_PROVIDERS[mechanism](auth_params)
984-
@debug("auth_resp: $(auth_resp)")
981+
@debug("auth_resp", auth_resp)
985982

986983
send(chan, TAMQPMethodPayload(:Connection, :StartOk, (client_props, mechanism, auth_resp, client_locale)))
987984
nothing
@@ -1017,7 +1014,7 @@ function send_connection_tune_ok(chan::MessageChannel, channelmax=0, framemax=0,
10171014
conn.heartbeat = max(conn.heartbeat, heartbeat)
10181015
end
10191016

1020-
@debug("channelmax: $(conn.channelmax), framemax: $(conn.framemax), heartbeat: $(conn.heartbeat)")
1017+
@debug("send_connection_tune_ok", channelmax=conn.channelmax, framemax=conn.framemax, heartbeat=conn.heartbeat)
10211018
send(chan, TAMQPMethodPayload(:Connection, :TuneOk, (conn.channelmax, conn.framemax, conn.heartbeat)))
10221019

10231020
# start heartbeat timer
@@ -1065,7 +1062,7 @@ end
10651062
function on_channel_flow(chan::MessageChannel, m::TAMQPMethodFrame, ctx)
10661063
@assert is_method(m, :Channel, ctx)
10671064
chan.flow = m.payload.fields[1].second
1068-
@debug("channel $(chan.id) flow is now $(chan.flow)")
1065+
@debug("on_channel_flow", channel=chan.id, flow=chan.flow)
10691066
nothing
10701067
end
10711068

@@ -1202,7 +1199,7 @@ function on_channel_message_in(chan::MessageChannel, m::TAMQPContentBodyFrame, c
12021199
elseif msg.consumer_tag in keys(chan.consumers)
12031200
put!(chan.consumers[msg.consumer_tag].recvq, popfirst!(chan.partial_msgs))
12041201
else
1205-
@debug("discarding message, no consumer with tag $(msg.consumer_tag)")
1202+
@debug("discarding message, no consumer with tag", tag=msg.consumer_tag)
12061203
end
12071204
end
12081205

0 commit comments

Comments
 (0)