diff --git a/Cargo.lock b/Cargo.lock index 9c337344b300e..2a244d99504b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14567,6 +14567,18 @@ dependencies = [ "sui-verifier-latest", ] +[[package]] +name = "sui-futures" +version = "1.62.0" +dependencies = [ + "anyhow", + "futures", + "tap", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "sui-genesis-builder" version = "0.0.0" @@ -14888,6 +14900,7 @@ dependencies = [ "serde", "serde_json", "sui-default-config", + "sui-futures", "sui-indexer-alt-consistent-api", "sui-indexer-alt-framework", "sui-indexer-alt-metrics", @@ -14985,6 +14998,7 @@ dependencies = [ "scoped-futures", "serde", "sui-field-count", + "sui-futures", "sui-indexer-alt-framework-store-traits", "sui-indexer-alt-metrics", "sui-pg-db", @@ -15216,7 +15230,7 @@ dependencies = [ "sui-core", "sui-data-ingestion-core", "sui-field-count", - "sui-indexer-alt-framework", + "sui-futures", "sui-indexer-alt-schema", "sui-pg-db", "sui-snapshot", @@ -16346,7 +16360,7 @@ dependencies = [ "reqwest 0.12.9", "serde", "serde_json", - "sui-indexer-alt-framework", + "sui-futures", "telemetry-subscribers", "tokio", "tokio-postgres", @@ -16563,7 +16577,7 @@ dependencies = [ "serde_json", "sui-config", "sui-core", - "sui-indexer-alt-framework", + "sui-futures", "sui-protocol-config", "sui-storage", "sui-types", diff --git a/Cargo.toml b/Cargo.toml index 5b0f0de9a223d..4ad6a879b7d09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ members = [ "crates/sui-framework", "crates/sui-framework-snapshot", "crates/sui-framework-tests", + "crates/sui-futures", "crates/sui-genesis-builder", "crates/sui-graphql-e2e-tests", "crates/sui-graphql-rpc", @@ -702,6 +703,7 @@ sui-enum-compat-util = { path = "crates/sui-enum-compat-util" } sui-faucet = { path = "crates/sui-faucet" } sui-field-count = { path = "crates/sui-field-count" } sui-field-count-derive = { path = "crates/sui-field-count-derive" } +sui-futures = { path = "crates/sui-futures" } sui-framework = { path = "crates/sui-framework" } sui-framework-snapshot = { path = "crates/sui-framework-snapshot" } sui-framework-tests = { path = "crates/sui-framework-tests" } diff --git a/crates/sui-futures/Cargo.toml b/crates/sui-futures/Cargo.toml new file mode 100644 index 0000000000000..5f015e42ab9ea --- /dev/null +++ b/crates/sui-futures/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "sui-futures" +version.workspace = true +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2024" + +[dependencies] +anyhow.workspace = true +futures.workspace = true +tap.workspace = true +thiserror.workspace = true +tokio = { workspace = true, features = ["rt", "time", "macros"] } +tracing.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["full", "test-util"] } diff --git a/crates/sui-futures/src/future.rs b/crates/sui-futures/src/future.rs new file mode 100644 index 0000000000000..4162e6f1b8a0d --- /dev/null +++ b/crates/sui-futures/src/future.rs @@ -0,0 +1,173 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{future::Future, time::Duration}; + +use tokio::time::sleep; + +/// Wraps a future with slow/stuck detection using `tokio::select!` +/// +/// This implementation races the future against a timer. If the timer expires first, the callback +/// is executed (exactly once) but the future continues to run. This approach can detect stuck +/// futures that never wake their waker. +pub async fn with_slow_future_monitor( + future: F, + threshold: Duration, + callback: C, +) -> F::Output +where + F: Future, + C: FnOnce(), +{ + // The select! macro needs to take a reference to the future, which requires it to be pinned + tokio::pin!(future); + + tokio::select! { + result = &mut future => { + // Future completed before timeout + return result; + } + _ = sleep(threshold) => { + // Timeout elapsed - fire the warning + callback(); + } + } + + // If we get here, the timeout fired but the future is still running. Continue waiting for the + // future to complete + future.await +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, + }; + + use tokio::time::{sleep, timeout}; + + use super::*; + + #[derive(Clone)] + struct Counter(Arc); + + impl Counter { + fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + fn increment(&self) { + self.0.fetch_add(1, Ordering::Relaxed); + } + + fn count(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + } + + #[tokio::test] + async fn slow_monitor_callback_called_once_when_threshold_exceeded() { + let c = Counter::new(); + + let result = with_slow_future_monitor( + async { + sleep(Duration::from_millis(200)).await; + 42 // Return a value to verify completion + }, + Duration::from_millis(100), + || c.increment(), + ) + .await; + + assert_eq!(c.count(), 1); + assert_eq!(result, 42); + } + + #[tokio::test] + async fn slow_monitor_callback_not_called_when_threshold_not_exceeded() { + let c = Counter::new(); + + let result = with_slow_future_monitor( + async { + sleep(Duration::from_millis(50)).await; + 42 // Return a value to verify completion + }, + Duration::from_millis(200), + || c.increment(), + ) + .await; + + assert_eq!(c.count(), 0); + assert_eq!(result, 42); + } + + #[tokio::test] + async fn slow_monitor_error_propagation() { + let c = Counter::new(); + + let result: Result = with_slow_future_monitor( + async { + sleep(Duration::from_millis(150)).await; + Err("Something went wrong") + }, + Duration::from_millis(100), + || c.increment(), + ) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Something went wrong"); + assert_eq!(c.count(), 1); + } + + #[tokio::test] + async fn slow_monitor_error_propagation_without_callback() { + let c = Counter::new(); + + let result: Result = with_slow_future_monitor( + async { + sleep(Duration::from_millis(50)).await; + Err("Quick error") + }, + Duration::from_millis(200), + || c.increment(), + ) + .await; + + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "Quick error"); + assert_eq!(c.count(), 0); + } + + #[tokio::test] + async fn slow_monitor_stuck_future_detection() { + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; + + // A future that returns Pending but never wakes the waker + struct StuckFuture; + impl Future for StuckFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + Poll::Pending + } + } + + let c = Counter::new(); + + // Even though StuckFuture never wakes, our monitor will detect it! + let monitored = + with_slow_future_monitor(StuckFuture, Duration::from_millis(200), || c.increment()); + + // Use a timeout to prevent the test from hanging + timeout(Duration::from_secs(2), monitored) + .await + .unwrap_err(); + assert_eq!(c.count(), 1); + } +} diff --git a/crates/sui-futures/src/lib.rs b/crates/sui-futures/src/lib.rs new file mode 100644 index 0000000000000..03e640f9f6240 --- /dev/null +++ b/crates/sui-futures/src/lib.rs @@ -0,0 +1,7 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +pub mod future; +pub mod service; +pub mod stream; +pub mod task; diff --git a/crates/sui-futures/src/service.rs b/crates/sui-futures/src/service.rs new file mode 100644 index 0000000000000..dadc77a7efe6a --- /dev/null +++ b/crates/sui-futures/src/service.rs @@ -0,0 +1,728 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::fmt; +use std::panic; +use std::time::Duration; + +use futures::future; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use tap::TapFallible; +use tokio::signal; +use tokio::task::JoinSet; +use tokio::time::timeout; +use tracing::error; +use tracing::info; + +/// Default grace period for shutdown. +/// +/// After shutdown signals are sent, tasks have this duration to complete gracefully before being +/// forcefully aborted. +pub const GRACE: Duration = Duration::from_secs(30); + +/// A collection of related tasks that succeed or fail together, consisting of: +/// +/// - A set of primary tasks, which control the lifetime of the service in the happy path. When all +/// primary tasks complete successfully or have been cancelled, the service will start a graceful +/// shutdown. +/// +/// - A set of secondary tasks, which run alongside the primary tasks, but do not extend the +/// service's lifetime (The service will not wait for all secondary tasks to complete or be +/// cancelled before triggering a shutdown). +/// +/// - A set of exit signals, which are executed when the service wants to trigger graceful +/// shutdown. +/// +/// Any task (primary or secondary) failing by returning an error, or panicking, will also trigger +/// a graceful shutdown. +/// +/// If graceful shutdown takes longer than the grace period, or another task fails during shutdown, +/// all remaining tasks are aborted and dropped immediately. Tasks are expected to clean-up after +/// themselves when dropped (e.g. if a task has spawned its own sub-tasks, these should also be +/// aborted when the parent task is dropped). +#[must_use = "Dropping the service aborts all its tasks immediately"] +#[derive(Default)] +pub struct Service { + /// Futures that are run when the service is instructed to shutdown gracefully. + exits: Vec>, + + /// Tasks that control the lifetime of the service in the happy path. + fsts: JoinSet>, + + /// Tasks that run alongside the primary tasks, but do not extend the service's lifetime. + snds: JoinSet>, +} + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Service has been terminated gracefully")] + Terminated, + + #[error("Service has been aborted due to ungraceful shutdown")] + Aborted, + + #[error(transparent)] + Task(anyhow::Error), +} + +impl Service { + /// Create a new, empty service. + pub fn new() -> Self { + Self::default() + } + + /// Add a primary task. + /// + /// The task will start running in the background immediately, once added. It is expected to + /// clean up after itself when it is dropped, which will happen when it is aborted + /// (non-graceful shutdown). + pub fn spawn( + mut self, + task: impl Future> + Send + 'static, + ) -> Self { + self.fsts.spawn(task); + self + } + + /// Add a primary task that aborts immediately on graceful shutdown. + /// + /// This is useful for tasks that don't need a graceful shutdown. + pub fn spawn_aborting( + mut self, + task: impl Future> + Send + 'static, + ) -> Self { + let h = self.fsts.spawn(task); + self.with_shutdown_signal(async move { h.abort() }) + } + + /// Add a shutdown signal. + /// + /// This future will be executed when the service is instructed to shutdown gracefully, before + /// a grace period timer starts (in which the task receiving the shutdown signal is expected to + /// wind down and exit cleanly). + /// + /// Evaluation order of shutdown signals is non-determinate. + pub fn with_shutdown_signal(mut self, exit: impl Future + Send + 'static) -> Self { + self.exits.push(exit.boxed()); + self + } + + /// Add the tasks and signals from `other` into `self`. + pub fn merge(mut self, mut other: Service) -> Self { + self.exits.extend(other.exits); + + if !other.fsts.is_empty() { + self.fsts.spawn(async move { run(&mut other.fsts).await }); + } + + if !other.snds.is_empty() { + self.snds.spawn(async move { run(&mut other.snds).await }); + } + + self + } + + /// Attach `other` to `self` as a secondary service. + /// + /// All its tasks (primary and secondary) will be treated as secondary tasks of `self`. + pub fn attach(mut self, mut other: Service) -> Self { + self.exits.extend(other.exits); + + if !other.fsts.is_empty() { + self.snds.spawn(async move { run(&mut other.fsts).await }); + } + + if !other.snds.is_empty() { + self.snds.spawn(async move { run(&mut other.snds).await }); + } + + self + } + + /// Runs the service, waiting for interrupt signals from the operating system to trigger + /// graceful shutdown, within the defualt grace period. + pub async fn main(self) -> Result<(), Error> { + self.run(GRACE, terminate).await + } + + /// Runs the service, waiting for interrupt signals from the operating system to trigger + /// graceful shutdown, within a `grace` period. + pub async fn main_with_grace_period(self, grace: Duration) -> Result<(), Error> { + self.run(grace, terminate).await + } + + /// Run the service, waiting for the provided `terminate` future to complete to trigger + /// graceful shutdown, within a `grace` period. + async fn run>( + mut self, + grace: Duration, + mut terminate: impl FnMut() -> T, + ) -> Result<(), Error> { + let exec = tokio::select! { + res = self.join() => { + res.map_err(Error::Task) + } + + _ = terminate() => { + info!("Termination received"); + Err(Error::Terminated) + } + }; + + info!("Shutting down gracefully..."); + tokio::select! { + res = timeout(grace, self.shutdown()) => { + match res { + Ok(Ok(())) => {}, + Ok(Err(_)) => return Err(Error::Aborted), + Err(_) => { + error!("Grace period elapsed, aborting..."); + return Err(Error::Aborted); + } + } + } + + _ = terminate() => { + error!("Termination received during shutdown, aborting..."); + return Err(Error::Aborted); + }, + } + + exec + } + + /// Wait until all primary tasks in the service either complete successfully or are cancelled, + /// or one task fails. + /// + /// This operation does not consume the service, so that it can be interacted with further in + /// case of an error. + pub async fn join(&mut self) -> anyhow::Result<()> { + tokio::select! { + res = run(&mut self.fsts) => { + res.tap_err(|e| error!("Primary task failure: {e:#}")) + }, + + res = run_secondary(&mut self.snds) => { + res.tap_err(|e| error!("Secondary task failure: {e:#}")) + } + } + } + + /// Trigger a graceful shutdown of the service. + /// + /// Returns with an error if any of the constituent tasks produced an error during shutdown, + /// otherwise waits for all tasks (primary and secondy) to complete successfully. + pub async fn shutdown(mut self) -> Result<(), Error> { + let _ = future::join_all(self.exits).await; + if let Err(e) = future::try_join(run(&mut self.fsts), run(&mut self.snds)).await { + error!("Task failure during shutdown: {e:#}"); + return Err(Error::Task(e)); + } + + Ok(()) + } +} + +// SAFETY: `Service` is not `Send` by default because `self.exits` is not `Sync`, but it is only +// ever accessed through exclusive references (`&mut self` or `self`), so it cannot be accessed +// through multiple threads simultaneously. +unsafe impl Sync for Service {} + +impl fmt::Debug for Service { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Service") + .field("exits", &self.exits.len()) + .field("fsts", &self.fsts) + .field("snds", &self.snds) + .finish() + } +} + +/// Wait until all tasks in `tasks` complete successfully or is cancelled, or any individual task +/// fails or panics. +async fn run(tasks: &mut JoinSet>) -> anyhow::Result<()> { + while let Some(res) = tasks.join_next().await { + match res { + Ok(Ok(())) => continue, + Ok(Err(e)) => return Err(e), + + Err(e) => { + if e.is_panic() { + panic::resume_unwind(e.into_panic()); + } + } + } + } + + Ok(()) +} + +/// Like `run` but never completes successfully (only propagates errors). +/// +/// If the secondary tasks do all complete successfully, this future holds off indefinitely, to +/// give the primary tasks a chance to complete. +async fn run_secondary(tasks: &mut JoinSet>) -> anyhow::Result<()> { + run(tasks).await?; + std::future::pending().await +} + +/// Waits for various termination signals from the operating system. +/// +/// On unix systems, this waits for either `SIGINT` or `SIGTERM`, while on other systems it will +/// only wait for `SIGINT`. +pub async fn terminate() { + tokio::select! { + _ = signal::ctrl_c() => {}, + + _ = async { + #[cfg(unix)] + let _ = signal::unix::signal(signal::unix::SignalKind::terminate()).unwrap().recv().await; + + #[cfg(not(unix))] + future::pending::<()>().await; + } => {} + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use anyhow::bail; + use tokio::sync::Notify; + use tokio::sync::oneshot; + + use super::*; + + #[tokio::test] + async fn test_empty() { + // The empty service should exit immediately. + Service::new() + .run(GRACE, std::future::pending) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_empty_attach_merge() { + // Attaching and merging empty services should work without issue. + Service::new() + .attach(Service::new()) + .merge(Service::new()) + .run(GRACE, std::future::pending) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_completion() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + + let svc = Service::new().spawn(async move { + let _brx = brx; + Ok(arx.await?) + }); + + // The task has not finished yet (dropping the receiver) + assert!(!btx.is_closed()); + + // Sending the signal allows the task to complete successfully, which allows the service to + // exit, and at that point, the second channel should be closed too. + atx.send(()).unwrap(); + svc.run(GRACE, std::future::pending).await.unwrap(); + assert!(btx.is_closed()); + } + + #[tokio::test] + async fn test_failure() { + let svc = Service::new().spawn(async move { bail!("boom") }); + let res = svc.run(GRACE, std::future::pending).await; + assert!(matches!(res, Err(Error::Task(_)))); + } + + #[tokio::test] + #[should_panic] + async fn test_panic() { + let svc = Service::new().spawn(async move { panic!("boom") }); + let _ = svc.run(GRACE, std::future::pending).await; + } + + #[tokio::test] + async fn test_graceful_shutdown() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + let svc = Service::new() + .with_shutdown_signal(async move { atx.send(()).unwrap() }) + .spawn(async move { + arx.await?; + btx.send(()).unwrap(); + Ok(()) + }); + + // The service is now running in the background. + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Send the shutdown signal, and confirm the task went through its graceful shutdwon + // process. + stx.notify_one(); + brx.await.unwrap(); + + // The service should exit gracefully now, dropping the receiver it was holding. + let res = handle.await.unwrap(); + assert!(matches!(res, Err(Error::Terminated))); + } + + #[tokio::test] + async fn test_multiple_tasks() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + let (ctx, crx) = oneshot::channel::<()>(); + + // Three different tasks each waiting on a oneshot channel. We should be able to unblock + // each of them before the service exits. + let svc = Service::new() + .spawn(async move { Ok(arx.await?) }) + .spawn(async move { Ok(brx.await?) }) + .spawn(async move { Ok(crx.await?) }); + + let handle = tokio::spawn(svc.run(GRACE, std::future::pending)); + + atx.send(()).unwrap(); + tokio::task::yield_now().await; + + btx.send(()).unwrap(); + tokio::task::yield_now().await; + + ctx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn test_multiple_task_failure() { + let (atx, arx) = oneshot::channel::<()>(); + + // The task waiting on the channel (that aborts on shutdown) will never get to finish because + // the other task errors out. + let svc = Service::new() + .spawn_aborting(async move { Ok(arx.await?) }) + .spawn(async move { bail!("boom") }); + + let handle = tokio::spawn(svc.run(GRACE, std::future::pending)); + let res = handle.await.unwrap(); + + // The service exits with a task error because one of the tasks failed, and this also + // means the other task is aborted before it can complete successfully. + assert!(matches!(res, Err(Error::Task(_)))); + assert!(atx.is_closed()); + } + + #[tokio::test] + async fn test_secondary_stuck() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + + // A secondary task and a primary task. + let snd = Service::new().spawn_aborting(async move { Ok(brx.await?) }); + let svc = Service::new() + .spawn(async move { Ok(arx.await?) }) + .attach(snd); + + let handle = tokio::spawn(svc.run(GRACE, std::future::pending)); + + // Complete the primary task, and the service as a whole should wrap up. + atx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + assert!(btx.is_closed()); + } + + #[tokio::test] + async fn test_secondary_complete() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + let (mut ctx, crx) = oneshot::channel::<()>(); + + // A secondary task and a primary task. + let snd = Service::new().spawn(async move { + let _crx = crx; + brx.await?; + Ok(()) + }); + + let svc = Service::new() + .spawn(async move { Ok(arx.await?) }) + .attach(snd); + + let handle = tokio::spawn(svc.run(GRACE, std::future::pending)); + + // Complete the secondary task, and wait for it to complete (dropping the other channel). + btx.send(()).unwrap(); + ctx.closed().await; + tokio::task::yield_now().await; + + // The primary task will not have been cleaned up, so we can send to it, completing that + // task, and allowing the service as a whole to complete. + atx.send(()).unwrap(); + handle.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn test_secondary_failure() { + let (atx, arx) = oneshot::channel::<()>(); + + // A secondary task that fails, with a primary task. + let snd = Service::new().spawn(async move { bail!("boom") }); + let svc = Service::new() + .spawn_aborting(async move { Ok(arx.await?) }) + .attach(snd); + + // Run the service -- it should fail immediately because the secondary task failed, + // cleaning up the primary task. + let res = svc.run(GRACE, std::future::pending).await; + assert!(matches!(res, Err(Error::Task(_)))); + assert!(atx.is_closed()); + } + + #[tokio::test] + #[should_panic] + async fn test_secondary_panic() { + let (_atx, arx) = oneshot::channel::<()>(); + + // A secondary task that fails, with a primary task. + let snd = Service::new().spawn(async move { panic!("boom") }); + let svc = Service::new() + .spawn_aborting(async move { Ok(arx.await?) }) + .attach(snd); + + // When the service runs, the panic from the secondary task will be propagated. + let _ = svc.run(GRACE, std::future::pending).await; + } + + #[tokio::test] + async fn test_secondary_graceful_shutdown() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + let (ctx, crx) = oneshot::channel::<()>(); + + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + // A secondary task with a shutdown signal. + let snd = Service::new() + .with_shutdown_signal(async move { atx.send(()).unwrap() }) + .spawn(async move { + let _crx = crx; + Ok(arx.await?) + }); + + // A primary task which aborts when signalled to shutdown. + let svc = Service::new() + .spawn_aborting(async move { Ok(brx.await?) }) + .attach(snd); + + // The service is now running in the background. + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Confirm that each task is still waiting on their respective channels. + assert!(!btx.is_closed()); + assert!(!ctx.is_closed()); + + // Send the shutdown signal - both tasks should be unblocked and complete gracefully. + stx.notify_one(); + let res = handle.await.unwrap(); + assert!(matches!(res, Err(Error::Terminated))); + assert!(btx.is_closed()); + assert!(ctx.is_closed()); + } + + #[tokio::test] + async fn test_merge() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + let (ctx, crx) = oneshot::channel::<()>(); + let (dtx, drx) = oneshot::channel::<()>(); + let (etx, erx) = oneshot::channel::<()>(); + let (ftx, frx) = oneshot::channel::<()>(); + + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + // Set-up two services -- each with a task that can be shutdown, and a task that's paused, + // which will send a message once unpaused. + let a = Service::new() + .spawn(async move { Ok(arx.await?) }) + .with_shutdown_signal(async move { ctx.send(()).unwrap() }) + .spawn(async move { + crx.await?; + dtx.send(()).unwrap(); + Ok(()) + }); + + let b = Service::new() + .spawn(async move { Ok(brx.await?) }) + .with_shutdown_signal(async move { etx.send(()).unwrap() }) + .spawn(async move { + erx.await?; + ftx.send(()).unwrap(); + Ok(()) + }); + + // Merge them into a larger service and run it. + let svc = Service::new().merge(a).merge(b); + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Unblock the paused tasks, so they terminate. + atx.send(()).unwrap(); + tokio::task::yield_now().await; + + btx.send(()).unwrap(); + tokio::task::yield_now().await; + + // Send the shutdown signal - triggering graceful shutdown on the remaining tasks -- + // confirm that those tasks actually go through the graceful shutdown process. + stx.notify_one(); + drx.await.unwrap(); + frx.await.unwrap(); + + // ...and then the service shuts down. + let res = handle.await.unwrap(); + assert!(matches!(res, Err(Error::Terminated))); + } + + #[tokio::test] + async fn test_drop_abort() { + let (mut atx, arx) = oneshot::channel::<()>(); + let (mut btx, brx) = oneshot::channel::<()>(); + + let svc = Service::new() + .spawn(async move { Ok(arx.await?) }) + .spawn_aborting(async move { Ok(brx.await?) }); + + assert!(!atx.is_closed()); + assert!(!btx.is_closed()); + + // Once the service is dropped, its tasks will also be dropped, and the receivers will be + // dropped, closing the senders. + drop(svc); + atx.closed().await; + btx.closed().await; + } + + #[tokio::test] + async fn test_shutdown() { + let (atx, arx) = oneshot::channel::<()>(); + let (btx, brx) = oneshot::channel::<()>(); + + let svc = Service::new() + .with_shutdown_signal(async move { atx.send(()).unwrap() }) + .spawn(async move { Ok(arx.await?) }) + .spawn_aborting(async move { Ok(brx.await?) }); + + // We don't need to call `Service::run` to kick off the service's tasks -- they are now + // running in the background. We can call `shutdown` to trigger a graceful shutdown, which + // should exit cleanly and clean up any unused resources. + svc.shutdown().await.unwrap(); + assert!(btx.is_closed()); + } + + #[tokio::test] + async fn test_error_cascade() { + let (atx, arx) = oneshot::channel::<()>(); + + // The first task errors immediately, and the second task errors during graceful shutdown. + let svc = Service::new() + .spawn(async move { bail!("boom") }) + .with_shutdown_signal(async move { atx.send(()).unwrap() }) + .spawn(async move { + arx.await?; + bail!("boom, again") + }); + + // The service will exit with an abort. + let res = svc.run(GRACE, std::future::pending).await; + assert!(matches!(res, Err(Error::Aborted))); + } + + #[tokio::test] + async fn test_multiple_errors() { + // Both tasks produce an error, one will be picked up during normal execution, and the + // other will be picked up during shutdown, resulting in an ungraceful shutdown (abort). + let svc = Service::new() + .spawn(async move { bail!("boom") }) + .spawn(async move { bail!("boom, again") }); + + // The service will exit with an abort. + let res = svc.run(GRACE, std::future::pending).await; + assert!(matches!(res, Err(Error::Aborted))); + } + + #[tokio::test] + async fn test_termination_cascade() { + // A service with a task that ignores graceful shutdown. + let svc = Service::new().spawn(std::future::pending()); + + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + // The service is now running in the background. + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Trigger the first termination, which the task will ignore. + stx.notify_one(); + tokio::task::yield_now().await; + + // Trigger the second termination, the service takes over. + stx.notify_one(); + tokio::task::yield_now().await; + + let res = handle.await.unwrap(); + assert!(matches!(res, Err(Error::Aborted))); + } + + #[tokio::test] + #[should_panic] + async fn test_panic_during_shutdown() { + let (atx, arx) = oneshot::channel::<()>(); + + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + let svc = Service::new() + .with_shutdown_signal(async move { atx.send(()).unwrap() }) + .spawn(async move { + arx.await?; + panic!("boom") + }); + + // The service is now running in the background. + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Send the shutdown signal, the panic gets resumed when the service is awaited. + stx.notify_one(); + let _ = handle.await.unwrap(); + } + + #[tokio::test(start_paused = true)] + async fn test_graceful_shutdown_timeout() { + let srx = Arc::new(Notify::new()); + let stx = srx.clone(); + + // A service with a task that ignores graceful shutdown. + let svc = Service::new().spawn(std::future::pending()); + + let handle = tokio::spawn(svc.run(GRACE, move || srx.clone().notified_owned())); + + // Trigger cancellation and then let twice the grace period pass, which should be treated + // as an abort. + stx.notify_one(); + tokio::time::advance(GRACE * 2).await; + + let res = handle.await.unwrap(); + assert!(matches!(res, Err(Error::Aborted))); + } +} diff --git a/crates/sui-indexer-alt-framework/src/task.rs b/crates/sui-futures/src/stream.rs similarity index 67% rename from crates/sui-indexer-alt-framework/src/task.rs rename to crates/sui-futures/src/stream.rs index 37594bb30a2dd..d92ab044b7205 100644 --- a/crates/sui-indexer-alt-framework/src/task.rs +++ b/crates/sui-futures/src/stream.rs @@ -1,10 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{future::Future, panic, pin::pin, time::Duration}; +use std::{future::Future, panic, pin::pin}; use futures::stream::{Stream, StreamExt}; -use tokio::{task::JoinSet, time::sleep}; +use tokio::task::JoinSet; /// Extension trait introducing `try_for_each_spawned` to all streams. pub trait TrySpawnStreamExt: Stream { @@ -35,6 +35,13 @@ pub trait TrySpawnStreamExt: Stream { E: Send + 'static; } +/// Wrapper type for errors to allow the body of a `try_for_each_spawned` call to signal that it +/// either wants to return early (`Break`) out of the loop, or propagate an error (`Err(E)`). +pub enum Break { + Break, + Err(E), +} + impl TrySpawnStreamExt for S { async fn try_for_each_spawned( self, @@ -118,37 +125,10 @@ impl TrySpawnStreamExt for S { } } -/// Wraps a future with slow/stuck detection using `tokio::select!` -/// -/// This implementation races the future against a timer. If the timer expires first, the callback -/// is executed (exactly once) but the future continues to run. This approach can detect stuck -/// futures that never wake their waker. -pub async fn with_slow_future_monitor( - future: F, - threshold: Duration, - callback: C, -) -> F::Output -where - F: Future, - C: FnOnce(), -{ - // The select! macro needs to take a reference to the future, which requires it to be pinned - tokio::pin!(future); - - tokio::select! { - result = &mut future => { - // Future completed before timeout - return result; - } - _ = sleep(threshold) => { - // Timeout elapsed - fire the warning - callback(); - } +impl From for Break { + fn from(e: E) -> Self { + Break::Err(e) } - - // If we get here, the timeout fired but the future is still running. Continue waiting for the - // future to complete - future.await } #[cfg(test)] @@ -162,27 +142,9 @@ mod tests { }; use futures::stream; - use tokio::time::timeout; use super::*; - #[derive(Clone)] - struct Counter(Arc); - - impl Counter { - fn new() -> Self { - Self(Arc::new(AtomicUsize::new(0))) - } - - fn increment(&self) { - self.0.fetch_add(1, Ordering::Relaxed); - } - - fn count(&self) -> usize { - self.0.load(Ordering::Relaxed) - } - } - #[tokio::test] async fn for_each_explicit_sequential_iteration() { let actual = Arc::new(Mutex::new(vec![])); @@ -328,106 +290,4 @@ mod tests { }) .await; } - - #[tokio::test] - async fn slow_monitor_callback_called_once_when_threshold_exceeded() { - let c = Counter::new(); - - let result = with_slow_future_monitor( - async { - sleep(Duration::from_millis(200)).await; - 42 // Return a value to verify completion - }, - Duration::from_millis(100), - || c.increment(), - ) - .await; - - assert_eq!(c.count(), 1); - assert_eq!(result, 42); - } - - #[tokio::test] - async fn slow_monitor_callback_not_called_when_threshold_not_exceeded() { - let c = Counter::new(); - - let result = with_slow_future_monitor( - async { - sleep(Duration::from_millis(50)).await; - 42 // Return a value to verify completion - }, - Duration::from_millis(200), - || c.increment(), - ) - .await; - - assert_eq!(c.count(), 0); - assert_eq!(result, 42); - } - - #[tokio::test] - async fn slow_monitor_error_propagation() { - let c = Counter::new(); - - let result: Result = with_slow_future_monitor( - async { - sleep(Duration::from_millis(150)).await; - Err("Something went wrong") - }, - Duration::from_millis(100), - || c.increment(), - ) - .await; - - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), "Something went wrong"); - assert_eq!(c.count(), 1); - } - - #[tokio::test] - async fn slow_monitor_error_propagation_without_callback() { - let c = Counter::new(); - - let result: Result = with_slow_future_monitor( - async { - sleep(Duration::from_millis(50)).await; - Err("Quick error") - }, - Duration::from_millis(200), - || c.increment(), - ) - .await; - - assert!(result.is_err()); - assert_eq!(result.unwrap_err(), "Quick error"); - assert_eq!(c.count(), 0); - } - - #[tokio::test] - async fn slow_monitor_stuck_future_detection() { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - // A future that returns Pending but never wakes the waker - struct StuckFuture; - impl Future for StuckFuture { - type Output = (); - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - Poll::Pending - } - } - - let c = Counter::new(); - - // Even though StuckFuture never wakes, our monitor will detect it! - let monitored = - with_slow_future_monitor(StuckFuture, Duration::from_millis(200), || c.increment()); - - // Use a timeout to prevent the test from hanging - timeout(Duration::from_secs(2), monitored) - .await - .unwrap_err(); - assert_eq!(c.count(), 1); - } } diff --git a/crates/sui-futures/src/task.rs b/crates/sui-futures/src/task.rs new file mode 100644 index 0000000000000..6489a25cea25d --- /dev/null +++ b/crates/sui-futures/src/task.rs @@ -0,0 +1,67 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::task::{JoinError, JoinHandle}; + +/// A wrapper around `JoinHandle` that aborts the task when dropped. +/// +/// The abort on drop does not wait for the task to finish, it simply sends the abort signal. +#[must_use = "Dropping the handle aborts the task immediately"] +#[derive(Debug)] +pub struct TaskGuard(JoinHandle); + +impl TaskGuard { + pub fn new(handle: JoinHandle) -> Self { + Self(handle) + } +} + +impl Future for TaskGuard { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } +} + +impl AsRef> for TaskGuard { + fn as_ref(&self) -> &JoinHandle { + &self.0 + } +} + +impl Drop for TaskGuard { + fn drop(&mut self) { + self.0.abort(); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::sync::oneshot; + + use super::*; + + #[tokio::test] + async fn test_abort_on_drop() { + let (mut tx, rx) = oneshot::channel::<()>(); + + let guard = TaskGuard::new(tokio::spawn(async move { + let _ = rx.await; + })); + + // When the guard is dropped, the task should be aborted, cleaning up its future, which + // will close the receiving side of the channel. + drop(guard); + tokio::time::timeout(Duration::from_millis(100), tx.closed()) + .await + .unwrap(); + } +} diff --git a/crates/sui-indexer-alt-consistent-store/Cargo.toml b/crates/sui-indexer-alt-consistent-store/Cargo.toml index cb6a4b712abbd..abab408b90607 100644 --- a/crates/sui-indexer-alt-consistent-store/Cargo.toml +++ b/crates/sui-indexer-alt-consistent-store/Cargo.toml @@ -49,6 +49,7 @@ move-core-types.workspace = true bin-version.workspace = true mysten-network.workspace = true sui-default-config.workspace = true +sui-futures.workspace = true sui-indexer-alt-consistent-api.workspace = true sui-indexer-alt-framework = { workspace = true, default-features = false } sui-indexer-alt-metrics.workspace = true diff --git a/crates/sui-indexer-alt-consistent-store/src/restore/broadcaster.rs b/crates/sui-indexer-alt-consistent-store/src/restore/broadcaster.rs index 469074c71ab04..05f379f9e10f6 100644 --- a/crates/sui-indexer-alt-consistent-store/src/restore/broadcaster.rs +++ b/crates/sui-indexer-alt-consistent-store/src/restore/broadcaster.rs @@ -13,7 +13,8 @@ use std::{ use anyhow::Context as _; use backoff::{Error as BE, ExponentialBackoff}; use futures::{future::try_join_all, stream}; -use sui_indexer_alt_framework::task::{TrySpawnStreamExt, with_slow_future_monitor}; +use sui_futures::future::with_slow_future_monitor; +use sui_futures::stream::TrySpawnStreamExt; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; diff --git a/crates/sui-indexer-alt-consistent-store/src/restore/worker.rs b/crates/sui-indexer-alt-consistent-store/src/restore/worker.rs index a5cb7efb302d1..cb79ffc963e9f 100644 --- a/crates/sui-indexer-alt-consistent-store/src/restore/worker.rs +++ b/crates/sui-indexer-alt-consistent-store/src/restore/worker.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::Context as _; -use sui_indexer_alt_framework::task::TrySpawnStreamExt as _; +use sui_futures::stream::TrySpawnStreamExt as _; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; diff --git a/crates/sui-indexer-alt-consistent-store/src/store/synchronizer.rs b/crates/sui-indexer-alt-consistent-store/src/store/synchronizer.rs index 0a9f6e87234c6..e48ceaef7bff7 100644 --- a/crates/sui-indexer-alt-consistent-store/src/store/synchronizer.rs +++ b/crates/sui-indexer-alt-consistent-store/src/store/synchronizer.rs @@ -5,7 +5,7 @@ use std::{cmp::Ordering, collections::HashMap, sync::Arc, time::Duration}; use anyhow::{Context, bail}; use futures::future; -use sui_indexer_alt_framework::task::with_slow_future_monitor; +use sui_futures::future::with_slow_future_monitor; use tokio::{ sync::{Barrier, mpsc}, task::JoinHandle, diff --git a/crates/sui-indexer-alt-framework/Cargo.toml b/crates/sui-indexer-alt-framework/Cargo.toml index c02271f554e83..c1d4f93c36c3f 100644 --- a/crates/sui-indexer-alt-framework/Cargo.toml +++ b/crates/sui-indexer-alt-framework/Cargo.toml @@ -34,6 +34,7 @@ tracing-subscriber = { workspace = true, optional = true } url.workspace = true sui-field-count.workspace = true +sui-futures.workspace = true sui-indexer-alt-framework-store-traits.workspace = true sui-indexer-alt-metrics.workspace = true sui-rpc.workspace = true diff --git a/crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs b/crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs index cd0a71fa39f83..cd1afa2e5f037 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/broadcaster.rs @@ -4,6 +4,7 @@ use std::{collections::HashMap, pin::Pin, sync::Arc, time::Duration}; use futures::{Stream, StreamExt, future::try_join_all, stream}; +use sui_futures::stream::TrySpawnStreamExt; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, @@ -11,14 +12,14 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; -use super::{IngestionConfig, ingestion_client::IngestionClient}; use crate::{ ingestion::{error::Error, streaming_client::CheckpointStreamingClient}, metrics::IngestionMetrics, - task::TrySpawnStreamExt, types::full_checkpoint_content::Checkpoint, }; +use super::{IngestionConfig, ingestion_client::IngestionClient}; + /// Broadcaster task that manages checkpoint flow and spawns broadcast tasks for ranges /// via either streaming or ingesting, or both. /// diff --git a/crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs b/crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs index 391dd76b14ba1..86f2a51dea7f4 100644 --- a/crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs +++ b/crates/sui-indexer-alt-framework/src/ingestion/ingestion_client.rs @@ -5,9 +5,11 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use backoff::Error as BE; use backoff::ExponentialBackoff; use backoff::backoff::Constant; +use sui_futures::future::with_slow_future_monitor; use sui_rpc::Client; use sui_rpc::client::HeadersInterceptor; use sui_storage::blob::Blob; @@ -22,9 +24,7 @@ use crate::ingestion::local_client::LocalIngestionClient; use crate::ingestion::remote_client::RemoteIngestionClient; use crate::metrics::CheckpointLagMetricReporter; use crate::metrics::IngestionMetrics; -use crate::task::with_slow_future_monitor; use crate::types::full_checkpoint_content::{Checkpoint, CheckpointData}; -use async_trait::async_trait; /// Wait at most this long between retries for transient errors. const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60); diff --git a/crates/sui-indexer-alt-framework/src/lib.rs b/crates/sui-indexer-alt-framework/src/lib.rs index 2a184930f1c57..859fe5c0005c8 100644 --- a/crates/sui-indexer-alt-framework/src/lib.rs +++ b/crates/sui-indexer-alt-framework/src/lib.rs @@ -35,7 +35,6 @@ pub mod metrics; pub mod pipeline; #[cfg(feature = "postgres")] pub mod postgres; -pub mod task; #[cfg(test)] pub mod mocks; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs index e179496c45d4a..a191526ed9dbe 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs @@ -4,6 +4,7 @@ use std::{sync::Arc, time::Duration}; use backoff::ExponentialBackoff; +use sui_futures::stream::TrySpawnStreamExt; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; @@ -13,7 +14,6 @@ use crate::{ metrics::{CheckpointLagMetricReporter, IndexerMetrics}, pipeline::{Break, CommitterConfig, WatermarkPart}, store::Store, - task::TrySpawnStreamExt, }; use super::{BatchedRows, Handler}; diff --git a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs index 6554d4a7f3e47..edc420028e283 100644 --- a/crates/sui-indexer-alt-framework/src/pipeline/processor.rs +++ b/crates/sui-indexer-alt-framework/src/pipeline/processor.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use std::time::Duration; use backoff::ExponentialBackoff; +use sui_futures::stream::TrySpawnStreamExt; use sui_types::full_checkpoint_content::Checkpoint; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; @@ -14,7 +15,6 @@ use tracing::{debug, error, info}; use crate::{ metrics::{CheckpointLagMetricReporter, IndexerMetrics}, pipeline::Break, - task::TrySpawnStreamExt, }; use super::IndexedCheckpoint; diff --git a/crates/sui-indexer-alt-restorer/Cargo.toml b/crates/sui-indexer-alt-restorer/Cargo.toml index 927570759b2da..ca3ce4528c4bc 100644 --- a/crates/sui-indexer-alt-restorer/Cargo.toml +++ b/crates/sui-indexer-alt-restorer/Cargo.toml @@ -23,8 +23,8 @@ sui-config.workspace = true sui-core.workspace = true sui-data-ingestion-core.workspace = true sui-field-count.workspace = true +sui-futures.workspace = true sui-indexer-alt-schema.workspace = true -sui-indexer-alt-framework.workspace = true sui-pg-db.workspace = true sui-snapshot.workspace = true sui-storage.workspace = true diff --git a/crates/sui-indexer-alt-restorer/src/snapshot.rs b/crates/sui-indexer-alt-restorer/src/snapshot.rs index b39393fcaadc1..c836ce4862a3a 100644 --- a/crates/sui-indexer-alt-restorer/src/snapshot.rs +++ b/crates/sui-indexer-alt-restorer/src/snapshot.rs @@ -16,7 +16,7 @@ use tracing::{debug, info}; use sui_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType}; use sui_core::authority::authority_store_tables::LiveObject; use sui_field_count::FieldCount; -use sui_indexer_alt_framework::task::TrySpawnStreamExt; +use sui_futures::stream::TrySpawnStreamExt; use sui_indexer_alt_schema::objects::StoredObjInfo; use sui_indexer_alt_schema::schema::obj_info; use sui_pg_db::Db; diff --git a/crates/sui-rpc-benchmark/Cargo.toml b/crates/sui-rpc-benchmark/Cargo.toml index 7c70ac3901882..a4c2c483cb99b 100644 --- a/crates/sui-rpc-benchmark/Cargo.toml +++ b/crates/sui-rpc-benchmark/Cargo.toml @@ -18,7 +18,7 @@ rand.workspace = true reqwest.workspace = true serde.workspace = true serde_json.workspace = true -sui-indexer-alt-framework.workspace = true +sui-futures.workspace = true telemetry-subscribers.workspace = true tracing.workspace = true tokio = { workspace = true, features = ["full"] } diff --git a/crates/sui-rpc-benchmark/src/direct/query_enricher.rs b/crates/sui-rpc-benchmark/src/direct/query_enricher.rs index 1ca22d819e468..d571a6f7665e0 100644 --- a/crates/sui-rpc-benchmark/src/direct/query_enricher.rs +++ b/crates/sui-rpc-benchmark/src/direct/query_enricher.rs @@ -10,7 +10,7 @@ use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use parking_lot::Mutex; use std::sync::Arc; -use sui_indexer_alt_framework::task::TrySpawnStreamExt; +use sui_futures::stream::TrySpawnStreamExt; use tokio_postgres::{NoTls, Row, types::Type}; use tracing::warn; use url::Url; diff --git a/crates/sui-rpc-benchmark/src/direct/query_executor.rs b/crates/sui-rpc-benchmark/src/direct/query_executor.rs index 1150f3549b4f8..cdb2944f5531a 100644 --- a/crates/sui-rpc-benchmark/src/direct/query_executor.rs +++ b/crates/sui-rpc-benchmark/src/direct/query_executor.rs @@ -11,7 +11,7 @@ use bb8::Pool; use bb8_postgres::PostgresConnectionManager; use rand::SeedableRng; use rand::seq::SliceRandom; -use sui_indexer_alt_framework::task::TrySpawnStreamExt; +use sui_futures::stream::TrySpawnStreamExt; use tokio_postgres::{NoTls, types::ToSql}; use tracing::info; use url::Url; diff --git a/crates/sui-rpc-benchmark/src/json_rpc/runner.rs b/crates/sui-rpc-benchmark/src/json_rpc/runner.rs index 32cadb55985b4..0c514eb72fa18 100644 --- a/crates/sui-rpc-benchmark/src/json_rpc/runner.rs +++ b/crates/sui-rpc-benchmark/src/json_rpc/runner.rs @@ -16,7 +16,7 @@ use std::{ sync::{Arc, Mutex}, time::Instant, }; -use sui_indexer_alt_framework::task::TrySpawnStreamExt; +use sui_futures::stream::TrySpawnStreamExt; use tokio::time::timeout; use tracing::{debug, info, warn}; diff --git a/crates/sui-snapshot/Cargo.toml b/crates/sui-snapshot/Cargo.toml index 31a7b078ff167..8650a718c4f1e 100644 --- a/crates/sui-snapshot/Cargo.toml +++ b/crates/sui-snapshot/Cargo.toml @@ -25,7 +25,7 @@ prometheus.workspace = true sui-types.workspace = true sui-config.workspace = true sui-core.workspace = true -sui-indexer-alt-framework.workspace = true +sui-futures.workspace = true sui-storage.workspace = true sui-protocol-config.workspace = true fastcrypto = { workspace = true, features = ["copy_key"] } diff --git a/crates/sui-snapshot/src/reader.rs b/crates/sui-snapshot/src/reader.rs index 1354269ae80ed..8540d13ebeebc 100644 --- a/crates/sui-snapshot/src/reader.rs +++ b/crates/sui-snapshot/src/reader.rs @@ -27,7 +27,7 @@ use std::time::Duration; use sui_config::object_storage_config::ObjectStoreConfig; use sui_core::authority::AuthorityStore; use sui_core::authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject}; -use sui_indexer_alt_framework::task::TrySpawnStreamExt; +use sui_futures::stream::TrySpawnStreamExt; use sui_storage::blob::{Blob, BlobEncoding}; use sui_storage::object_store::http::HttpDownloaderBuilder; use sui_storage::object_store::util::{copy_files, path_to_filesystem};