Skip to content

Commit 8779723

Browse files
guangtaoguangtao
authored andcommitted
Optimize Arrow write paths and tighten Flight interop
1 parent d03cad0 commit 8779723

File tree

20 files changed

+1016
-70
lines changed

20 files changed

+1016
-70
lines changed

Project.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ gRPCServer = "608c6337-0d7d-447f-bb69-0f5674ee3959"
4646
[extensions]
4747
ArrowgRPCServerExt = "gRPCServer"
4848

49+
[sources]
50+
ArrowTypes = { path = "src/ArrowTypes" }
51+
4952
[compat]
5053
ArrowTypes = "1.1,2"
5154
BitIntegers = "0.2, 0.3"

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ The package can be installed by typing in the following in a Julia REPL:
4040
julia> using Pkg; Pkg.add("Arrow")
4141
```
4242

43+
Arrow.jl currently requires Julia `1.12+`.
44+
4345
## Local Development
4446

4547
When developing on Arrow.jl it is recommended that you run the following to ensure that any

ext/arrowgrpcserverext/handlers.jl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ function _server_streaming_handler(service::Flight.Service, method::Flight.Metho
3535
end
3636
try
3737
_drain_response!(stream, response)
38-
finally
3938
_streaming_handler_result(task)
39+
gRPCServer.close!(stream)
40+
finally
41+
istaskdone(task) || wait(task)
4042
end
4143
end
4244
end
@@ -86,9 +88,11 @@ function _bidi_streaming_handler(service::Flight.Service, method::Flight.MethodD
8688
for message in response
8789
gRPCServer.send!(stream, message)
8890
end
91+
_streaming_handler_result(task, producer)
8992
gRPCServer.close!(stream)
9093
finally
91-
_streaming_handler_result(task, producer)
94+
istaskdone(task) || wait(task)
95+
isnothing(producer) || (istaskdone(producer) || wait(producer))
9296
end
9397
return nothing
9498
end

ext/arrowgrpcserverext/streams.jl

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,27 @@
1616
# under the License.
1717

1818
function _drain_response!(
19-
stream::gRPCServer.ServerStream{T},
20-
response::Channel{T},
21-
) where {T}
19+
stream::gRPCServer.ServerStream,
20+
response::Channel,
21+
)
22+
# gRPCServer falls back to `ServerStream{Any}` when a descriptor only carries
23+
# protobuf type names. Drain generically and let `send!` enforce compatibility.
2224
for message in response
2325
gRPCServer.send!(stream, message)
2426
end
25-
gRPCServer.close!(stream)
2627
return nothing
2728
end
2829

2930
function _streaming_handler_result(task::Task, producer::Union{Nothing,Task}=nothing)
3031
if !isnothing(producer)
32+
if istaskfailed(producer)
33+
throw(producer.exception)
34+
end
3135
wait(producer)
3236
end
37+
if istaskfailed(task)
38+
throw(task.exception)
39+
end
3340
wait(task)
3441
return nothing
3542
end

src/ArrowTypes/Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,4 @@ Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
2525
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
2626

2727
[compat]
28-
julia = "1.0"
28+
julia = "1.12"

src/ArrowTypes/src/ArrowTypes.jl

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -405,26 +405,58 @@ function _toarroweltype(x)
405405
state = iterate(x)
406406
state === nothing && return Missing
407407
y, st = state
408-
T = typeof(toarrow(y))
408+
srcT = Union{}
409+
stable = false
410+
T = Missing
411+
if y !== missing
412+
srcT = typeof(y)
413+
mapped = ArrowType(srcT)
414+
stable = isconcretetype(mapped)
415+
T = stable ? mapped : typeof(toarrow(y))
416+
end
409417
while true
410418
state = iterate(x, st)
411419
state === nothing && return T
412420
y, st = state
413-
S = typeof(toarrow(y))
421+
if y === missing
422+
S = Missing
423+
elseif srcT === Union{}
424+
srcT = typeof(y)
425+
mapped = ArrowType(srcT)
426+
stable = isconcretetype(mapped)
427+
S = stable ? mapped : typeof(toarrow(y))
428+
elseif stable && typeof(y) === srcT
429+
continue
430+
else
431+
S = typeof(toarrow(y))
432+
if stable && typeof(y) !== srcT
433+
stable = false
434+
end
435+
end
414436
S === T && continue
415437
T = promoteunion(T, S)
416438
end
417439
end
418440

441+
@inline _hasoffsetaxes(data) = Base.has_offset_axes(data)
442+
@inline _offsetshift(data) = _hasoffsetaxes(data) ? firstindex(data) - 1 : 0
443+
@inline _hasonebasedaxes(data) = !_hasoffsetaxes(data)
444+
419445
# lazily call toarrow(x) on getindex for each x in data
420446
struct ToArrow{T,A} <: AbstractVector{T}
421447
data::A
422448
offset::Int
423449
needsconvert::Bool
424450
end
451+
@inline _sourcedata(x::ToArrow) = getfield(x, :data)
452+
@inline _sourceoffset(x::ToArrow) = getfield(x, :offset)
453+
@inline _needsconvert(x::ToArrow) = getfield(x, :needsconvert)
454+
@inline _sourcevalue(x::ToArrow, i::Integer) =
455+
@inbounds getindex(_sourcedata(x), i + _sourceoffset(x))
456+
425457
function ToArrow{T,A}(data::A) where {T,A}
426458
needsconvert = !(eltype(A) === T && concrete_or_concreteunion(T))
427-
return ToArrow{T,A}(data, firstindex(data) - 1, needsconvert)
459+
return ToArrow{T,A}(data, _offsetshift(data), needsconvert)
428460
end
429461

430462
concrete_or_concreteunion(T) =
@@ -434,8 +466,7 @@ concrete_or_concreteunion(T) =
434466
function ToArrow(x::A) where {A}
435467
S = eltype(A)
436468
T = ArrowType(S)
437-
fi = firstindex(x)
438-
if S === T && concrete_or_concreteunion(S) && fi == 1
469+
if S === T && concrete_or_concreteunion(S) && _hasonebasedaxes(x)
439470
return x
440471
elseif !concrete_or_concreteunion(T)
441472
# arrow needs concrete types, so try to find a concrete common type, preferring unions
@@ -472,12 +503,12 @@ function _convert(::Type{T}, x) where {T}
472503
end
473504

474505
@inline function _toarrowvalue(x::ToArrow{T}, value) where {T}
475-
x.needsconvert || return value
506+
_needsconvert(x) || return value
476507
return _convert(T, toarrow(value))
477508
end
478509

479510
Base.@propagate_inbounds function Base.getindex(x::ToArrow{T}, i::Int) where {T}
480-
value = @inbounds getindex(x.data, i + x.offset)
511+
value = _sourcevalue(x, i)
481512
return _toarrowvalue(x, value)
482513
end
483514

src/ArrowTypes/test/tests.jl

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,21 @@ end
185185
@test !ArrowTypes.concrete_or_concreteunion(Any)
186186

187187
@testset "ToArrow" begin
188+
@test !ArrowTypes._hasoffsetaxes([1, 2, 3])
189+
@test ArrowTypes._offsetshift([1, 2, 3]) == 0
190+
188191
x = ArrowTypes.ToArrow([1, 2, 3])
189192
@test x isa Vector{Int}
190193
@test x == [1, 2, 3]
191194

195+
baseview = @view [1, 2, 3][1:3]
196+
x = ArrowTypes.ToArrow(baseview)
197+
@test x === baseview
198+
192199
x = ArrowTypes.ToArrow([:hey, :ho])
193200
@test x isa ArrowTypes.ToArrow{String,Vector{Symbol}}
194201
@test eltype(x) == String
195-
@test getfield(x, :needsconvert)
202+
@test ArrowTypes._needsconvert(x)
196203
@test x[1] == "hey"
197204
@test collect(x) == ["hey", "ho"]
198205
@test x == ["hey", "ho"]
@@ -203,32 +210,54 @@ end
203210
@test collect(x) == [1.0, 3.14]
204211
@test x == [1.0, 3.14]
205212

213+
x = ArrowTypes.ToArrow(Any[UUID(UInt128(1)), UUID(UInt128(2))])
214+
@test x isa ArrowTypes.ToArrow{NTuple{16,UInt8},Vector{Any}}
215+
@test eltype(x) == NTuple{16,UInt8}
216+
@test collect(x) == [ArrowTypes.toarrow(UUID(UInt128(1))), ArrowTypes.toarrow(UUID(UInt128(2)))]
217+
218+
x = ArrowTypes.ToArrow(Any[missing, UUID(UInt128(1))])
219+
@test x isa ArrowTypes.ToArrow{Union{Missing,NTuple{16,UInt8}},Vector{Any}}
220+
@test eltype(x) == Union{Missing,NTuple{16,UInt8}}
221+
@test isequal(collect(x), Union{Missing,NTuple{16,UInt8}}[missing, ArrowTypes.toarrow(UUID(UInt128(1)))])
222+
206223
x = ArrowTypes.ToArrow(Any[1, 3.14, "hey"])
207224
@test x isa ArrowTypes.ToArrow{Union{Float64,String},Vector{Any}}
208225
@test eltype(x) == Union{Float64,String}
209226
@test collect(x) == Union{Float64,String}[1.0, 3.14, "hey"]
210227
@test x == [1.0, 3.14, "hey"]
211228

229+
x = ArrowTypes.ToArrow(Any[UUID(UInt128(1)), "tail"])
230+
@test x isa ArrowTypes.ToArrow{Union{NTuple{16,UInt8},String},Vector{Any}}
231+
@test eltype(x) == Union{NTuple{16,UInt8},String}
232+
@test collect(x) == Union{NTuple{16,UInt8},String}[ArrowTypes.toarrow(UUID(UInt128(1))), "tail"]
233+
212234
x = ArrowTypes.ToArrow(OffsetArray([1, 2, 3], -3:-1))
213235
@test x isa ArrowTypes.ToArrow{Int,OffsetVector{Int,Vector{Int}}}
236+
@test ArrowTypes._hasoffsetaxes(getfield(x, :data))
237+
@test getfield(x, :offset) == ArrowTypes._offsetshift(getfield(x, :data))
238+
@test ArrowTypes._sourcedata(x) === getfield(x, :data)
239+
@test ArrowTypes._sourceoffset(x) == getfield(x, :offset)
240+
@test !ArrowTypes._needsconvert(x)
241+
@test ArrowTypes._sourcevalue(x, 1) == 1
214242
@test eltype(x) == Int
215-
@test !getfield(x, :needsconvert)
216243
@test x[1] == 1
217244
@test x[3] == 3
218245
@test collect(x) == [1, 2, 3]
219246
@test x == [1, 2, 3]
220247

221248
x = ArrowTypes.ToArrow(OffsetArray(Union{Missing,Int}[1, missing], -3:-2))
222249
@test x isa ArrowTypes.ToArrow{Union{Missing,Int},OffsetVector{Union{Missing,Int},Vector{Union{Missing,Int}}}}
223-
@test !getfield(x, :needsconvert)
250+
@test !ArrowTypes._needsconvert(x)
224251
@test x[1] == 1
225252
@test x[2] === missing
226253
@test isequal(collect(x), Union{Missing,Int}[1, missing])
227254

228255
x = ArrowTypes.ToArrow(OffsetArray(Any[1, 3.14], -3:-2))
229256
@test x isa ArrowTypes.ToArrow{Float64,OffsetVector{Any,Vector{Any}}}
257+
@test getfield(x, :offset) == ArrowTypes._offsetshift(getfield(x, :data))
258+
@test ArrowTypes._sourcevalue(x, 2) == 3.14
230259
@test eltype(x) == Float64
231-
@test getfield(x, :needsconvert)
260+
@test ArrowTypes._needsconvert(x)
232261
@test x[1] == 1
233262
@test x[2] == 3.14
234263
@test collect(x) == [1.0, 3.14]

src/arraytypes/arraytypes.jl

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ function arrowvector(
9999
dictencode=dictencode,
100100
kw...,
101101
)
102-
elseif !(x isa DictEncode)
102+
elseif !(x isa DictEncode) && !_keeprawmapvector(T, x)
103103
x = ToArrow(x)
104104
end
105105
S = maybemissing(eltype(x))
@@ -144,6 +144,64 @@ function _arrowtypemeta(meta, n, m)
144144
return toidict(dict)
145145
end
146146

147+
@inline function _materializeconverted(x::ArrowTypes.ToArrow)
148+
data = ArrowTypes._sourcedata(x)
149+
if ArrowTypes._needsconvert(x) && !ArrowTypes.concrete_or_concreteunion(eltype(data))
150+
return _materializeconverted(eltype(x), x)
151+
end
152+
return x
153+
end
154+
155+
function _materializeconverted(::Type{T}, x::ArrowTypes.ToArrow{T,A}) where {T,A}
156+
len = length(x)
157+
data = Vector{T}(undef, len)
158+
source = ArrowTypes._sourcedata(x)
159+
i = 1
160+
for value in source
161+
@inbounds data[i] =
162+
value isa T ? value : ArrowTypes._convert(T, ArrowTypes.toarrow(value))
163+
i += 1
164+
end
165+
return data
166+
end
167+
168+
@inline function _materializefixedbytes16(value)
169+
if value isa ArrowTypes.UUID
170+
return ArrowTypes._cast(NTuple{16,UInt8}, value.value)
171+
elseif value isa NTuple{16,UInt8}
172+
return value
173+
else
174+
return ArrowTypes._convert(NTuple{16,UInt8}, ArrowTypes.toarrow(value))
175+
end
176+
end
177+
178+
function _materializeconverted(
179+
::Type{NTuple{16,UInt8}},
180+
x::ArrowTypes.ToArrow{NTuple{16,UInt8},A},
181+
) where {A}
182+
len = length(x)
183+
data = Vector{NTuple{16,UInt8}}(undef, len)
184+
source = ArrowTypes._sourcedata(x)
185+
i = 1
186+
for value in source
187+
@inbounds data[i] = _materializefixedbytes16(value)
188+
i += 1
189+
end
190+
return data
191+
end
192+
193+
@inline _toarrowvaliditysource(x::ArrowTypes.ToArrow) =
194+
ArrowTypes._needsconvert(x) ? x : ArrowTypes._sourcedata(x)
195+
196+
@inline _toarrowvalidity(x::ArrowTypes.ToArrow, data) =
197+
data === x ? ValidityBitmap(x) : ValidityBitmap(data)
198+
199+
@inline function _keeprawmapvector(::Type{T}, x) where {T}
200+
return Base.has_offset_axes(x) &&
201+
ArrowTypes.concrete_or_concreteunion(T) &&
202+
ArrowKind(T) isa ArrowTypes.MapKind
203+
end
204+
147205
# now we check for ArrowType converions and dispatch on ArrowKind
148206
function arrowvector(::Type{S}, x, i, nl, fi, de, ded, meta; kw...) where {S}
149207
meta = _normalizemeta(meta)
@@ -240,7 +298,7 @@ function ValidityBitmap(x::ArrowTypes.ToArrow)
240298
if !(T >: Missing)
241299
return ValidityBitmap(UInt8[], 1, length(x), 0)
242300
end
243-
source = getfield(x, :needsconvert) ? x : getfield(x, :data)
301+
source = _toarrowvaliditysource(x)
244302
return _validitybitmap(source, length(x))
245303
end
246304

src/arraytypes/bool.jl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,12 @@ function arrowvector(::BoolKind, x, i, nl, fi, de, ded, meta; kw...)
8383
end
8484

8585
function arrowvector(::BoolKind, x::ArrowTypes.ToArrow, i, nl, fi, de, ded, meta; kw...)
86-
validity = ValidityBitmap(x)
87-
len = length(x)
88-
source = getfield(x, :needsconvert) ? x : getfield(x, :data)
86+
data = _materializeconverted(x)
87+
validity = _toarrowvalidity(x, data)
88+
len = length(data)
89+
source = data === x ? _toarrowvaliditysource(x) : data
8990
bytes = _packboolbytes(source, len)
90-
return BoolVector{eltype(x)}(bytes, 1, validity, len, meta)
91+
return BoolVector{eltype(data)}(bytes, 1, validity, len, meta)
9192
end
9293

9394
function compress(Z::Meta.CompressionType.T, comp, p::P) where {P<:BoolVector}

0 commit comments

Comments
 (0)