From 60a6986313a93e90ceede16ba674511f91ca7a72 Mon Sep 17 00:00:00 2001 From: Jonathan Bieler Date: Tue, 22 Jul 2025 13:25:54 +0000 Subject: [PATCH 1/2] added snakemake-like Rule --- src/Dagger.jl | 1 + src/rules.jl | 76 +++++++++++++++++++++++++++++ test/rules.jl | 122 +++++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 1 + 4 files changed, 200 insertions(+) create mode 100644 src/rules.jl create mode 100644 test/rules.jl diff --git a/src/Dagger.jl b/src/Dagger.jl index 0c3761c4..584d8b8f 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -67,6 +67,7 @@ include("thunk.jl") include("submission.jl") include("chunks.jl") include("memory-spaces.jl") +include("rules.jl") # Task scheduling include("compute.jl") diff --git a/src/rules.jl b/src/rules.jl new file mode 100644 index 00000000..0d67f0ec --- /dev/null +++ b/src/rules.jl @@ -0,0 +1,76 @@ +struct Rule + action::Function + inputs::Vector{String} + outputs::Vector{String} + forcerun::Bool + + function Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}, forcerun::Bool) + new(action, inputs, outputs, forcerun) + end +end + +# String inputs/outputs variants +Rule(action::Function, inputs::String, outputs::String; forcerun::Bool=false) = + Rule(action, [inputs], [outputs], forcerun) + +Rule(action::Function, inputs::Vector{String}, outputs::String; forcerun::Bool=false) = + Rule(action, inputs, [outputs], forcerun) + +Rule(action::Function, inputs::String, outputs::Vector{String}; forcerun::Bool=false) = + Rule(action, [inputs], outputs, forcerun) + +Rule(action::Function, io::Pair; forcerun::Bool=false) = + Rule(action, io.first, io.second; forcerun) + +Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}; forcerun::Bool=false) = + Rule(action, inputs, outputs, forcerun) + +function Base.show(io::IO, r::Rule) + print(io, "Rule(") + print(io, r.action) + + # Show inputs and outputs + in_str = length(r.inputs) == 1 ? "\"$(r.inputs[1])\"" : string(r.inputs) + out_str = length(r.outputs) == 1 ? "\"$(r.outputs[1])\"" : string(r.outputs) + print(io, ", ", in_str, " => ", out_str) + + # Show forcerun if true + r.forcerun && print(io, "; forcerun=true") + + print(io, ")") +end + +function Base.show(io::IO, ::MIME"text/plain", r::Rule) + println(io, "Rule(;forcerun=$(r.forcerun)):") + println(io, " Action: ", r.action) + println(io, " Inputs: ", r.inputs) + println(io, " Outputs: ", r.outputs) +end + +function needs_update(task::Rule) + + # all input files should be present so I can check their dates + missingfiles = @. !isfile(task.inputs) + any(missingfiles) && throw(AssertionError("Rule declares $(task.inputs) as input\n but $(task.inputs[missingfiles]) do not exist.")) + + task.forcerun && return true + any(!isfile, task.outputs) && return true + + # Get the latest modification time of inputs + input_mtime = maximum(mtime.(task.inputs)) + # Get the earliest modification time of outputs + output_mtime = minimum(mtime.(task.outputs)) + # Run if any input is newer than any output + input_mtime > output_mtime +end + +function (task::Rule)(inputs...) # Inputs in not used, only for dagger to build the DAG + + if needs_update(task) + "[RUN] Running $(task) (thread $(Threads.threadid()))" |> println + task.action(task.inputs, task.outputs) + else + "[ - ] Skipping $(task)" |> println + end + task.outputs +end \ No newline at end of file diff --git a/test/rules.jl b/test/rules.jl new file mode 100644 index 00000000..501bd159 --- /dev/null +++ b/test/rules.jl @@ -0,0 +1,122 @@ + +@testset "Rules" begin + +# test basic patterns +mktempdir() do dir + + write("$(dir)/a.txt", "a") + write("$(dir)/b.txt", "b") + + rule_write(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output + write(output[1], read(input[1], String)) + output + end + + rule_merge(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output + write(output[1], join(read.(input, String))) + output + end + + # Linear: a -> b -> c + r1 = rule_write("$(dir)/a.txt", "$(dir)/b1.txt") + r2 = rule_write("$(dir)/b1.txt", "$(dir)/c.txt") + t1 = Dagger.@spawn r1() + t2 = Dagger.@spawn r2(t1) + @test read(fetch(t2)[1], String) == "a" + + # Fan-in: a.txt, b.txt -> ab.txt + r3 = rule_merge(["$(dir)/a.txt", "$(dir)/b.txt"], "$(dir)/ab.txt") + t3 = Dagger.@spawn r3() + @test read(fetch(t3)[1], String) == "ab" + + # Fan-out: a.txt -> a1.txt, a2.txt + r4 = Dagger.Rule("$(dir)/a.txt" => ["$(dir)/a1.txt", "$(dir)/a2.txt"]; forcerun=true) do i, o + for f in o; + write(f, read(i[1], String)) + end + o + end + t4 = Dagger.@spawn r4() + @test read.(fetch(t4), String) == ["a", "a"] + + # Diamond: a -> b,c -> d + r5 = rule_write("$(dir)/a.txt", "$(dir)/b.txt") + r6 = rule_write("$(dir)/a.txt", "$(dir)/c.txt") + r7 = rule_merge(["$(dir)/b.txt", "$(dir)/c.txt"], "$(dir)/d.txt") + tb = Dagger.@spawn r5() + tc = Dagger.@spawn r6() + td = Dagger.@spawn r7(tb, tc) + @test read(fetch(td)[1], String) == "aa" +end + +# more realistic use case +mktempdir() do dir + + # avoid CSV & DataFrame dependency in tests + writefile(file, x) = begin + open(file, "w") do io + for i in 1:length(x)-1 + write(io, string(x[i]) * "\n") + end + write(io, string(x[end])) + end + end + + readfile(file) = read(file, String) |> x->split(x, "\n") .|> x->parse(Float64, x) + + x = rand(10) + writefile("$(dir)/test.txt", x) + @test x == readfile("$(dir)/test.txt") + + # prepare inputs + + mean_squared_input = Float64[] + for sample_idx in 1:5 + x = rand(10) + writefile("$(dir)/sample_$(sample_idx).csv", x) + push!(mean_squared_input, mean(x.^2)) + end + + samples = ["$(dir)/sample_$(sample_idx).csv" for sample_idx in 1:5] + + # define and run + + get_rule_square(sample) = Dagger.Rule(sample => replace(sample, "sample_" => "sample_squared_"); forcerun=false) do input, output + x = readfile(input[1]) + xsquared = x .^ 2 + writefile(output[1], xsquared) + output + end + + squared_rules = get_rule_square.(samples) + squared_rule_outputs = [only(r.outputs) for r in squared_rules] + + make_summary = Dagger.Rule(squared_rule_outputs => "$(dir)/samples_summary.csv"; forcerun=false) do inputs, output + xs = readfile.(inputs) + mean_squared = [mean(x) for x in xs] + writefile(output[1], mean_squared) + output + end + + squared = [Dagger.@spawn r() for r in squared_rules] + @warn "running first summary_file" + summary_file = Dagger.@spawn make_summary(squared...) + + out = readfile(fetch(summary_file)[1]) + + @test out == mean_squared_input + + @test Dagger.needs_update(make_summary) == false + sleep(1) + run(`touch $(squared_rule_outputs[1])`) + sleep(1) + @test Dagger.needs_update(make_summary) == true + + run(`rm $(squared_rule_outputs[1])`) + @warn "running second summary_file" + summary_file = Dagger.@spawn make_summary(squared...) + @test_throws Dagger.DTaskFailedException fetch(summary_file) + +end + +end \ No newline at end of file diff --git a/test/runtests.jl b/test/runtests.jl index 39860cf2..593bae59 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -22,6 +22,7 @@ tests = [ ("Datadeps", "datadeps.jl"), ("Streaming", "streaming.jl"), ("Domain Utilities", "domain.jl"), + ("Rules", "rules.jl"), ("Array - Allocation", "array/allocation.jl"), ("Array - Indexing", "array/indexing.jl"), ("Array - Core", "array/core.jl"), From 8f44c45aab14cbc5aa8ae9c7b83dd8e7ee6d960b Mon Sep 17 00:00:00 2001 From: Jonathan Bieler Date: Wed, 23 Jul 2025 11:07:37 +0000 Subject: [PATCH 2/2] replaced mean by sum in tests --- test/rules.jl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/rules.jl b/test/rules.jl index 501bd159..611995ac 100644 --- a/test/rules.jl +++ b/test/rules.jl @@ -70,11 +70,11 @@ mktempdir() do dir # prepare inputs - mean_squared_input = Float64[] + sum_squared_input = Float64[] for sample_idx in 1:5 x = rand(10) writefile("$(dir)/sample_$(sample_idx).csv", x) - push!(mean_squared_input, mean(x.^2)) + push!(sum_squared_input, sum(x.^2)) end samples = ["$(dir)/sample_$(sample_idx).csv" for sample_idx in 1:5] @@ -93,8 +93,8 @@ mktempdir() do dir make_summary = Dagger.Rule(squared_rule_outputs => "$(dir)/samples_summary.csv"; forcerun=false) do inputs, output xs = readfile.(inputs) - mean_squared = [mean(x) for x in xs] - writefile(output[1], mean_squared) + sum_squared = [sum(x) for x in xs] + writefile(output[1], sum_squared) output end @@ -104,7 +104,7 @@ mktempdir() do dir out = readfile(fetch(summary_file)[1]) - @test out == mean_squared_input + @test out == sum_squared_input @test Dagger.needs_update(make_summary) == false sleep(1)