Skip to content

Commit 46f6ca9

Browse files
author
KristofferC
committed
add the option to disable the part of threading in Arrow.Table that leads to catastrophic negative scaling
1 parent c75d0e9 commit 46f6ca9

File tree

2 files changed

+19
-5
lines changed

2 files changed

+19
-5
lines changed

src/table.jl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ Table(inputs::Vector; kw...) =
465465
Table([ArrowBlob(tobytes(x), 1, nothing) for x in inputs]; kw...)
466466

467467
# will detect whether we're reading a Table from a file or stream
468-
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
468+
function Table(blobs::Vector{ArrowBlob}; convert::Bool=true, threaded::Bool=true)
469469
t = Table()
470470
sch = nothing
471471
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
@@ -580,10 +580,15 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
580580
elseif header isa Meta.RecordBatch
581581
anyrecordbatches = true
582582
@debug "parsing record batch message: compression = $(header.compression)"
583-
@wkspawn begin
584-
cols =
585-
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
586-
put!(() -> put!(tsks, cols), sync, $(rbi))
583+
if threaded
584+
@wkspawn begin
585+
cols =
586+
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
587+
put!(() -> put!(tsks, cols), sync, $(rbi))
588+
end
589+
else
590+
cols = collect(VectorIterator(sch, batch, dictencodingslockable, convert))
591+
put!(tsks, cols)
587592
end
588593
rbi += 1
589594
else

test/testtables.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,15 @@ function testtable(nm, t, writekw, readkw, extratests)
310310
@test all(isequal.(values(t), values(tt)))
311311
extratests !== nothing && extratests(tt)
312312
seekstart(io)
313+
314+
# Explicitly unthreaded
315+
io = Arrow.tobuffer(t; writekw...)
316+
tt = Arrow.Table(io; threaded=false, readkw...)
317+
@test length(tt) == length(t)
318+
@test all(isequal.(values(t), values(tt)))
319+
extratests !== nothing && extratests(tt)
320+
seekstart(io)
321+
313322
str = Arrow.Stream(io; readkw...)
314323
tt = first(str)
315324
@test length(tt) == length(t)

0 commit comments

Comments
 (0)