Skip to content

Commit d9b4bc9

Browse files
guangtaoguangtao
authored andcommitted
Add experimental Arrow Flight support
1 parent 10a8308 commit d9b4bc9

File tree

95 files changed

+5223
-23
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

95 files changed

+5223
-23
lines changed

.github/workflows/ci.yml

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,50 @@ jobs:
168168
continue-on-error: false
169169
run: >
170170
julia --color=yes --project=monorepo -e 'using Pkg; Pkg.test("Arrow")'
171+
flight_interop:
172+
name: Arrow Flight interop - Julia 1 - ubuntu-latest
173+
runs-on: ubuntu-latest
174+
timeout-minutes: 30
175+
steps:
176+
- uses: actions/checkout@v6
177+
- uses: actions/setup-python@v6
178+
with:
179+
python-version: '3.11'
180+
- name: Install Flight Python dependencies
181+
run: |
182+
python -m pip install --upgrade pip
183+
python -m pip install pyarrow grpcio grpcio-tools
184+
- uses: julia-actions/setup-julia@v2
185+
with:
186+
version: '1'
187+
- uses: actions/cache@v5
188+
env:
189+
cache-name: cache-artifacts
190+
with:
191+
path: ~/.julia/artifacts
192+
key: ${{ runner.os }}-flight-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
193+
restore-keys: |
194+
${{ runner.os }}-flight-${{ env.cache-name }}-
195+
${{ runner.os }}-flight-
196+
${{ runner.os }}-
197+
- uses: julia-actions/julia-buildpkg@v1.6
198+
with:
199+
project: .
200+
- name: Dev local ArrowTypes for Arrow.jl tests
201+
shell: julia --project=. {0}
202+
run: |
203+
using Pkg
204+
Pkg.develop(PackageSpec(path="src/ArrowTypes"))
205+
- name: Run Arrow Flight interop tests
206+
env:
207+
ARROW_FLIGHT_PYTHON: ${{ env.pythonLocation }}/bin/python
208+
run: >
209+
julia --color=yes --project=test -e 'using Pkg;
210+
Pkg.develop(PackageSpec(path="."));
211+
Pkg.develop(PackageSpec(path="src/ArrowTypes"));
212+
Pkg.instantiate();
213+
using Test, Arrow;
214+
include("test/flight.jl")'
171215
docs:
172216
name: Documentation
173217
runs-on: ubuntu-latest

.github/workflows/ci_nightly.yml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,48 @@ jobs:
106106
continue-on-error: false
107107
run: >
108108
julia --color=yes --project=monorepo -e 'using Pkg; Pkg.test("Arrow")'
109+
flight_interop:
110+
name: Arrow Flight interop - Julia nightly - ubuntu-latest
111+
runs-on: ubuntu-latest
112+
timeout-minutes: 30
113+
steps:
114+
- uses: actions/checkout@v6
115+
- uses: actions/setup-python@v6
116+
with:
117+
python-version: '3.11'
118+
- name: Install Flight Python dependencies
119+
run: |
120+
python -m pip install --upgrade pip
121+
python -m pip install pyarrow grpcio grpcio-tools
122+
- uses: julia-actions/setup-julia@v2
123+
with:
124+
version: 'nightly'
125+
arch: x64
126+
- uses: actions/cache@v5
127+
env:
128+
cache-name: cache-artifacts
129+
with:
130+
path: ~/.julia/artifacts
131+
key: ${{ runner.os }}-flight-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}
132+
restore-keys: |
133+
${{ runner.os }}-flight-${{ env.cache-name }}-
134+
${{ runner.os }}-flight-
135+
${{ runner.os }}-
136+
- uses: julia-actions/julia-buildpkg@v1.6
137+
with:
138+
project: .
139+
- name: Dev local ArrowTypes for Arrow.jl tests
140+
shell: julia --project=. {0}
141+
run: |
142+
using Pkg
143+
Pkg.develop(PackageSpec(path="src/ArrowTypes"))
144+
- name: Run Arrow Flight interop tests
145+
env:
146+
ARROW_FLIGHT_PYTHON: ${{ env.pythonLocation }}/bin/python
147+
run: >
148+
julia --color=yes --project=test -e 'using Pkg;
149+
Pkg.develop(PackageSpec(path="."));
150+
Pkg.develop(PackageSpec(path="src/ArrowTypes"));
151+
Pkg.instantiate();
152+
using Test, Arrow;
153+
include("test/flight.jl")'

Project.toml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ version = "2.8.1"
2121

2222
[deps]
2323
ArrowTypes = "31f734f8-188a-4ce0-8406-c8a06bd891cd"
24+
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
2425
BitIntegers = "c3b6d118-76ef-56ca-8cc7-ebb389d030a1"
2526
CodecLz4 = "5ba52731-8f18-5e0d-9241-30f10d1ec561"
2627
CodecZstd = "6b39b394-51ab-5f42-8807-6242bab2b4c2"
2728
ConcurrentUtilities = "f0e56b4a-5159-44fe-b623-3e5288b988bb"
2829
DataAPI = "9a962f9c-6df0-11e9-0e5d-c546b8b5ee8a"
2930
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
3031
EnumX = "4e289a0a-7415-4d19-859d-a7e5c4648b56"
32+
ProtoBuf = "3349acd9-ac6a-5e09-bcdb-63829b23a429"
33+
gRPCClient = "aaca4a50-36af-4a1d-b878-4c443f2061ad"
3134
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
3235
PooledArrays = "2dfb63ee-cc39-5dd5-95bd-886bf059d720"
3336
SentinelArrays = "91c51154-3ec4-41a3-a24f-3f23e20d615c"
@@ -37,6 +40,12 @@ TimeZones = "f269a46b-ccf7-5d73-abea-4c690281aa53"
3740
TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
3841
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
3942

43+
[weakdeps]
44+
gRPCServer = "608c6337-0d7d-447f-bb69-0f5674ee3959"
45+
46+
[extensions]
47+
ArrowgRPCServerExt = "gRPCServer"
48+
4049
[compat]
4150
ArrowTypes = "1.1,2"
4251
BitIntegers = "0.2, 0.3"
@@ -45,10 +54,13 @@ CodecZstd = "0.7, 0.8"
4554
ConcurrentUtilities = "2"
4655
DataAPI = "1"
4756
EnumX = "1"
57+
ProtoBuf = "~1.2.1"
58+
gRPCClient = "1"
59+
gRPCServer = "0.1"
4860
PooledArrays = "0.5, 1.0"
4961
SentinelArrays = "1"
5062
StringViews = "1"
5163
Tables = "1.1"
5264
TimeZones = "1"
5365
TranscodingStreams = "0.9.12, 0.10, 0.11"
54-
julia = "1.9"
66+
julia = "1.12"

README.md

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,27 @@ This implementation supports the 1.0 version of the specification, including sup
6060

6161
It currently doesn't include support for:
6262
* Tensors or sparse tensors
63-
* Flight RPC
6463
* C data interface
6564

65+
Flight RPC status:
66+
* Experimental `Arrow.Flight` support is available in-tree
67+
* Requires Julia `1.12+`
68+
* Includes generated protocol bindings and complete client constructors for the `FlightService` RPC surface
69+
* Keeps the top-level Flight module shell thin, with exports and generated-protocol setup split out of `src/flight/Flight.jl`
70+
* Includes high-level `FlightData <-> Arrow IPC` helpers for `Arrow.Table`, `Arrow.Stream`, and DoPut payload generation
71+
* Keeps the Flight IPC conversion layer modular under `src/flight/convert/`, with `src/flight/convert.jl` retained as a thin entrypoint
72+
* Includes client helpers for request headers, binary metadata, handshake token reuse, and TLS configuration via `withheaders`, `withtoken`, and `authenticate`
73+
* Keeps the Flight client implementation modular under `src/flight/client/`, with thin entrypoints at `src/flight/client.jl` and `src/flight/client/rpc_methods.jl`
74+
* Includes a transport-agnostic server core (`Service`, `ServerCallContext`, `ServiceDescriptor`, `MethodDescriptor`) for local Flight method dispatch, path lookup, and handler testing
75+
* Keeps the transport-agnostic server core modular under `src/flight/server/`, with `src/flight/server.jl` retained as a thin entrypoint
76+
* Includes an optional `gRPCServer.jl` package extension that maps `Arrow.Flight.Service` into `gRPCServer.ServiceDescriptor` and registers Flight proto types with the external server package when it is present
77+
* Keeps the optional `gRPCServer.jl` bridge modular under `ext/arrowgrpcserverext/`, with `ext/ArrowgRPCServerExt.jl` retained as a thin entrypoint
78+
* Includes optional live interoperability coverage for `Handshake`, authenticated token propagation, `PollFlightInfo`, and TLS via dedicated Python reference servers
79+
* Includes optional live `pyarrow.flight` interoperability coverage for `ListFlights`, `GetFlightInfo`, `GetSchema`, `DoGet`, `DoPut`, `DoExchange`, `ListActions`, and `DoAction`
80+
* Keeps targeted Flight verification modular under `test/flight/`, with `test/flight.jl` retained as a thin entrypoint for local and CI invocation stability, the client-constructor/protocol-wrapper checks decomposed under `test/flight/client_surface/`, the optional `gRPCServer` extension scenarios decomposed under `test/flight/grpcserver_extension/`, the `pyarrow.flight` interop scenarios decomposed under `test/flight/pyarrow_interop/`, and the transport-agnostic server-core checks decomposed under `test/flight/server_core/`
81+
* Includes `test/flight_grpcserver.jl` as a temporary-environment runner for optional native `gRPCServer` coverage without mutating `test/Project.toml`
82+
* Dedicated CI jobs now exercise the Flight interop suite on stable and nightly Linux; native Julia server transport remains optional/experimental and is not part of the default Flight suite
83+
6684
Third-party data formats:
6785
* CSV, parquet and avro support via the existing [CSV.jl](https://github.com/JuliaData/CSV.jl), [Parquet.jl](https://github.com/JuliaIO/Parquet.jl) and [Avro.jl](https://github.com/JuliaData/Avro.jl) packages
6886
* Other Tables.jl-compatible packages automatically supported ([DataFrames.jl](https://github.com/JuliaData/DataFrames.jl), [JSONTables.jl](https://github.com/JuliaData/JSONTables.jl), [JuliaDB.jl](https://github.com/JuliaData/JuliaDB.jl), [SQLite.jl](https://github.com/JuliaDatabases/SQLite.jl), [MySQL.jl](https://github.com/JuliaDatabases/MySQL.jl), [JDBC.jl](https://github.com/JuliaDatabases/JDBC.jl), [ODBC.jl](https://github.com/JuliaDatabases/ODBC.jl), [XLSX.jl](https://github.com/felipenoris/XLSX.jl), etc.)

ext/ArrowgRPCServerExt.jl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module ArrowgRPCServerExt
2+
3+
using Arrow
4+
using gRPCServer
5+
6+
include("arrowgrpcserverext/constants.jl")
7+
include("arrowgrpcserverext/context.jl")
8+
include("arrowgrpcserverext/streams.jl")
9+
include("arrowgrpcserverext/handlers.jl")
10+
include("arrowgrpcserverext/descriptor.jl")
11+
12+
end # module ArrowgRPCServerExt
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
const Flight = Arrow.Flight
2+
const STREAM_BUFFER_SIZE = 16
3+
const GENERATED_TYPE_PREFIX = "Arrow.Flight.Generated."

ext/arrowgrpcserverext/context.jl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
function _method_type(method::Flight.MethodDescriptor)
2+
if method.request_streaming
3+
return method.response_streaming ? gRPCServer.MethodType.BIDI_STREAMING :
4+
gRPCServer.MethodType.CLIENT_STREAMING
5+
end
6+
return method.response_streaming ? gRPCServer.MethodType.SERVER_STREAMING :
7+
gRPCServer.MethodType.UNARY
8+
end
9+
10+
function _call_context(context::gRPCServer.ServerContext)
11+
headers = Flight.HeaderPair[
12+
String(name) => (value isa String ? value : Vector{UInt8}(value)) for
13+
(name, value) in pairs(context.metadata)
14+
]
15+
peer = string(context.peer.address, ":", context.peer.port)
16+
return Flight.ServerCallContext(
17+
headers=headers,
18+
peer=peer,
19+
secure=context.peer.certificate !== nothing,
20+
)
21+
end
22+
23+
function _proto_type_name(T::Type)
24+
type_name = string(T)
25+
if startswith(type_name, GENERATED_TYPE_PREFIX)
26+
return type_name[(ncodeunits(GENERATED_TYPE_PREFIX) + 1):end]
27+
end
28+
return type_name
29+
end
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
function _register_proto_types!(method::Flight.MethodDescriptor)
2+
registry = gRPCServer.get_type_registry()
3+
registry[_proto_type_name(method.request_type)] = method.request_type
4+
registry[_proto_type_name(method.response_type)] = method.response_type
5+
return nothing
6+
end
7+
8+
function gRPCServer.service_descriptor(service::Flight.Service)
9+
descriptor = Flight.servicedescriptor(service)
10+
methods = Dict{String,gRPCServer.MethodDescriptor}()
11+
for method in descriptor.methods
12+
_register_proto_types!(method)
13+
methods[method.name] = gRPCServer.MethodDescriptor(
14+
method.name,
15+
_method_type(method),
16+
_proto_type_name(method.request_type),
17+
_proto_type_name(method.response_type),
18+
_handler(service, method),
19+
)
20+
end
21+
return gRPCServer.ServiceDescriptor(descriptor.name, methods, nothing)
22+
end

ext/arrowgrpcserverext/handlers.jl

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
function _unary_handler(service::Flight.Service, method::Flight.MethodDescriptor)
2+
return (context, request) -> Flight.dispatch(service, _call_context(context), method, request)
3+
end
4+
5+
function _server_streaming_handler(service::Flight.Service, method::Flight.MethodDescriptor)
6+
return (context, request, stream) -> begin
7+
response = Channel{method.response_type}(STREAM_BUFFER_SIZE)
8+
task = @async begin
9+
try
10+
if method.handler_field === :listactions
11+
Flight.listactions(service, _call_context(context), response)
12+
else
13+
Flight.dispatch(service, _call_context(context), method, request, response)
14+
end
15+
finally
16+
close(response)
17+
end
18+
end
19+
try
20+
_drain_response!(stream, response)
21+
finally
22+
_streaming_handler_result(task)
23+
end
24+
end
25+
end
26+
27+
function _client_streaming_handler(service::Flight.Service, method::Flight.MethodDescriptor)
28+
return (context, stream) -> begin
29+
request = Channel{method.request_type}(STREAM_BUFFER_SIZE)
30+
producer = @async begin
31+
try
32+
for message in stream
33+
put!(request, message)
34+
end
35+
finally
36+
close(request)
37+
end
38+
end
39+
task = @async Flight.dispatch(service, _call_context(context), method, request)
40+
try
41+
return fetch(task)
42+
finally
43+
_streaming_handler_result(task, producer)
44+
end
45+
end
46+
end
47+
48+
function _bidi_streaming_handler(service::Flight.Service, method::Flight.MethodDescriptor)
49+
return (context, stream) -> begin
50+
request = Channel{method.request_type}(STREAM_BUFFER_SIZE)
51+
response = Channel{method.response_type}(STREAM_BUFFER_SIZE)
52+
producer = @async begin
53+
try
54+
for message in stream
55+
put!(request, message)
56+
end
57+
finally
58+
close(request)
59+
end
60+
end
61+
task = @async begin
62+
try
63+
Flight.dispatch(service, _call_context(context), method, request, response)
64+
finally
65+
close(response)
66+
end
67+
end
68+
try
69+
for message in response
70+
gRPCServer.send!(stream, message)
71+
end
72+
gRPCServer.close!(stream)
73+
finally
74+
_streaming_handler_result(task, producer)
75+
end
76+
return nothing
77+
end
78+
end
79+
80+
function _handler(service::Flight.Service, method::Flight.MethodDescriptor)
81+
if !method.request_streaming && !method.response_streaming
82+
return _unary_handler(service, method)
83+
elseif !method.request_streaming && method.response_streaming
84+
return _server_streaming_handler(service, method)
85+
elseif method.request_streaming && !method.response_streaming
86+
return _client_streaming_handler(service, method)
87+
end
88+
return _bidi_streaming_handler(service, method)
89+
end

ext/arrowgrpcserverext/streams.jl

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
function _drain_response!(
2+
stream::gRPCServer.ServerStream{T},
3+
response::Channel{T},
4+
) where {T}
5+
for message in response
6+
gRPCServer.send!(stream, message)
7+
end
8+
gRPCServer.close!(stream)
9+
return nothing
10+
end
11+
12+
function _streaming_handler_result(task::Task, producer::Union{Nothing,Task}=nothing)
13+
if !isnothing(producer)
14+
wait(producer)
15+
end
16+
wait(task)
17+
return nothing
18+
end

0 commit comments

Comments
 (0)