Skip to content

Conversation

@OussamaSaoudi
Copy link
Collaborator

@OussamaSaoudi OussamaSaoudi commented Nov 19, 2025

🥞 Stacked PR

Use this link to review incremental changes.


What changes are proposed in this pull request?

How was this change tested?

@OussamaSaoudi OussamaSaoudi changed the title implement driver feat: Distributed Log Replay Driver Phase Nov 19, 2025
use crate::log_segment::LogSegment;
use crate::{DeltaResult, Engine, Error, FileMeta};

/// Driver-side log replay (Phase 1) for distributed execution.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"driver" and "executor" are spark jargon, do we want to intentionally adopt that in kernel?
Or should we use e.g. PreparePhase and DistributedPhase?

/// ```
pub(crate) struct DriverPhase<P> {
processor: P,
state: Option<DriverState>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a small nomenclature clash here -- here we use "state" as in "state of a state machine" but other parts of kernel use "state" as in "the information tracked by a stateful operation". Maybe we call this one a step instead (as in, a phase contains multiple steps)?

}

/// Result of driver phase processing.
pub(crate) enum DriverPhaseResult<P> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another small clash: "result" usually means something related to std::result::Result (success/failure), but here it just means "[successful] outcome" which must then be wrapped as Result<DriverPhaseResult>?

/// ```
pub(crate) struct DriverPhase<P> {
processor: P,
state: Option<DriverState>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions:

  1. Do we want DriverPhase to impl Send (assuming P does), but purposefully not impl Sync (maybe by a PhantomData<Cell<()>> member?
    • Send + !Sync + !Clone could be an API contract that tells both humans and compiler how that phase should be used. And then ExecutorPhase should be Send + Sync + Clone (assuming P is), again because that tells the compiler what we intend.
    • However, I notice that types like Vec<T> are Send + Sync, presumably because the borrow checker enforces the necessary mutual exclusion (only one mutable reference can exist at a time even within a single thread, let alone across multiple threads). So maybe we don't need to do anything special here?
  2. Should types that impl Processor have type states to transition them from (mutable) driver phase to (immutable or at least different) executor phase? Or should we leave it up to the implementations to sort out things like that?
    • The driver state is ~always mutable (building "seen" set, collecting P&M that fly past, etc)
    • The executor state for map-type operations like scan tends to be immutable (probe the "seen" set)
    • The executor state for fold-type operations like P&M query tends to be mutable (still looking for P&M that the driver phase never saw)
    • The executor state for checkpoint operations is both map (actions to be written) and fold (collecting stats for the checkpoint manifest and/or _last_checkpoint file).
    • So actually... driver phase state/processor only needs to be left-associative: f(x, y) != f(y, x) and f(x, f(y, z)) != f(f(x, y), z) (order matters, can't parallelize); meanwhile, executor phase state/processor must be fully associative and commutative: f(x, y) = f(y, x) and f(x, f(y, z)) = f(f(x, y), z). That guarantees the results (whatever they are) can be partially aggregated by executors with final aggregation of those partial results on the driver.

engine: Arc<dyn Engine>,
) -> DeltaResult<Self> {
let commit_schema = get_commit_schema();
let commit = CommitReader::try_new(engine.as_ref(), &log_segment, commit_schema.clone())?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no guarantee the log segment contains any commits. It might just have a checkpoint. And it might be a (v1) multi-part checkpoint as well.

So this phase could have any of four possible sets of steps: [commits], [commits, manifest], [manifest], or []. Should we just track a (Option<CommitReader>, Option<ManifestReader>) pair and be done with it? The choreograph is the same either way: commits first (if present), followed by manifest (if present).

Ok(DriverState::Manifest(manifest))
} else {
// Multi-part checkpoint: all parts are leaf files
let files: Vec<_> = log_segment
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let files: Vec<_> = log_segment
let files = log_segment

Comment on lines +149 to +151
loop {
// Try to get item from current phase
let batch_result = match self.state.as_mut()? {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just tracked a pair of states, I think this code simplifies a lot:

if let Some(ref mut commits) = self.commits {
    match commits.next() {
        None => self.commits = None, // exhausted!
        next => return next,
    }
}

if let Some(ref mut manifest) = self.manifest {
    match manifest.next() {
        None => self.manifest = None, // exhausted!
        next => return next,
    }
}

None

We would no longer need self.is_finished because

self.commits.is_none() && self.manifest.is_none()

should suffice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants