Skip to content

Conversation

@sebbegg
Copy link

@sebbegg sebbegg commented Dec 18, 2025

Which issue does this PR close?

Closes #1349

Rationale for this change

See #1349. Having a proxy on the scheduler makes it easier to e.g. expose this in docker-compose or kubernetes.

Making this a draft first, let me know what you think.

What changes are included in this PR?

This adds a "flight proxy" service to the scheduler that's optionally startet, when advertise_flight_sql_endpoint is set.
This implements only do_get and simply proxies the requests to the actual executor.
I reasoned that having this as a separate service (instead of another method in the scheduler grpc) makes this more flexible since the code on the client side remains almost unchanged - except for the logic to pick scheduler or executor host as the endpoint.

Are there any user-facing changes?

Kind of - using advertise_flight_sql_endpoint now actually has an effect.

@milenkovicm
Copy link
Contributor

Thanks @sebbegg, I'm a bit stuck time wise will try to have a look tomorrow if not will follow up over holiday period

@milenkovicm
Copy link
Contributor

Perhaps @martin-g may be able to help with quick review, I'd be very thankful

Copy link
Member

@martin-g martin-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no new tests

))
})?;
let flight_client = FlightServiceClient::new(connection)
.max_decoding_message_size(16 * 1024 * 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can/should we use config.grpc_server_max_encoding_message_size or a new setting ?
Same for min below.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly.

No idea on whether this should be a new setting 🤔
I'd guess starting with the current one might be fine and if there should be need for a dedicated setting one could revisit?

info!("Built-in arrow flight server proxy listening on: {address:?} max_encoding_size: {max_encoding_message_size} max_decoding_size: {max_decoding_message_size}");

let grpc_server_config = GrpcServerConfig::default();
let server_future = create_grpc_server(&grpc_server_config)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no authentication layer.
But there is no authentication for the main service too, so this is not required at the moment.

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a good start, i just think we could remove part additional request to check for proxy endpoint

let duration = Duration::from_millis(duration);

info!("Job {job_id} finished executing in {duration:?} ");
let FlightEndpointInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we should do a round trip to fetch endpoint info. Could we add optional response parameter in message SuccessfulJob ? and if it is present do a proxy request?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing SuccessfulJob would require a few changes in more modules… not sure it’s the right place?
An alternative would be to add it as a second field to GetJobStatusResult?
That would limit the impact to the scheduler grpc server only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gives you back partition locations which indicates where data is, so adding optional proxy parameter you could ask it "give me data from partition_location" otherwise you just fetch it as it is at the moment (if proxy not provided)

Copy link
Author

@sebbegg sebbegg Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that, my worry was about where to fill the information.
The SuccessfulJob and PartitionLocation objects are all created in execution_graph.rs it seems. It appears weird to somehow forward the proxy information all the way into the execution graph in order to be able to fill a new field like SuccessfulJob.flight_endpoint

The alternative could be to clone & update the SuccessfulJob in the grpc endpoint:

Ok(status) => Ok(Response::new(GetJobStatusResult { status })),

pseudo:

fn get_job_status(job_id) {
  job_status = task_manager.get_job_status(job_id)
  if job_status.status is SuccessfulJob:
      job_status.status.flight_endpoint = self.state.config.advertise_flight_sql_endpoint
  
  return job_status
}

@milenkovicm milenkovicm changed the title Add arrow flight proxy feat: Add arrow flight proxy to scheduler Jan 3, 2026
@milenkovicm
Copy link
Contributor

hey @sebbegg is there anything i can do to help you with this PR?

@sebbegg
Copy link
Author

sebbegg commented Jan 6, 2026 via email

# Conflicts:
#	ballista/core/src/execution_plans/distributed_query.rs
#	ballista/scheduler/src/lib.rs
#	ballista/scheduler/src/scheduler_process.rs
#	ballista/scheduler/src/scheduler_server/grpc.rs
@sebbegg
Copy link
Author

sebbegg commented Jan 8, 2026

@milenkovicm Feel free to have another look - made some updates:

  • The GetJobStatusResult now has the flight_endpoint - so there's no extra request involved to fetch this information.
  • As @martin-g suggested the proxy now checks that the requested host:port belongs to an active executor
  • Used tokio::select! to exit in case the flight proxy panics

Unfortunately this is still missing tests.
A proper test would probably be some sort of integration test with scheduler and at least one executor.
I looked through the test-utils, but I'm not sure there's something that could be used for that...

@milenkovicm
Copy link
Contributor

thanks @sebbegg will have a look today/tomorrow

@milenkovicm milenkovicm marked this pull request as ready for review January 10, 2026 17:20
Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good to me.

Main problem I have with this approach is that scheduler may be overloaded with data transport which could affect scheduling.

But I also find this approach as valuable as ballista can open single port towards the clients. It does make sense to me that "proxy" can be on different address / port.

  • if proxy is not configured, it should not listen for connections.
  • If proxy is configured without specific ip/port, i'd suggest to bind it to same port as scheduler, as I believe it would be sensible default and simplify deployment.
  • If proxy is configured with specific ip/port we could treat it as external process.

what do you think @sebbegg ?

also, It would be great if we could add a test or two


message GetJobStatusResult {
JobStatus status = 1;
optional string flight_endpoint = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to name this as "flight proxy" or similar?

let GetJobStatusResult { status } = scheduler
let GetJobStatusResult {
status,
flight_endpoint,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to support Some("") in which case client should use scheduler address and port ? This way scheduler should not relly care about its public port?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure should we use Some("") or we have proto enums to represent proxy cases

match config.advertise_flight_sql_endpoint {
Some(_) => {
info!("Starting flight proxy");
let flight_proxy = start_flight_proxy_server(config, scheduler.state.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to run proxy as a service on the same port with a scheduler service? It would simplify configuration.

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @sebbegg,

just to clarify, we can have three configuration options:

  • proxy not configured, client needs to fetc data from executors
  • proxy configured, no ip address or port provided, scheduler needs to start proxy on the same port (withing process)
  • proxy configured ip/port provided, scheduler considers this as extenal process running proxy, it just needs to put that value in the response, scheduler will not start proxy. client needs to use that ip/port combination to connect to process

config.advertise_flight_sql_endpoint
);
match config.advertise_flight_sql_endpoint.clone() {
Some(s) if s != "" => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I might be unclear. If we specify different port (or ip port) that would mean there is external process running proxy, not the scheduler process.

So we have three configuration options

  • no proxy
  • in process (no need to specify ip/port client should use scheduler ip port)
  • external process (ip / port specified) client should use given ip/port

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s what happens though?
This just puts the logic to use the scheduler host:port in the scheduler rather than the client.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but you need to specify advertising address and configure it correctly, which may be tricky in docker containers.

Suggestion would eliminate that as client already knows scheduler address.


message GetJobStatusResult {
JobStatus status = 1;
optional string flight_proxy = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this one of to represent proxy statuses

  • no proxy
  • in process (no need to specify ip/port client should use scheduler ip port)
  • external process (ip / port specified) client should use given ip/port

that would remove check if for empty string on client side

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. like

oneof flight_proxy {
    bool no_proxy = 2;
    bool in_scheduler = 3;
    string external_address = 4;
}

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes something like that,
perhaps,

oneof flight_proxy {
    bool local = 1; 
    string external = 4;
}

something like that

.advertise_flight_sql_endpoint
.clone()
.map(|s| match s {
s if s.is_empty() => format!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess same thing here, if configuration is empty client should dial back on scheduler address / port

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what happens - just felt that the "switch" was easier to implement on scheduler side.
This way there's a bit less logic on the client side. Can move this though.

#[arg(
long,
help = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT"
help = "Route for proxying flight results via scheduler. Should be of the form 'IP:PORT'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment regarding empty address. if address/port not specified client need to dial back on scheduler address

use std::sync::Arc;
use tonic::{Request, Response, Status, Streaming};

/// Service implementing a proxy from scheduler to executor Apache Arrow Flight Protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great if we could add more comments here. describe how it can be configured

@sebbegg
Copy link
Author

sebbegg commented Jan 13, 2026

thanks @sebbegg,

just to clarify, we can have three configuration options:

  • proxy not configured, client needs to fetc data from executors
  • proxy configured, no ip address or port provided, scheduler needs to start proxy on the same port (withing process)
  • proxy configured ip/port provided, scheduler considers this as extenal process running proxy, it just needs to put that value in the response, scheduler will not start proxy. client needs to use that ip/port combination to connect to process

If I get this right, the last variant would mean we don't need this block, right?

https://github.com/sebbegg/datafusion-ballista/blob/5022263904c37d660bc77e3f5c065206b6720d20/ballista/scheduler/src/scheduler_process.rs#L202-L212

How would you then start this external process?
I guess we could add another crate/binary at ballista/flight-proxy?

Starting a cluster could then look like:

  • ./ballista-flight-proxy --bind-host localhost --bind-port 50040
  • ./ballista-scheduler --advertise-flight-sql-endpoint localhost:50040
  • ./ballista-executor --scheduler-host localhost --scheduler-port 50050

I guess it's smart because like this all services can be run independently.

As far as I can tell all the scheduler-state is in-memory right?
So in this setup we could e.g. not perform the check whether the requested data / executor-host is actually alive and belongs to the cluster.
On the other hand, it would make the proxy stateless, which is probably a good thing.

I wonder though, whether it's worthwhile to add the possibility (and hence the complexity in the cli & protobuf) of running the flight-proxy "embedded" in the scheduler?

@milenkovicm
Copy link
Contributor

thanks @sebbegg,
just to clarify, we can have three configuration options:

  • proxy not configured, client needs to fetc data from executors
  • proxy configured, no ip address or port provided, scheduler needs to start proxy on the same port (withing process)
  • proxy configured ip/port provided, scheduler considers this as extenal process running proxy, it just needs to put that value in the response, scheduler will not start proxy. client needs to use that ip/port combination to connect to process

If I get this right, the last variant would mean we don't need this block, right?

https://github.com/sebbegg/datafusion-ballista/blob/5022263904c37d660bc77e3f5c065206b6720d20/ballista/scheduler/src/scheduler_process.rs#L202-L212

yes we don't start in process proxy on a different port

How would you then start this external process? I guess we could add another crate/binary at ballista/flight-proxy?

we can provide new library, or users create their own based on proxy you have created

Starting a cluster could then look like:

  • ./ballista-flight-proxy --bind-host localhost --bind-port 50040
  • ./ballista-scheduler --advertise-flight-sql-endpoint localhost:50040
  • ./ballista-executor --scheduler-host localhost --scheduler-port 50050

I guess it's smart because like this all services can be run independently.

yes, we offload scheduler process from proxying data, and let it in charge of orchestration only

As far as I can tell all the scheduler-state is in-memory right? So in this setup we could e.g. not perform the check whether the requested data / executor-host is actually alive and belongs to the cluster. On the other hand, it would make the proxy stateless, which is probably a good thing.

maybe we could relax this requirement, perhaps i should speak earlier. why do we need to check if executor is there? there is no corrective actions we can take.

I wonder though, whether it's worthwhile to add the possibility (and hence the complexity in the cli & protobuf) of running the flight-proxy "embedded" in the scheduler?

I'm not sure i understand, we still have option to run it "embedded"

* `./ballista-scheduler --advertise-flight-sql-endpoint`

should listen "embedded".
please let me know if i got you wrong

@sebbegg
Copy link
Author

sebbegg commented Jan 14, 2026

maybe we could relax this requirement, perhaps i should speak earlier. why do we need to check if executor is there? there is no corrective actions we can take.

That was a comment on the PR - but sure, we can drop this.

we can provide new library, or users create their own based on proxy you have created

Ok, so for the scope of this PR, should we add the extra proxy as an additional executable?
A minimalistic approach could be to only implement the embedded variant and leave an external flight-proxy executable up to users.

@milenkovicm
Copy link
Contributor

Ok, so for the scope of this PR, should we add the extra proxy as an additional executable? A minimalistic approach could be to only implement the embedded variant and leave an external flight-proxy executable up to users.

I agree

@milenkovicm
Copy link
Contributor

I'll try to review changes tomorrow

@milenkovicm
Copy link
Contributor

I apologise @sebbegg, I'm catching up with reviews

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @sebbegg
I think this can be merged, I just have a few minor comments and one case to be fixed

running scheduler with:

cargo run --bin ballista-scheduler -- --advertise-flight-sql-endpoint 

will return error

error: a value is required for '--advertise-flight-sql-endpoint <ADVERTISE_FLIGHT_SQL_ENDPOINT>' but none was supplied

not sure how to configure local proxy to test this

#[command(version, about, long_about = None)]
pub struct Config {
/// Route for proxying flight results via scheduler (IP:PORT format).
#[arg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[arg(
        long,
        num_args = 0..=1,
        default_missing_value = "",
        help = "Route for proxying flight results via scheduler. Use 'HOST:PORT' to let clients fetch results from the specified address. If empty a flight proxy will be started on the scheduler host and port."
    )]

max_decoding_message_size: usize,
max_encoding_message_size: usize,
) -> Result<FlightServiceClient<tonic::transport::channel::Channel>, BallistaError> {
let addr = format!("http://{host}:{port}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should not assume http here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, other usages of create_grpc_client_connection follow the same usage:

let addr = format!("http://{host}:{port}");
let grpc_config = GrpcClientConfig::default();
debug!("BallistaClient connecting to {addr}");
let connection = create_grpc_client_connection(addr.clone(), &grpc_config)

let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");

It's somewhat inconsistent that some parts of the code use host+port while at other places require Urls or url-like strings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Clarify usage of advertise_flight_sql_endpoint

3 participants