Skip to content

Conversation

@gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Aug 7, 2025

This PR ships the ability to provide user-defined codecs for users, for that two main pieces of work are shipped:

  • Fix the propagation of the user provided codecs. This codec is now used and combined with the DistributedCodec in both the head stage of the plan that gets executed locally and any other further stages that get executed remotely in an ArrowFlightEndpoint
  • Proposes a new API for letting users provide their own codecs

About the second point:

Previously, the user defined codec was provided as an optional input to the DistributedPhysicalOptimizerRule, which ergonomically made a lot of sense. However, the DistributedPhysicalOptimizerRule has no way of propagating the user defined codec down to remote workers.

In practice, users with custom codecs need to inject it twice:

  1. one for the head of the plan that gets executed locally (currently addressed by DistributedPhysicalOptimizerRule that holds a reference to the codec and injects it into the ExecutionStage)
  2. another one while instantiating the ArrowFlightEndpoint that runs in a different machine (currently unaddressed)

The proposal in this PR is to find an API that is as consistent as possible for 1) and 2) so that providing a custom codec is a consistent and ergonomic experience no matter if it's for the head stage running locally, or for a worker running an ArrowFlightEndpoint.

For that, just some plain functions are shipped:

providing a codec for an ArrowFlightEndpoint that will run remotely

#[derive(Clone)]
struct CustomSessionBuilder;
impl SessionBuilder for CustomSessionBuilder {
     fn on_new_session(&self, builder: SessionStateBuilder) -> SessionStateBuilder {
          with_user_codec(builder, MyCustomCodec) // <- this is how you'd inject a custom codec for a remote worker
     }
}

providing a codec for the head of the plan that runs locally

let builder = SessionStateBuilder::new().with_default_features();
let builder = with_user_codec(builder, MyCustomCodec); // <- this is how you'd inject a custom codec locally
let state = builder.build()

The challenge while keeping the old API (DistributedPhysicalOptimizerRule::with_codec()), is that it would lead to two different ways necessary for the user to learn to provide their codecs, and makes the ExecutionStage.codec field just a transportation field that adds no semantic meaning to the structure, and is just there to carry the codec around.

#[tokio::test]
#[ignore]
async fn custom_extension_codec() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Clone)]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test cannot yet be un-ignored, as the expected results are failing, but the custom codec propagation works.

use std::sync::Arc;

#[tokio::test]
#[ignore]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test now works! 🚀

type Error = DataFusionError;
pub fn proto_from_stage(
stage: &ExecutionStage,
codec: &dyn PhysicalExtensionCodec,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As now a codec is needed here, we cannot just implement this in terms of a simple TryFrom implementation

Comment on lines -35 to -36
/// An optional codec to assist in serializing and deserializing this stage
pub codec: Option<Arc<dyn PhysicalExtensionCodec>>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, this fields ends up being Some() in the head stage, but None in any other. As it's always None in the ArrowFlightEndpoint, there's no way we can use a custom user provided codec there.

Copy link
Collaborator

@robtandy robtandy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good; thank you @gabotechs !

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another great work. Thanks Gabriel

// Nobody will never want to retriever a user codec from a SessionStateBuilder
None
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The content of this file is awesome. New UserCodecTransport trait for setting and getting codec for existing DF struct. And comments showing how to use them

.write()
.config_mut()
.set_extension(Arc::new(codec));
add_user_codec(&mut ctx, Int64ListExecCodec);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice and simple

combined_codec.push_arc(Arc::clone(user_codec));
}

let child_stage_proto = proto_from_stage(child_stage, &combined_codec).map_err(|e| {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intuively this makes sense but I think I can only fully get the role of combined_codec when I start using it

@gabotechs gabotechs merged commit 84026d6 into main Aug 7, 2025
3 checks passed
@gabotechs gabotechs deleted the gabrielmusat/user-provided-codecs branch August 7, 2025 16:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants