Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/lib/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ gennames
getmaxwidths
ourshow
ourstrwidth
tforeach
```
71 changes: 39 additions & 32 deletions src/dataframerow/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -338,46 +338,53 @@ function row_group_slots(cols::NTuple{N, AbstractVector},
end
refmap
end
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refarrays)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
tforeach(eachindex(groups), basesize=1_000_000) do i
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(c -> c[i], refarrays)
end
vals = map((m, r, s, fi) -> m[r-fi+1] * s, refmaps, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1 in refmap, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
groups[i] = j
end
else
@inbounds for i in eachindex(groups)
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
else
return r
tforeach(eachindex(groups), basesize=1_000_000) do i
@inbounds begin
local refs_i
let i=i # Workaround for julia#15276
refs_i = map(refarrays, missinginds) do ref, missingind
r = Int(ref[i])
if skipmissing
return r == missingind ? -1 : (r > missingind ? r-1 : r)
else
return r
end
end
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
vals = map((r, s, fi) -> (r-fi) * s, refs_i, strides, firstinds)
j = sum(vals) + 1
# x < 0 happens with -1, which corresponds to missing
if skipmissing && any(x -> x < 0, vals)
j = 0
else
seen[j] = true
end
groups[i] = j
end
end
if !all(seen) # Compress group indices to remove unused ones
# If some groups are unused, compress group indices to drop them
# sum(seen) is faster than all(seen) when not short-circuiting,
# and short-circuit would only happen in the slower case anyway
if sum(seen) < length(seen)
oldngroups = ngroups
remap = zeros(Int, ngroups)
ngroups = 0
Expand Down
38 changes: 38 additions & 0 deletions src/other/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,41 @@ else
end

funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))

"""
tforeach(f, x::AbstractArray; basesize::Integer)

Apply function `f` to each entry in `x` in parallel, spawning
one separate task for each block of at least `basesize` entries.

A number of task higher than `Threads.nthreads()` may be spawned,
since that can allow for a more efficient load balancing in case
some threads are busy (nested parallelism).
"""
function tforeach(f, x::AbstractArray; basesize::Integer)
@static if VERSION >= v"1.4"
nt = Threads.nthreads()
len = length(x)
if nt > 1 && len > basesize
# Round size up to ensure all chunks have at least `basesize` entries
# This ensures balanced sizes by avoiding a small last chunk
basesize′ = cld(len, fld(len, basesize))
@sync for p in Iterators.partition(x, basesize′)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have a good sense for "common column types" when doing operations like this. For example, any <:AbstractArray ends up using view for each partition, but for generic iterators, it actually accumulates the partitions manually, which is less efficient.

All that to say, I think we're fine here, it's just something that I pause on because I don't always have a good mental model of what the most common kinds of column types are and whether there might be hidden bottlenecks in things like ths.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But columns are always AbstractArray?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's why I think this is really a non-issue; but just gave me a pause to consider if we're really efficient here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. BTW, we don't use views are all, we just iterate over relevant indices.

Threads.@spawn begin
for i in p
f(@inbounds x[i])
end
end
end
else
for i in eachindex(x)
f(@inbounds x[i])
end
end
else
for i in eachindex(x)
f(@inbounds x[i])
end
end
return
end
18 changes: 18 additions & 0 deletions test/grouping.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3831,4 +3831,22 @@ end
((x, y, z) -> x[1] <= 5 ? unwrap(y[1]) : unwrap(z[1])) => :res)
end

@testset "groupby multithreading" begin
for x in (PooledArray(rand(1:10, 1_100_000)),
PooledArray(rand([1:9; missing], 1_100_000))),
y in (PooledArray(rand(["a", "b", "c", "d"], 1_100_000)),
PooledArray(rand(["a"; "b"; "c"; missing], 1_100_000)))
df = DataFrame(x=x, y=y)

# Checks are done by groupby_checked
@test length(groupby_checked(df, :x)) == 10
@test length(groupby_checked(df, :x, skipmissing=true)) ==
length(unique(skipmissing(x)))

@test length(groupby_checked(df, [:x, :y])) == 40
@test length(groupby_checked(df, [:x, :y], skipmissing=true)) ==
length(unique(skipmissing(x))) * length(unique(skipmissing(y)))
end
end

end # module