Skip to content

Conversation

@CuteChuanChuan
Copy link
Contributor

Which issue does this PR close?

Closes #117

Rationale for this change

PR #115 introduced gRPC timeout configurations, but these values were hard-coded. This PR extends #115 by making all gRPC timeout values user-configurable through DataFusion's extensible configuration mechanism.

What changes are included in this PR?

  • Added 9 new configuration options to BallistaConfig (5 client-side, 4 server-side)
  • Modified create_grpc_client_connection() and create_grpc_server() to accept config parameter
  • Updated all call sites across executor and scheduler

Are there any user-facing changes?

Yes. Users can now configure gRPC timeouts.

All existing tests pass.

- Extends PR apache#115 by making gRPC timeout values configurable instead of hard-coded.
tokio::spawn(async move {
let shutdown_signal = grpc_shutdown.recv();
let grpc_server_future = create_grpc_server()
let grpc_server_future = create_grpc_server(&ballista_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this configuration should be provided as part of executor binary configuration, not ballista condif

- Add GrpcClientConfig and GrpcServerConfig structs
- Update create_grpc_client_connection/create_grpc_server signatures
- Move server configs to ExecutorProcessConfig
- Add standalone prefix to keepalive_timeout config
@CuteChuanChuan
Copy link
Contributor Author

Hi @milenkovicm ,
Thanks for your comments and pointing out better approach. PTAL when you have a chance.
Thanks.

@milenkovicm
Copy link
Contributor

apologies @CuteChuanChuan will try to review this change in next day or so

@CuteChuanChuan
Copy link
Contributor Author

No worries at all! Take your time - I appreciate you taking a look at this. 🙏

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firs of all sorry for late review, and thank you for your contribution !
I think changes make sense, I have just few minor comments, please have a look when you get chance

"+---------------------------------------+----------+",
"+-------------------------------------------------------+-------+",
"| name | value |",
"+-------------------------------------------------------+-------+",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please update query to return ballista.job.name only. so we don't change the test every time we add new option.

"ballista.grpc.client.tcp_keepalive_seconds";
pub const BALLISTA_GRPC_CLIENT_HTTP2_KEEPALIVE_INTERVAL_SECONDS: &str =
"ballista.grpc.client.http2_keepalive_interval_seconds";
pub const BALLISTA_STANDALONE_GRPC_CLIENT_KEEPALIVE_TIMEOUT_SECONDS: &str =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need standalone config? Standalone is used just for local testing, maybe if we can live without it. It would make configuration options simpler. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. You're right. We can make configuration options simpler.

}

impl GrpcClientConfig {
pub fn from_ballista_config(config: &BallistaConfig) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to implement this as From or Into not sure which one would be better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that GrpcClientConfig is derived by extracting specific configs from BallistaConfig, I think From is more appropriate than Into here. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From make sense, it will implement into as well, if i'm not mistaken

}

#[derive(Debug, Clone)]
pub struct GrpcServerConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

use tonic::transport::{Channel, Error, Server};

#[derive(Debug, Clone)]
pub struct GrpcClientConfig {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please put some docs please, we will try to catch up with documentation at some point so it would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);

let grpc_server_config = GrpcServerConfig::default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you avoid variable here and create config at method call

create_grpc_server(&GrpcServerConfig::default();)

executor_metadata.host, executor_metadata.grpc_port
);
let connection = create_grpc_client_connection(executor_url).await?;
let grpc_config = GrpcClientConfig::default();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be method parameter, so we can use BallistaConfig value instead of default values here?

let grpc_server_config = GrpcServerConfig::default();
tokio::spawn(
create_grpc_server()
create_grpc_server(&grpc_server_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest

create_grpc_server(&GrpcServerConfig::default();)

if possible

- Simplify config test to query only ballista.job.name
- Remove standalone-specific gRPC configurations
- Implement From trait for GrpcClientConfig conversion
- Add documentation to gRPC config structs
- Inline GrpcServerConfig creation in executor and scheduler
- Pass BallistaConfig to ExecutorManager for proper config propagation
@CuteChuanChuan
Copy link
Contributor Author

Hi @milenkovicm ,
Thanks for your comments and I really appreciate it. PTAL when you have a chance.
Thanks! 🙏

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have just two small coments, if you agree

let grpc_client_config = GrpcClientConfig::from(&ballista_config);
let connection = if connect_timeout == 0 {
create_grpc_client_connection(scheduler_url)
create_grpc_client_connection(scheduler_url, &grpc_client_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have

create_grpc_client_connection(scheduler_url, &ballista_config.into())

it would remove

let grpc_client_config = GrpcClientConfig::from(&ballista_config);

making it a bit easier to read?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @milenkovicm ,

Good catch! However, since the From trait is implemented for &BallistaConfig (not BallistaConfig), I need &(&ballista_config).into() instead of &ballista_config.into().

Would you prefer I change the implementation to impl From<BallistaConfig> so we can use the simpler &ballista_config.into() syntax?

let ballista_config = session_config.ballista_config();
let grpc_client_config = GrpcClientConfig::from(&ballista_config);
let connection =
create_grpc_client_connection(scheduler_url, &grpc_client_config).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have

create_grpc_client_connection(scheduler_url, &ballista_config.into())

it would remove

let grpc_client_config = GrpcClientConfig::from(&ballista_config);

making it a bit easier to read?

@CuteChuanChuan
Copy link
Contributor Author

Hi @milenkovicm ,

I've addressed the feedback about simplifying the config usage and error found in Clippy. However, I noticed the CI is failing with rustdoc errors in display.rs and execution_graph.rs, which are files I haven't modified in this PR.

Would you like me to:

  1. Fix them in this PR as well, or
  2. Create a separate issue/PR for these rustdoc fixes?

- Use Into trait conversion for GrpcClientConfig creation
- Fix clippy needless_borrow warning in executor_manager
@milenkovicm
Copy link
Contributor

should be fixed on main, rebase your branch

@CuteChuanChuan
Copy link
Contributor Author

Fixed the unused imports. Thanks!

@milenkovicm
Copy link
Contributor

thanks on your contribution @CuteChuanChuan

@milenkovicm milenkovicm merged commit 917741a into apache:main Nov 1, 2025
27 checks passed
@CuteChuanChuan CuteChuanChuan deleted the cutechuanchuan/grpc-user-configurable branch November 2, 2025 02:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make gRPC timeouts user-configurable

2 participants