Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 16 additions & 18 deletions src/QuartoNotebookWorker/src/Malt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -119,31 +119,29 @@ interrupt(::Nothing) = nothing
function handle(::Val{MsgType.from_host_call_with_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, respond_with_nothing = msg

@async begin
result, success = try
result = f(args...; kwargs...)

# @debug("WORKER: Evaluated result", result)
(respond_with_nothing ? nothing : result, true)
catch err
# @debug("WORKER: Got exception!", e)
(format_error(err, catch_backtrace()), false)
end

_serialize_msg(
socket,
success ? MsgType.from_worker_call_result : MsgType.from_worker_call_failure,
msg_id,
result,
)
result, success = try
result = f(args...; kwargs...)

# @debug("WORKER: Evaluated result", result)
(respond_with_nothing ? nothing : result, true)
catch err
# @debug("WORKER: Got exception!", e)
(format_error(err, catch_backtrace()), false)
end

_serialize_msg(
socket,
success ? MsgType.from_worker_call_result : MsgType.from_worker_call_failure,
msg_id,
result,
)
end


function handle(::Val{MsgType.from_host_call_without_response}, socket, msg, msg_id::MsgID)
f, args, kwargs, _ignored = msg

@async try
try
f(args...; kwargs...)
catch e
@warn(
Expand Down
16 changes: 4 additions & 12 deletions src/server.jl
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,6 @@ end

# Implementation.

function remote_eval_fetch_channeled(worker, expr)
code = quote
put!(stable_execution_task_channel_in, $(QuoteNode(expr)))
take!(stable_execution_task_channel_out)
end
return Malt.remote_eval_fetch(worker, code)
end

function init!(file::File, options::Dict)
worker = file.worker
Malt.remote_eval_fetch(worker, worker_init(file, options))
Expand All @@ -218,7 +210,7 @@ function refresh!(file::File, options::Dict)
file.output_chunks = []
init!(file, options)
end
remote_eval_fetch_channeled(file.worker, :(refresh!($(options)); revise_hook()))
Malt.remote_eval_fetch(file.worker, :(refresh!($(options)); revise_hook()))
end

function _cache_file(f::File, source_code_hash)
Expand Down Expand Up @@ -930,7 +922,7 @@ function evaluate_raw_cells!(
$(chunk.cell_options),
))

worker_results, expand_cell = remote_eval_fetch_channeled(f.worker, expr)
worker_results, expand_cell = Malt.remote_eval_fetch(f.worker, expr)

# When the result of the cell evaluation is a cell expansion
# then we insert the original cell contents before the expanded
Expand Down Expand Up @@ -1141,7 +1133,7 @@ function evaluate_raw_cells!(
# inline evaluation since you can't pass cell
# options and so `expand` will always be `false`.
worker_results, expand_cell =
remote_eval_fetch_channeled(f.worker, expr)
Malt.remote_eval_fetch(f.worker, expr)
expand_cell && error("inline code cells cannot be expanded")
remote = only(worker_results)
if !isnothing(remote.error)
Expand Down Expand Up @@ -1202,7 +1194,7 @@ function evaluate_params!(f, params::Dict{String})
:(@eval getfield(Main, :Notebook) const $(Symbol(key::String)) = $value)
end
expr = Expr(:block, exprs...)
remote_eval_fetch_channeled(f.worker, expr)
Malt.remote_eval_fetch(f.worker, expr)
return
end

Expand Down
15 changes: 0 additions & 15 deletions src/startup.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,6 @@ end
render(args...; kwargs...) = QuartoNotebookWorker.render(args...; kwargs...)
revise_hook() = @static QUARTO_ENABLE_REVISE ? QuartoNotebookWorker.revise_hook() : nothing

# Issue #192
#
# Malt itself uses a new task for each `remote_eval` and because of
# this, random number streams are not consistent across runs even if
# seeded, as each task introduces a new state for its task-local RNG.
# As a workaround, we feed all `remote_eval` requests through these
# channels, such that the task executing code is always the same.
const stable_execution_task_channel_out = Channel()
const stable_execution_task_channel_in = Channel() do chan
for expr in chan
result = Core.eval(Main, expr)
put!(stable_execution_task_channel_out, result)
end
end

# Step 6:
#
# Ensures that the LOAD_PATH is returned to it's previous state without the
Expand Down
Loading