From 5280a9aa99484e48fe60fa15451a7845c006d35e Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Fri, 24 Oct 2025 14:29:08 -0600 Subject: [PATCH] Fix poor performance of table reading when many record batches are involved --- src/table.jl | 85 +++++++++++++++++++++++++++++++++------------------- src/utils.jl | 6 ++++ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/src/table.jl b/src/table.jl index fe9206b4..de8bfc37 100644 --- a/src/table.jl +++ b/src/table.jl @@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) sch = nothing dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field - sync = OrderedSynchronizer() - tsks = Channel{Any}(Inf) - tsk = @wkspawn begin - i = 1 - for cols in tsks - if i == 1 - foreach(x -> push!(columns(t), x), cols) - elseif i == 2 - foreach(1:length(cols)) do i - columns(t)[i] = ChainedVector([columns(t)[i], cols[i]]) - end - else - foreach(1:length(cols)) do i - append!(columns(t)[i], cols[i]) - end - end - i += 1 - end - end - anyrecordbatches = false + # we'll grow/add a record batch set of columns as they're constructed + # must be holding the lock while growing/adding + # starts at 0-length because we don't know how many record batches there will be + rb_cols = [] + rb_cols_lock = ReentrantLock() rbi = 1 - @sync for blob in blobs + tasks = Task[] + for blob in blobs for batch in BatchIterator(blob) # store custom_metadata of batch.msg? header = batch.msg.header @@ -578,30 +564,49 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) end # lock @debug "parsed dictionary batch message: id=$id, data=$values\n" elseif header isa Meta.RecordBatch - anyrecordbatches = true @debug "parsing record batch message: compression = $(header.compression)" - @wkspawn begin - cols = - collect(VectorIterator(sch, $batch, dictencodingslockable, convert)) - put!(() -> put!(tsks, cols), sync, $(rbi)) - end + push!( + tasks, + collect_cols!( + rbi, + rb_cols_lock, + rb_cols, + sch, + batch, + dictencodingslockable, + convert, + ), + ) rbi += 1 else throw(ArgumentError("unsupported arrow message type: $(typeof(header))")) end end end - close(tsks) - wait(tsk) + _waitall(tasks) lu = lookup(t) ty = types(t) # 158; some implementations may send 0 record batches - if !anyrecordbatches && !isnothing(sch) + # no more multithreading, so no need to take the lock now + if length(rb_cols) == 0 && !isnothing(sch) for field in sch.fields T = juliaeltype(field, buildmetadata(field), convert) push!(columns(t), T[]) end end + if length(rb_cols) > 0 + foreach(x -> push!(columns(t), x), rb_cols[1]) + end + if length(rb_cols) > 1 + foreach(enumerate(rb_cols[2])) do (i, x) + columns(t)[i] = ChainedVector([columns(t)[i], x]) + end + foreach(3:length(rb_cols)) do j + foreach(enumerate(rb_cols[j])) do (i, x) + append!(columns(t)[i], x) + end + end + end for (nm, col) in zip(names(t), columns(t)) lu[nm] = col push!(ty, eltype(col)) @@ -610,6 +615,26 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true) return t end +function collect_cols!( + rbi, + rb_cols_lock, + rb_cols, + sch, + batch, + dictencodingslockable, + convert, +) + @wkspawn begin + cols = collect(VectorIterator(sch, batch, dictencodingslockable, convert)) + @lock rb_cols_lock begin + if length(rb_cols) < rbi + resize!(rb_cols, rbi) + end + rb_cols[rbi] = cols + end + end +end + function getdictionaries!(dictencoded, field) d = field.dictionary if d !== nothing diff --git a/src/utils.jl b/src/utils.jl index 419bd397..8e2dfeed 100644 --- a/src/utils.jl +++ b/src/utils.jl @@ -28,6 +28,12 @@ function writezeros(io::IO, n::Integer) s end +if isdefined(Base, :waitall) + const _waitall = waitall +else + _waitall(tasks) = foreach(wait, tasks) +end + # efficient writing of arrays writearray(io, col) = writearray(io, maybemissing(eltype(col)), col)