Skip to content

Commit 16bddfb

Browse files
authored
Merge pull request #13 from JuliaImageRecon/nh/lock
Add algorithm locking for thread-safety
2 parents 70910e4 + 9415231 commit 16bddfb

File tree

12 files changed

+188
-56
lines changed

12 files changed

+188
-56
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
[![Build Status](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/JuliaImageRecon/AbstractImageReconstruction.jl/actions/workflows/CI.yml?query=branch%3Amain)
44

5-
[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl/latest)
5+
[![](https://img.shields.io/badge/docs-latest-blue.svg)](https://JuliaImageRecon.github.io/AbstractImageReconstruction.jl)
66

77

88
This package contains an interface and type hierarchy for image reconstruction algorithms and their parameters, together with associated utility tools.

docs/make.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ makedocs(
3939
"Serialization" => "generated/howto/serialization.md",
4040
"Caching" => "generated/howto/caching.md",
4141
"Observables" => "generated/howto/observables.md",
42+
"Multi-Threading" => "generated/howto/multi_threading.md",
4243
],
4344
"API Reference" => "API/api.md",
4445
#"Regularization Terms" => "API/regularization.md"],

docs/src/API/api.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ AbstractImageReconstruction.AbstractImageReconstructionAlgorithm
88
AbstractImageReconstruction.reconstruct
99
Base.put!(::AbstractImageReconstructionAlgorithm, ::Any)
1010
Base.take!(::AbstractImageReconstructionAlgorithm)
11+
Base.lock(::AbstractImageReconstructionAlgorithm)
12+
Base.unlock(::AbstractImageReconstructionAlgorithm)
13+
Base.isready(::AbstractImageReconstructionAlgorithm)
14+
Base.wait(::AbstractImageReconstructionAlgorithm)
1115
AbstractImageReconstruction.AbstractImageReconstructionParameters
1216
AbstractImageReconstruction.process
1317
AbstractImageReconstruction.parameter

docs/src/literate/example/2_direct.jl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,20 @@ end
5353
# And they implement a method to retrieve the used parameters:
5454
AbstractImageReconstruction.parameter(algo::DirectRadonAlgorithm) = algo.parameter
5555

56-
# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel:
56+
# Algorithms are assumed to be stateful. To ensure thread safety, we need to implement the `lock` and `unlock` functions. We will use the `output` channel as a lock:
57+
Base.lock(algo::DirectRadonAlgorithm) = lock(algo.output)
58+
Base.unlock(algo::DirectRadonAlgorithm) = unlock(algo.output)
59+
60+
# And implement the `put!` and `take!` functions, mimicking the behavior of a FIFO channel for reconstructions:
5761
Base.take!(algo::DirectRadonAlgorithm) = Base.take!(algo.output)
58-
function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
59-
lock(algo.output) do
62+
function Base.put!(algo::DirectRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
63+
lock(algo) do
6064
put!(algo.output, process(algo, algo.parameter, data))
6165
end
6266
end
6367

6468
# The way the behaviour is implemented here, the algorithm does not buffer any inputs and instead blocks until the currenct reconstruction is done. Outputs are stored until they are retrieved.
69+
70+
# With `wait` and `isready` we can check if the algorithm is currently processing data or if it is ready to accept new inputs:
71+
Base.wait(algo::DirectRadonAlgorithm) = wait(algo.output)
72+
Base.isready(algo::DirectRadonAlgorithm) = isready(algo.output)

docs/src/literate/example/4_iterative.jl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, para
6464
end
6565

6666
# Note that initially the operator is `nothing` and the processing step would fail as it stands. To "fix" this we define a `process` method for the algorithm instance which creates the operator and stores it in the algorithm:
67-
function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::IterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T}
67+
function AbstractImageReconstruction.process(algo::IterativeRadonAlgorithm, params::AbstractIterativeRadonReconstructionParameters, ::Nothing, data::AbstractArray{T, 4}) where {T}
6868
op = RadonOp(T; shape = params.shape, angles = params.angles)
6969
algo.op = op
7070
return process(AbstractIterativeRadonAlgorithm, params, op, data)
@@ -73,11 +73,15 @@ end
7373
# Our algorithm is not type stable. To fix this, we would need to know the element type of the sinograms during construction. Which is possible with a different parameterization of the algorithm. We will not do this here.
7474
# Often times the performance impact of this is negligible as the critical sections are in the preprocessing or the iterative solver, especially since we still dispatch on the operator.
7575

76-
# To finish up the implementation we need to implement the `put!`, `take!` and `parameters` functions:
76+
# To finish up the implementation we need to implement the remaining runtime related functions:
7777
Base.take!(algo::IterativeRadonAlgorithm) = Base.take!(algo.output)
7878
function Base.put!(algo::IterativeRadonAlgorithm, data::AbstractArray{T, 4}) where {T}
7979
lock(algo.output) do
8080
put!(algo.output, process(algo, algo.parameter, data))
8181
end
8282
end
83+
Base.lock(algo::IterativeRadonAlgorithm) = lock(algo.output)
84+
Base.unlock(algo::IterativeRadonAlgorithm) = unlock(algo.output)
85+
Base.isready(algo::IterativeRadonAlgorithm) = isready(algo.output)
86+
Base.wait(algo::IterativeRadonAlgorithm) = wait(algo.output)
8387
AbstractImageReconstruction.parameter(algo::IterativeRadonAlgorithm) = algo.parameter

docs/src/literate/example/example_include_data.jl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,5 @@ angles, shape, sinograms, images = isDataDefined ? (angles, shape, sinograms, im
3131
sinograms[:, :, :, i] = Array(RadonKA.radon(images[:, :, :, i], angles))
3232
end
3333
return angles, shape, sinograms, images
34-
end;
34+
end
35+
nothing
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
include("../../literate/example/example_include_all.jl") #hide
2+
3+
# # Multi-Threading
4+
# `AbstractImageReconstruction` assumes that algorithms are stateful. This is reflected in the FIFO behaviour and the locking interface of algorithms.
5+
# The motivation behind this choice is that the nature of computations within an algorithms heavily impact if multi-threading is beneficial or not.
6+
# For example, consider a GPU-accelerated reconstruction. There it might be faster to sequentially process all images on the GPU instead of processing them in parallel. Or consider, the preprocessing step of the Radon example where we average our data. If we were to extend our algorithm to read sinograms from a file, it might be inefficient to partially read and average frames from the file in parallel.
7+
# Instead it would be more efficient to read the required file in one go and then average the frames in parallel.
8+
# Therefore, the actual runtime behaviour is intended to be an implementation detail of an algorithm which is to be abstracted behind `reconstruct`.
9+
10+
# In the following we will explore the results of this design decision. If we consider a n algorithm such as:
11+
plan = RecoPlan(IterativeRadonAlgorithm, parameter = RecoPlan(IterativeRadonParameters,
12+
pre = RecoPlan(RadonPreprocessingParameters, frames = collect(1:5)),
13+
reco = RecoPlan(IterativeRadonReconstructionParameters, shape = size(images)[1:3], angles = angles,
14+
iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR)
15+
))
16+
algo = build(plan)
17+
18+
# which acts on one frame at a time, we could in theory do:
19+
# ```julia
20+
# Threads.@threads for i = 1:size(sinograms, 4)
21+
# res = reconstruct(algo, sinograms[:, :, :, i:i])
22+
# # Store res
23+
# end
24+
# ```
25+
# Due to the locking interface of the algorithm, this will not actually run in parallel. Instead the algorithm will be locked for each iteration and tasks will wait for the previous reconstruction to finish.
26+
27+
# If a user wants to explicitly use multi-threading, we could the plan to create a new algorithm for each task:
28+
# ```julia
29+
# Threads.@threads for i = 1:size(sinograms, 4)
30+
# algo = build(plan)
31+
# res = reconstruct(algo, sinograms[:, :, :, i:i])
32+
# # Store res
33+
# end
34+
# ```
35+
# This way each task has its own algorithm and can run in parallel. As mentioned before, this parallelization might not be the most efficient parallel reconstruction, both in terms of memory and runtime.
36+
37+
# To further improve the performance of the reconstruction, we could implement specific multi-threading `process`-ing steps for our algorithms. As an example, we will implement parallel processing for the iterative solver:
38+
Base.@kwdef struct ThreadedIterativeReconstructionParameters{S <: AbstractLinearSolver, R <: AbstractRegularization, N} <: AbstractIterativeRadonReconstructionParameters
39+
solver::Type{S}
40+
iterations::Int64
41+
reg::Vector{R}
42+
shape::NTuple{N, Int64}
43+
angles::Vector{Float64}
44+
end
45+
# Our parameters are identical to the iterative reconstruction parameters from the iterative example. We only differ in the type of the parameters. This allows us to dispatch on the type of the parameters and implement a different `process` method for the threaded version:
46+
function AbstractImageReconstruction.process(::Type{<:AbstractIterativeRadonAlgorithm}, params::ThreadedIterativeReconstructionParameters, op, data::AbstractArray{T, 4}) where {T}
47+
48+
result = similar(data, params.shape..., size(data, 4))
49+
50+
Threads.@threads for i = 1:size(data, 4)
51+
solver = createLinearSolver(params.solver, op; iterations = params.iterations, reg = params.reg)
52+
result[:, :, :, i] = solve!(solver, vec(data[:, :, :, i]))
53+
end
54+
55+
return result
56+
end
57+
58+
# While the Radon operator is thread-safe, the normal operator constructed by the solver is not. Therefore, we can reuse the Radon operator but still have to construct new solvers for each task.
59+
60+
# Our multi-threaded reconstruction can be constructed and used just like the single-threaded one::
61+
plan.parameter.pre.frames = collect(1:size(sinograms, 4))
62+
plan.parameter.reco = RecoPlan(ThreadedIterativeReconstructionParameters, shape = size(images)[1:3], angles = angles,
63+
iterations = 20, reg = [L2Regularization(0.001), PositiveRegularization()], solver = CGNR)
64+
65+
algo = build(plan)
66+
imag_iter = reconstruct(algo, sinograms)
67+
fig = Figure()
68+
for i = 1:5
69+
plot_image(fig[i,1], reverse(images[:, :, 24, i]))
70+
plot_image(fig[i,2], sinograms[:, :, 24, i])
71+
plot_image(fig[i,3], reverse(imag_iter[:, :, 24, i]))
72+
end
73+
resize_to_layout!(fig)
74+
fig

src/AbstractImageReconstruction.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ using Observables
66
using Scratch
77
using LRUCache
88

9-
import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready
9+
import Base: put!, take!, fieldtypes, fieldtype, ismissing, propertynames, parent, hash, wait, isready, lock, unlock
1010

1111
include("AlgorithmInterface.jl")
1212
include("StructTransforms.jl")

src/AlgorithmInterface.jl

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,57 @@ put!(algo::AbstractImageReconstructionAlgorithm, inputs...) = error("$(typeof(al
2727
Remove and return a stored result from the algorithm `algo`. Blocks until a result is available.
2828
"""
2929
take!(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement take!")
30+
"""
31+
isready(algo::AbstractImageReconstructionAlgorithm)
32+
33+
Determine if the algorithm `algo` has a result available.
34+
"""
35+
isready(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement isready")
36+
"""
37+
wait(algo::AbstractImageReconstructionAlgorithm)
38+
39+
Wait for a result to be available from the specified `algo`.
40+
"""
41+
wait(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement wait")
42+
"""
43+
lock(algo::AbstractImageReconstructionAlgorithm)
44+
45+
Acquire a lock on the algorithm `algo`. If the lock is already acquired, wait until it is released.
46+
47+
Each `lock` must be matched with a `unlock`.
48+
"""
49+
lock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement lock")
50+
"""
51+
unlock(algo::AbstractImageReconstructionAlgorithm)
52+
53+
Release a lock on the algorithm `algo`.
54+
"""
55+
unlock(algo::AbstractImageReconstructionAlgorithm) = error("$(typeof(algo)) must implement unlock")
56+
"""
57+
lock(fn, algo::AbstractImageReconstructionAlgorithm)
58+
59+
Acquire the `lock` on `algo`, execute `fn` and release the `lock` afterwards.
60+
"""
61+
function lock(fn, algo::AbstractImageReconstructionAlgorithm)
62+
lock(algo)
63+
try
64+
fn()
65+
finally
66+
unlock(algo)
67+
end
68+
end
3069

3170
export reconstruct
3271
"""
3372
reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm}
3473
35-
Reconstruct an image from input `u` using algorithm `algo`.
74+
Reconstruct an image from input `u` using algorithm `algo`. The `àlgo` will be `lock`ed until the result is available or an error occurs.
3675
"""
3776
function reconstruct(algo::T, u) where {T<:AbstractImageReconstructionAlgorithm}
38-
put!(algo, u)
39-
return take!(algo)
77+
lock(algo) do
78+
put!(algo, u)
79+
return take!(algo)
80+
end
4081
end
4182

4283
export process

src/MiscAlgorithms/MiscAlgorithms.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
include("RuntimeAlgorithms.jl")
1+
include("ThreadPinnedAlgorithm.jl")

0 commit comments

Comments
 (0)