Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
These aspects of input data frames might affect the order of rows produced in the output
([#2612](https://github.com/JuliaData/DataFrames.jl/pull/2612),
[#2622][https://github.com/JuliaData/DataFrames.jl/pull/2622])
* `DataFrame` constructor, `getindex`, `select`, `select!`, `transform`, `transform!`,
and `combine` functions now use multiple threads in selected operations
([XXXX](XXXX))

# DataFrames v0.22 Release Notes

Expand Down
88 changes: 57 additions & 31 deletions src/dataframe/dataframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,20 @@ struct DataFrame <: AbstractDataFrame

# we write into columns as we know that it is guaranteed
# that it was freshly allocated in the outer constructor
for (i, col) in enumerate(columns)
# check for vectors first as they are most common
if col isa AbstractRange
columns[i] = collect(col)
elseif col isa AbstractVector
columns[i] = copycols ? copy(col) : col
elseif col isa Union{AbstractArray{<:Any, 0}, Ref}
x = col[]
columns[i] = fill!(Tables.allocatecolumn(typeof(x), len), x)
@static if VERSION >= v"1.4"
if Threads.nthreads() > 1 && len >= 1_000_000
@sync for i in eachindex(columns)
@async Threads.@spawn columns[i] = _preprocess_column(columns[i],
len, copycols)
end
else
if col isa AbstractArray
throw(ArgumentError("adding AbstractArray other than AbstractVector " *
"as a column of a data frame is not allowed"))
for i in eachindex(columns)
columns[i] = _preprocess_column(columns[i], len, copycols)
end
columns[i] = fill!(Tables.allocatecolumn(typeof(col), len), col)
end
else
for i in eachindex(columns)
columns[i] = _preprocess_column(columns[i], len, copycols)
end
end

Expand All @@ -216,6 +215,21 @@ struct DataFrame <: AbstractDataFrame
end
end

function _preprocess_column(col, len, copycols)
# check for vectors first as they are most common
col isa AbstractRange && return collect(col)
col isa AbstractVector && return copycols ? copy(col) : col
if col isa Union{AbstractArray{<:Any, 0}, Ref}
x = col[]
return fill!(Tables.allocatecolumn(typeof(x), len), x)
end
if col isa AbstractArray
throw(ArgumentError("adding AbstractArray other than AbstractVector " *
"as a column of a data frame is not allowed"))
end
return fill!(Tables.allocatecolumn(typeof(col), len), col)
end

DataFrame(df::DataFrame; copycols::Bool=true) = copy(df, copycols=copycols)

function DataFrame(pairs::Pair{Symbol, <:Any}...; makeunique::Bool=false,
Expand Down Expand Up @@ -502,34 +516,46 @@ end
throw(BoundsError(df, (row_inds, col_inds)))
end
selected_columns = index(df)[col_inds]
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)[selected_columns]]
old_columns = _columns(df)[selected_columns]
new_columns = Vector{AbstractVector}(undef, length(old_columns))

if length(new_columns) == 1
new_columns[1] = only(old_columns)[row_inds]
elseif length(new_columns) > 1
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
@static if VERSION >= v"1.4"
if length(selected_rows) > 1_000_000 && Threads.nthreads() > 1
@sync for i in eachindex(new_columns)
@async Threads.@spawn new_columns[i] = old_columns[i][selected_rows]
end
else
for i in eachindex(new_columns)
new_columns[i] = old_columns[i][selected_rows]
end
end
else
for i in eachindex(new_columns)
new_columns[i] = old_columns[i][selected_rows]
end
end
end

return DataFrame(new_columns, Index(_names(df)[selected_columns]), copycols=false)
end

@inline function Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T
@boundscheck if !checkindex(Bool, axes(df, 1), row_inds)
throw(BoundsError(df, (row_inds, :)))
end
# Computing integer indices once for all columns is faster
selected_rows = T === Bool ? findall(row_inds) : row_inds
new_columns = AbstractVector[dv[selected_rows] for dv in _columns(df)]
return DataFrame(new_columns, copy(index(df)), copycols=false)
end
@inline Base.getindex(df::DataFrame, row_inds::AbstractVector{T}, ::Colon) where T =
df[row_inds, index(df)[:]]

@inline Base.getindex(df::DataFrame, row_inds::Not,
col_inds::MultiColumnIndex) =
@inline Base.getindex(df::DataFrame, row_inds::Not, col_inds::MultiColumnIndex) =
df[axes(df, 1)[row_inds], col_inds]

# df[:, MultiColumnIndex] => DataFrame
Base.getindex(df::DataFrame, row_ind::Colon,
col_inds::MultiColumnIndex) =
Base.getindex(df::DataFrame, row_ind::Colon, col_inds::MultiColumnIndex) =
select(df, col_inds, copycols=true)

# df[!, MultiColumnIndex] => DataFrame
Base.getindex(df::DataFrame, row_ind::typeof(!),
col_inds::MultiColumnIndex) =
Base.getindex(df::DataFrame, row_ind::typeof(!), col_inds::MultiColumnIndex) =
select(df, col_inds, copycols=false)

##############################################################################
Expand Down
7 changes: 7 additions & 0 deletions test/constructors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,11 @@ end
@test_throws ArgumentError DataFrame([Int, Float64], ["a", "b"], 2)
end

@testset "threading correcness tests" begin
for x in (10, 2*10^6), y in 1:4
df = DataFrame(rand(x, y), :auto)
@test df == copy(df)
end
end

end # module
11 changes: 11 additions & 0 deletions test/indexing.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2009,4 +2009,15 @@ if VERSION >= v"1.5"
include("indexing_offset.jl")
end

@testset "threading correcness tests" begin
for x in (10, 2*10^6), y in 1:4
mat = rand(x, y)
df = DataFrame(mat, :auto)
for rowrange in [:, 1:nrow(df)-5, collect(1:nrow(df)-5), axes(df, 1) .< nrow(df)-5],
colrange in [:, axes(df, 2), collect(axes(df, 2)), 1:ncol(df) - 1]
@test DataFrame(mat[rowrange, colrange], :auto) == df[rowrange, colrange]
end
end
end

end # module