Skip to content

Commit cf02a94

Browse files
authored
Merge pull request #1284 from movementlabsxyz/indexer-v2-4
[feat] Enable internal indexer db for grpc stream. (4/n)
2 parents 754e405 + 7e62a5e commit cf02a94

File tree

14 files changed

+117
-59
lines changed

14 files changed

+117
-59
lines changed

Cargo.lock

Lines changed: 9 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ aptos-crypto = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1
175175

176176
aptos-crypto-derive = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
177177
aptos-db = { git = "https://github.com/movementlabsxyz/aptos-core.git", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
178+
aptos-db-indexer = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
178179
aptos-executor = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
179180
aptos-executor-test-helpers = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
180181
aptos-executor-types = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cdbbd7fabb80dcb95ba5e23213faa072fab67" }
@@ -206,9 +207,8 @@ movement = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "1d1cd
206207
# Indexer
207208
processor = { git = "https://github.com/movementlabsxyz/aptos-indexer-processors", rev = "20be1190105908fd4fea4e78c330997658e9428e" }
208209
server-framework = { git = "https://github.com/movementlabsxyz/aptos-indexer-processors", rev = "20be1190105908fd4fea4e78c330997658e9428e" }
209-
# TODO: use forked version.
210-
processor-v2 = { git = "https://github.com/larryl3u/aptos-indexer-processors-v2", rev = "2e5dd83639886fe9e44af97726d79895a3b4f650", package = "processor" }
211-
aptos-indexer-processor-sdk = { git = "https://github.com/larryl3u/aptos-indexer-processor-sdk", rev = "bbcc42d5a03cffe861b89c080ef7de918f37b4d0" }
210+
processor-v2 = { git = "https://github.com/movementlabsxyz/aptos-indexer-processors-v2", rev = "3fa850e7ea7770fec7967ce5b9a857dabace6aa3", package = "processor" }
211+
aptos-indexer-processor-sdk = { git = "https://github.com/movementlabsxyz/aptos-indexer-processor-sdk", rev = "a596af7bfdabeb4fc94c19bbbccbd0574d7c15a1" }
212212

213213

214214
bcs = { git = "https://github.com/movementlabsxyz/bcs.git", rev = "bc16d2d39cabafaabd76173dd1b04b2aa170cf0c" }

docker/compose/movement-full-node/docker-compose.fullnode.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,20 @@ services:
77
- DOT_MOVEMENT_PATH=/.movement
88
- MOVEMENT_TIMING=info
99
- RUST_BACKTRACE=1
10+
- MOVEMENT_METRICS_ADDR=0.0.0.0:9464
11+
- APTOS_FORCE_ENABLE_TELEMETRY=1
12+
- APTOS_METRICS_PORT=9464
13+
- APTOS_METRICS_HOST=0.0.0.0
14+
- APTOS_DISABLE_TELEMETRY_PUSH_METRICS=1
15+
- PROMETHEUS_METRICS_ENABLED=1
16+
- APTOS_ENABLE_CONSENSUS_METRICS=1
17+
- APTOS_ENABLE_DB_METRICS=1
18+
- APTOS_ENABLE_MEMPOOL_METRICS=1
19+
- APTOS_ENABLE_NETWORK_METRICS=1
20+
- APTOS_ENABLE_STORAGE_METRICS=1
21+
- APTOS_ENABLE_VM_METRICS=1
22+
# OTEL Configuration (optional - uncomment and set your endpoint)
23+
# - MOVEMENT_OTEL_ENDPOINT=http://otel-collector:4317
1024
volumes:
1125
- ${DOT_MOVEMENT_PATH}:/.movement
1226
ports:

docker/compose/movement-indexer/docker-compose.local-development.indexer-v2.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,4 @@ services:
8989

9090
volumes:
9191
dot-movement:
92-
db_data:
92+
db_data:

networks/movement/indexer-v2/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ processor-v2 = { workspace = true }
3434
aptos-indexer-processor-sdk = { workspace = true }
3535
movement-tracing = { workspace = true }
3636
movement-health = { workspace = true }
37-
37+
godfig = { workspace = true }
3838
serde_yaml = "0.9.34"
3939

4040
[lints]

networks/movement/indexer-v2/src/main.rs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use aptos_indexer_processor_sdk::server_framework::RunnableConfig;
2+
use godfig::{backend::config_file::ConfigFile, Godfig};
23
use maptos_execution_util::config::Config;
34
use movement_health::run_service;
45
use movement_tracing::simple_metrics::start_metrics_server;
@@ -10,27 +11,37 @@ fn main() -> Result<(), anyhow::Error> {
1011
init_logger();
1112

1213
let runtime = get_maptos_runtime();
13-
let maptos_config = load_maptos_config()?;
14-
let runnable_processor_config: IndexerProcessorConfig =
15-
maptos_config.indexer_processor_v2.clone().into();
1614

1715
runtime.block_on(async move {
16+
let maptos_config = load_maptos_config().await.expect("Failed to load maptos config");
17+
let runnable_processor_config: IndexerProcessorConfig =
18+
maptos_config.indexer_processor_v2.clone().into();
19+
1820
let metrics_handle = tokio::spawn(async move {
19-
start_metrics_server(
21+
let res = start_metrics_server(
2022
maptos_config.indexer_processor_v2.metrics_config.listen_hostname,
2123
maptos_config.indexer_processor_v2.metrics_config.listen_port,
2224
)
23-
.await
25+
.await;
26+
tracing::info!("Metrics server started: {:?}", res);
27+
res
2428
});
2529
let health_handle = tokio::spawn(async move {
26-
run_service(
30+
let res = run_service(
2731
maptos_config.indexer_processor_v2.health_config.hostname,
2832
maptos_config.indexer_processor_v2.health_config.port,
2933
)
30-
.await
34+
.await;
35+
tracing::info!("Health server started: {:?}", res);
36+
res
3137
});
3238

33-
let processor_handle = tokio::spawn(async move { runnable_processor_config.run().await });
39+
let processor_handle = tokio::spawn(async move {
40+
let res = runnable_processor_config.run().await;
41+
tracing::info!("Indexer processor started: {:?}", res);
42+
res
43+
});
44+
tracing::info!("Indexer v2 stack started.");
3445

3546
tokio::select! {
3647
metrics_handle = metrics_handle => {
@@ -43,6 +54,7 @@ fn main() -> Result<(), anyhow::Error> {
4354
tracing::error!("Indexer processor exited abnormally: {:?}", processor_handle);
4455
}
4556
}
57+
tracing::info!("Indexer v2 stack exited normally.");
4658
});
4759
panic!("Indexer v2 stack exited abnormally.");
4860
}
@@ -78,8 +90,17 @@ fn get_maptos_runtime() -> tokio::runtime::Runtime {
7890
runtime
7991
}
8092

81-
fn load_maptos_config() -> anyhow::Result<Config> {
93+
async fn load_maptos_config() -> anyhow::Result<Config> {
94+
// get the config file
8295
let dot_movement = dot_movement::DotMovement::try_from_env()?;
83-
let maptos_config = dot_movement.try_get_config_from_json::<Config>()?;
96+
97+
// Load Maptos config
98+
let maptos_config = {
99+
let config_file = dot_movement.try_get_or_create_config_file().await?;
100+
let godfig: Godfig<maptos_execution_util::config::Config, ConfigFile> =
101+
Godfig::new(ConfigFile::new(config_file), vec!["maptos_config".to_string()]);
102+
godfig.try_wait_for_ready().await
103+
}?;
104+
84105
Ok(maptos_config)
85106
}

networks/movement/movement-full-node/src/main.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
use clap::*;
33
use movement_full_node::MovementFullNode;
44
use tracing_subscriber::EnvFilter;
5+
56
#[tokio::main]
67
async fn main() -> Result<(), anyhow::Error> {
78
// Initialize default tracing
@@ -10,6 +11,12 @@ async fn main() -> Result<(), anyhow::Error> {
1011
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
1112
)
1213
.init();
14+
15+
// Initialize telemetry if MOVEMENT_METRICS_ADDR is set
16+
if std::env::var("MOVEMENT_METRICS_ADDR").is_ok() {
17+
movement_tracing::ensure_telemetry_initialized();
18+
}
19+
1320
let suzuka_util = MovementFullNode::parse();
1421
let result = suzuka_util.execute().await;
1522
result

protocol-units/execution/maptos/opt-executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ aptos-db = { workspace = true }
4444
aptos-api = { workspace = true }
4545
aptos-api-types = { workspace = true }
4646
aptos-types = { workspace = true }
47+
aptos-db-indexer = { workspace = true }
4748
aptos-storage-interface = { workspace = true }
4849
aptos-block-executor = { workspace = true }
4950
aptos-vm-types = { workspace = true }

protocol-units/execution/maptos/opt-executor/src/bootstrap.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,14 @@ pub fn maybe_bootstrap_empty_db(
8686
chain_id: ChainId,
8787
public_key: &Ed25519PublicKey,
8888
release: &impl Release,
89+
enable_indexer_db: bool,
8990
) -> Result<(DbReaderWriter, ValidatorSigner), anyhow::Error> {
9091
let aptos_db = AptosDB::open(
9192
StorageDirPaths::from_path(db_dir.clone()),
9293
false,
9394
config.storage.storage_pruner_config.clone(),
9495
config.storage.rocksdb_configs.clone(),
95-
false,
96+
enable_indexer_db,
9697
config.storage.buffered_state_target_items,
9798
config.storage.max_num_nodes_per_lru_cache_shard,
9899
)?;

protocol-units/execution/maptos/opt-executor/src/executor/initialization.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -82,30 +82,25 @@ impl Executor {
8282
node_config.indexer.batch_size = Some(8);
8383
node_config.indexer.gap_lookback_versions = Some(4);
8484

85-
node_config.indexer_grpc.enabled = true;
86-
8785
// indexer_grpc config
88-
if maptos_config.chain.enable_table_info_service {
86+
if maptos_config.chain.enable_indexer_grpc {
8987
node_config.indexer_table_info.enabled = true;
90-
node_config.storage.enable_indexer = true;
88+
node_config.indexer_grpc.enabled = true;
89+
node_config.indexer_grpc.processor_batch_size = 4;
90+
node_config.indexer_grpc.processor_task_count = 4;
91+
node_config.indexer_grpc.output_batch_size = 4;
92+
node_config.indexer_grpc.address = (
93+
maptos_config.indexer.maptos_indexer_grpc_listen_hostname.as_str(),
94+
maptos_config.indexer.maptos_indexer_grpc_listen_port,
95+
)
96+
.to_socket_addrs()?
97+
.next()
98+
.context("failed to resolve the value of maptos_indexer_grpc_listen_hostname")?;
99+
node_config.indexer_grpc.use_data_service_interface = true;
91100
}
92-
node_config.indexer_grpc.processor_batch_size = 4;
93-
node_config.indexer_grpc.processor_task_count = 4;
94-
node_config.indexer_grpc.output_batch_size = 4;
95-
node_config.indexer_grpc.address = (
96-
maptos_config.indexer.maptos_indexer_grpc_listen_hostname.as_str(),
97-
maptos_config.indexer.maptos_indexer_grpc_listen_port,
98-
)
99-
.to_socket_addrs()?
100-
.next()
101-
.context("failed to resolve the value of maptos_indexer_grpc_listen_hostname")?;
102-
node_config.indexer_grpc.use_data_service_interface = true;
103101

104-
// indexer table info config
105-
node_config.indexer_table_info.enabled = true;
106102
node_config.storage.dir = dot_movement.get_path().join("maptos-storage");
107103
node_config.storage.set_data_dir(node_config.storage.dir.clone());
108-
109104
let known_release = aptos_framework_known_release::KnownRelease::try_new(
110105
maptos_config.chain.known_framework_release_str.as_str(),
111106
)?;
@@ -115,6 +110,7 @@ impl Executor {
115110
maptos_config.chain.maptos_chain_id.clone(),
116111
&public_key,
117112
&known_release,
113+
maptos_config.chain.enable_indexer_grpc,
118114
)?;
119115

120116
Ok(Self {

0 commit comments

Comments
 (0)