Skip to content

Commit fb2e8a6

Browse files
committed
chore: fix conflict.
2 parents e51fa83 + 14ff488 commit fb2e8a6

File tree

7 files changed

+234
-39
lines changed

7 files changed

+234
-39
lines changed

migration/util/migrator-types/src/migrator/movement_aptos_migrator.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use anyhow::Context;
22
use aptos_config::config::NodeConfig;
33
use aptos_rest_client::Client as MovementAptosRestClient;
44
use kestrel::WaitCondition;
5-
use movement_aptos_core::MovementAptos;
5+
use movement_aptos_core::{runtime, MovementAptos};
66
use mtma_node_types::executor::MovementAptosNode;
77

88
/// An enum supporting different types of runners.
@@ -12,7 +12,7 @@ use mtma_node_types::executor::MovementAptosNode;
1212
#[derive(Clone)]
1313
pub enum Runner {
1414
/// [MovementAptos] runner.
15-
MovementAptos(MovementAptos),
15+
MovementAptos(MovementAptos<runtime::TokioTest>),
1616
}
1717

1818
/// The [MovementAptos] migration struct as would be presented in the criterion.

util/movement-aptos/movement-aptos-core/src/config.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
66
use std::path::PathBuf;
77
use std::str::FromStr;
88

9-
use crate::movement_aptos::MovementAptos;
9+
use crate::movement_aptos::{runtime, MovementAptos};
1010
use aptos_node::create_single_node_test_config;
1111

1212
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -66,11 +66,6 @@ pub struct Config {
6666
#[orfile(config)]
6767
#[clap(long)]
6868
pub log_file: Option<PathBuf>,
69-
70-
/// Whether to create a global rayon pool.
71-
#[orfile(config)]
72-
#[clap(long)]
73-
pub create_global_rayon_pool: bool,
7469
}
7570

7671
impl Config {
@@ -90,19 +85,15 @@ impl Config {
9085
)
9186
.map_err(|e| ConfigError::Internal(e.into()))?;
9287

93-
Ok(Config {
94-
node_config: NodeConfigWrapper(node_config),
95-
log_file: None,
96-
create_global_rayon_pool: false,
97-
})
88+
Ok(Config { node_config: NodeConfigWrapper(node_config), log_file: None })
9889
}
9990

10091
/// Builds the config into a [MovementAptos] runner.
101-
pub fn build(&self) -> Result<MovementAptos, ConfigError> {
102-
Ok(MovementAptos::new(
92+
pub fn build(&self) -> Result<MovementAptos<runtime::TokioTest>, ConfigError> {
93+
Ok(MovementAptos::<runtime::TokioTest>::try_new(
10394
self.node_config.0.clone(),
10495
self.log_file.clone(),
105-
self.create_global_rayon_pool,
106-
))
96+
)
97+
.map_err(|e| ConfigError::Internal(e.into()))?)
10798
}
10899
}

util/movement-aptos/movement-aptos-core/src/movement_aptos.rs

Lines changed: 73 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use aptos_config::config::NodeConfig;
22
use kestrel::State;
33
use std::path::PathBuf;
44
pub mod rest_api;
5+
pub mod runtime;
56
pub use rest_api::RestApi;
67

8+
use runtime::Runtime;
9+
710
/// Errors thrown when running [MovementAptos].
811
#[derive(Debug, thiserror::Error)]
912
pub enum MovementAptosError {
@@ -12,28 +15,41 @@ pub enum MovementAptosError {
1215
}
1316

1417
#[derive(Clone)]
15-
pub struct MovementAptos {
18+
pub struct MovementAptos<R>
19+
where
20+
R: Runtime,
21+
{
1622
/// The [NodeConfig] for the Aptos node.
1723
pub node_config: NodeConfig,
1824
/// The path to the log file.
1925
pub log_file: Option<PathBuf>,
20-
/// Whether to create a global rayon pool.
21-
pub create_global_rayon_pool: bool,
26+
/// The runtime for the Aptos node.
27+
pub runtime: std::marker::PhantomData<R>,
2228
/// The [MovementAptosRestApi] for the Aptos node.
2329
pub rest_api: State<RestApi>,
2430
}
2531

26-
impl MovementAptos {
27-
pub fn new(
32+
impl<R> MovementAptos<R>
33+
where
34+
R: Runtime,
35+
{
36+
/// If you have something that marks your ability to get a runtime, you can use this.
37+
pub fn new(node_config: NodeConfig, log_file: Option<PathBuf>, _runtime: R) -> Self {
38+
Self { node_config, log_file, runtime: std::marker::PhantomData, rest_api: State::new() }
39+
}
40+
41+
/// Checks runtime availability and creates a new [MovementAptos].
42+
pub fn try_new(
2843
node_config: NodeConfig,
2944
log_file: Option<PathBuf>,
30-
create_global_rayon_pool: bool,
31-
) -> Self {
32-
Self { node_config, log_file, create_global_rayon_pool, rest_api: State::new() }
45+
) -> Result<Self, anyhow::Error> {
46+
let runtime = R::try_new()?;
47+
let movement_aptos = MovementAptos::new(node_config, log_file, runtime);
48+
Ok(movement_aptos)
3349
}
3450

3551
pub fn from_config(config: NodeConfig) -> Result<Self, anyhow::Error> {
36-
let movement_aptos = MovementAptos::new(config, None, false);
52+
let movement_aptos = MovementAptos::new(config, None, R::try_new()?);
3753
Ok(movement_aptos)
3854
}
3955

@@ -43,15 +59,37 @@ impl MovementAptos {
4359
}
4460

4561
/// Runs the internal node logic
46-
pub(crate) fn run_node(&self) -> Result<(), MovementAptosError> {
47-
aptos_node::start(
48-
self.node_config.clone(),
49-
self.log_file.clone(),
50-
self.create_global_rayon_pool,
51-
)
52-
.map_err(|e| MovementAptosError::Internal(e.into()))?;
53-
54-
Ok(())
62+
pub(crate) async fn run_node(&self) -> Result<(), MovementAptosError> {
63+
// Clone necessary data for the closure
64+
let node_config = self.node_config.clone();
65+
let log_file = self.log_file.clone();
66+
67+
// Spawn the blocking task
68+
let blocking_task_result = tokio::task::spawn_blocking(move || {
69+
// This closure runs on a blocking thread
70+
aptos_node::start(
71+
node_config,
72+
log_file,
73+
R::create_global_rayon_pool(), // Assuming R is in scope and its result is Send
74+
)
75+
// The closure should return the direct result from aptos_node::start.
76+
// The error type from aptos_node::start (let's call it AptosNodeError)
77+
// needs to be Send + 'static for the closure.
78+
})
79+
.await;
80+
81+
match blocking_task_result {
82+
Ok(Ok(())) => Ok(()), // aptos_node::start succeeded
83+
Ok(Err(aptos_node_err)) => {
84+
// aptos_node::start failed. We need aptos_node_err to be convertible
85+
// into the Box<dyn Error> for MovementAptosError::Internal.
86+
Err(MovementAptosError::Internal(aptos_node_err.into()))
87+
}
88+
Err(join_err) => {
89+
// spawn_blocking task failed (e.g., panicked or was cancelled by Tokio)
90+
Err(MovementAptosError::Internal(Box::new(join_err)))
91+
}
92+
}
5593
}
5694

5795
/// Runs the node and fills state.
@@ -66,18 +104,22 @@ impl MovementAptos {
66104

67105
let runner = self.clone();
68106
let runner_task = kestrel::task(async move {
69-
runner.run_node()?;
107+
runner.run_node().await?;
70108
Ok::<_, MovementAptosError>(())
71109
});
72110

73111
// rest api state
74112
let rest_api_state = self.rest_api.clone();
75113
let rest_api_polling = kestrel::task(async move {
76114
loop {
115+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
116+
println!("POLLING REST API: {:?}", rest_api);
77117
// wait for the rest api to be ready
78118
let response = reqwest::get(rest_api.rest_api_url.clone())
79119
.await
80120
.map_err(|e| MovementAptosError::Internal(e.into()))?;
121+
122+
println!("REST API RESPONSE: {:?}", response);
81123
if response.status().is_success() {
82124
rest_api_state.write().set(rest_api).await;
83125
break;
@@ -102,7 +144,7 @@ mod tests {
102144
use rand::thread_rng;
103145
use std::path::Path;
104146

105-
#[tokio::test]
147+
#[tokio::test(flavor = "multi_thread")]
106148
async fn test_movement_aptos() -> Result<(), anyhow::Error> {
107149
// open in a new db
108150
let unique_id = uuid::Uuid::new_v4();
@@ -128,11 +170,20 @@ mod tests {
128170
rng,
129171
)?;
130172

131-
let movement_aptos = MovementAptos::new(node_config, None, false);
173+
let movement_aptos = MovementAptos::<runtime::TokioTest>::try_new(node_config, None)?;
132174
let rest_api_state = movement_aptos.rest_api().read().clone();
133-
movement_aptos.run().await?;
175+
176+
let movement_aptos_task = kestrel::task(async move {
177+
movement_aptos.run().await?;
178+
Ok::<_, MovementAptosError>(())
179+
});
180+
134181
rest_api_state.wait_for(tokio::time::Duration::from_secs(30)).await?;
135182

183+
println!("ENDING MOVEMENT APTOS");
184+
185+
kestrel::end!(movement_aptos_task)?;
186+
136187
Ok(())
137188
}
138189
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
pub mod delegated;
2+
pub mod native;
3+
pub mod tokio_test;
4+
5+
pub use delegated::*;
6+
pub use native::*;
7+
pub use tokio_test::*;
8+
9+
use std::fmt::Debug;
10+
11+
/// Errors thrown when attempting to use a runtime.
12+
#[derive(Debug, thiserror::Error)]
13+
pub enum RuntimeError {
14+
#[error("encountered internal error while using Movement Aptos Runtime: {0}")]
15+
Internal(#[source] Box<dyn std::error::Error + Send + Sync>),
16+
#[error("requested Movement Aptos Runtime is unavailable: {0}")]
17+
Unavailable(#[source] Box<dyn std::error::Error + Send + Sync>),
18+
}
19+
20+
/// Trait for a runtime that can be used to run [MovementAptos].
21+
///
22+
/// A runtime knows statically whether it is multithreaded or not.
23+
pub trait Runtime: Sized + Clone + Debug + Send + Sync + 'static {
24+
/// Try to create a new runtime.
25+
fn try_new() -> Result<Self, RuntimeError>;
26+
27+
/// Returns whether to create a global rayon pool.
28+
fn create_global_rayon_pool() -> bool;
29+
}
30+
31+
/// Returns true if the current runtime is multithreaded.
32+
fn is_multithreaded_runtime() -> bool {
33+
std::panic::catch_unwind(|| {
34+
tokio::task::block_in_place(|| {});
35+
})
36+
.is_ok()
37+
}
38+
39+
/// Returns true if the current runtime is a tokio runtime.
40+
fn is_in_tokio_runtime() -> bool {
41+
tokio::runtime::Handle::try_current().is_ok()
42+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use super::{Runtime, RuntimeError};
2+
3+
/// Delegated runtime refers to a runtime where the global rayon pool is created outside of the runner.
4+
///
5+
/// This is useful when we may be calling from other tasks, i.e., not at the main thread.
6+
#[derive(Debug, Clone)]
7+
pub struct Delegated;
8+
9+
impl Runtime for Delegated {
10+
/// Try to create a new runtime.
11+
///
12+
/// There are no restrictions on the surrounding environment here.
13+
fn try_new() -> Result<Self, RuntimeError> {
14+
Ok(Self)
15+
}
16+
17+
/// Whether to create a global rayon pool.
18+
fn create_global_rayon_pool() -> bool {
19+
false
20+
}
21+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use super::{is_in_tokio_runtime, Runtime, RuntimeError};
2+
3+
/// Native runtime refers to a runtime where the global rayon pool is created within the runner.
4+
#[derive(Debug, Clone)]
5+
pub struct Native;
6+
7+
impl Runtime for Native {
8+
/// Try to create a new runtime.
9+
///
10+
/// There are no restrictions on the surrounding environment here.
11+
fn try_new() -> Result<Self, RuntimeError> {
12+
if is_in_tokio_runtime() {
13+
return Err(RuntimeError::Unavailable(Box::new(std::io::Error::new(
14+
std::io::ErrorKind::Other,
15+
"Native runtime is not available in a tokio runtime",
16+
))));
17+
}
18+
19+
Ok(Self)
20+
}
21+
22+
/// Whether to create a global rayon pool.
23+
fn create_global_rayon_pool() -> bool {
24+
true
25+
}
26+
}
27+
28+
#[cfg(test)]
29+
mod tests {
30+
use super::*;
31+
32+
#[tokio::test]
33+
async fn test_native_runtime_should_fail_in_tokio_runtime() -> Result<(), anyhow::Error> {
34+
assert!(Native::try_new().is_err());
35+
Ok(())
36+
}
37+
38+
#[test]
39+
fn test_native_runtime_should_succeed_outside_of_tokio_runtime() -> Result<(), anyhow::Error> {
40+
assert!(Native::try_new().is_ok());
41+
Ok(())
42+
}
43+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use super::{is_in_tokio_runtime, is_multithreaded_runtime, Runtime, RuntimeError};
2+
3+
/// Tokio test runtime.
4+
#[derive(Debug, Clone)]
5+
pub struct TokioTest;
6+
7+
impl Runtime for TokioTest {
8+
/// Try to create a new runtime.
9+
fn try_new() -> Result<Self, RuntimeError> {
10+
if !is_in_tokio_runtime() || !is_multithreaded_runtime() {
11+
return Err(RuntimeError::Unavailable(Box::new(std::io::Error::new(
12+
std::io::ErrorKind::Other,
13+
"Tokio test runtime is not multithreaded use #[tokio::test(flavor = \"multi_thread\")] instead",
14+
))));
15+
}
16+
17+
Ok(Self)
18+
}
19+
20+
/// Whether to create a global rayon pool.
21+
fn create_global_rayon_pool() -> bool {
22+
false
23+
}
24+
}
25+
26+
#[cfg(test)]
27+
mod tests {
28+
use super::*;
29+
30+
#[tokio::test(flavor = "multi_thread")]
31+
async fn test_tokio_test_should_succeed_in_multi_thread() -> Result<(), anyhow::Error> {
32+
TokioTest::try_new()?;
33+
Ok(())
34+
}
35+
36+
#[tokio::test]
37+
async fn test_tokio_test_should_fail_in_single_thread() -> Result<(), anyhow::Error> {
38+
assert!(TokioTest::try_new().is_err());
39+
Ok(())
40+
}
41+
42+
#[test]
43+
fn test_tokio_test_should_fail_outside_of_tokio_test() -> Result<(), anyhow::Error> {
44+
assert!(TokioTest::try_new().is_err());
45+
Ok(())
46+
}
47+
}

0 commit comments

Comments
 (0)