diff --git a/CHANGELOG.md b/CHANGELOG.md index 77421ea4..c830708e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Added + +- New socket server command `status` that returns a string describing the current server status, including information for each active worker, as well as `forceclose` which is the forced version of `close` that can shut down a worker even if it's currently running [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229). + +### Changed + +- Socket server commands `run`, `close` and `stop` will error if a file lock is currently held (a file worker is running) instead of waiting potentially a long time for that lock to open. Those locks were always intended only as mutation protection and not a queueing mechanism, they only behaved that way by accident and in case some worker hangs, it's impractical that further `quarto render` commands just add on top of the pile without a message [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229). + ## [v0.14.0] - 2025-02-26 ### Added diff --git a/src/QuartoNotebookRunner.jl b/src/QuartoNotebookRunner.jl index 1e352a4d..2f7ac955 100644 --- a/src/QuartoNotebookRunner.jl +++ b/src/QuartoNotebookRunner.jl @@ -43,6 +43,10 @@ export Server, render, run!, close! # Includes. +const QNR_VERSION = + VersionNumber(TOML.parsefile(joinpath(@__DIR__, "..", "Project.toml"))["version"]) +include_dependency(joinpath(@__DIR__, "..", "Project.toml")) + include("UserError.jl") include("Malt.jl") include("WorkerSetup.jl") diff --git a/src/server.jl b/src/server.jl index 49e26185..148cd9e6 100644 --- a/src/server.jl +++ b/src/server.jl @@ -11,6 +11,9 @@ mutable struct File lock::ReentrantLock timeout::Float64 timeout_timer::Union{Nothing,Timer} + run_started::Union{Nothing,Dates.DateTime} + run_finished::Union{Nothing,Dates.DateTime} + run_decision_channel::Channel{Symbol} function File(path::String, options::Union{String,Dict{String,Any}}) if isfile(path) @@ -24,6 +27,7 @@ mutable struct File exeflags, env = _exeflags_and_env(merged_options) timeout = _extract_timeout(merged_options) + exe, _exeflags = _julia_exe(exeflags) worker = cd(() -> Malt.Worker(; exe, exeflags = _exeflags, env), dirname(path)) @@ -38,6 +42,9 @@ mutable struct File ReentrantLock(), timeout, nothing, + nothing, + nothing, + Channel{Symbol}(32), # we don't want an unbuffered channel because we might want to `put!` to it without blocking ) init!(file, merged_options) return file @@ -1427,21 +1434,70 @@ function run!( options::Union{String,Dict{String,Any}} = Dict{String,Any}(), chunk_callback = (i, n, c) -> nothing, ) - borrow_file!(server, path; optionally_create = true) do file - if file.timeout_timer !== nothing - close(file.timeout_timer) - file.timeout_timer = nothing - end - result = evaluate!(file, output; showprogress, options, markdown, chunk_callback) - if file.timeout > 0 - file.timeout_timer = Timer(file.timeout) do _ + try + borrow_file!(server, path; optionally_create = true) do file + if file.timeout_timer !== nothing + close(file.timeout_timer) + file.timeout_timer = nothing + end + file.run_started = Dates.now() + file.run_finished = nothing + + # we want to be able to force close the worker while `evaluate!` is running, + # so we run `evaluate!` in a task and wait for the `run_decision_channel` + # further down. Depending on the value fetched from that channel, we either + # know that evaluation has finished, or that force closing was requested. + while !isempty(file.run_decision_channel) + take!(file.run_decision_channel) # empty! not defined on channels in earlier julia versions + end + + result_task = Threads.@spawn begin + try + evaluate!(file, output; showprogress, options, markdown, chunk_callback) + finally + put!(file.run_decision_channel, :evaluate_finished) + end + end + + # block until a decision is reached + decision = take!(file.run_decision_channel) + + # :forceclose might have been set from another task + if decision === :forceclose + close!(server, file.path) # this is in the same task, so reentrant lock allows access + error("File was force-closed during run") + elseif decision === :evaluate_finished + result = try + fetch(result_task) + catch err + # throw the original exception, not the wrapping TaskFailedException + rethrow(err.task.exception) + end + else + error("Invalid decision $decision") + end + + file.run_finished = Dates.now() + if file.timeout > 0 + file.timeout_timer = Timer(file.timeout) do _ + close!(server, file.path) + @debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity." + end + else close!(server, file.path) - @debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity." end + return result + end + catch err + if err isa FileBusyError + throw( + UserError( + "Tried to run file \"$path\" but the corresponding worker is busy.", + ), + ) else - close!(server, file.path) + rethrow(err) end - return result end end @@ -1449,19 +1505,25 @@ struct NoFileEntryError <: Exception path::String end +struct FileBusyError <: Exception + path::String +end + """ - borrow_file!(f, server, path; optionally_create = false, options = Dict{String,Any}()) + borrow_file!(f, server, path; wait = false, optionally_create = false, options = Dict{String,Any}()) Executes `f(file)` while the `file`'s `ReentrantLock` is locked. All actions on a `Server`'s `File` should be wrapped in this so that no two tasks can mutate the `File` at the same time. When `optionally_create` is `true`, the `File` will be created on the server if it doesn't exist, in which case it is passed `options`. +If `wait = false`, `borrow_file!` will throw a `FileBusyError` if the lock cannot be attained immediately. """ function borrow_file!( f, server, path; + wait = false, optionally_create = false, options = Dict{String,Any}(), ) @@ -1497,7 +1559,18 @@ function borrow_file!( # no file exists or it doesn't match the one we have, we recurse into `borrow_file!`. # This could in principle go on forever but is very unlikely to with a small number of # concurrent users. - lock(file.lock) do + + if wait + lock(file.lock) + lock_attained = true + else + lock_attained = trylock(file.lock) + end + + try + if !lock_attained + throw(FileBusyError(apath)) + end current_file = lock(server.lock) do get(server.workers, apath, nothing) end @@ -1506,6 +1579,8 @@ function borrow_file!( else return f(file) end + finally + lock_attained && unlock(file.lock) end end end @@ -1560,7 +1635,13 @@ function close!(server::Server, path::String) end return true catch err - if !(err isa NoFileEntryError) + if err isa FileBusyError + throw( + UserError( + "Tried to close file \"$path\" but the corresponding worker is busy.", + ), + ) + elseif !(err isa NoFileEntryError) rethrow(err) else false @@ -1568,6 +1649,41 @@ function close!(server::Server, path::String) end end +function forceclose!(server::Server, path::String) + apath = abspath(path) + file = lock(server.lock) do + if haskey(server.workers, apath) + return server.workers[apath] + else + throw(NoFileEntryError(apath)) + end + end + # if the worker is not actually running we need to fall back to normal closing, + # for that we try to get the file lock now + lock_attained = trylock(file.lock) + try + # if we've attained the lock, we can close normally + if lock_attained + close!(server, path) + # but if not, we request a forced close via the run decision channel that + # is being waited for in `run!` function + else + put!(file.run_decision_channel, :forceclose) + t = time() + while Malt.isrunning(file.worker) + timeout = 10 + (time() - t) > timeout && error( + "Force close was requested but worker was still running after $timeout seconds.", + ) + sleep(0.1) + end + end + finally + lock_attained && unlock(file.lock) + end + return +end + json_reader(str) = JSON3.read(str, Any) yaml_reader(str) = YAML.load(str) diff --git a/src/socket.jl b/src/socket.jl index 3fef60df..88b5d979 100644 --- a/src/socket.jl +++ b/src/socket.jl @@ -6,6 +6,9 @@ struct SocketServer port::Int task::Task key::Base.UUID + started_at::Dates.DateTime + timeout::Union{Nothing,Float64} + timeout_started_at::Ref{Union{Nothing,Dates.DateTime}} end Base.wait(s::SocketServer) = wait(s.task) @@ -44,7 +47,7 @@ should match the following schema: ```json { - type: "run" | "close" | "stop" | "isopen" | "isready" + type: "run" | "close" | "forceclose" | "stop" | "isopen" | "isready" | "status" content: string | { file: string, options: string | { ... } } } ``` @@ -64,7 +67,12 @@ A description of the message types: - `close` - Close a notebook. The `content` should be the absolute path to the notebook file. If no file is specified, all notebooks will be closed. When the notebook is closed, the server will return a response with a - `status` field set to `true`. + `status` field set to `true`. Will return an error if any of the notebooks to be + closed is currently running. + + - `forceclose` - Forcibly close a notebook even if it is currently running. + The `content` should be the absolute path to the notebook file. When the notebook + is closed, the server will return a response with a `status` field set to `true`. - `stop` - Stop the server. The server will return a response with a `message` field set to `Server stopped.`. @@ -76,6 +84,8 @@ A description of the message types: - `isready` - Returns `true` if the server is ready to accept commands. Should never return `false`. + +- `status` - Returns string with information about the server and workers. """ function serve(; port = nothing, @@ -96,6 +106,10 @@ function serve(; key = Base.UUID(rand(UInt128)) + # we want to be able to pass the full SocketServer to the status + # function later, but we have to reference it before it exists + socket_server_ref = Ref{Union{SocketServer,Nothing}}(nothing) + notebook_server = Server() closed_deliberately = Ref(false) @@ -106,6 +120,7 @@ function serve(; end timer = Ref{Union{Timer,Nothing}}(nothing) + timeout_started_at = Ref{Union{Nothing,Dates.DateTime}}(nothing) function set_timer!() @debug "Timer set up" @@ -124,6 +139,7 @@ function serve(; close(socket_server) end end + timeout_started_at[] = Dates.now() end # this function is called under server.lock so we don't need further synchronization @@ -143,6 +159,7 @@ function serve(; @debug "Closing active timer" close(timer[]) timer[] = nothing + timeout_started_at[] = nothing end end return @@ -162,7 +179,7 @@ function serve(; break end if !isnothing(socket) - Threads.@spawn while isopen(socket) + subtask = Threads.@spawn while isopen(socket) @debug "Waiting for request" data = readline(socket; keep = true) if isempty(data) @@ -182,6 +199,7 @@ function serve(; # close connection with clients sending wrong hmacs or invalid json # (could be other processes mistakingly targeting our port) close(socket) + break end @debug "Received request" json @@ -198,10 +216,11 @@ function serve(; elseif json.type == "isready" _write_json(socket, true) else - _handle_response(socket, notebook_server, json, showprogress) + _handle_response(socket, socket_server_ref[], json, showprogress) end end end + errormonitor(subtask) end end @debug "Server closed." @@ -209,7 +228,17 @@ function serve(; errormonitor(task) - return SocketServer(socket_server, notebook_server, port, task, key) + socket_server_ref[] = SocketServer( + socket_server, + notebook_server, + port, + task, + key, + Dates.now(), + timeout, + timeout_started_at, + ) + return socket_server_ref[] end if Preferences.@load_preference("enable_revise", false) @@ -255,16 +284,22 @@ end function _handle_response_internal( socket, - notebooks::Server, + socketserver::Union{Nothing,SocketServer}, request::@NamedTuple{type::String, content::Union{String,Dict{String,Any}}}, showprogress::Bool, ) + socketserver === nothing && error("Got request before SocketServer object was created.") + notebooks = socketserver.notebookserver @debug "debugging" request notebooks = collect(keys(notebooks.workers)) type = request.type - type in ("close", "run", "isopen") || + type in ("close", "forceclose", "run", "isopen", "status") || return _write_json(socket, _log_error("Unknown request type: $type")) + if type == "status" + return _write_json(socket, Base.@invokelatest(server_status(socketserver))) + end + file = _get_file(request.content) # Closing: @@ -291,6 +326,22 @@ function _handle_response_internal( end end + if type == "forceclose" + try + forceclose!(notebooks, file) + return _write_json(socket, (; status = true)) + catch error + return _write_json( + socket, + _log_error( + "Failed to force close notebook: $file", + error, + catch_backtrace(), + ), + ) + end + end + # Running: if type == "run" @@ -466,3 +517,144 @@ if !isdefined(Base, :errormonitor) return t end end + +function is_same_day(date1, date2)::Bool + return Dates.year(date1) == Dates.year(date2) && + Dates.month(date1) == Dates.month(date2) && + Dates.day(date1) == Dates.day(date2) +end + +function simple_date_time_string(date)::String + now = Dates.now() + if is_same_day(date, now) + return string(Dates.hour(date), ":", Dates.minute(date), ":", Dates.second(date)) + else + return string( + date, + " ", + Dates.hour(date), + ":", + Dates.minute(date), + ":", + Dates.second(date), + ) + end +end + +function format_seconds(seconds)::String + seconds = round(Int, seconds) + if seconds < 60 + return string(seconds, " second", seconds == 1 ? "" : "s") + elseif seconds < 3600 + full_minutes = div(seconds, 60) + rem_seconds = seconds % 60 + seconds_str = rem_seconds == 0 ? "" : " " * format_seconds(rem_seconds) + return string(full_minutes, " minute", full_minutes == 1 ? "" : "s", seconds_str) + else + full_hours = div(seconds, 3600) + rem_seconds = seconds % 3600 + minutes_str = rem_seconds == 0 ? "" : " " * format_seconds(rem_seconds) + return string(full_hours, " hour", full_hours == 1 ? "" : "s", minutes_str) + end +end + +function server_status(socketserver::SocketServer) + server_timeout = socketserver.timeout + timeout_started_at = socketserver.timeout_started_at[] + server = socketserver.notebookserver + lock(server.lock) do + io = IOBuffer() + current_time = Dates.now() + + running_since_seconds = Dates.value(current_time - socketserver.started_at) / 1000 + + println(io, "QuartoNotebookRunner server status:") + println( + io, + " started at: $(simple_date_time_string(socketserver.started_at)) ($(format_seconds(running_since_seconds)) ago)", + ) + println(io, " runner version: $QNR_VERSION") + println( + io, + " environment: $(replace(Base.active_project(), "Project.toml" => ""))", + ) + println(io, " pid: $(Base.getpid())") + println(io, " port: $(socketserver.port)") + println(io, " julia version: $(VERSION)") + + print( + io, + " timeout: $(server_timeout === nothing ? "disabled" : format_seconds(server_timeout))", + ) + + if isempty(server.workers) && + server_timeout !== nothing && + timeout_started_at !== nothing + seconds_until_server_timeout = + server_timeout - Dates.value(Dates.now() - timeout_started_at) / 1000 + println(io, " ($(format_seconds(seconds_until_server_timeout)) left)") + else + println(io) + end + + println(io, " workers active: $(length(server.workers))") + + for (index, file) in enumerate(values(server.workers)) + run_started = file.run_started + run_finished = file.run_finished + + if isnothing(run_started) + seconds_since_started = nothing + else + seconds_since_started = Dates.value(current_time - run_started) / 1000 + end + + if isnothing(run_started) || isnothing(run_finished) + run_duration_seconds = nothing + else + run_duration_seconds = Dates.value(run_finished - run_started) / 1000 + end + + if isnothing(run_finished) + seconds_since_finished = nothing + else + seconds_since_finished = Dates.value(current_time - run_finished) / 1000 + end + + if file.timeout > 0 && !isnothing(seconds_since_finished) + time_until_timeout = file.timeout - seconds_since_finished + else + time_until_timeout = nothing + end + + run_started_str = + isnothing(run_started) ? "-" : simple_date_time_string(run_started) + run_started_ago = + isnothing(seconds_since_started) ? "" : + " ($(format_seconds(seconds_since_started)) ago)" + + run_finished_str = + isnothing(run_finished) ? "-" : simple_date_time_string(run_finished) + run_duration_str = + isnothing(run_duration_seconds) ? "" : + " (took $(format_seconds(run_duration_seconds)))" + + timeout_str = "$(format_seconds(file.timeout))" + time_until_timeout_str = + isnothing(time_until_timeout) ? "" : + " ($(format_seconds(time_until_timeout)) left)" + + println(io, " worker $(index):") + println(io, " path: $(file.path)") + println(io, " run started: $(run_started_str)$(run_started_ago)") + println(io, " run finished: $(run_finished_str)$(run_duration_str)") + println(io, " timeout: $(timeout_str)$(time_until_timeout_str)") + println(io, " pid: $(file.worker.proc_pid)") + println(io, " exe: $(file.exe)") + println(io, " exeflags: $(file.exeflags)") + println(io, " env: $(file.env)") + end + + return String(take!(io)) + end +end diff --git a/test/Project.toml b/test/Project.toml index bfd0877b..c9c9f065 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -1,5 +1,6 @@ [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" JSONSchema = "7d188eb4-7ad8-530c-ae41-71a32a6d4692" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" diff --git a/test/examples/cell_types.qmd b/test/examples/cell_types.qmd index 32012f67..e7279f21 100644 --- a/test/examples/cell_types.qmd +++ b/test/examples/cell_types.qmd @@ -1,5 +1,7 @@ --- title: Cell types +execute: + daemon: 123 --- Values: diff --git a/test/examples/sleep_10.qmd b/test/examples/sleep_10.qmd new file mode 100644 index 00000000..20b4e3ba --- /dev/null +++ b/test/examples/sleep_10.qmd @@ -0,0 +1,3 @@ +```{julia} +sleep(10) +``` \ No newline at end of file diff --git a/test/testsets/concurrency.jl b/test/testsets/concurrency.jl deleted file mode 100644 index 1b38166d..00000000 --- a/test/testsets/concurrency.jl +++ /dev/null @@ -1,16 +0,0 @@ -@testset "concurrent run and close" begin - s = Server() - file = joinpath(@__DIR__, "..", "examples", "soft_scope.qmd") - - @test_nowarn @sync begin - for i = 1:20 - Threads.@spawn begin - run!(s, file; showprogress = false) - # files may be closed already by another task, that's ok - close!(s, file) - end - end - end - - @test isempty(s.workers) -end diff --git a/test/testsets/socket_server/client.js b/test/testsets/socket_server/client.js index af234982..3f06ad61 100644 --- a/test/testsets/socket_server/client.js +++ b/test/testsets/socket_server/client.js @@ -16,15 +16,17 @@ function handle() { } const run = (file) => toJSON({ type: 'run', content: file }); const close = (file) => toJSON({ type: 'close', content: file || '' }); + const forceclose = (file) => toJSON({ type: 'forceclose', content: file || '' }); const stop = () => toJSON({ type: 'stop', content: '' }); const isopen = (file) => toJSON({ type: 'isopen', content: file }); const isready = () => toJSON({ type: 'isready', content: '' }); + const status = () => toJSON({ type: 'status', content: '' }); const notebook = (arg) => { - if (arg) { - return path.join(process.cwd(), arg); + if (path.isAbsolute(arg)) { + return arg } - throw new Error('No notebook specified.'); + throw new Error('No notebook with absolute path specified.'); } const type = process.argv[4]; @@ -35,12 +37,16 @@ function handle() { return run(notebook(arg)); case 'close': return close(notebook(arg)); + case 'forceclose': + return forceclose(notebook(arg)); case 'stop': return stop(); case 'isopen': return isopen(notebook(arg)); case 'isready': return isready(); + case 'status': + return status(); default: throw new Error('Invalid command.'); } diff --git a/test/testsets/socket_server/socket_server.jl b/test/testsets/socket_server/socket_server.jl index 218a8e2b..5e024f84 100644 --- a/test/testsets/socket_server/socket_server.jl +++ b/test/testsets/socket_server/socket_server.jl @@ -8,7 +8,7 @@ include("../../utilities/prelude.jl") sleep(1) json(cmd) = JSON3.read(read(cmd, String), Any) - cell_types = "../../examples/cell_types.qmd" + cell_types = abspath("../../examples/cell_types.qmd") @test json(`$node $client $(server.port) $(server.key) isready`) @@ -21,17 +21,102 @@ include("../../utilities/prelude.jl") d3 = json(`$node $client $(server.port) $(server.key) isopen $(cell_types)`) @test d3 == true + t_before_run = Dates.now() d4 = json(`$node $client $(server.port) $(server.key) run $(cell_types)`) + t_after_run = Dates.now() @test d2 == d4 - d5 = json(`$node $client $(server.port) $(server.key) close $(cell_types)`) - @test d5["status"] == true + d5 = json(`$node $client $(server.port) $(server.key) status`) + @test d5 isa String + @test occursin("workers active: 1", d5) + @test occursin(abspath(cell_types), d5) - d6 = json(`$node $client $(server.port) $(server.key) isopen $(cell_types)`) - @test d6 == false + d6 = json(`$node $client $(server.port) $(server.key) close $(cell_types)`) + @test d6["status"] == true - d7 = json(`$node $client $(server.port) $(server.key) run $(cell_types)`) - @test d2 == d7 + d7 = json(`$node $client $(server.port) $(server.key) isopen $(cell_types)`) + @test d7 == false + + d8 = json(`$node $client $(server.port) $(server.key) run $(cell_types)`) + @test d2 == d8 + + # test that certain commands on notebooks fail while those notebooks are already running + + sleep_10 = abspath("../../examples/sleep_10.qmd") + sleep_task = Threads.@spawn json( + `$node $client $(server.port) $(server.key) run $(sleep_10)`, + ) + + # wait until server lock locks due to the `run` command above + while !islocked(server.notebookserver.lock) + sleep(0.001) + end + # wait just until the previous task releases the server lock, which is when it has + # attained the lock for the new file + lock(server.notebookserver.lock) do + end + + # both of these tasks should then try to access the worker that is busy and fail because + # the lock is already held + d9_task = Threads.@spawn json( + `$node $client $(server.port) $(server.key) run $(sleep_10)`, + ) + d10_task = Threads.@spawn json( + `$node $client $(server.port) $(server.key) close $(sleep_10)`, + ) + + d9 = fetch(d9_task) + @test occursin("the corresponding worker is busy", d9["juliaError"]) + + d10 = fetch(d10_task) + @test occursin("the corresponding worker is busy", d10["juliaError"]) + + d11 = fetch(sleep_task) + @test haskey(d11, "notebook") + + run(`$node $client $(server.port) $(server.key) stop`) + + wait(server) + end +end + +@testset "socket server force close" begin + cd(@__DIR__) do + node = NodeJS_18_jll.node() + client = joinpath(@__DIR__, "client.js") + server = QuartoNotebookRunner.serve(; showprogress = false) + sleep(1) + json(cmd) = JSON3.read(read(cmd, String), Any) + + sleep_10 = abspath("../../examples/sleep_10.qmd") + sleep_task = Threads.@spawn json( + `$node $client $(server.port) $(server.key) run $(sleep_10)`, + ) + + # wait until server lock locks due to the `run` command above + while !islocked(server.notebookserver.lock) + sleep(0.001) + end + # wait just until the previous task releases the server lock, which is when it has + # attained the lock for the new file + lock(server.notebookserver.lock) do + end + + # force-closing should kill the worker even if it's running + d1 = json(`$node $client $(server.port) $(server.key) forceclose $(sleep_10)`) + @test d1 == Dict{String,Any}("status" => true) + + d2 = fetch(sleep_task) + @test occursin("File was force-closed", d2["juliaError"]) + + # check that force-closing also works when a notebook is not currently running + + cell_types = abspath("../../examples/cell_types.qmd") + + d3 = json(`$node $client $(server.port) $(server.key) run $(cell_types)`) + + d4 = json(`$node $client $(server.port) $(server.key) forceclose $(cell_types)`) + @test d4 == Dict{String,Any}("status" => true) run(`$node $client $(server.port) $(server.key) stop`) diff --git a/test/utilities/prelude.jl b/test/utilities/prelude.jl index 99ac1cd4..ecf01f0f 100644 --- a/test/utilities/prelude.jl +++ b/test/utilities/prelude.jl @@ -7,6 +7,7 @@ import JSON3 import JSONSchema import NodeJS_18_jll import quarto_jll +import Dates if !@isdefined(SCHEMA) SCHEMA = JSONSchema.Schema(