Skip to content

Conversation

@whankinsiv
Copy link
Collaborator

Description

This PR introduces a new dynamic sync mode to the peer_network_interface. In this mode, the interface waits for a ChangeSyncPoint command on the cardano.sync.command topic and begins fetching from that point.

A background task forwards incoming sync points to the NetworkManager. When a new point arrives, the manager resets the ChainState, clears outstanding peer requests, and issues a new find_intersect across all upstream peers.

A minimal indexer process is included to demonstrate this behavior. It registers only the required modules and emits two ChangeSyncPoint commands 5 seconds apart, demonstrating dynamic re-syncing.

Related Issue(s)

#374

How was this tested?

By running the indexer process:

  1. It publishes a sync point at the last Byron block.
  2. The peer_network_interface begins fetching blocks normally.
  3. A second ChangeSyncPoint is published, returning to the starting point.
  4. The peer_network_interface resets and syncs again from that point.

For visibility, you can log inside handle_roll_forward to observe block fetching.

Checklist

  • My code builds and passes local tests
  • I added/updated tests for my changes, where applicable
  • I updated documentation (if applicable)
  • CI is green for this PR

Impact / Side effects

Adds a basic indexer process intended to become the minimal working example for building custom indexers on top of Acropolis.

Reviewer notes / Areas to focus

  • Dynamic NetworkManager construction in peer_network_interface/src/peer_network_interface.rs.
  • Correct behavior of on_sync_cmd when resetting sync state in peer_network_interface/src/network.rs.

Comment on lines 4 to 6
pub enum SyncCommand {
ChangeSyncPoint { slot: Slot, hash: BlockHash },
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub enum SyncCommand {
ChangeSyncPoint { slot: Slot, hash: BlockHash },
}
pub enum ChainSyncCommand {
FindIntersect { slot: Slot, hash: BlockHash },
}

Not a huge deal, but IMO "sync" is a bit of a generic name when we're interacting with a specific miniprotocol

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to ChainSyncCommand in 76cd5fb.

Comment on lines 14 to 17
// Configuration defaults
const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) =
("dynamic-sync-publisher-topic", "cardano.sync.command");

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow the strongly-typed configuration pattern which PeerNetworkInterface is using? I'd like that pattern to infect as much of the system as possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use the same pattern as PeerNetworkInterface in 3dfdb78.

Comment on lines 145 to 148
Ok(Point::Origin) => {
warn!("Dynamic sync received Point::Origin; ignoring");
return;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any reason to not support syncing to origin.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d9110c1.

Comment on lines 129 to 158
SyncPoint::Dynamic => {
let mut rx = match cmd_rx.take() {
Some(rx) => rx,
None => {
warn!("Dynamic mode configured but cmd_rx is missing");
return;
}
};

let point = match Self::wait_sync_command(&mut rx).await {
Ok(Point::Specific(slot, hash)) => {
let (epoch, _) = sink.genesis_values.slot_to_epoch(slot);
sink.last_epoch = Some(epoch);
info!("Dynamic sync starting at slot {} (epoch {})", slot, epoch);
Point::Specific(slot, hash)
}
Ok(Point::Origin) => {
warn!("Dynamic sync received Point::Origin; ignoring");
return;
}
Err(err) => {
warn!("Failed to receive initial sync command: {err:#}");
return;
}
};

let mut manager = Self::init_manager(cfg, sink, Some(rx));
manager.sync_to_point(point);
manager
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SyncPoint::Dynamic => {
let mut rx = match cmd_rx.take() {
Some(rx) => rx,
None => {
warn!("Dynamic mode configured but cmd_rx is missing");
return;
}
};
let point = match Self::wait_sync_command(&mut rx).await {
Ok(Point::Specific(slot, hash)) => {
let (epoch, _) = sink.genesis_values.slot_to_epoch(slot);
sink.last_epoch = Some(epoch);
info!("Dynamic sync starting at slot {} (epoch {})", slot, epoch);
Point::Specific(slot, hash)
}
Ok(Point::Origin) => {
warn!("Dynamic sync received Point::Origin; ignoring");
return;
}
Err(err) => {
warn!("Failed to receive initial sync command: {err:#}");
return;
}
};
let mut manager = Self::init_manager(cfg, sink, Some(rx));
manager.sync_to_point(point);
manager
}
SyncPoint::Dynamic => {
let manager = Self::init_manager(cfg, sink, Some(rx));
manager
}

If we can handle these commands while the manager is running, I don't think we need to handle them before it starts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this feedback in d9110c1.

Comment on lines 48 to 53
// Create background task to foward sync commands to NetworkManager
let mut cmd_rx = if cfg.sync_point == SyncPoint::Dynamic {
Some(Self::spawn_command_forwarder(context.clone(), &cfg.sync_command_topic).await?)
} else {
None
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a dedicated channel receiving sync commands works, but I think that these commands should be sent to NetworkManager by writing to its event queue instead.

  1. Future requirements: there are going to be more commands in the future, the consensus module will be able to tell this module to follow a different chain/peer and I suspect someday a "peer discovery" module will tell it to connect to new peers or disconnect from shady ones. So this module will have to handle several kinds of commands from outside, and that'll be less onerous if we do it by publishing them all to one queue.

  2. I don't think we need to restrict "sync commands" so that they only work when you're using this "dynamic" sync point. We can change which point we sync to at runtime, which means that we could start by syncing to the tip and switch to something else later. I don't have a concrete use case in mind for that, but in general I try not to block functionality if it's easy to support.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the module to follow this approach in d9110c1. Sync commands are now forwarded into the NetworkManager through the same event queue used for peer updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants