Skip to content

Commit 90934ec

Browse files
author
KristofferC
committed
add the option to disable the part of threading in Arrow.Table that leads to catastrophic negative scaling
1 parent c75d0e9 commit 90934ec

File tree

2 files changed

+20
-4
lines changed

2 files changed

+20
-4
lines changed

src/table.jl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ Table(inputs::Vector; kw...) =
465465
Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
466466

467467
# will detect whether we're reading a Table from a file or stream
468-
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
468+
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, threaded::Bool=true)
469469
t = Table()
470470
sch = nothing
471471
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
@@ -580,10 +580,17 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
580580
elseif header isa Meta.RecordBatch
581581
anyrecordbatches = true
582582
@debug "parsing record batch message: compression = $(header.compression)"
583-
@wkspawn begin
583+
if threaded
584+
@wkspawn begin
585+
cols = collect(
586+
VectorIterator(sch, $batch, dictencodingslockable, convert),
587+
)
588+
put!(() -> put!(tsks, cols), sync, $(rbi))
589+
end
590+
else
584591
cols =
585-
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
586-
put!(() -> put!(tsks, cols), sync, $(rbi))
592+
collect(VectorIterator(sch, batch, dictencodingslockable, convert))
593+
put!(tsks, cols)
587594
end
588595
rbi += 1
589596
else

test/testtables.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,15 @@ function testtable(nm, t, writekw, readkw, extratests)
310310
@test all(isequal.(values(t), values(tt)))
311311
extratests !== nothing && extratests(tt)
312312
seekstart(io)
313+
314+
# Explicitly unthreaded
315+
io = Arrow.tobuffer(t; writekw...)
316+
tt = Arrow.Table(io; threaded=false, readkw...)
317+
@test length(tt) == length(t)
318+
@test all(isequal.(values(t), values(tt)))
319+
extratests !== nothing && extratests(tt)
320+
seekstart(io)
321+
313322
str = Arrow.Stream(io; readkw...)
314323
tt = first(str)
315324
@test length(tt) == length(t)

0 commit comments

Comments
 (0)