Skip to content

Commit 436b686

Browse files
authored
Reduce size of multi-threading enablement to 100_000 (#3274)
1 parent 63bd85a commit 436b686

File tree

11 files changed

+112
-121
lines changed

11 files changed

+112
-121
lines changed

src/dataframe/dataframe.jl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ mutable struct DataFrame <: AbstractDataFrame
212212

213213
# we write into columns as we know that it is guaranteed
214214
# that it was freshly allocated in the outer constructor
215-
if copycols && len >= 1_000_000 && length(columns) > 1 && Threads.nthreads() > 1
215+
if copycols && len >= 100_000 && length(columns) > 1 && Threads.nthreads() > 1
216216
@sync for i in eachindex(columns)
217-
Threads.@spawn columns[i] = _preprocess_column(columns[i], len, copycols)
217+
@spawn columns[i] = _preprocess_column(columns[i], len, copycols)
218218
end
219219
else
220220
for i in eachindex(columns)
@@ -558,10 +558,10 @@ function _threaded_getindex(selected_rows::AbstractVector,
558558
selected_columns::AbstractVector,
559559
df_columns::AbstractVector,
560560
idx::AbstractIndex)
561-
if length(selected_rows) >= 1_000_000 && Threads.nthreads() > 1
561+
if length(selected_rows) >= 100_000 && Threads.nthreads() > 1
562562
new_columns = Vector{AbstractVector}(undef, length(selected_columns))
563563
@sync for i in eachindex(new_columns)
564-
Threads.@spawn new_columns[i] = df_columns[selected_columns[i]][selected_rows]
564+
@spawn new_columns[i] = df_columns[selected_columns[i]][selected_rows]
565565
end
566566
return DataFrame(new_columns, idx, copycols=false)
567567
else

src/groupeddataframe/complextransforms.jl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ function _combine_rows_with_first!((firstrow,)::Ref{Any},
248248
@assert colnames isa NTuple{N, Symbol} where N
249249
@assert length(colnames) == length(outcols)
250250
len = length(gd)
251-
gdidx = gd.idx
252251
starts = gd.starts
253252
ends = gd.ends
254253

src/groupeddataframe/utils.jl

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ function hashrows_col!(h::Vector{UInt},
55
v::AbstractVector{T},
66
rp::Nothing,
77
firstcol::Bool) where T
8-
@spawn_for_chunks 1_000_000 for i in eachindex(h)
8+
@spawn_for_chunks 100_000 for i in eachindex(h)
99
@inbounds begin
1010
el = v[i]
1111
h[i] = hash(el, h[i])
@@ -33,18 +33,18 @@ function hashrows_col!(h::Vector{UInt},
3333
fira = firstindex(ra)
3434

3535
hashes = Vector{UInt}(undef, length(rp))
36-
@spawn_for_chunks 1_000_000 for i in eachindex(hashes)
36+
@spawn_for_chunks 100_000 for i in eachindex(hashes)
3737
@inbounds hashes[i] = hash(rp[i+firp-1])
3838
end
3939

4040
# here we rely on the fact that `DataAPI.refpool` has a continuous
4141
# block of indices
42-
@spawn_for_chunks 1_000_000 for i in eachindex(h)
42+
@spawn_for_chunks 100_000 for i in eachindex(h)
4343
@inbounds ref = ra[i+fira-1]
4444
@inbounds h[i] = hashes[ref+1-firp]
4545
end
4646
else
47-
@spawn_for_chunks 1_000_000 for i in eachindex(h, v)
47+
@spawn_for_chunks 100_000 for i in eachindex(h, v)
4848
@inbounds h[i] = hash(v[i], h[i])
4949
end
5050
end
@@ -332,10 +332,13 @@ function row_group_slots!(cols::NTuple{N, AbstractVector},
332332

333333
lg = length(groups)
334334
nt = Threads.nthreads()
335-
# disable threading if we are processing a small data frame or number of groups is large
336-
if lg < 1_000_000 || ngroups > lg * (0.5 - 1 / (2 * nt)) / (2 * nt)
337-
nt = 1
335+
# make sure we are processing at least 100_000 rows per task if we do threading
336+
if lg < 100_000 * nt
337+
nt = max(1, lg ÷ 100_000)
338338
end
339+
# if there are few rows per group limit the number of threads used
340+
nt = clamp(round(Int, (lg / 4) / ngroups - 2), 1, nt)
341+
339342
seen = fill(false, ngroups)
340343
seen_vec = Vector{Vector{Bool}}(undef, nt)
341344
seen_vec[1] = seen

src/join/composer.jl

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -249,9 +249,9 @@ function compose_inner_table(joiner::DataFrameJoiner,
249249
right_ixs = right_ixs[csp_r]
250250
end
251251

252-
if Threads.nthreads() > 1 && length(left_ixs) >= 1_000_000
253-
dfl_task = Threads.@spawn joiner.dfl[left_ixs, :]
254-
dfr_noon_task = Threads.@spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
252+
if Threads.nthreads() > 1 && length(left_ixs) >= 100_000
253+
dfl_task = @spawn joiner.dfl[left_ixs, :]
254+
dfr_noon_task = @spawn joiner.dfr[right_ixs, Not(joiner.right_on)]
255255
dfl = fetch(dfl_task)
256256
dfr_noon = fetch(dfr_noon_task)
257257
else
@@ -384,20 +384,20 @@ function _compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique
384384

385385
@assert col_idx == ncol(joiner.dfl_on) + 1
386386

387-
if Threads.nthreads() > 1 && target_nrow >= 1_000_000 && length(cols) > col_idx
387+
if Threads.nthreads() > 1 && target_nrow >= 100_000 && length(cols) > col_idx
388388
@sync begin
389389
for col in eachcol(dfl_noon)
390390
cols_i = left_idxs[col_idx]
391-
Threads.@spawn _noon_compose_helper!(cols, _similar_left, cols_i,
392-
col, target_nrow, left_ixs, lil + 1,
393-
leftonly_ixs, loil)
391+
@spawn _noon_compose_helper!(cols, _similar_left, cols_i,
392+
col, target_nrow, left_ixs, lil + 1,
393+
leftonly_ixs, loil)
394394
col_idx += 1
395395
end
396396
@assert col_idx == ncol(joiner.dfl) + 1
397397
for col in eachcol(dfr_noon)
398398
cols_i = col_idx
399-
Threads.@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow,
400-
right_ixs, lil + loil + 1, rightonly_ixs, roil)
399+
@spawn _noon_compose_helper!(cols, _similar_right, cols_i, col, target_nrow,
400+
right_ixs, lil + loil + 1, rightonly_ixs, roil)
401401
col_idx += 1
402402
end
403403
end
@@ -422,7 +422,7 @@ function _compose_joined_table(joiner::DataFrameJoiner, kind::Symbol, makeunique
422422
left_ixs, lil + 1, leftonly_ixs, loil)
423423
new_order = _count_sortperm(left_cols_idxs)
424424
end
425-
if order == :right && !(issorted(right_ixs) && isempty(rightonly_ixs))
425+
if order == :right && !(issorted(right_ixs) && isempty(rightonly_ixs))
426426
right_cols_idxs = _sort_compose_helper(nrow(joiner.dfr) + 1,
427427
1:nrow(joiner.dfr), target_nrow,
428428
right_ixs, lil + loil + 1, rightonly_ixs, roil)

src/other/utils.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,13 @@ function _spawn_for_chunks_helper(iter, lbody, basesize)
175175
nt = Threads.nthreads()
176176
len = length(x)
177177
if nt > 1 && len > basesize
178-
tasks = [Threads.@spawn begin
179-
for i in p
180-
local $(esc(lidx)) = @inbounds x[i]
181-
$(esc(lbody))
182-
end
183-
end
184-
for p in split_indices(len, basesize)]
178+
tasks = [@spawn begin
179+
for i in p
180+
local $(esc(lidx)) = @inbounds x[i]
181+
$(esc(lbody))
182+
end
183+
end
184+
for p in split_indices(len, basesize)]
185185
foreach(wait, tasks)
186186
else
187187
for i in eachindex(x)

test/grouping.jl

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3722,10 +3722,10 @@ end
37223722
end
37233723

37243724
@testset "groupby multithreading" begin
3725-
for x in (PooledArray(rand(1:10, 1_100_000)),
3726-
PooledArray(rand([1:9; missing], 1_100_000))),
3727-
y in (PooledArray(rand(["a", "b", "c", "d"], 1_100_000)),
3728-
PooledArray(rand(["a"; "b"; "c"; missing], 1_100_000)))
3725+
for x in (PooledArray(rand(1:10, 210_000)),
3726+
PooledArray(rand([1:9; missing], 210_000))),
3727+
y in (PooledArray(rand(["a", "b", "c", "d"], 210_000)),
3728+
PooledArray(rand(["a"; "b"; "c"; missing], 210_000)))
37293729
df = DataFrame(x=x, y=y)
37303730

37313731
# Checks are done by groupby_checked
@@ -3777,7 +3777,7 @@ end
37773777
end
37783778

37793779
Random.seed!(1234)
3780-
for levs in (100, 99_000), sz in (100_000, 1_100_000)
3780+
for levs in (100, 89_000), sz in (90_000, 210_000)
37813781
df = DataFrame(x_int=rand(1:levs, sz))
37823782
df.x_str = string.(df.x_int, pad=5)
37833783
df.x_pool = PooledArray(df.x_str)
@@ -4401,7 +4401,7 @@ end
44014401
@test getindex.(keys(groupby(df, [order(:x, rev=true)], sort=NamedTuple())), 1) == ["c", "b", "a"]
44024402
@test getindex.(keys(groupby(df, :x, sort=(;rev=true))), 1) == ["c", "b", "a"]
44034403
@test getindex.(keys(groupby(df, [:x], sort=(;rev=true))), 1) == ["c", "b", "a"]
4404-
4404+
44054405
# by default sorting is not applied as range of values is wide
44064406
df = DataFrame(x=[2, 100, 2, 1, 100])
44074407
@test getindex.(keys(groupby(df, :x)), 1) == [2, 100, 1]
@@ -4434,7 +4434,7 @@ end
44344434
df2 = string.(df1, pad=3)
44354435

44364436
for df in (df1, df2)
4437-
for col in (:a, "a", 1, :b, "b", 2, :c, "c", 3)
4437+
for col in (:a, "a", 1, :b, "b", 2, :c, "c", 3)
44384438
gdf = groupby(df, order(col))
44394439
@test issorted(DataFrame(gdf)[:, col])
44404440
@test all(x -> issorted(x.c), gdf)

test/indexing.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2022,7 +2022,7 @@ end
20222022
include("indexing_offset.jl")
20232023

20242024
@testset "threading correctness tests" begin
2025-
for x in (10, 1_100_000), y in 1:4
2025+
for x in (10, 110_000), y in 1:4
20262026
vecvec = [rand(Int8, x) for _ in 1:y]
20272027
df = DataFrame(vecvec, :auto, copycols=false)
20282028
for rowrange in [:, 1:nrow(df)-5, collect(1:nrow(df)-5), axes(df, 1) .< nrow(df)-5],

test/insertion.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1331,7 +1331,7 @@ end
13311331
b=[22.5, 2.0, "b", 10.0, 5.0, missing],
13321332
c=[missing, 3.0, "c", 11.0, 6.0, 16.5],
13331333
d=[missing, missing, "d", missing, missing, 15])
1334-
for i in [1, 2, 4, 8, 16, 32, 64, 100, 1000, 10000, 20_000, 100_000]
1334+
for i in [1, 2, 4, 8, 16, 32, 64, 100, 1000, 10000, 20_000, 210_000]
13351335
df = DataFrame()
13361336
mat = Any[a + 100 * b + (iseven(b) ? 0.5 : 0) for a in 1:2, b in 1:i]
13371337
tab = Tables.table(mat, header=Symbol.("x", 1:i))

test/join.jl

Lines changed: 69 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,79 +1510,77 @@ end
15101510
@test m1[!, :a] == m2[!, :a]
15111511
end
15121512

1513-
if Sys.WORD_SIZE == 64
1514-
@testset "threaded correctness" begin
1515-
df1 = DataFrame(id=[1:10^6; 10^7+1:10^7+2])
1516-
df1.left_row = axes(df1, 1)
1517-
df2 = DataFrame(id=[1:10^6; 10^8+1:10^8+4])
1518-
df2.right_row = axes(df2, 1)
1519-
1520-
@test innerjoin(df1, df2, on=:id)
1521-
DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6)
1522-
@test leftjoin(df1, df2, on=:id)
1523-
DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2,
1524-
right_row=[1:10^6; missing; missing])
1525-
@test rightjoin(df1, df2, on=:id)
1526-
DataFrame(id=[1:10^6; 10^8+1:10^8+4],
1527-
left_row=[1:10^6; fill(missing, 4)],
1528-
right_row=1:10^6+4)
1529-
@test outerjoin(df1, df2, on=:id)
1530-
DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4],
1531-
left_row=[1:10^6+2; fill(missing, 4)],
1532-
right_row=[1:10^6; missing; missing; 10^6+1:10^6+4])
1533-
@test semijoin(df1, df2, on=:id)
1534-
DataFrame(id=1:10^6, left_row=1:10^6)
1535-
@test antijoin(df1, df2, on=:id)
1536-
DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2)
1537-
1538-
Random.seed!(1234)
1539-
for i in 1:4
1540-
df1 = df1[shuffle(axes(df1, 1)), :]
1541-
df2 = df2[shuffle(axes(df2, 1)), :]
1542-
1543-
@test sort!(innerjoin(df1, df2, on=:id))
1544-
DataFrame(id=1:10^6, left_row=1:10^6, right_row=1:10^6)
1545-
@test sort!(leftjoin(df1, df2, on=:id))
1546-
DataFrame(id=[1:10^6; 10^7+1:10^7+2], left_row=1:10^6+2,
1547-
right_row=[1:10^6; missing; missing])
1548-
@test sort!(rightjoin(df1, df2, on=:id))
1549-
DataFrame(id=[1:10^6; 10^8+1:10^8+4],
1550-
left_row=[1:10^6; fill(missing, 4)],
1551-
right_row=1:10^6+4)
1552-
@test sort!(outerjoin(df1, df2, on=:id))
1553-
DataFrame(id=[1:10^6; 10^7+1:10^7+2; 10^8+1:10^8+4],
1554-
left_row=[1:10^6+2; fill(missing, 4)],
1555-
right_row=[1:10^6; missing; missing; 10^6+1:10^6+4])
1556-
@test sort!(semijoin(df1, df2, on=:id))
1557-
DataFrame(id=1:10^6, left_row=1:10^6)
1558-
@test sort!(antijoin(df1, df2, on=:id))
1559-
DataFrame(id=10^7+1:10^7+2, left_row=10^6+1:10^6+2)
1560-
end
1513+
@testset "threaded correctness" begin
1514+
df1 = DataFrame(id=[1:10^5; 10^7+1:10^7+2])
1515+
df1.left_row = axes(df1, 1)
1516+
df2 = DataFrame(id=[1:10^5; 10^8+1:10^8+4])
1517+
df2.right_row = axes(df2, 1)
1518+
1519+
@test innerjoin(df1, df2, on=:id)
1520+
DataFrame(id=1:10^5, left_row=1:10^5, right_row=1:10^5)
1521+
@test leftjoin(df1, df2, on=:id)
1522+
DataFrame(id=[1:10^5; 10^7+1:10^7+2], left_row=1:10^5+2,
1523+
right_row=[1:10^5; missing; missing])
1524+
@test rightjoin(df1, df2, on=:id)
1525+
DataFrame(id=[1:10^5; 10^8+1:10^8+4],
1526+
left_row=[1:10^5; fill(missing, 4)],
1527+
right_row=1:10^5+4)
1528+
@test outerjoin(df1, df2, on=:id)
1529+
DataFrame(id=[1:10^5; 10^7+1:10^7+2; 10^8+1:10^8+4],
1530+
left_row=[1:10^5+2; fill(missing, 4)],
1531+
right_row=[1:10^5; missing; missing; 10^5+1:10^5+4])
1532+
@test semijoin(df1, df2, on=:id)
1533+
DataFrame(id=1:10^5, left_row=1:10^5)
1534+
@test antijoin(df1, df2, on=:id)
1535+
DataFrame(id=10^7+1:10^7+2, left_row=10^5+1:10^5+2)
15611536

1562-
# test correctness of column order
1563-
df1 = DataFrame(a=Int8(1), id2=-[1:10^6; 10^7+1:10^7+2], b=Int8(2),
1564-
id1=[1:10^6; 10^7+1:10^7+2], c=Int8(3), d=Int8(4))
1565-
df2 = DataFrame(e=Int8(5), id1=[1:10^6; 10^8+1:10^8+4], f=Int8(6), g=Int8(7),
1566-
id2=-[1:10^6; 10^8+1:10^8+4], h=Int8(8))
1567-
1568-
@test innerjoin(df1, df2, on=[:id1, :id2])
1569-
DataFrame(a=Int8(1), id2=-(1:10^6), b=Int8(2), id1=1:10^6,
1570-
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1571-
@test leftjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
1572-
DataFrame(a=Int8(1), id2=-(1:10^6), b=Int8(2), id1=1:10^6,
1573-
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1574-
@test rightjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
1575-
DataFrame(a=Int8(1), id2=-(1:10^6), b=Int8(2), id1=1:10^6,
1576-
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1577-
@test outerjoin(df1, df2, on=[:id1, :id2])[1:10^6, :]
1578-
DataFrame(a=Int8(1), id2=-(1:10^6), b=Int8(2), id1=1:10^6,
1579-
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1580-
@test semijoin(df1, df2, on=[:id1, :id2])
1581-
DataFrame(a=Int8(1), id2=-(1:10^6), b=Int8(2), id1=1:10^6, c=Int8(3), d=Int8(4))
1582-
@test antijoin(df1, df2, on=[:id1, :id2])
1583-
DataFrame(a=Int8(1), id2=-(10^7+1:10^7+2), b=Int8(2), id1=(10^7+1:10^7+2),
1584-
c=Int8(3), d=Int8(4))
1537+
Random.seed!(1234)
1538+
for i in 1:4
1539+
df1 = df1[shuffle(axes(df1, 1)), :]
1540+
df2 = df2[shuffle(axes(df2, 1)), :]
1541+
1542+
@test sort!(innerjoin(df1, df2, on=:id))
1543+
DataFrame(id=1:10^5, left_row=1:10^5, right_row=1:10^5)
1544+
@test sort!(leftjoin(df1, df2, on=:id))
1545+
DataFrame(id=[1:10^5; 10^7+1:10^7+2], left_row=1:10^5+2,
1546+
right_row=[1:10^5; missing; missing])
1547+
@test sort!(rightjoin(df1, df2, on=:id))
1548+
DataFrame(id=[1:10^5; 10^8+1:10^8+4],
1549+
left_row=[1:10^5; fill(missing, 4)],
1550+
right_row=1:10^5+4)
1551+
@test sort!(outerjoin(df1, df2, on=:id))
1552+
DataFrame(id=[1:10^5; 10^7+1:10^7+2; 10^8+1:10^8+4],
1553+
left_row=[1:10^5+2; fill(missing, 4)],
1554+
right_row=[1:10^5; missing; missing; 10^5+1:10^5+4])
1555+
@test sort!(semijoin(df1, df2, on=:id))
1556+
DataFrame(id=1:10^5, left_row=1:10^5)
1557+
@test sort!(antijoin(df1, df2, on=:id))
1558+
DataFrame(id=10^7+1:10^7+2, left_row=10^5+1:10^5+2)
15851559
end
1560+
1561+
# test correctness of column order
1562+
df1 = DataFrame(a=Int8(1), id2=-[1:10^5; 10^7+1:10^7+2], b=Int8(2),
1563+
id1=[1:10^5; 10^7+1:10^7+2], c=Int8(3), d=Int8(4))
1564+
df2 = DataFrame(e=Int8(5), id1=[1:10^5; 10^8+1:10^8+4], f=Int8(6), g=Int8(7),
1565+
id2=-[1:10^5; 10^8+1:10^8+4], h=Int8(8))
1566+
1567+
@test innerjoin(df1, df2, on=[:id1, :id2])
1568+
DataFrame(a=Int8(1), id2=-(1:10^5), b=Int8(2), id1=1:10^5,
1569+
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1570+
@test leftjoin(df1, df2, on=[:id1, :id2])[1:10^5, :]
1571+
DataFrame(a=Int8(1), id2=-(1:10^5), b=Int8(2), id1=1:10^5,
1572+
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1573+
@test rightjoin(df1, df2, on=[:id1, :id2])[1:10^5, :]
1574+
DataFrame(a=Int8(1), id2=-(1:10^5), b=Int8(2), id1=1:10^5,
1575+
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1576+
@test outerjoin(df1, df2, on=[:id1, :id2])[1:10^5, :]
1577+
DataFrame(a=Int8(1), id2=-(1:10^5), b=Int8(2), id1=1:10^5,
1578+
c=Int8(3), d=Int8(4), e=Int8(5), f=Int8(6), g=Int8(7), h=Int8(8))
1579+
@test semijoin(df1, df2, on=[:id1, :id2])
1580+
DataFrame(a=Int8(1), id2=-(1:10^5), b=Int8(2), id1=1:10^5, c=Int8(3), d=Int8(4))
1581+
@test antijoin(df1, df2, on=[:id1, :id2])
1582+
DataFrame(a=Int8(1), id2=-(10^7+1:10^7+2), b=Int8(2), id1=(10^7+1:10^7+2),
1583+
c=Int8(3), d=Int8(4))
15861584
end
15871585

15881586
@testset "matchmissing :notequal correctness" begin

test/multithreading.jl

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,6 @@ module TestMultithreading
22

33
using Test, DataFrames
44

5-
6-
@testset "pre-Julia 1.3 @spawn replacement" begin
7-
t = @sync DataFrames.@spawn begin
8-
sleep(1)
9-
true
10-
end
11-
@test fetch(t) === true
12-
end
13-
145
@testset "split_indices" begin
156
for len in 1:100, basesize in 1:10
167
x = DataFrames.split_indices(len, basesize)
@@ -268,4 +259,4 @@ end
268259

269260
end
270261

271-
end # module
262+
end # module

0 commit comments

Comments
 (0)