Skip to content

Commit ad55690

Browse files
authored
Support for new status and forceclose server commands (#229)
* add errormonitor also to subtask * add Dates dep * add run start and finish timestamps to `File` * add a "workers" message type that returns worker metadata * reformat docstring * remove forced error again * add tests * format * switch to string with full server status via "status" message * print server timeout and time left * fix test * format * add info about when server started * formatting * add changelog * read QNR_VERSION at compile time * break after error to not continue with `nothing` as json * formatting * throw errors when run/close are called on a notebook that's running * add tests for notebook run collisions * remove obsolete concurrency test * add env to printout * add changelog about failure behavior * add `forceclose` server command and tests * format * `empty!(channel)` is not available on previous julia versions * `Base.errormonitor` is also not available on earlier julias * fix issue when `evaluate!` throws * fix message * add forceclose mention to changelog
1 parent 53ce1e1 commit ad55690

File tree

11 files changed

+449
-47
lines changed

11 files changed

+449
-47
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Added
11+
12+
- New socket server command `status` that returns a string describing the current server status, including information for each active worker, as well as `forceclose` which is the forced version of `close` that can shut down a worker even if it's currently running [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229).
13+
14+
### Changed
15+
16+
- Socket server commands `run`, `close` and `stop` will error if a file lock is currently held (a file worker is running) instead of waiting potentially a long time for that lock to open. Those locks were always intended only as mutation protection and not a queueing mechanism, they only behaved that way by accident and in case some worker hangs, it's impractical that further `quarto render` commands just add on top of the pile without a message [#229](https://github.com/PumasAI/QuartoNotebookRunner.jl/pull/229).
17+
1018
## [v0.14.0] - 2025-02-26
1119

1220
### Added

src/QuartoNotebookRunner.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ export Server, render, run!, close!
4343

4444
# Includes.
4545

46+
const QNR_VERSION =
47+
VersionNumber(TOML.parsefile(joinpath(@__DIR__, "..", "Project.toml"))["version"])
48+
include_dependency(joinpath(@__DIR__, "..", "Project.toml"))
49+
4650
include("UserError.jl")
4751
include("Malt.jl")
4852
include("WorkerSetup.jl")

src/server.jl

Lines changed: 130 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ mutable struct File
1111
lock::ReentrantLock
1212
timeout::Float64
1313
timeout_timer::Union{Nothing,Timer}
14+
run_started::Union{Nothing,Dates.DateTime}
15+
run_finished::Union{Nothing,Dates.DateTime}
16+
run_decision_channel::Channel{Symbol}
1417

1518
function File(path::String, options::Union{String,Dict{String,Any}})
1619
if isfile(path)
@@ -24,6 +27,7 @@ mutable struct File
2427
exeflags, env = _exeflags_and_env(merged_options)
2528
timeout = _extract_timeout(merged_options)
2629

30+
2731
exe, _exeflags = _julia_exe(exeflags)
2832
worker =
2933
cd(() -> Malt.Worker(; exe, exeflags = _exeflags, env), dirname(path))
@@ -38,6 +42,9 @@ mutable struct File
3842
ReentrantLock(),
3943
timeout,
4044
nothing,
45+
nothing,
46+
nothing,
47+
Channel{Symbol}(32), # we don't want an unbuffered channel because we might want to `put!` to it without blocking
4148
)
4249
init!(file, merged_options)
4350
return file
@@ -1427,41 +1434,96 @@ function run!(
14271434
options::Union{String,Dict{String,Any}} = Dict{String,Any}(),
14281435
chunk_callback = (i, n, c) -> nothing,
14291436
)
1430-
borrow_file!(server, path; optionally_create = true) do file
1431-
if file.timeout_timer !== nothing
1432-
close(file.timeout_timer)
1433-
file.timeout_timer = nothing
1434-
end
1435-
result = evaluate!(file, output; showprogress, options, markdown, chunk_callback)
1436-
if file.timeout > 0
1437-
file.timeout_timer = Timer(file.timeout) do _
1437+
try
1438+
borrow_file!(server, path; optionally_create = true) do file
1439+
if file.timeout_timer !== nothing
1440+
close(file.timeout_timer)
1441+
file.timeout_timer = nothing
1442+
end
1443+
file.run_started = Dates.now()
1444+
file.run_finished = nothing
1445+
1446+
# we want to be able to force close the worker while `evaluate!` is running,
1447+
# so we run `evaluate!` in a task and wait for the `run_decision_channel`
1448+
# further down. Depending on the value fetched from that channel, we either
1449+
# know that evaluation has finished, or that force closing was requested.
1450+
while !isempty(file.run_decision_channel)
1451+
take!(file.run_decision_channel) # empty! not defined on channels in earlier julia versions
1452+
end
1453+
1454+
result_task = Threads.@spawn begin
1455+
try
1456+
evaluate!(file, output; showprogress, options, markdown, chunk_callback)
1457+
finally
1458+
put!(file.run_decision_channel, :evaluate_finished)
1459+
end
1460+
end
1461+
1462+
# block until a decision is reached
1463+
decision = take!(file.run_decision_channel)
1464+
1465+
# :forceclose might have been set from another task
1466+
if decision === :forceclose
1467+
close!(server, file.path) # this is in the same task, so reentrant lock allows access
1468+
error("File was force-closed during run")
1469+
elseif decision === :evaluate_finished
1470+
result = try
1471+
fetch(result_task)
1472+
catch err
1473+
# throw the original exception, not the wrapping TaskFailedException
1474+
rethrow(err.task.exception)
1475+
end
1476+
else
1477+
error("Invalid decision $decision")
1478+
end
1479+
1480+
file.run_finished = Dates.now()
1481+
if file.timeout > 0
1482+
file.timeout_timer = Timer(file.timeout) do _
1483+
close!(server, file.path)
1484+
@debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity."
1485+
end
1486+
else
14381487
close!(server, file.path)
1439-
@debug "File at $(file.path) timed out after $(file.timeout) seconds of inactivity."
14401488
end
1489+
return result
1490+
end
1491+
catch err
1492+
if err isa FileBusyError
1493+
throw(
1494+
UserError(
1495+
"Tried to run file \"$path\" but the corresponding worker is busy.",
1496+
),
1497+
)
14411498
else
1442-
close!(server, file.path)
1499+
rethrow(err)
14431500
end
1444-
return result
14451501
end
14461502
end
14471503

14481504
struct NoFileEntryError <: Exception
14491505
path::String
14501506
end
14511507

1508+
struct FileBusyError <: Exception
1509+
path::String
1510+
end
1511+
14521512
"""
1453-
borrow_file!(f, server, path; optionally_create = false, options = Dict{String,Any}())
1513+
borrow_file!(f, server, path; wait = false, optionally_create = false, options = Dict{String,Any}())
14541514
14551515
Executes `f(file)` while the `file`'s `ReentrantLock` is locked.
14561516
All actions on a `Server`'s `File` should be wrapped in this
14571517
so that no two tasks can mutate the `File` at the same time.
14581518
When `optionally_create` is `true`, the `File` will be created on the server
14591519
if it doesn't exist, in which case it is passed `options`.
1520+
If `wait = false`, `borrow_file!` will throw a `FileBusyError` if the lock cannot be attained immediately.
14601521
"""
14611522
function borrow_file!(
14621523
f,
14631524
server,
14641525
path;
1526+
wait = false,
14651527
optionally_create = false,
14661528
options = Dict{String,Any}(),
14671529
)
@@ -1497,7 +1559,18 @@ function borrow_file!(
14971559
# no file exists or it doesn't match the one we have, we recurse into `borrow_file!`.
14981560
# This could in principle go on forever but is very unlikely to with a small number of
14991561
# concurrent users.
1500-
lock(file.lock) do
1562+
1563+
if wait
1564+
lock(file.lock)
1565+
lock_attained = true
1566+
else
1567+
lock_attained = trylock(file.lock)
1568+
end
1569+
1570+
try
1571+
if !lock_attained
1572+
throw(FileBusyError(apath))
1573+
end
15011574
current_file = lock(server.lock) do
15021575
get(server.workers, apath, nothing)
15031576
end
@@ -1506,6 +1579,8 @@ function borrow_file!(
15061579
else
15071580
return f(file)
15081581
end
1582+
finally
1583+
lock_attained && unlock(file.lock)
15091584
end
15101585
end
15111586
end
@@ -1560,14 +1635,55 @@ function close!(server::Server, path::String)
15601635
end
15611636
return true
15621637
catch err
1563-
if !(err isa NoFileEntryError)
1638+
if err isa FileBusyError
1639+
throw(
1640+
UserError(
1641+
"Tried to close file \"$path\" but the corresponding worker is busy.",
1642+
),
1643+
)
1644+
elseif !(err isa NoFileEntryError)
15641645
rethrow(err)
15651646
else
15661647
false
15671648
end
15681649
end
15691650
end
15701651

1652+
function forceclose!(server::Server, path::String)
1653+
apath = abspath(path)
1654+
file = lock(server.lock) do
1655+
if haskey(server.workers, apath)
1656+
return server.workers[apath]
1657+
else
1658+
throw(NoFileEntryError(apath))
1659+
end
1660+
end
1661+
# if the worker is not actually running we need to fall back to normal closing,
1662+
# for that we try to get the file lock now
1663+
lock_attained = trylock(file.lock)
1664+
try
1665+
# if we've attained the lock, we can close normally
1666+
if lock_attained
1667+
close!(server, path)
1668+
# but if not, we request a forced close via the run decision channel that
1669+
# is being waited for in `run!` function
1670+
else
1671+
put!(file.run_decision_channel, :forceclose)
1672+
t = time()
1673+
while Malt.isrunning(file.worker)
1674+
timeout = 10
1675+
(time() - t) > timeout && error(
1676+
"Force close was requested but worker was still running after $timeout seconds.",
1677+
)
1678+
sleep(0.1)
1679+
end
1680+
end
1681+
finally
1682+
lock_attained && unlock(file.lock)
1683+
end
1684+
return
1685+
end
1686+
15711687
json_reader(str) = JSON3.read(str, Any)
15721688
yaml_reader(str) = YAML.load(str)
15731689

0 commit comments

Comments
 (0)