Skip to content

added snakemake-like Rule #638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ include("thunk.jl")
include("submission.jl")
include("chunks.jl")
include("memory-spaces.jl")
include("rules.jl")

# Task scheduling
include("compute.jl")
Expand Down
76 changes: 76 additions & 0 deletions src/rules.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
struct Rule
action::Function
inputs::Vector{String}
outputs::Vector{String}
forcerun::Bool

function Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}, forcerun::Bool)
new(action, inputs, outputs, forcerun)
end
end

# String inputs/outputs variants
Rule(action::Function, inputs::String, outputs::String; forcerun::Bool=false) =
Rule(action, [inputs], [outputs], forcerun)

Rule(action::Function, inputs::Vector{String}, outputs::String; forcerun::Bool=false) =
Rule(action, inputs, [outputs], forcerun)

Rule(action::Function, inputs::String, outputs::Vector{String}; forcerun::Bool=false) =
Rule(action, [inputs], outputs, forcerun)

Rule(action::Function, io::Pair; forcerun::Bool=false) =
Rule(action, io.first, io.second; forcerun)

Rule(action::Function, inputs::Vector{String}, outputs::Vector{String}; forcerun::Bool=false) =
Rule(action, inputs, outputs, forcerun)

function Base.show(io::IO, r::Rule)
print(io, "Rule(")
print(io, r.action)

# Show inputs and outputs
in_str = length(r.inputs) == 1 ? "\"$(r.inputs[1])\"" : string(r.inputs)
out_str = length(r.outputs) == 1 ? "\"$(r.outputs[1])\"" : string(r.outputs)
print(io, ", ", in_str, " => ", out_str)

# Show forcerun if true
r.forcerun && print(io, "; forcerun=true")

print(io, ")")
end

function Base.show(io::IO, ::MIME"text/plain", r::Rule)
println(io, "Rule(;forcerun=$(r.forcerun)):")
println(io, " Action: ", r.action)
println(io, " Inputs: ", r.inputs)
println(io, " Outputs: ", r.outputs)
end

function needs_update(task::Rule)

# all input files should be present so I can check their dates
missingfiles = @. !isfile(task.inputs)
any(missingfiles) && throw(AssertionError("Rule declares $(task.inputs) as input\n but $(task.inputs[missingfiles]) do not exist."))

task.forcerun && return true
any(!isfile, task.outputs) && return true

# Get the latest modification time of inputs
input_mtime = maximum(mtime.(task.inputs))
# Get the earliest modification time of outputs
output_mtime = minimum(mtime.(task.outputs))
# Run if any input is newer than any output
input_mtime > output_mtime
end

function (task::Rule)(inputs...) # Inputs in not used, only for dagger to build the DAG

if needs_update(task)
"[RUN] Running $(task) (thread $(Threads.threadid()))" |> println
task.action(task.inputs, task.outputs)
else
"[ - ] Skipping $(task)" |> println
end
task.outputs
end
122 changes: 122 additions & 0 deletions test/rules.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@

@testset "Rules" begin

# test basic patterns
mktempdir() do dir

write("$(dir)/a.txt", "a")
write("$(dir)/b.txt", "b")

rule_write(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output
write(output[1], read(input[1], String))
output
end

rule_merge(x, y) = Dagger.Rule(x => y; forcerun=true) do input, output
write(output[1], join(read.(input, String)))
output
end

# Linear: a -> b -> c
r1 = rule_write("$(dir)/a.txt", "$(dir)/b1.txt")
r2 = rule_write("$(dir)/b1.txt", "$(dir)/c.txt")
t1 = Dagger.@spawn r1()
t2 = Dagger.@spawn r2(t1)
@test read(fetch(t2)[1], String) == "a"

# Fan-in: a.txt, b.txt -> ab.txt
r3 = rule_merge(["$(dir)/a.txt", "$(dir)/b.txt"], "$(dir)/ab.txt")
t3 = Dagger.@spawn r3()
@test read(fetch(t3)[1], String) == "ab"

# Fan-out: a.txt -> a1.txt, a2.txt
r4 = Dagger.Rule("$(dir)/a.txt" => ["$(dir)/a1.txt", "$(dir)/a2.txt"]; forcerun=true) do i, o
for f in o;
write(f, read(i[1], String))
end
o
end
t4 = Dagger.@spawn r4()
@test read.(fetch(t4), String) == ["a", "a"]

# Diamond: a -> b,c -> d
r5 = rule_write("$(dir)/a.txt", "$(dir)/b.txt")
r6 = rule_write("$(dir)/a.txt", "$(dir)/c.txt")
r7 = rule_merge(["$(dir)/b.txt", "$(dir)/c.txt"], "$(dir)/d.txt")
tb = Dagger.@spawn r5()
tc = Dagger.@spawn r6()
td = Dagger.@spawn r7(tb, tc)
@test read(fetch(td)[1], String) == "aa"
end

# more realistic use case
mktempdir() do dir

# avoid CSV & DataFrame dependency in tests
writefile(file, x) = begin
open(file, "w") do io
for i in 1:length(x)-1
write(io, string(x[i]) * "\n")
end
write(io, string(x[end]))
end
end

readfile(file) = read(file, String) |> x->split(x, "\n") .|> x->parse(Float64, x)

x = rand(10)
writefile("$(dir)/test.txt", x)
@test x == readfile("$(dir)/test.txt")

# prepare inputs

sum_squared_input = Float64[]
for sample_idx in 1:5
x = rand(10)
writefile("$(dir)/sample_$(sample_idx).csv", x)
push!(sum_squared_input, sum(x.^2))
end

samples = ["$(dir)/sample_$(sample_idx).csv" for sample_idx in 1:5]

# define and run

get_rule_square(sample) = Dagger.Rule(sample => replace(sample, "sample_" => "sample_squared_"); forcerun=false) do input, output
x = readfile(input[1])
xsquared = x .^ 2
writefile(output[1], xsquared)
output
end

squared_rules = get_rule_square.(samples)
squared_rule_outputs = [only(r.outputs) for r in squared_rules]

make_summary = Dagger.Rule(squared_rule_outputs => "$(dir)/samples_summary.csv"; forcerun=false) do inputs, output
xs = readfile.(inputs)
sum_squared = [sum(x) for x in xs]
writefile(output[1], sum_squared)
output
end

squared = [Dagger.@spawn r() for r in squared_rules]
@warn "running first summary_file"
summary_file = Dagger.@spawn make_summary(squared...)

out = readfile(fetch(summary_file)[1])

@test out == sum_squared_input

@test Dagger.needs_update(make_summary) == false
sleep(1)
run(`touch $(squared_rule_outputs[1])`)
sleep(1)
@test Dagger.needs_update(make_summary) == true

run(`rm $(squared_rule_outputs[1])`)
@warn "running second summary_file"
summary_file = Dagger.@spawn make_summary(squared...)
@test_throws Dagger.DTaskFailedException fetch(summary_file)

end

end
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ tests = [
("Datadeps", "datadeps.jl"),
("Streaming", "streaming.jl"),
("Domain Utilities", "domain.jl"),
("Rules", "rules.jl"),
("Array - Allocation", "array/allocation.jl"),
("Array - Indexing", "array/indexing.jl"),
("Array - Core", "array/core.jl"),
Expand Down