Skip to content

Commit fcd603a

Browse files
committed
Fix poor performance of table reading when many record batches are involved
1 parent c75d0e9 commit fcd603a

File tree

1 file changed

+36
-30
lines changed

1 file changed

+36
-30
lines changed

src/table.jl

Lines changed: 36 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -470,28 +470,14 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
470470
sch = nothing
471471
dictencodingslockable = Lockable(Dict{Int64,DictEncoding}()) # dictionary id => DictEncoding
472472
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
473-
sync = OrderedSynchronizer()
474-
tsks = Channel{Any}(Inf)
475-
tsk = @wkspawn begin
476-
i = 1
477-
for cols in tsks
478-
if i == 1
479-
foreach(x -> push!(columns(t), x), cols)
480-
elseif i == 2
481-
foreach(1:length(cols)) do i
482-
columns(t)[i] = ChainedVector([columns(t)[i], cols[i]])
483-
end
484-
else
485-
foreach(1:length(cols)) do i
486-
append!(columns(t)[i], cols[i])
487-
end
488-
end
489-
i += 1
490-
end
491-
end
492-
anyrecordbatches = false
473+
# we'll grow/add a record batch set of columns as they're constructed
474+
# must be holding the lock while growing/adding
475+
# starts at 0-length because we don't know how many record batches there will be
476+
rb_cols = []
477+
rb_cols_lock = ReentrantLock()
493478
rbi = 1
494-
@sync for blob in blobs
479+
tasks = Task[]
480+
for blob in blobs
495481
for batch in BatchIterator(blob)
496482
# store custom_metadata of batch.msg?
497483
header = batch.msg.header
@@ -578,30 +564,38 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
578564
end # lock
579565
@debug "parsed dictionary batch message: id=$id, data=$values\n"
580566
elseif header isa Meta.RecordBatch
581-
anyrecordbatches = true
582567
@debug "parsing record batch message: compression = $(header.compression)"
583-
@wkspawn begin
584-
cols =
585-
collect(VectorIterator(sch, $batch, dictencodingslockable, convert))
586-
put!(() -> put!(tsks, cols), sync, $(rbi))
587-
end
568+
push!(tasks, collect_cols!(rbi, rb_cols_lock, rb_cols, sch, batch, dictencodingslockable, convert))
588569
rbi += 1
589570
else
590571
throw(ArgumentError("unsupported arrow message type: $(typeof(header))"))
591572
end
592573
end
593574
end
594-
close(tsks)
595-
wait(tsk)
575+
waitall(tasks)
596576
lu = lookup(t)
597577
ty = types(t)
598578
# 158; some implementations may send 0 record batches
599-
if !anyrecordbatches && !isnothing(sch)
579+
# no more multithreading, so no need to take the lock now
580+
if length(rb_cols) == 0 && !isnothing(sch)
600581
for field in sch.fields
601582
T = juliaeltype(field, buildmetadata(field), convert)
602583
push!(columns(t), T[])
603584
end
604585
end
586+
if length(rb_cols) > 0
587+
foreach(x -> push!(columns(t), x), rb_cols[1])
588+
end
589+
if length(rb_cols) > 1
590+
foreach(enumerate(rb_cols[2])) do (i, x)
591+
columns(t)[i] = ChainedVector([columns(t)[i], x])
592+
end
593+
foreach(3:length(rb_cols)) do j
594+
foreach(enumerate(rb_cols[j])) do (i, x)
595+
append!(columns(t)[i], x)
596+
end
597+
end
598+
end
605599
for (nm, col) in zip(names(t), columns(t))
606600
lu[nm] = col
607601
push!(ty, eltype(col))
@@ -610,6 +604,18 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
610604
return t
611605
end
612606

607+
function collect_cols!(rbi, rb_cols_lock, rb_cols, sch, batch, dictencodingslockable, convert)
608+
@wkspawn begin
609+
cols = collect(VectorIterator(sch, batch, dictencodingslockable, convert))
610+
@lock rb_cols_lock begin
611+
if length(rb_cols) < rbi
612+
resize!(rb_cols, rbi)
613+
end
614+
rb_cols[rbi] = cols
615+
end
616+
end
617+
end
618+
613619
function getdictionaries!(dictencoded, field)
614620
d = field.dictionary
615621
if d !== nothing

0 commit comments

Comments
 (0)