Skip to content

Commit 3e225ad

Browse files
committed
Add global default
1 parent ab76ff8 commit 3e225ad

File tree

6 files changed

+101
-36
lines changed

6 files changed

+101
-36
lines changed

docs/src/lib/functions.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,9 @@ pairs
129129
```@docs
130130
isapprox
131131
```
132+
133+
## Multithreading
134+
```@docs
135+
DataFrames.nthreads
136+
DataFrames.nthreads!
137+
```

src/abstractdataframe/selection.jl

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,12 @@ end
576576
"""
577577
select!(df::DataFrame, args...; renamecols::Bool=true)
578578
select!(args::Base.Callable, df::DataFrame; renamecols::Bool=true)
579-
select!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true)
580-
select!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
579+
select!(gd::GroupedDataFrame{DataFrame}, args...;
580+
ungroup::Bool=true, renamecols::Bool=true,
581+
nthreads::Int=DataFrames.nthreads())
582+
select!(f::Base.Callable, gd::GroupedDataFrame;
583+
ungroup::Bool=true, renamecols::Bool=true,
584+
nthreads::Int=DataFrames.nthreads())
581585
582586
Mutate `df` or `gd` in place to retain only columns or transformations specified by `args...` and
583587
return it. The result is guaranteed to have the same number of rows as `df` or
@@ -595,6 +599,11 @@ $TRANSFORMATION_COMMON_RULES
595599
column names should include the name of transformation functions or not.
596600
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
597601
frame or a `GroupedDataFrame`.
602+
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
603+
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
604+
a different value. Passing a value higher than 1 currently has an effect only
605+
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
606+
will be replaced with that value.
598607
599608
See [`select`](@ref) for examples.
600609
```
@@ -613,8 +622,12 @@ end
613622
"""
614623
transform!(df::DataFrame, args...; renamecols::Bool=true)
615624
transform!(args::Callable, df::DataFrame; renamecols::Bool=true)
616-
transform!(gd::GroupedDataFrame{DataFrame}, args...; ungroup::Bool=true, renamecols::Bool=true)
617-
transform!(f::Base.Callable, gd::GroupedDataFrame; ungroup::Bool=true, renamecols::Bool=true)
625+
transform!(gd::GroupedDataFrame{DataFrame}, args...;
626+
ungroup::Bool=true, renamecols::Bool=true,
627+
nthreads::Int=DataFrames.nthreads())
628+
transform!(f::Base.Callable, gd::GroupedDataFrame;
629+
ungroup::Bool=true, renamecols::Bool=true,
630+
nthreads::Int=DataFrames.nthreads())
618631
619632
Mutate `df` or `gd` in place to add columns specified by `args...` and return it.
620633
The result is guaranteed to have the same number of rows as `df`.
@@ -627,6 +640,11 @@ $TRANSFORMATION_COMMON_RULES
627640
column names should include the name of transformation functions or not.
628641
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
629642
frame or a `GroupedDataFrame`.
643+
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
644+
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
645+
a different value. Passing a value higher than 1 currently has an effect only
646+
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
647+
will be replaced with that value.
630648
631649
See [`select`](@ref) for examples.
632650
"""
@@ -644,10 +662,10 @@ end
644662
select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
645663
select(args::Callable, df::DataFrame; renamecols::Bool=true)
646664
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
647-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1)
665+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
648666
select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
649667
keepkeys::Bool=true, ungroup::Bool=true,
650-
renamecols::Bool=true, nthreads::Int=1)
668+
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
651669
652670
Create a new data frame that contains columns from `df` or `gd` specified by
653671
`args` and return it. The result is guaranteed to have the same number of rows
@@ -665,9 +683,11 @@ $TRANSFORMATION_COMMON_RULES
665683
data frame.
666684
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
667685
frame or a `GroupedDataFrame`.
668-
- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1
669-
currently has an effect only for some optimized grouped reductions. Values higher than
670-
`Threads.nthreads()` will be replaced with that value.
686+
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
687+
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
688+
a different value. Passing a value higher than 1 currently has an effect only
689+
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
690+
will be replaced with that value.
671691
672692
# Examples
673693
```jldoctest
@@ -863,10 +883,10 @@ end
863883
transform(f::Callable, df::DataFrame; renamecols::Bool=true)
864884
transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
865885
keepkeys::Bool=true, ungroup::Bool=true,
866-
renamecols::Bool=true, nthreads::Int=1)
886+
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
867887
transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
868888
keepkeys::Bool=true, ungroup::Bool=true,
869-
renamecols::Bool=true, nthreads::Int=1)
889+
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
870890
871891
Create a new data frame that contains columns from `df` or `gd` plus columns
872892
specified by `args` and return it. The result is guaranteed to have the same
@@ -883,9 +903,11 @@ $TRANSFORMATION_COMMON_RULES
883903
data frame.
884904
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
885905
frame or a `GroupedDataFrame`.
886-
- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1
887-
currently has an effect only for some optimized grouped reductions. Values higher than
888-
`Threads.nthreads()` will be replaced with that value.
906+
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
907+
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
908+
a different value. Passing a value higher than 1 currently has an effect only
909+
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
910+
will be replaced with that value.
889911
890912
Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false`
891913
is needed to be able to return a different value for the grouping column:
@@ -934,10 +956,10 @@ end
934956
combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true)
935957
combine(gd::GroupedDataFrame, args...;
936958
keepkeys::Bool=true, ungroup::Bool=true,
937-
renamecols::Bool=true, nthreads::Int=1)
959+
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
938960
combine(f::Base.Callable, gd::GroupedDataFrame;
939961
keepkeys::Bool=true, ungroup::Bool=true,
940-
renamecols::Bool=true, nthreads::Int=1)
962+
renamecols::Bool=true, nthreads::Int=DataFrames.nthreads())
941963
942964
Create a new data frame that contains columns from `df` or `gd` specified by
943965
`args` and return it. The result can have any number of rows that is determined
@@ -952,9 +974,11 @@ $TRANSFORMATION_COMMON_RULES
952974
data frame.
953975
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
954976
frame or a `GroupedDataFrame`.
955-
- `nthreads::Int=1` : the number of CPU threads to use. Passing a value higher than 1
956-
currently has an effect only for some optimized grouped reductions. Values higher than
957-
`Threads.nthreads()` will be replaced with that value.
977+
- `nthreads::Int=DataFrames.nthreads()` : the number of CPU threads to use.
978+
Defaults to `1` unless [`DataFrames.nthreads!`](@ref) has been called with
979+
a different value. Passing a value higher than 1 currently has an effect only
980+
for some optimized grouped reductions. Values higher than `Threads.nthreads()`
981+
will be replaced with that value.
958982
959983
# Examples
960984
```jldoctest

src/groupeddataframe/fastaggregates.jl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
290290
f, op, condf, adjust, checkempty, incol, gd, nthreads)
291291

292292
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame;
293-
nthreads::Int=1) =
293+
nthreads::Int=nthreads()) =
294294
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads)
295295

296296
# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
@@ -300,7 +300,7 @@ if VERSION < v"1.1"
300300
end
301301

302302
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
303-
nthreads::Int=1)
303+
nthreads::Int=nthreads())
304304
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false,
305305
incol, gd, nthreads)
306306
# !ismissing check is purely an optimization to avoid a copy later
@@ -316,7 +316,7 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
316316
end
317317

318318
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
319-
nthreads::Int=1)
319+
nthreads::Int=nthreads())
320320
outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads)
321321
if eltype(outcol) <: Union{Missing, Rational}
322322
return sqrt.(outcol)
@@ -329,7 +329,7 @@ for f in (:first, :last)
329329
# Without using @eval the presence of a keyword argument triggers a Julia bug
330330
@eval begin
331331
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
332-
nthreads::Int=1)
332+
nthreads::Int=nthreads())
333333
n = length(gd)
334334
outcol = similar(incol, n)
335335
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
@@ -343,7 +343,7 @@ for f in (:first, :last)
343343
end
344344

345345
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
346-
nthreads::Int=1)
346+
nthreads::Int=nthreads())
347347
if getfield(gd, :idx) === nothing
348348
lens = zeros(Int, length(gd))
349349
@inbounds for gix in gd.groups

src/groupeddataframe/splitapplycombine.jl

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -627,7 +627,7 @@ end
627627

628628
function combine(f::Base.Callable, gd::GroupedDataFrame;
629629
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
630-
nthreads::Int=1)
630+
nthreads::Int=nthreads())
631631
if f isa Colon
632632
throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame"))
633633
end
@@ -637,22 +637,22 @@ end
637637

638638
combine(f::Pair, gd::GroupedDataFrame;
639639
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
640-
nthreads::Int=1) =
640+
nthreads::Int=nthreads()) =
641641
throw(ArgumentError("First argument must be a transformation if the second argument is a GroupedDataFrame. " *
642642
"You can pass a `Pair` as the second argument of the transformation. If you want the return " *
643643
"value to be processed as having multiple columns add `=> AsTable` suffix to the pair."))
644644

645645
combine(gd::GroupedDataFrame,
646646
cs::Union{Pair, Base.Callable, ColumnIndex, MultiColumnIndex}...;
647647
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
648-
nthreads::Int=1) =
648+
nthreads::Int=nthreads()) =
649649
_combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup,
650650
copycols=true, keeprows=false, renamecols=renamecols,
651651
nthreads=nthreads)
652652

653653
function select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
654654
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
655-
nthreads::Int=1)
655+
nthreads::Int=nthreads())
656656
if f isa Colon
657657
throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame"))
658658
end
@@ -662,14 +662,14 @@ end
662662

663663

664664
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
665-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1) =
665+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads()) =
666666
_combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys,
667667
ungroup=ungroup, keeprows=true, renamecols=renamecols,
668668
nthreads=nthreads)
669669

670670
function transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
671671
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
672-
nthreads::Int=1)
672+
nthreads::Int=nthreads())
673673
if f isa Colon
674674
throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame"))
675675
end
@@ -679,7 +679,7 @@ end
679679

680680
function transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
681681
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
682-
nthreads::Int=1)
682+
nthreads::Int=nthreads())
683683
res = select(gd, :, args..., copycols=copycols, keepkeys=keepkeys,
684684
ungroup=ungroup, renamecols=renamecols, nthreads=nthreads)
685685
# res can be a GroupedDataFrame based on DataFrame or a DataFrame,
@@ -689,31 +689,31 @@ function transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
689689
end
690690

691691
function select!(f::Base.Callable, gd::GroupedDataFrame;
692-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1)
692+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads())
693693
if f isa Colon
694694
throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame"))
695695
end
696696
return select!(gd, f, ungroup=ungroup, nthreads=nthreads)
697697
end
698698

699699
function select!(gd::GroupedDataFrame{DataFrame}, args...;
700-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1)
700+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads())
701701
newdf = select(gd, args..., copycols=false, renamecols=renamecols, nthreads=nthreads)
702702
df = parent(gd)
703703
_replace_columns!(df, newdf)
704704
return ungroup ? df : gd
705705
end
706706

707707
function transform!(f::Base.Callable, gd::GroupedDataFrame;
708-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1)
708+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads())
709709
if f isa Colon
710710
throw(ArgumentError("First argument must be a transformation if the second argument is a grouped data frame"))
711711
end
712712
return transform!(gd, f, ungroup=ungroup, nthreads=nthreads)
713713
end
714714

715715
function transform!(gd::GroupedDataFrame{DataFrame}, args...;
716-
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=1)
716+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=nthreads())
717717
newdf = select(gd, :, args..., copycols=false, renamecols=renamecols, nthreads=nthreads)
718718
df = parent(gd)
719719
select!(newdf, propertynames(df), :)

src/other/utils.jl

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,29 @@ else
8282
using Compat: ComposedFunction
8383
end
8484

85-
funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))
85+
funname(c::ComposedFunction) = Symbol(funname(c.outer), :_, funname(c.inner))
86+
87+
const NTHREADS = Ref(1)
88+
89+
"""
90+
DataFrames.nthreads()
91+
92+
Return the default value for the `nthreads` argument, which determines the number
93+
of CPU threads used by functions that support it when not specified explicitly.
94+
95+
Defaults to 1. Call [`DataFrames.nthreads!`](@ref) to adjust the value.
96+
"""
97+
nthreads() = DataFrames.NTHREADS[]
98+
99+
"""
100+
DataFrames.nthreads!(n::Int)
101+
102+
Set to `n` the default value for the `nthreads` argument, which determines the number
103+
of CPU threads used by functions that support it when not specified explicitly.
104+
105+
Use [`DataFrames.nthreads`](@ref) to access the value.
106+
"""
107+
function nthreads!(n::Int)
108+
n > 0 || throw(ArgumentError("n must be equal to or greater than 1 (got $n)"))
109+
DataFrames.NTHREADS[] = n
110+
end

test/utils.jl

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,14 @@ end
9494
:sum_skipmissing_div12
9595
end
9696

97+
@test "nthreads and nthreads!" begin
98+
@test DataFrames.nthreads() == 1
99+
@test DataFrames.nthreads!(2) == 2
100+
@test DataFrames.nthreads() == 2
101+
@test DataFrames.nthreads!(1) == 1 # reset to default
102+
@test_throws ArgumentError DataFrames.nthreads!(0)
103+
@test DataFrames.nthreads() == 1
104+
end
105+
end
106+
97107
end # module

0 commit comments

Comments
 (0)