Skip to content
4 changes: 4 additions & 0 deletions src/arraytypes/dictencoding.jl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ function arrowvector(
kw...,
)
id = x.encoding.id
# XXX This is a race condition if two workers hit this block at the same time, then they'll create
# distinct locks
if !haskey(de, id)
de[id] = Lockable(x.encoding)
else
Expand Down Expand Up @@ -215,6 +217,8 @@ function arrowvector(
x = x.data
len = length(x)
validity = ValidityBitmap(x)
# XXX This is a race condition if two workers hit this block at the same time, then they'll create
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@quinnj I think there is a race condition baked into the current architecture that can't be addressed without a very large refactoring. The current architecture creates the locks on a worker thread if they don't already exist, which means that threads are competing for the creation of the initial lock. The locks should be created before any tasks are spawned.

# distinct locks
if !haskey(de, id)
# dict encoding doesn't exist yet, so create for 1st time
if DataAPI.refarray(x) === x || DataAPI.refpool(x) === nothing
Expand Down
84 changes: 43 additions & 41 deletions src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -295,47 +295,49 @@ function write(writer::Writer, source)
recbatchmsg = makerecordbatchmsg(writer.schema[], cols, writer.alignment)
put!(writer.msgs, recbatchmsg)
else
if writer.threaded
@wkspawn process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
writer.sync,
writer.msgs,
writer.alignment,
$(writer.partition_count),
writer.schema,
writer.errorref,
writer.anyerror,
writer.meta,
writer.colmeta,
)
else
@async process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
writer.sync,
writer.msgs,
writer.alignment,
$(writer.partition_count),
writer.schema,
writer.errorref,
writer.anyerror,
writer.meta,
writer.colmeta,
)
end
# XXX There is a race condition in the processing of dict encodings
# so we disable multithreaded writing until that can be addressed. See #582
# if writer.threaded
# @wkspawn process_partition(
# tblcols,
# writer.dictencodings,
# writer.largelists,
# writer.compress,
# writer.denseunions,
# writer.dictencode,
# writer.dictencodenested,
# writer.maxdepth,
# writer.sync,
# writer.msgs,
# writer.alignment,
# $(writer.partition_count),
# writer.schema,
# writer.errorref,
# writer.anyerror,
# writer.meta,
# writer.colmeta,
# )
# else
@async process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
writer.compress,
writer.denseunions,
writer.dictencode,
writer.dictencodenested,
writer.maxdepth,
writer.sync,
writer.msgs,
writer.alignment,
$(writer.partition_count),
writer.schema,
writer.errorref,
writer.anyerror,
writer.meta,
writer.colmeta,
)
# end
end
writer.partition_count += 1
end
Expand Down
2 changes: 2 additions & 0 deletions test/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c"
TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TestSetExtensions = "98d24dd4-01ad-11ea-1b02-c9a08f80db04"
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"

[compat]
Expand All @@ -44,4 +45,5 @@ PooledArrays = "1"
StructTypes = "1"
SentinelArrays = "1"
Tables = "1"
TestSetExtensions = "3"
TimeZones = "1"
25 changes: 16 additions & 9 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ using DataAPI
using FilePathsBase
using DataFrames
import Random: randstring
using TestSetExtensions: ExtendedTestSet
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given how long the Arrow tests take, it's useful to have some indication of progress so that we can tell if tests have hung. ExtendedTestSet shows a . for each completed test.

(We also get colored diffs of arrays when tests fail, which is nice.)


# this formulation tests the loaded ArrowTypes, even if it's not the dev version
# within the mono-repo
include(joinpath(dirname(pathof(ArrowTypes)), "../test/tests.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/testtables.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/testappend.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/integrationtest.jl"))
include(joinpath(dirname(pathof(Arrow)), "../test/dates.jl"))

include(joinpath(@__DIR__, "testtables.jl"))
include(joinpath(@__DIR__, "testappend.jl"))
include(joinpath(@__DIR__, "integrationtest.jl"))
include(joinpath(@__DIR__, "dates.jl"))

struct CustomStruct
x::Int
Expand All @@ -45,7 +49,7 @@ struct CustomStruct2{sym}
x::Int
end

@testset "Arrow" begin
@testset ExtendedTestSet "Arrow" begin
@testset "table roundtrips" begin
for case in testtables
testtable(case...)
Expand Down Expand Up @@ -381,6 +385,8 @@ end
end

@testset "# 126" begin
# XXX This test also captures a race condition in multithreaded
# writes of dictionary encoded arrays
t = Tables.partitioner((
(a=Arrow.toarrowvector(PooledArray([1, 2, 3])),),
(a=Arrow.toarrowvector(PooledArray([1, 2, 3, 4])),),
Expand Down Expand Up @@ -602,14 +608,15 @@ end
end

@testset "# 181" begin
# XXX this test hangs on Julia 1.12 when using a deeper nesting
d = Dict{Int,Int}()
for i = 1:9
for i = 1:1
d = Dict(i => d)
end
tbl = (x=[d],)
msg = "reached nested serialization level (20) deeper than provided max depth argument (19); to increase allowed nesting level, pass `maxdepth=X`"
@test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=19)
@test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=20)).x == tbl.x
msg = "reached nested serialization level (2) deeper than provided max depth argument (1); to increase allowed nesting level, pass `maxdepth=X`"
@test_throws ErrorException(msg) Arrow.tobuffer(tbl; maxdepth=1)
@test Arrow.Table(Arrow.tobuffer(tbl; maxdepth=5)).x == tbl.x
end

@testset "# 167" begin
Expand Down
Loading