diff --git a/stdlib/FileWatching/docs/src/index.md b/stdlib/FileWatching/docs/src/index.md index 3944f5d3ed1c9..c062f69d5bdb2 100644 --- a/stdlib/FileWatching/docs/src/index.md +++ b/stdlib/FileWatching/docs/src/index.md @@ -1,5 +1,6 @@ # [File Events](@id lib-filewatching) +## Basic File Watching ```@docs FileWatching.poll_fd FileWatching.poll_file @@ -7,3 +8,19 @@ FileWatching.watch_file FileWatching.watch_folder FileWatching.unwatch_folder ``` + +## Pidfile-based File Locking + +```@docs +FileWatching.mkpidlock +``` + +### Helper Functions +```@docs +FileWatching.open_exclusive +FileWatching.tryopen_exclusive +FileWatching.write_pidfile +FileWatching.parse_pidfile +FileWatching.stale_pidfile +FileWatching.isvalidpid +``` diff --git a/stdlib/FileWatching/src/FileWatching.jl b/stdlib/FileWatching/src/FileWatching.jl index d929d1ebfb98d..33ae4ce36494a 100644 --- a/stdlib/FileWatching/src/FileWatching.jl +++ b/stdlib/FileWatching/src/FileWatching.jl @@ -12,6 +12,10 @@ export unwatch_folder, poll_file, # very inefficient alternative to above poll_fd, # very efficient, unrelated to above + # Pidfile-based File Locking + mkpidlock, + tryopen_exclusive, + open_exclusive, # continuous API (returns objects): FileMonitor, FolderMonitor, @@ -21,8 +25,9 @@ export import Base: @handle_as, wait, close, eventloop, notify_error, IOError, _sizeof_uv_poll, _sizeof_uv_fs_poll, _sizeof_uv_fs_event, _uv_hook_close, uv_error, _UVError, iolock_begin, iolock_end, associate_julia_struct, disassociate_julia_struct, - preserve_handle, unpreserve_handle, isreadable, iswritable, | -import Base.Filesystem.StatStruct + preserve_handle, unpreserve_handle, isreadable, iswritable, |, + UV_EEXIST, UV_ESRCH, Process +import Base.Filesystem: StatStruct, File, open, JL_O_CREAT, JL_O_RDWR, JL_O_RDONLY, JL_O_EXCL, samefile if Sys.iswindows() import Base.WindowsRawSocket end @@ -824,4 +829,223 @@ function poll_file(s::AbstractString, interval_seconds::Real=5.007, timeout_s::R end end + +## Pidfile: This code originated in https://github.com/vtjnash/Pidfile.jl +""" + mkpidlock([f::Function], at::String, [pid::Cint, proc::Process]; kwopts...) + +Create a pidfile lock for the path "at" for the current process +or the process identified by pid or proc. Can take a function to execute once locked, +for usage in `do` blocks, after which the lock will be automatically closed. +Optional keyword arguments: + - `mode`: file access mode (modified by the process umask). Defaults to world-readable. + - `poll_interval`: Specify the maximum time to between attempts (if `watch_file` doesn't work) + - `stale_age`: Delete an existing pidfile (ignoring the lock) if its mtime is older than this. + The file won't be deleted until 25x longer than this if the pid in the file appears that it may be valid. + By default this is disabled (`stale_age` = 0), but a typical recommended value would be about 3-5x an + estimated normal completion time. + +For instance multiple julia processes could use the following to simultaneously append to the same file safely: + +```julia +file = "/path/to/file" +mkpidlock(file) do + open(file, write = true, append = true) do io + write(io, "new text") + end +end +``` +""" +function mkpidlock end + +mutable struct LockMonitor + path::String + fd::File + + global function mkpidlock(at::String, pid::Cint; kwopts...) + local lock + at = abspath(at) + fd = open_exclusive(at; kwopts...) + try + write_pidfile(fd, pid) + lock = new(at, fd) + finalizer(close, lock) + catch ex + close(fd) + rm(at) + rethrow(ex) + end + return lock + end +end + +mkpidlock(at::String; kwopts...) = mkpidlock(at, getpid(); kwopts...) +mkpidlock(f::Function, at::String; kwopts...) = mkpidlock(f, at, getpid(); kwopts...) + +function mkpidlock(f::Function, at::String, pid::Cint; kwopts...) + lock = mkpidlock(at, pid; kwopts...) + try + return f() + finally + close(lock) + end +end + +# TODO: enable this when we update libuv +#Base.getpid(proc::Process) = ccall(:uv_process_get_pid, Cint, (Ptr{Void},), proc.handle) +#function mkpidlock(at::String, proc::Process; kwopts...) +# lock = mkpidlock(at, getpid(proc)) +# @schedule begin +# wait(proc) +# close(lock) +# end +# return lock +#end + +""" + write_pidfile(io, pid) + +Write our pidfile format to an open IO descriptor. +""" +function write_pidfile(io::IO, pid::Cint) + print(io, "$pid $(gethostname())") +end + +""" + parse_pidfile(file::Union{IO, String}) => (pid, hostname, age) + +Attempt to parse our pidfile format, +replaced an element with (0, "", 0.0), respectively, for any read that failed. +""" +function parse_pidfile(io::IO) + fields = split(read(io, String), ' ', limit = 2) + pid = tryparse(Cuint, fields[1]) + pid === nothing && (pid = Cuint(0)) + hostname = (length(fields) == 2) ? fields[2] : "" + when = mtime(io) + age = time() - when + return (pid, hostname, age) +end + +function parse_pidfile(path::String) + try + existing = open(path, JL_O_RDONLY) + try + return parse_pidfile(existing) + finally + close(existing) + end + catch ex + isa(ex, EOFError) || isa(ex, IOError) || rethrow(ex) + return (Cuint(0), "", 0.0) + end +end + +""" + isvalidpid(hostname::String, pid::Cuint) :: Bool + +Attempt to conservatively estimate whether pid is a valid process id. +""" +function isvalidpid(hostname::AbstractString, pid::Cuint) + # can't inspect remote hosts + (hostname == "" || hostname == gethostname()) || return true + # pid < 0 is never valid (must be a parser error or different OS), + # and would have a completely different meaning when passed to kill + !Sys.iswindows() && pid > typemax(Cint) && return false + # (similarly for pid 0) + pid == 0 && return false + # see if the process id exists by querying kill without sending a signal + # and checking if it returned ESRCH (no such process) + return ccall(:uv_kill, Cint, (Cuint, Cint), pid, 0) != UV_ESRCH +end + +""" + stale_pidfile(path::String, stale_age::Real) :: Bool + +Helper function for `open_exclusive` for deciding if a pidfile is stale. +""" +function stale_pidfile(path::String, stale_age::Real) + pid, hostname, age = parse_pidfile(path) + if age < -stale_age + @warn "filesystem time skew detected" path=path + elseif age > stale_age + if (age > stale_age * 25) || !isvalidpid(hostname, pid) + return true + end + end + return false +end + +""" + tryopen_exclusive(path::String, mode::Integer = 0o444) :: Union{Void, File} + +Try to create a new file for read-write advisory-exclusive access, +return nothing if it already exists. +""" +function tryopen_exclusive(path::String, mode::Integer = 0o444) + try + return open(path, JL_O_RDWR | JL_O_CREAT | JL_O_EXCL, mode) + catch ex + (isa(ex, IOError) && ex.code == UV_EEXIST) || rethrow(ex) + end + return nothing +end + +""" + open_exclusive(path::String; mode, poll_interval, stale_age) :: File + +Create a new a file for read-write advisory-exclusive access, +blocking until it can succeed. +For a description of the keyword arguments, see [`mkpidlock`](@ref). +""" +function open_exclusive(path::String; + mode::Integer = 0o444 #= read-only =#, + poll_interval::Real = 10 #= seconds =#, + stale_age::Real = 0 #= disabled =#) + # fast-path: just try to open it + file = tryopen_exclusive(path, mode) + file === nothing || return file + @info "waiting for lock on pidfile" path=path + # fall-back: wait for the lock + while true + # start the file-watcher prior to checking for the pidfile existence + t = @async try + watch_file(path, poll_interval) + catch ex + isa(ex, IOError) || rethrow(ex) + sleep(poll_interval) # if the watch failed, convert to just doing a sleep + end + # now try again to create it + file = tryopen_exclusive(path, mode) + file === nothing || return file + wait(t) # sleep for a bit before trying again + if stale_age > 0 && stale_pidfile(path, stale_age) + # if the file seems stale, try to remove it before attempting again + # set stale_age to zero so we won't attempt again, even if the attempt fails + stale_age -= stale_age + @warn "attempting to remove probably stale pidfile" path=path + try + rm(path) + catch ex + isa(ex, IOError) || rethrow(ex) + end + end + end +end + +""" + close(lock::LockMonitor) + +Release a pidfile lock. +""" +function Base.close(lock::LockMonitor) + isopen(lock.fd) || return false + havelock = samefile(stat(lock.fd), stat(lock.path)) + close(lock.fd) + if havelock # try not to delete someone else's lock + rm(lock.path) + end + return havelock +end + end diff --git a/stdlib/FileWatching/test/runtests.jl b/stdlib/FileWatching/test/runtests.jl index 345ffce07482f..9f13868afd79e 100644 --- a/stdlib/FileWatching/test/runtests.jl +++ b/stdlib/FileWatching/test/runtests.jl @@ -2,6 +2,8 @@ using Test, FileWatching using Base: uv_error, Experimental +using Base.Filesystem: File +using FileWatching: write_pidfile, parse_pidfile, isvalidpid, stale_pidfile, tryopen_exclusive, open_exclusive # This script does the following # Sets up N unix pipes (or WSA sockets) @@ -431,3 +433,241 @@ unwatch_folder(dir) @test isempty(FileWatching.watched_folders) rm(file) rm(dir) + + +## pidfile-based utilities +# helper utilities +struct MemoryFile <: Base.AbstractPipe + io::IOBuffer + mtime::Float64 +end +Base.pipe_reader(io::MemoryFile) = io.io +Base.Filesystem.mtime(io::MemoryFile) = io.mtime + +# set the process umask so we can test the behavior of +# open mask without interference from parent's state +# and create a test environment temp directory +umask(new_mask) = ccall((@static Sys.iswindows() ? :_umask : :umask), Cint, (Cint,), new_mask) +@testset "pidfile-based utilities" begin +old_umask = umask(0o002) +try + mktempdir() do dir + cd(dir) do +# now start tests definitions: + +@testset "validpid" begin + mypid = getpid() % Cuint + @test isvalidpid(gethostname(), mypid) + @test isvalidpid("", mypid) + @test !isvalidpid("", 0 % Cuint) + @test isvalidpid("NOT" * gethostname(), mypid) + @test isvalidpid("NOT" * gethostname(), 0 % Cuint) + @test isvalidpid("NOT" * gethostname(), -1 % Cuint) + if !Sys.iswindows() + @test isvalidpid("", 1 % Cuint) + @test !isvalidpid("", -1 % Cuint) + @test !isvalidpid("", -mypid) + end +end + +@testset "write_pidfile" begin + buf = IOBuffer() + pid, host, age = 0, "", 123 + pid2, host2, age2 = parse_pidfile(MemoryFile(seekstart(buf), time() - age)) + @test pid == pid2 + @test host == host2 + @test age ≈ age2 atol=5 + + host = " host\r\n" + write(buf, "-1 $host") + pid2, host2, age2 = parse_pidfile(MemoryFile(seekstart(buf), time() - age)) + @test pid == pid2 + @test host == host2 + @test age ≈ age2 atol=5 + truncate(seekstart(buf), 0) + + pid, host = getpid(), gethostname() + write_pidfile(buf, pid) + @test read(seekstart(buf), String) == "$pid $host" + pid2, host2, age2 = parse_pidfile(MemoryFile(seekstart(buf), time() - age)) + @test pid == pid2 + @test host == host2 + @test age ≈ age2 atol=5 + truncate(seekstart(buf), 0) + + @testset "parse_pidfile" begin + age = 0 + @test parse_pidfile("nonexist") === (Cuint(0), "", 0.0) + open(io -> write_pidfile(io, pid), "pidfile", "w") + pid2, host2, age2 = parse_pidfile("pidfile") + @test pid == pid2 + @test host == host2 + @test age ≈ age2 atol=10 + rm("pidfile") + end +end + +@testset "open_exclusive" begin + f = open_exclusive("pidfile")::File + try + # check that f is open and read-writable + @test isfile("pidfile") + @test filemode("pidfile") & 0o777 == 0o444 + @test filemode(f) & 0o777 == 0o444 + @test filesize(f) == 0 + @test write(f, "a") == 1 + @test filesize(f) == 1 + @test read(seekstart(f), String) == "a" + chmod("pidfile", 0o600) + @test filemode(f) & 0o777 == (Sys.iswindows() ? 0o666 : 0o600) + finally + close(f) + end + + # release the pidfile after a short delay + deleted = false + rmtask = @async begin + sleep(3) + rm("pidfile") + deleted = true + end + @test isfile("pidfile") + @test !deleted + + # open the pidfile again (should wait for it to disappear first) + t = @elapsed f2 = open_exclusive("pidfile")::File + try + @test deleted + @test isfile("pidfile") + @test t > 2 + if t > 6 + println("INFO: watch_file optimization appears to have NOT succeeded") + end + @test filemode(f2) & 0o777 == 0o444 + @test filesize(f2) == 0 + @test write(f2, "bc") == 2 + @test read(seekstart(f2), String) == "bc" + @test filesize(f2) == 2 + finally + close(f2) + end + rm("pidfile") + wait(rmtask) + + # now test with a long delay and other non-default options + f = open_exclusive("pidfile", mode = 0o000)::File + try + @test filemode(f) & 0o777 == (Sys.iswindows() ? 0o444 : 0o000) + finally + close(f) + end + deleted = false + rmtask = @async begin + sleep(8) + rm("pidfile") + deleted = true + end + @test isfile("pidfile") + @test !deleted + # open the pidfile again (should wait for it to disappear first) + t = @elapsed f2 = open_exclusive("pidfile", mode = 0o777, poll_interval = 1.0)::File + try + @test deleted + @test isfile("pidfile") + @test filemode(f2) & 0o777 == (Sys.iswindows() ? 0o666 : 0o775) + @test write(f2, "def") == 3 + @test read(seekstart(f2), String) == "def" + @test t > 7 + finally + close(f2) + end + rm("pidfile") + wait(rmtask) +end + +@testset "open_exclusive: break lock" begin + # test for stale_age + t = @elapsed f = open_exclusive("pidfile", poll_interval=3, stale_age=10)::File + try + write_pidfile(f, getpid()) + finally + close(f) + end + @test t < 2 + t = @elapsed f = open_exclusive("pidfile", poll_interval=3, stale_age=1)::File + close(f) + @test 20 < t < 50 + rm("pidfile") + + t = @elapsed f = open_exclusive("pidfile", poll_interval=3, stale_age=10)::File + close(f) + @test t < 2 + t = @elapsed f = open_exclusive("pidfile", poll_interval=3, stale_age=10)::File + close(f) + @test 8 < t < 20 + rm("pidfile") +end + +@testset "open_exclusive: other errors" begin + error = @test_throws(Base.IOError, open_exclusive("nonexist/folder")) + @test error.value.code == Base.UV_ENOENT +end + +@testset "mkpidlock" begin + lockf = mkpidlock("pidfile") + waittask = @async begin + sleep(3) + cd(homedir()) do + return close(lockf) + end + end + t = @elapsed lockf1 = mkpidlock("pidfile") + @test t > 2 + @test istaskdone(waittask) && fetch(waittask) + @test !close(lockf) + finalize(lockf1) + t = @elapsed lockf2 = mkpidlock("pidfile") + @test t < 2 + @test !close(lockf1) + + # test manual breakage of the lock + # is correctly handled + if Sys.iswindows() + mv("pidfile", "xpidfile") + else + rm("pidfile") + end + t = @elapsed lockf3 = mkpidlock("pidfile") + @test t < 2 + @test isopen(lockf2.fd) + @test !close(lockf2) + @test !isopen(lockf2.fd) + @test isfile("pidfile") + @test close(lockf3) + @test !isfile("pidfile") + if Sys.iswindows() + rm("xpidfile") + end + + # Just for coverage's sake, run a test with do-block syntax + lock_times = Float64[] + t_loop = @async begin + for idx in 1:100 + t = @elapsed mkpidlock("do_block_pidfile") do + end + sleep(0.01) + push!(lock_times, t) + end + end + mkpidlock("do_block_pidfile") do + sleep(3) + end + wait(t_loop) + @test maximum(lock_times) > 2 + @test minimum(lock_times) < 1 +end + +end; end # cd(tempdir) +finally + umask(old_umask) +end; end #testset