diff --git a/src/Dagger.jl b/src/Dagger.jl index 0c3761c4..584d8b8f 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -67,6 +67,7 @@ include("thunk.jl") include("submission.jl") include("chunks.jl") include("memory-spaces.jl") +include("rules.jl") # Task scheduling include("compute.jl") diff --git a/src/rules.jl b/src/rules.jl new file mode 100644 index 00000000..0d67f0ec --- /dev/null +++ b/src/rules.jl @@ -0,0 +1,76 @@ +struct Rule + action::Function + inputs::Vector{String} + outputs::Vector{String} + forcerun::Bool + + function Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}, forcerun::Bool) + new(action, inputs, outputs, forcerun) + end +end + +# String inputs/outputs variants +Rule(action::Function, inputs::String, outputs::String; forcerun::Bool=false) = + Rule(action, [inputs], [outputs], forcerun) + +Rule(action::Function, inputs::Vector{String}, outputs::String; forcerun::Bool=false) = + Rule(action, inputs, [outputs], forcerun) + +Rule(action::Function, inputs::String, outputs::Vector{String}; forcerun::Bool=false) = + Rule(action, [inputs], outputs, forcerun) + +Rule(action::Function, io::Pair; forcerun::Bool=false) = + Rule(action, io.first, io.second; forcerun) + +Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}; forcerun::Bool=false) = + Rule(action, inputs, outputs, forcerun) + +function Base.show(io::IO, r::Rule) + print(io, "Rule(") + print(io, r.action) + + # Show inputs and outputs + in_str = length(r.inputs) == 1 ? "\"$(r.inputs[1])\"" : string(r.inputs) + out_str = length(r.outputs) == 1 ? "\"$(r.outputs[1])\"" : string(r.outputs) + print(io, ", ", in_str, " => ", out_str) + + # Show forcerun if true + r.forcerun && print(io, "; forcerun=true") + + print(io, ")") +end + +function Base.show(io::IO, ::MIME"text/plain", r::Rule) + println(io, "Rule(;forcerun=$(r.forcerun)):") + println(io, " Action: ", r.action) + println(io, " Inputs: ", r.inputs) + println(io, " Outputs: ", r.outputs) +end + +function needs_update(task::Rule) + + # all input files should be present so I can check their dates + missingfiles = @. !isfile(task.inputs) + any(missingfiles) && throw(AssertionError("Rule declares $(task.inputs) as input\n but $(task.inputs[missingfiles]) do not exist.")) + + task.forcerun && return true + any(!isfile, task.outputs) && return true + + # Get the latest modification time of inputs + input_mtime = maximum(mtime.(task.inputs)) + # Get the earliest modification time of outputs + output_mtime = minimum(mtime.(task.outputs)) + # Run if any input is newer than any output + input_mtime > output_mtime +end + +function (task::Rule)(inputs...) # Inputs in not used, only for dagger to build the DAG + + if needs_update(task) + "[RUN] Running $(task) (thread $(Threads.threadid()))" |> println + task.action(task.inputs, task.outputs) + else + "[ - ] Skipping $(task)" |> println + end + task.outputs +end \ No newline at end of file diff --git a/test/rules.jl b/test/rules.jl new file mode 100644 index 00000000..611995ac --- /dev/null +++ b/test/rules.jl @@ -0,0 +1,122 @@ + +@testset "Rules" begin + +# test basic patterns +mktempdir() do dir + + write("$(dir)/a.txt", "a") + write("$(dir)/b.txt", "b") + + rule_write(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output + write(output[1], read(input[1], String)) + output + end + + rule_merge(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output + write(output[1], join(read.(input, String))) + output + end + + # Linear: a -> b -> c + r1 = rule_write("$(dir)/a.txt", "$(dir)/b1.txt") + r2 = rule_write("$(dir)/b1.txt", "$(dir)/c.txt") + t1 = Dagger.@spawn r1() + t2 = Dagger.@spawn r2(t1) + @test read(fetch(t2)[1], String) == "a" + + # Fan-in: a.txt, b.txt -> ab.txt + r3 = rule_merge(["$(dir)/a.txt", "$(dir)/b.txt"], "$(dir)/ab.txt") + t3 = Dagger.@spawn r3() + @test read(fetch(t3)[1], String) == "ab" + + # Fan-out: a.txt -> a1.txt, a2.txt + r4 = Dagger.Rule("$(dir)/a.txt" => ["$(dir)/a1.txt", "$(dir)/a2.txt"]; forcerun=true) do i, o + for f in o; + write(f, read(i[1], String)) + end + o + end + t4 = Dagger.@spawn r4() + @test read.(fetch(t4), String) == ["a", "a"] + + # Diamond: a -> b,c -> d + r5 = rule_write("$(dir)/a.txt", "$(dir)/b.txt") + r6 = rule_write("$(dir)/a.txt", "$(dir)/c.txt") + r7 = rule_merge(["$(dir)/b.txt", "$(dir)/c.txt"], "$(dir)/d.txt") + tb = Dagger.@spawn r5() + tc = Dagger.@spawn r6() + td = Dagger.@spawn r7(tb, tc) + @test read(fetch(td)[1], String) == "aa" +end + +# more realistic use case +mktempdir() do dir + + # avoid CSV & DataFrame dependency in tests + writefile(file, x) = begin + open(file, "w") do io + for i in 1:length(x)-1 + write(io, string(x[i]) * "\n") + end + write(io, string(x[end])) + end + end + + readfile(file) = read(file, String) |> x->split(x, "\n") .|> x->parse(Float64, x) + + x = rand(10) + writefile("$(dir)/test.txt", x) + @test x == readfile("$(dir)/test.txt") + + # prepare inputs + + sum_squared_input = Float64[] + for sample_idx in 1:5 + x = rand(10) + writefile("$(dir)/sample_$(sample_idx).csv", x) + push!(sum_squared_input, sum(x.^2)) + end + + samples = ["$(dir)/sample_$(sample_idx).csv" for sample_idx in 1:5] + + # define and run + + get_rule_square(sample) = Dagger.Rule(sample => replace(sample, "sample_" => "sample_squared_"); forcerun=false) do input, output + x = readfile(input[1]) + xsquared = x .^ 2 + writefile(output[1], xsquared) + output + end + + squared_rules = get_rule_square.(samples) + squared_rule_outputs = [only(r.outputs) for r in squared_rules] + + make_summary = Dagger.Rule(squared_rule_outputs => "$(dir)/samples_summary.csv"; forcerun=false) do inputs, output + xs = readfile.(inputs) + sum_squared = [sum(x) for x in xs] + writefile(output[1], sum_squared) + output + end + + squared = [Dagger.@spawn r() for r in squared_rules] + @warn "running first summary_file" + summary_file = Dagger.@spawn make_summary(squared...) + + out = readfile(fetch(summary_file)[1]) + + @test out == sum_squared_input + + @test Dagger.needs_update(make_summary) == false + sleep(1) + run(`touch $(squared_rule_outputs[1])`) + sleep(1) + @test Dagger.needs_update(make_summary) == true + + run(`rm $(squared_rule_outputs[1])`) + @warn "running second summary_file" + summary_file = Dagger.@spawn make_summary(squared...) + @test_throws Dagger.DTaskFailedException fetch(summary_file) + +end + +end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 39860cf2..593bae59 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -22,6 +22,7 @@ tests = [ ("Datadeps", "datadeps.jl"), ("Streaming", "streaming.jl"), ("Domain Utilities", "domain.jl"), + ("Rules", "rules.jl"), ("Array - Allocation", "array/allocation.jl"), ("Array - Indexing", "array/indexing.jl"), ("Array - Core", "array/core.jl"),