Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b"
version = "1.11.0"

[deps]
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For controlling other processes via RPC:
For communicating between processes in the style of a channel or stream:

- `RemoteChannel` - a `Channel`-like object that can be `put!` to or `take!` from any process
- `RemoteLogger` - an `AbstractLogger` forwarding logs to a given worker

For controlling multiple processes at once:

Expand Down
3 changes: 3 additions & 0 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ using Base.Threads: Event
using Serialization, Sockets
import Serialization: serialize, deserialize
import Sockets: connect, wait_connected
import Logging

# NOTE: clusterserialize.jl imports additional symbols from Serialization for use

Expand Down Expand Up @@ -60,6 +61,7 @@ export
WorkerConfig,
RemoteException,
ProcessExitedException,
RemoteLogger,

process_messages,
remoteref_id,
Expand Down Expand Up @@ -107,6 +109,7 @@ include("messages.jl")
include("process_messages.jl") # process incoming messages
include("remotecall.jl") # the remotecall* api
include("macros.jl") # @spawn and friends
include("logger.jl")
include("workerpool.jl")
include("pmap.jl")
include("managers.jl") # LocalManager and SSHManager
Expand Down
27 changes: 27 additions & 0 deletions src/logger.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
RemoteLogger(pid=1, min_level=Info)

Logger that forward all logging to worker `pid` via `remote_do` along with
adding the current worker `id` as a `pid` kwarg.
"""
struct RemoteLogger <: Logging.AbstractLogger
pid::Int
min_level::Logging.LogLevel
end
function RemoteLogger(pid=1)
RemoteLogger(pid, Logging.Info)
end

Logging.min_enabled_level(logger::RemoteLogger) = logger.min_level
Logging.shouldlog(logger::RemoteLogger, level, _module, group, id) = true

# TODO: probably should live in base/logging.jl?
function logmsg(level::Logging.LogLevel, message, _module, _group, _id, _file, _line; kwargs...)
Logging.@logmsg level message _module=_module _group=_group _id=_id _file=_file _line=_line kwargs...
end

function Logging.handle_message(logger::RemoteLogger, level::Logging.LogLevel, message, _module, _group, _id,
_file, _line; kwargs...)
@nospecialize
remote_do(logmsg, logger.pid, level, message, _module, _group, _id, _file, _line; pid=myid(), kwargs...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I thought we could directly use handle_message from the targetted worker but I don't know how to fetch the current_logger there...

end
17 changes: 16 additions & 1 deletion test/distributed_exec.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test, Distributed, Random, Serialization, Sockets
using Test, Distributed, Random, Serialization, Sockets, Logging
import Distributed: launch, manage

sharedir = normpath(joinpath(Sys.BINDIR, "..", "share"))
Expand Down Expand Up @@ -1927,6 +1927,21 @@ begin
end
end

# test logging
w = only(addprocs(1))
@everywhere using Logging
@test_logs (:info, "from pid $w") begin
prev_logger = global_logger(current_logger())
try
wait(@spawnat w with_logger(RemoteLogger(1)) do
@info("from pid $(myid())")
end)
finally
global_logger(prev_logger)
end
end
wait(rmprocs([w]))

# Run topology tests last after removing all workers, since a given
# cluster at any time only supports a single topology.
nprocs() > 1 && rmprocs(workers())
Expand Down