Skip to content

Commit f7318aa

Browse files
authored
Rename all arrow flight endpoint references to "Worker" (#274)
1 parent 84243d3 commit f7318aa

28 files changed

+199
-213
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ https://datafusion-contrib.github.io/datafusion-distributed
3535
There are some runnable examples showcasing how to provide a localhost implementation for Distributed DataFusion in
3636
[examples/](examples):
3737

38-
- [localhost_worker.rs](examples/localhost_worker.rs): code that spawns an Arrow Flight Endpoint listening for physical
38+
- [localhost_worker.rs](examples/localhost_worker.rs): code that spawns a Worker listening for physical
3939
plans over the network.
40-
- [localhost_run.rs](examples/localhost_run.rs): code that distributes a query across the spawned Arrow Flight Endpoints
41-
and executes it.
40+
- [localhost_run.rs](examples/localhost_run.rs): code that distributes a query across the spawned Workers and executes
41+
it.
4242

4343
The integration tests also provide an idea about how to use the library and what can be achieved with it:
4444

benchmarks/cdk/bin/worker.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ use datafusion::execution::SessionStateBuilder;
99
use datafusion::physical_plan::execute_stream;
1010
use datafusion::prelude::SessionContext;
1111
use datafusion_distributed::{
12-
ArrowFlightEndpoint, DistributedExt, DistributedPhysicalOptimizerRule,
13-
DistributedSessionBuilderContext, WorkerResolver, display_plan_ascii,
12+
DistributedExt, DistributedPhysicalOptimizerRule, Worker, WorkerQueryContext, WorkerResolver,
13+
display_plan_ascii,
1414
};
1515
use futures::{StreamExt, TryFutureExt};
1616
use log::{error, info, warn};
@@ -72,12 +72,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
7272
.build();
7373
let ctx = SessionContext::from(state);
7474

75-
let arrow_flight_endpoint =
76-
ArrowFlightEndpoint::from_session_builder(move |ctx: DistributedSessionBuilderContext| {
77-
let s3 = s3.clone();
78-
let s3_url = s3_url.clone();
79-
async move { Ok(ctx.builder.with_object_store(&s3_url, s3).build()) }
80-
});
75+
let arrow_flight_endpoint = Worker::from_session_builder(move |ctx: WorkerQueryContext| {
76+
let s3 = s3.clone();
77+
let s3_url = s3_url.clone();
78+
async move { Ok(ctx.builder.with_object_store(&s3_url, s3).build()) }
79+
});
8180
let http_server = axum::serve(
8281
listener,
8382
Router::new().route(

benchmarks/src/run.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use datafusion::prelude::*;
3131
use datafusion_distributed::test_utils::localhost::LocalHostWorkerResolver;
3232
use datafusion_distributed::test_utils::{clickbench, tpcds, tpch};
3333
use datafusion_distributed::{
34-
ArrowFlightEndpoint, DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt,
34+
DistributedExt, DistributedPhysicalOptimizerRule, NetworkBoundaryExt, Worker,
3535
};
3636
use log::info;
3737
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@@ -185,7 +185,7 @@ impl RunOpt {
185185
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
186186
Ok::<_, Box<dyn Error + Send + Sync>>(
187187
Server::builder()
188-
.add_service(ArrowFlightEndpoint::default().into_flight_server())
188+
.add_service(Worker::default().into_flight_server())
189189
.serve_with_incoming(incoming)
190190
.await?,
191191
)

docs/source/user-guide/channel-resolver.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ points:
1313

1414
- You will need to provide your own implementation in two places:
1515
- in the `SessionContext` that handles your queries.
16-
- while instantiating the `ArrowFlightEndpoint` with the `from_session_builder()` constructor.
16+
- while instantiating the `Worker` with the `from_session_builder()` constructor.
1717
- If you decide to build it from scratch, make sure that Arrow Flight clients are reused across
1818
request rather than always building new ones.
1919
- You can use this library's `DefaultChannelResolver` as a backbone for your own implementation.
@@ -44,10 +44,10 @@ async fn main() {
4444
.with_distributed_channel_resolver(channel_resolver.clone())
4545
.build();
4646

47-
let endpoint = ArrowFlightEndpoint::from_session_builder(|ctx: DistributedSessionBuilderContext| {
47+
// ... and here for each query the Worker handles.
48+
let endpoint = Worker::from_session_builder(move |ctx: WorkerQueryContext| {
4849
let channel_resolver = channel_resolver.clone();
4950
async move {
50-
// ... and here for each query that ArrowFlightEndpoint handles.
5151
Ok(ctx.builder.with_distributed_channel_resolver(channel_resolver).build())
5252
}
5353
});

docs/source/user-guide/getting-started.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ impl ChannelResolver for LocalhostChannelResolver {
4040

4141
> NOTE: This example is not production-ready and is meant to showcase the basic concepts of the library.
4242
43-
This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion Arrow Flight servers, and it's
44-
also the user of this library's responsibility to spawn a Tonic server that exposes the Arrow Flight service.
43+
This `WorkerResolver` implementation should resolve URLs of Distributed DataFusion workers, and it's
44+
also the user of this library's responsibility to spawn a Tonic server that exposes the worker as an Arrow Flight
45+
service using Tonic.
4546

4647
A basic example of such a server is:
4748

4849
```rust
4950
#[tokio::main]
5051
async fn main() -> Result<(), Box<dyn Error>> {
51-
let endpoint = ArrowFlightEndpoint::default();
52+
let endpoint = Worker::default();
5253

5354
Server::builder()
5455
.add_service(endpoint.into_flight_server())
@@ -66,7 +67,7 @@ The next two sections of this guide will walk you through tailoring the library'
6667
- [Build your own WorkerResolver](worker-resolver.md)
6768
- [Build your own ChannelResolver](channel-resolver.md)
6869
- [Build your own TaskEstimator](task-estimator.md)
69-
- [Build your own distributed DataFusion Arrow Flight endpoint](arrow-flight-endpoint.md)
70+
- [Build your own distributed DataFusion Worker](worker.md)
7071

7172
Here are some other resources in the codebase:
7273

docs/source/user-guide/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ your own distributed DataFusion cluster.
1010
- [Concepts](concepts.md)
1111
- [Getting Started](getting-started.md)
1212
- [Building a WorkerResolver](worker-resolver.md)
13-
- [Building an Arrow Flight endpoint](arrow-flight-endpoint.md)
13+
- [Spawning a Worker](worker.md)
1414
- [Building a ChannelResolver](channel-resolver.md)
1515
- [Building a TaskEstimator](task-estimator.md)
1616
- [How a distributed plan is built](how-a-distributed-plan-is-built.md)

docs/source/user-guide/worker-resolver.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ async fn main() {
3030
}
3131
```
3232

33-
> NOTE: It's not necessary to pass this to the Arrow Flight endpoint session builder.
33+
> NOTE: It's not necessary to pass this to the Worker session builder.
3434
3535
## Static WorkerResolver
3636

3737
This is the simplest thing you can do, although it will not fit some common use cases. An example of this can be
38-
seen
39-
in [localhost_worker.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/fad9fa222d65b7d2ddae92fbc20082b5c434e4ff/examples/localhost_run.rs)
38+
seen in the
39+
[localhost_worker.rs](https://github.com/datafusion-contrib/datafusion-distributed/blob/fad9fa222d65b7d2ddae92fbc20082b5c434e4ff/examples/localhost_run.rs)
4040
example:
4141

4242
```rust
@@ -65,8 +65,8 @@ provider for hosting your Distributed DataFusion workers.
6565
It's up to you to decide how the URLs should be resolved. One important implementation note is:
6666

6767
- Retrieving the worker URLs needs to be synchronous (as planning is synchronous), and therefore, you'll likely
68-
need to spawn a background tasks that periodically refreshes the list of available URLs.
68+
need to spawn background tasks that periodically refresh the list of available URLs.
6969

7070
A good example can be found
7171
in https://github.com/datafusion-contrib/datafusion-distributed/blob/main/benchmarks/cdk/bin/worker.rs,
72-
where a cluster of AWS EC2 machines are discovered identified by tags with the AWS Rust SDK.
72+
where a cluster of AWS EC2 machines is discovered identified by tags with the AWS Rust SDK.

docs/source/user-guide/arrow-flight-endpoint.md renamed to docs/source/user-guide/worker.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
1-
# Building an Arrow Flight Endpoint
1+
# Spawn a Worker
22

3-
An `ArrowFlightEndpoint` is a gRPC server that implements the Arrow Flight protocol for distributed query execution.
3+
A `Worker` is a gRPC server that implements the Arrow Flight protocol for distributed query execution.
44
Worker nodes run these endpoints to receive execution plans, execute them, and stream results back.
55

66
## Overview
77

8-
The `ArrowFlightEndpoint` is the core worker component in Distributed DataFusion. It:
8+
The `Worker` is the core worker component in Distributed DataFusion. It:
99

1010
- Receives serialized execution plans via Arrow Flight's `do_get` method
1111
- Deserializes plans using protobuf and user-provided codecs
1212
- Executes plans using the local DataFusion runtime
1313
- Streams results back as Arrow record batches through the gRPC Arrow Flight interface
1414

15-
## Creating an ArrowFlightEndpoint
15+
## Launching the Arrow Flight server
1616

17-
The `Default` implementation of the `ArrowFlightEndpoint` should satisfy most basic use cases
17+
The `Default` implementation of the `Worker` should satisfy most basic use cases
1818

1919
```rust
20-
use datafusion_distributed::{ArrowFlightEndpoint};
20+
use datafusion_distributed::Worker;
2121

2222
async fn main() {
23-
let endpoint = ArrowFlightEndpoint::default();
23+
let endpoint = Worker::default();
2424

2525
Server::builder()
2626
.add_service(endpoint.into_flight_server())
@@ -31,21 +31,21 @@ async fn main() {
3131
}
3232
```
3333

34-
If you are using DataFusion though, it's very likely that you have your own custom UDFs, execution nodes, config options
35-
etc...
34+
If you are using DataFusion, though, it's very likely that you have your own custom UDFs, execution nodes, config
35+
options, etc...
3636

37-
You'll need to tell the `ArrowFlightEndpoint` how to build your DataFusion sessions:
37+
You'll need to tell the `Worker` how to build your DataFusion sessions:
3838

3939
```rust
40-
async fn build_state(ctx: DistributedSessionBuilderContext) -> Result<SessionState, DataFusionError> {
40+
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
4141
Ok(ctx
4242
.builder
4343
.with_scalar_functions(vec![your_custom_udf()])
4444
.build())
4545
}
4646

4747
async fn main() {
48-
let endpoint = ArrowFlightEndpoint::from_session_builder(build_sate);
48+
let endpoint = Worker::from_session_builder(build_sate);
4949

5050
Server::builder()
5151
.add_service(endpoint.into_flight_server())
@@ -56,21 +56,21 @@ async fn main() {
5656
}
5757
```
5858

59-
### DistributedSessionBuilder
59+
### WorkerSessionBuilder
6060

61-
The `DistributedSessionBuilder` is a closure or type that implements:
61+
The `WorkerSessionBuilder` is a closure or type that implements:
6262

6363
```rust
6464
#[async_trait]
65-
pub trait DistributedSessionBuilder {
65+
pub trait WorkerSessionBuilder {
6666
async fn build_session_state(
6767
&self,
68-
ctx: DistributedSessionBuilderContext,
68+
ctx: WorkerQueryContext,
6969
) -> Result<SessionState, DataFusionError>;
7070
}
7171
```
7272

73-
It receives a `DistributedSessionBuilderContext` containing:
73+
It receives a `WorkerQueryContext` containing:
7474

7575
- `SessionStateBuilder`: A pre-populated session state builder in which you can inject your custom stuff
7676
- `headers`: HTTP headers from the incoming request (useful for passing metadata)
@@ -81,11 +81,11 @@ Convert the endpoint to a gRPC service and serve it:
8181

8282
```rust
8383
use tonic::transport::Server;
84-
use datafusion_distributed::ArrowFlightEndpoint;
84+
use datafusion_distributed::Worker;
8585
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
8686

8787
async fn main() {
88-
let endpoint = ArrowFlightEndpoint::default();
88+
let endpoint = Worker::default();
8989

9090
Server::builder()
9191
.add_service(endpoint.into_flight_server())

examples/in_memory_cluster.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ use datafusion::common::DataFusionError;
55
use datafusion::execution::SessionStateBuilder;
66
use datafusion::prelude::{ParquetReadOptions, SessionContext};
77
use datafusion_distributed::{
8-
ArrowFlightEndpoint, BoxCloneSyncChannel, ChannelResolver, DistributedExt,
9-
DistributedPhysicalOptimizerRule, DistributedSessionBuilderContext, WorkerResolver,
10-
create_flight_client, display_plan_ascii,
8+
BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, Worker,
9+
WorkerQueryContext, WorkerResolver, create_flight_client, display_plan_ascii,
1110
};
1211
use futures::TryStreamExt;
1312
use hyper_util::rt::TokioIo;
@@ -89,12 +88,10 @@ impl InMemoryChannelResolver {
8988
};
9089
let this_clone = this.clone();
9190

92-
let endpoint = ArrowFlightEndpoint::from_session_builder(
93-
move |ctx: DistributedSessionBuilderContext| {
94-
let this = this.clone();
95-
async move { Ok(ctx.builder.with_distributed_channel_resolver(this).build()) }
96-
},
97-
);
91+
let endpoint = Worker::from_session_builder(move |ctx: WorkerQueryContext| {
92+
let this = this.clone();
93+
async move { Ok(ctx.builder.with_distributed_channel_resolver(this).build()) }
94+
});
9895

9996
tokio::spawn(async move {
10097
Server::builder()

examples/localhost.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ git lfs checkout
1616

1717
### Spawning the workers
1818

19-
In two different terminals spawn two ArrowFlightEndpoints
19+
In two different terminals spawn two workers
2020

2121
```shell
2222
cargo run --example localhost_worker -- 8080
@@ -26,7 +26,7 @@ cargo run --example localhost_worker -- 8080
2626
cargo run --example localhost_worker -- 8081
2727
```
2828

29-
The positional numeric argument is the port in which each Arrow Flight endpoint will listen to.
29+
The positional numeric argument is the port in which each worker will listen to.
3030

3131
### Issuing a distributed SQL query
3232

0 commit comments

Comments
 (0)