Skip to content

Commit aef4c07

Browse files
rafaqzasinghvi17
andauthored
Thread-safety tools with ThreadFunctors, simplified apply via Applicators (#288)
Changelog: - Refactor the `apply` pipeline to get rid of closures in favour of `Applicator`s, which are able to run `apply` on arrays, geoms and featurecollections. Some examples are `ApplyToPoint`, `ApplyToGeom`, etc. These are mainly used by internal methods in the `apply` pipeline. - Add a `TaskFunctors` struct that allows one to use individual functors per task. This allows thread safety for reentrant C APIs. - Refactor `reproject`, in the Proj extension, to use this functionality * add ThreadFunctors for threaded stateful functors in apply * include threading.jl * import ThreadFunctors * add Applicators * comments * Move applicators out of apply.jl * Replace the apply on pointtrait to polygon guts with a custom applicator. Maybe we need a formalized interface for applicators. * include applicators in core * add a few more docs in threading.jl * define applicator interface + remove overwrite * fix reproject * add back crs info to rebuilt geoms * remove error * add GFT * add and use applicators * more applicators * stop segfault by assigning context post cloning / construction * fix maptasks on thread functors * Fix tests ProjString needs +type=crs but regular String does not. This is likely because I stopped converting everything to String and started relying on the Proj constructors. * fixc * union proj crs * if statement for stability * passing tests * to -> with --------- Co-authored-by: Anshul Singhvi <[email protected]>
1 parent 723874e commit aef4c07

File tree

10 files changed

+236
-49
lines changed

10 files changed

+236
-49
lines changed

GeometryOpsCore/src/GeometryOpsCore.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ include("types/operation.jl")
2727
include("types/exceptions.jl")
2828
include("types/booltypes.jl")
2929
include("types/traittarget.jl")
30+
include("types/applicators.jl")
31+
include("types/tasks.jl")
3032

3133
include("apply.jl")
3234
include("applyreduce.jl")

GeometryOpsCore/src/apply.jl

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -145,19 +145,16 @@ end
145145
_apply(f, target, GI.trait(geom), geom; kw...)
146146
# There is no trait and this is an AbstractArray - so just iterate over it calling _apply on the contents
147147
@inline function _apply(f::F, target, ::Nothing, A::AbstractArray; threaded, kw...) where F
148-
# For an Array there is nothing else to do but map `_apply` over all values
149-
# _maptasks may run this level threaded if `threaded==true`,
150-
# but deeper `_apply` called in the closure will not be threaded
151-
apply_to_array(i) = _apply(f, target, A[i]; threaded=False(), kw...)
152-
_maptasks(apply_to_array, eachindex(A), threaded)
148+
applicator = ApplyToArray(f, target, A; kw...)
149+
_maptasks(applicator, eachindex(A), threaded)
153150
end
154151
# There is no trait and this is not an AbstractArray.
155152
# Try to call _apply over it. We can't use threading
156153
# as we don't know if we can can index into it. So just `map`.
157154
@inline function _apply(f::F, target, ::Nothing, iterable::IterableType; threaded, kw...) where {F, IterableType}
158155
# Try the Tables.jl interface first
159156
if Tables.istable(iterable)
160-
_apply_table(f, target, iterable; threaded, kw...)
157+
_apply_table(f, target, iterable; threaded, kw...)
161158
else # this is probably some form of iterable...
162159
if threaded isa True
163160
# `collect` first so we can use threads
@@ -239,9 +236,8 @@ end
239236
) where F
240237

241238
# Run _apply on all `features` in the feature collection, possibly threaded
242-
apply_to_feature(i) =
243-
_apply(f, target, GI.getfeature(fc, i); crs, calc_extent, threaded=False())::GI.Feature
244-
features = _maptasks(apply_to_feature, 1:GI.nfeature(fc), threaded)
239+
applicator = ApplyToFeatures(f, target, fc; crs, calc_extent)
240+
features = _maptasks(applicator, 1:GI.nfeature(fc), threaded)
245241
if calc_extent isa True
246242
# Calculate the extent of the features
247243
extent = mapreduce(GI.extent, Extents.union, features)
@@ -278,21 +274,16 @@ end
278274
# Map `_apply` over all sub geometries of `geom`
279275
# to create a new vector of geometries
280276
# TODO handle zero length
281-
apply_to_geom(i) = _apply(f, target, GI.getgeom(geom, i); crs, calc_extent, threaded=False())
282-
geoms = _maptasks(apply_to_geom, 1:GI.ngeom(geom), threaded)
277+
applicator = ApplyToGeom(f, target, geom; crs, calc_extent)
278+
geoms = _maptasks(applicator, 1:GI.ngeom(geom), threaded)
283279
return _apply_inner(geom, geoms, crs, calc_extent)
284280
end
285281
@inline function _apply(f::F, target::TraitTarget{<:PointTrait}, trait::GI.PolygonTrait, geom;
286282
crs=GI.crs(geom), calc_extent=False(), threaded
287283
)::(GI.geointerface_geomtype(trait)) where F
288284
# We need to force rebuilding a LinearRing not a LineString
289-
geoms = _maptasks(1:GI.ngeom(geom), threaded) do i
290-
lr = GI.getgeom(geom, i)
291-
points = map(GI.getgeom(lr)) do p
292-
_apply(f, target, p; crs, calc_extent, threaded=False())
293-
end
294-
_linearring(_apply_inner(lr, points, crs, calc_extent))
295-
end
285+
applicator = ApplyPointsToPolygon(f, target, geom; crs, calc_extent)
286+
geoms = _maptasks(applicator, 1:GI.ngeom(geom), threaded)
296287
return _apply_inner(geom, geoms, crs, calc_extent)
297288
end
298289
function _apply_inner(geom, geoms, crs, calc_extent::True)
@@ -360,6 +351,24 @@ end
360351
end
361352
end
362353

354+
# Finally we join the results into a new vector
355+
return mapreduce(fetch, vcat, tasks)
356+
end
357+
@inline function _maptasks(a::Applicator{<:TaskFunctors}, taskrange, threaded::True)::Vector
358+
ntasks = length(taskrange)
359+
chunk_size = max(1, cld(ntasks, (length(a.f.functors))))
360+
# partition the range into chunks
361+
task_chunks = Iterators.partition(taskrange, chunk_size)
362+
# Map over the chunks
363+
tasks = map(task_chunks, view(a.f.functors, 1:length(task_chunks))) do chunk, ft
364+
f = rebuild(a, ft)
365+
# Spawn a task to process this chunk
366+
StableTasks.@spawn begin
367+
# Where we map `f` over the chunk indices
368+
map(f, chunk)
369+
end
370+
end
371+
363372
# Finally we join the results into a new vector
364373
return mapreduce(fetch, vcat, tasks)
365374
end
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#=
2+
3+
```@meta
4+
CollapsedDocStrings = true
5+
```
6+
# Applicators
7+
8+
Applicators are functor structs that wrap a function and a target object. They are used to
9+
implement the `apply` interface, and are also used to dispatch to the correct method of `_maptasks`.
10+
11+
The basic functor struct is `ApplyToGeom`, which just applies a function to a geometry.
12+
13+
```@docs; canonical=false
14+
Applicator
15+
ApplyToGeom
16+
ApplyToArray
17+
ApplyToFeatures
18+
```
19+
=#
20+
21+
"""
22+
abstract type Applicator{F,T}
23+
24+
An abstract type for applicators that apply a function to a target object.
25+
26+
The type parameter `F` is the type of the function to apply, and `T` is the type of the target object.
27+
28+
A common dispatch pattern is to dispatch on `F` which may also be e.g. a ThreadFunctor.
29+
30+
## Interface
31+
All applicators must be callable by an index integer, and define the following methods:
32+
33+
- `rebuild(a::Applicator, f)` - swap out the function and return a new applicator.
34+
35+
The calling convention is `my_applicator(i::Int)`, so applicators must define this method.
36+
"""
37+
abstract type Applicator{F,T} end
38+
39+
struct ApplyToPoint{Z,M,F} <: Applicator{F,Nothing}
40+
f::F
41+
end
42+
ApplyToPoint{Z,M}(f::F) where {Z,M,F} = ApplyToPoint{Z,M,F}(f)
43+
ApplyToPoint{Z}(f::F) where {Z,F} = ApplyToPoint{Z,false,F}(f)
44+
# Default function is just `tuple`
45+
(a::Type{<:ApplyToPoint})() = a(tuple)
46+
rebuild(a::ApplyToPoint{Z, M}, f::F) where {Z, M, F} = ApplyToPoint{Z, M, F}(f)
47+
48+
# Currently we ignore M by default
49+
const WithXY = ApplyToPoint{false}
50+
const WithXYZ = ApplyToPoint{true}
51+
# But these could be used to require M
52+
const WithXYM = ApplyToPoint{false,true}
53+
const WithXYZM = ApplyToPoint{true,true}
54+
55+
(t::WithXY)(p) = t.f(GI.x(p), GI.y(p))
56+
(t::WithXYZ)(p) = t.f(GI.x(p), GI.y(p), GI.z(p))
57+
(t::WithXYZM)(p) = t.f(GI.x(p), GI.y(p), GI.m(p))
58+
(t::WithXYM)(p) = t.f(GI.x(p), GI.y(p), GI.z(p), GI.m(p))
59+
60+
# ***
61+
62+
for T in (:ApplyToGeom, :ApplyToArray, :ApplyToFeatures, :ApplyPointsToPolygon)
63+
@eval begin
64+
struct $T{F,T,O,K} <: Applicator{F,T}
65+
f::F
66+
target::T
67+
obj::O
68+
kw::K
69+
end
70+
$T(f, target, obj; kw...) = $T(f, target, obj, kw)
71+
# rebuild lets us swap out the function, such as with TaskFunctors
72+
rebuild(a::$T, f) = $T(f, a.target, a.obj, a.kw)
73+
end
74+
end
75+
76+
# Functor definitions
77+
# _maptasks may run this level threaded if `threaded==true`
78+
# but deeper `_apply` calls will not be threaded
79+
# For an Array there is nothing to do but map `_apply` over all values
80+
(a::ApplyToArray)(i::Int) = _apply(a.f, a.target, a.obj[i]; a.kw..., threaded=False())
81+
# For a FeatureCollection or Geometry we need getfeature or getgeom calls
82+
(a::ApplyToFeatures)(i::Int) = _apply(a.f, a.target, GI.getfeature(a.obj, i); a.kw..., threaded=False())
83+
(a::ApplyToGeom)(i::Int) = _apply(a.f, a.target, GI.getgeom(a.obj, i); a.kw..., threaded=False())
84+
function (a::ApplyPointsToPolygon)(i::Int)
85+
lr = GI.getgeom(a.obj, i)
86+
points = map(GI.getgeom(lr)) do p
87+
_apply(a.f, a.target, p; a.kw..., threaded=False())
88+
end
89+
_linearring(_apply_inner(lr, points, a.kw[:crs], a.kw[:calc_extent]))
90+
end
91+
92+
@doc """
93+
ApplyToArray(f, target, arr, kw)
94+
95+
Create an [`Applicator`](@ref) that applies a function to all elements of `arr`.
96+
""" ApplyToArray
97+
98+
@doc """
99+
ApplyToGeom(f, target, geom, kw)
100+
101+
Create an [`Applicator`](@ref) that applies a function to all sub-geometries of `geom`.
102+
""" ApplyToGeom
103+
104+
@doc """
105+
ApplyToFeatures(f, target, fc, kw)
106+
107+
Create an [`Applicator`](@ref) that applies a function to all features of `fc`.
108+
""" ApplyToFeatures

GeometryOpsCore/src/types/booltypes.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,3 +51,7 @@ function booltype end
5151

5252
@inline booltype(x::Bool)::BoolsAsTypes = x ? True() : False()
5353
@inline booltype(x::BoolsAsTypes)::BoolsAsTypes = x
54+
55+
@inline istrue(x::True) = true
56+
@inline istrue(x::False) = false
57+
@inline istrue(x::Bool) = x

GeometryOpsCore/src/types/tasks.jl

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#=
2+
# TaskFunctors
3+
4+
Many functions a user might run through `apply` call into C. But what do you do
5+
when the C function you are calling is not threadsafe, or uses a reentrant/context
6+
based API?
7+
8+
The answer is to wrap the function in TaskFunctors, which will use an approximation of
9+
task local storage to ensure that each task calls its own copy of the function or C object
10+
you are invoking.
11+
12+
The primary application of this is Proj `reproject`, and Proj has a reentrant API based on context
13+
objects. So, to make this work, we need to create a new context object per task. We do this and
14+
pass the vector of Proj transformation objects to `TaskFunctors`, which `apply` and `_maptasks`
15+
dispatch on to behave correctly in this circumstance.
16+
17+
```@docs; canonical=false
18+
TaskFunctors
19+
```
20+
=#
21+
"""
22+
TaskFunctors(functors, tasks_per_thread)
23+
24+
A struct to hold the functors and tasks_per_thread,
25+
for internal use with `_maptasks` where functions have state
26+
that cannot be shared accross threads, such as `Proj.Transformation`.
27+
28+
`functors` must be an array or tuple of functors, one per thread,
29+
and `tasks_per_thread` must be an integer. This also allows you to control
30+
the number of tasks per thread that `_maptasks` launches, useful for tuning
31+
performance if you like.
32+
"""
33+
struct TaskFunctors{F}
34+
functors::F
35+
end

Project.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CoordinateTransformations = "150eb455-5306-5404-9cee-2592286d6298"
88
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
99
DelaunayTriangulation = "927a84f5-c5f4-47a5-9785-b46e178433df"
1010
ExactPredicates = "429591f6-91af-11e9-00e2-59fbe8cec110"
11+
GeoFormatTypes = "68eda718-8dee-11e9-39e7-89f7f65f511f"
1112
GeoInterface = "cf35fbd7-0cd7-5166-be24-54bfbe79505f"
1213
GeometryBasics = "5c1252a2-5f33-56bf-86c9-59e7332b4326"
1314
GeometryOpsCore = "05efe853-fabf-41c8-927e-7063c8b9f013"
@@ -32,6 +33,7 @@ DataAPI = "1"
3233
DelaunayTriangulation = "1.0.4"
3334
ExactPredicates = "2.2.8"
3435
FlexiJoins = "0.1.30"
36+
GeoFormatTypes = "0.4"
3537
GeoInterface = "1.2"
3638
GeometryBasics = "0.4.7, 0.5"
3739
GeometryOpsCore = "=0.1.3"

ext/GeometryOpsProjExt/reproject.jl

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
import GeometryOps: GI, GeoInterface, reproject, apply, transform, _is3d, True, False
1+
import GeometryOps: GI, GeoInterface, reproject, apply, transform, _is3d, istrue,
2+
True, False, TaskFunctors, WithXY, WithXYZ
3+
import GeoFormatTypes
24
import Proj
35

6+
# TODO:
7+
# - respect `time`
8+
# - respect measured values
9+
410
function reproject(geom;
511
source_crs=nothing, target_crs=nothing, transform=nothing, kw...
612
)
@@ -22,29 +28,53 @@ function reproject(geom;
2228
reproject(geom, transform; kw...)
2329
end
2430
end
25-
function reproject(geom, source_crs, target_crs;
26-
time=Inf,
27-
always_xy=true,
28-
transform=nothing,
31+
function reproject(geom, source_crs, target_crs; always_xy=true, kw...)
32+
transform = Proj.Transformation(convert(String, source_crs), convert(String, target_crs); always_xy)
33+
return reproject(geom, transform; target_crs, kw...)
34+
end
35+
function reproject(
36+
geom, source_crs::CRSType, target_crs::CRSType; always_xy=true, kw...
37+
) where CRSType <: Union{GeoFormatTypes.GeoFormat, Proj.CRS}
38+
transform = Proj.Transformation(source_crs, target_crs; always_xy)
39+
return reproject(geom, transform; target_crs, kw...)
40+
end
41+
function reproject(geom, transform::Proj.Transformation;
42+
context=C_NULL,
43+
target_crs=nothing,
44+
time=Inf,
45+
threaded=False(),
2946
kw...
3047
)
31-
transform = if isnothing(transform)
32-
s = source_crs isa Proj.CRS ? source_crs : convert(String, source_crs)
33-
t = target_crs isa Proj.CRS ? target_crs : convert(String, target_crs)
34-
Proj.Transformation(s, t; always_xy)
35-
else
36-
transform
48+
if isnothing(target_crs)
49+
target_crs = GeoFormatTypes.ESRIWellKnownText(Proj.CRS(Proj.proj_get_target_crs(transform.pj)))
3750
end
38-
reproject(geom, transform; time, target_crs, kw...)
39-
end
40-
function reproject(geom, transform::Proj.Transformation; time=Inf, target_crs=nothing, kw...)
41-
if _is3d(geom)
42-
return apply(GI.PointTrait(), geom; crs=target_crs, kw...) do p
43-
transform(GI.x(p), GI.y(p), GI.z(p))
51+
kw1 = (; crs=target_crs, threaded, kw...)
52+
if istrue(threaded)
53+
tasks_per_thread = 2
54+
ntasks = Threads.nthreads() * tasks_per_thread
55+
# Construct one context per planned task
56+
contexts = [Proj.proj_context_clone(context) for _ in 1:ntasks]
57+
# Clone the transformation for each context
58+
proj_transforms = [Proj.Transformation(Proj.proj_clone(transform.pj)) for context in contexts]
59+
# Assign the context to the transformation
60+
Proj.proj_assign_context.(getproperty.(proj_transforms, :pj), contexts)
61+
62+
results = if _is3d(geom)
63+
functors = TaskFunctors(WithXYZ.(proj_transforms))
64+
apply(functors, GI.PointTrait(), geom; kw1...)
65+
else
66+
functors = TaskFunctors(WithXY.(proj_transforms))
67+
apply(functors, GI.PointTrait(), geom; kw1...)
4468
end
69+
# Destroy the temporary threading contexts that we created
70+
Proj.proj_destroy.(contexts)
71+
# Return the results
72+
return results
4573
else
46-
return apply(GI.PointTrait(), geom; crs=target_crs, kw...) do p
47-
transform(GI.x(p), GI.y(p))
74+
if _is3d(geom)
75+
return apply(WithXYZ(transform), GI.PointTrait(), geom; kw1...)
76+
else
77+
return apply(WithXY(transform), GI.PointTrait(), geom; kw1...)
4878
end
49-
end
50-
end
79+
end
80+
end

src/GeometryOps.jl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import GeometryOpsCore:
77
TraitTarget,
88
Manifold, Planar, Spherical, Geodesic, AutoManifold, WrongManifoldException,
99
Algorithm, AutoAlgorithm, ManifoldIndependentAlgorithm, SingleManifoldAlgorithm, NoAlgorithm,
10-
BoolsAsTypes, True, False, booltype,
10+
BoolsAsTypes, True, False, booltype, istrue,
11+
TaskFunctors,
12+
WithXY, WithXYZ, WithXYM, WithXYZM,
1113
apply, applyreduce,
1214
flatten, reconstruct, rebuild, unwrap, _linearring,
1315
APPLY_KEYWORDS, THREADED_KEYWORD, CRS_KEYWORD, CALC_EXTENT_KEYWORD

src/transformations/forcedims.jl

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,7 @@ export forcexy, forcexyz
1313
1414
Force the geometry to be 2D. Works on any geometry, vector of geometries, feature collection, or table!
1515
"""
16-
function forcexy(geom)
17-
return apply(GI.PointTrait(), geom) do point
18-
(GI.x(point), GI.y(point))
19-
end
20-
end
16+
forcexy(geom) = apply(WithXY(), GI.PointTrait(), geom)
2117

2218
"""
2319
forcexyz(geom, z = 0)

0 commit comments

Comments
 (0)