Skip to content

Conversation

@amnn
Copy link
Contributor

@amnn amnn commented Dec 3, 2025

Description

Factor out futures/streaming/tasks extensions from the indexer-alt codebase into their own crate. This was done to avoid a circular dependency between the metrics service and the indexing framework.

This PR also introduces the Services abstraction which allows us to combine tokio tasks into a single bundle, taking into account lifecycles, graceful shutdown, error and interrupt handling.

Test plan

CI

Stack


Release notes

Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.

For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates.

  • Protocol:
  • Nodes (Validators and Full nodes):
  • gRPC:
  • JSON-RPC:
  • GraphQL:
  • CLI:
  • Rust SDK:
  • Indexing Framework:

@amnn amnn self-assigned this Dec 3, 2025
@amnn amnn requested a review from a team as a code owner December 3, 2025 12:03
@amnn amnn temporarily deployed to sui-typescript-aws-kms-test-env December 3, 2025 12:04 — with GitHub Actions Inactive
@vercel
Copy link

vercel bot commented Dec 3, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
sui-docs Ready Ready Preview Comment Dec 3, 2025 1:19pm
2 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
multisig-toolkit Ignored Ignored Preview Dec 3, 2025 1:19pm
sui-kiosk Ignored Ignored Preview Dec 3, 2025 1:19pm

amnn added 3 commits December 3, 2025 13:14
## Description

Pull the utilities related to futures/tasks/streams from the indexing
framework and into their own crate.

## Test plan

CI
## Description

An abstraction for coordinating the lifecycle of tokio tasks, looking
after:

- Building up services by adding tasks or through composition.
- Waiting for multiple tasks to complete.
- Running secondary tasks that need to be wound down when the service
  winds down, but otherwise don't affect the lifecycle of the service
  (the service doesn't wait for these tasks to complete to shutdown).
- Graceful shutdown, with a grace period, because the service has
  wrapped up, or because of a termination request, or an error.
- Panic unwinding.
- Cascading termination (errors from multiple components, multiple
  termination requests, timeout during shutdown).
- Signal handling from the operating system.

## Test plan

New unit tests:

```
sui-futures$ cargo nextest run -- service
```
## Description

An abstraction over `JoinHandle<T>` that aborts the task when it is
dropped.

## Test plan

New unit tests:

```
sui-futures$ cargo nextest run
```
exits: Vec<BoxFuture<'static, ()>>,

/// Tasks that control the lifetime of the service in the happy path.
fsts: JoinSet<anyhow::Result<()>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do fsts and snds stand for? I don't see these mentioned anywhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"firsts" and "seconds", comes from a past functional programming life.

///
/// After shutdown signals are sent, tasks have this duration to complete gracefully before being
/// forcefully aborted.
pub const GRACE: Duration = Duration::from_secs(30);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the tasks know to try to abort without a CancellationToken? Or do we just assume that all tasks should take less than GRACE to complete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks that are running inside the tokio runtime as a task, can be aborted at any await point, so we are using that aborting behaviour instead of an explicit cancellation token.

Copy link
Contributor

@wlmyng wlmyng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

///
/// After shutdown signals are sent, tasks have this duration to complete gracefully before being
/// forcefully aborted.
pub const GRACE: Duration = Duration::from_secs(30);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not to be confused with Eve Frontier Grace Points

let (btx, brx) = oneshot::channel::<()>();

// A secondary task and a primary task.
let snd = Service::new().spawn_aborting(async move { Ok(brx.await?) });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, now that i think about it, we have methods on Service for adding primary tasks, but not secondaries, except through fn attach. Works for me


/// Runs the service, waiting for interrupt signals from the operating system to trigger
/// graceful shutdown, within the defualt grace period.
pub async fn main(self) -> Result<(), Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose when I see main and run, and the doc comment, I'm led to think that the tasks start running when the Service starts, which is not the case given fn spawn starts the task in the background immediately, and Service coordinates shutdown. One thing I was a bit stuck on is, say I add a task that fails immediately when .spawn is called.

From the perspective of Service, that failure is not observed until run()/main() or shutdown(). Kind of like a JoinHandle... I think the naming feels a bit off for me because run implies "start running" rather than "observe the running tasks until they complete, intercept around shutdown"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main is supposed to hint that you should call this in your own main function, but I'm not attached to these names -- I'll happily accept suggestions.

let _ = handle.await.unwrap();
}

#[tokio::test(start_paused = true)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the power of time travel..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants