Skip to content

Commit d52c2e6

Browse files
author
KristofferC
committed
add the option to disable the part of threading in Arrow.Table that leads to catastrophic negative scaling
1 parent c75d0e9 commit d52c2e6

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

src/table.jl

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,10 @@ function Base.iterate(x::Stream, (pos, id)=(1, 0))
289289
end
290290

291291
"""
292-
Arrow.Table(io::IO; convert::Bool=true)
293-
Arrow.Table(file::String; convert::Bool=true)
294-
Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true)
295-
Arrow.Table(inputs::Vector; convert::Bool=true)
292+
Arrow.Table(io::IO; convert::Bool=true, threaded::Bool=true)
293+
Arrow.Table(file::String; convert::Bool=true, threaded::Bool=true)
294+
Arrow.Table(bytes::Vector{UInt8}, pos=1, len=nothing; convert::Bool=true, threaded::Bool=true)
295+
Arrow.Table(inputs::Vector; convert::Bool=true, threaded::Bool=true)
296296
297297
Read an arrow formatted table, from:
298298
* `io`, bytes will be read all at once via `read(io)`
@@ -311,6 +311,8 @@ sink function: e.g. `DataFrame(Arrow.Table(file))`, `SQLite.load!(db, "table", A
311311
312312
Supports the `convert` keyword argument which controls whether certain arrow primitive types will be
313313
lazily converted to more friendly Julia defaults; by default, `convert=true`.
314+
315+
Set `threaded=false` to disable parallel batch processing, which can avoid lock contention for tables with many small batches.
314316
"""
315317
struct Table <: Tables.AbstractColumns
316318
names::Vector{Symbol}
@@ -465,7 +467,7 @@ Table(inputs::Vector; kw...) =
465467
Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
466468

467469
# will detect whether we're reading a Table from a file or stream
468-
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
470+
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, threaded::Bool=true)
469471
t = Table()
470472
sch = nothing
471473
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
@@ -580,10 +582,17 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
580582
elseif header isa Meta.RecordBatch
581583
anyrecordbatches = true
582584
@debug "parsing record batch message: compression = $(header.compression)"
583-
@wkspawn begin
585+
if threaded
586+
@wkspawn begin
587+
cols = collect(
588+
VectorIterator(sch, $batch, dictencodingslockable, convert),
589+
)
590+
put!(() -> put!(tsks, cols), sync, $(rbi))
591+
end
592+
else
584593
cols =
585-
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
586-
put!(() -> put!(tsks, cols), sync, $(rbi))
594+
collect(VectorIterator(sch, batch, dictencodingslockable, convert))
595+
put!(tsks, cols)
587596
end
588597
rbi += 1
589598
else

test/testtables.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,15 @@ function testtable(nm, t, writekw, readkw, extratests)
310310
@test all(isequal.(values(t), values(tt)))
311311
extratests !== nothing && extratests(tt)
312312
seekstart(io)
313+
314+
# Explicitly unthreaded
315+
io = Arrow.tobuffer(t; writekw...)
316+
tt = Arrow.Table(io; threaded=false, readkw...)
317+
@test length(tt) == length(t)
318+
@test all(isequal.(values(t), values(tt)))
319+
extratests !== nothing && extratests(tt)
320+
seekstart(io)
321+
313322
str = Arrow.Stream(io; readkw...)
314323
tt = first(str)
315324
@test length(tt) == length(t)

0 commit comments

Comments
 (0)