Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
981 changes: 926 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ aptos-indexer = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "
aptos-indexer-grpc-fullnode = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "b2f58eaeda1d3929a7b381afca78408731b71d77" }
aptos-indexer-grpc-table-info = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "b2f58eaeda1d3929a7b381afca78408731b71d77" }
aptos-protos = { git = "https://github.com/movementlabsxyz/aptos-core", rev = "b2f58eaeda1d3929a7b381afca78408731b71d77" }

# indexer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should update this to the latest rather than the commit on my branch.

processor = { git = "https://github.com/movementlabsxyz/aptos-indexer-processors", rev = "da0ccef0f0e0e1be4b39073edb3277dfd6cb7dc1", subdir = "rust" }
server-framework = { git = "https://github.com/movementlabsxyz/aptos-indexer-processors", rev = "da0ccef0f0e0e1be4b39073edb3277dfd6cb7dc1", subdir = "rust" }

bcs = { git = "https://github.com/aptos-labs/bcs.git", rev = "d31fab9d81748e2594be5cd5cdf845786a30562d" }
ethereum-types = "0.14.1"
ethers = "=2.0.10"
Expand Down Expand Up @@ -226,6 +231,8 @@ quote = "1.0"
rand = "0.7.3"
rand_core = "0.5.1"
rayon = "1.10.0"


reqwest = "0.12.4"
risc0-build = "0.20"
risc0-zkvm = { version = "0.21", features = ["std", "getrandom"] }
Expand Down Expand Up @@ -264,8 +271,8 @@ rustix = "0.38.34"
paste = "1.0.15"

# trying to pin diesel
# diesel = "=2.1.1"
# migrations_internals = "=2.1.1"
diesel = "=2.1.1"
migrations_internals = "=2.1.1"
num_cpus = "=1.16.0"
ahash = "=0.8.11"

Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
solc
grpcurl
grpcui
redis
];

# Specific version of toolchain
Expand Down
31 changes: 31 additions & 0 deletions networks/suzuka/aptos-indexer-grpc-file-store/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "aptos-indexer-grpc-file-store"
description = "Indexer gRPC file store saves transactions to persistent storage."
version = "1.0.0"

# Workspace inherited keys
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
publish = { workspace = true }
repository = { workspace = true }
rust-version = { workspace = true }

[dependencies]
anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-moving-average = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
once_cell = { workspace = true }
redis = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[target.'cfg(unix)'.dependencies]
jemallocator = { workspace = true }
15 changes: 15 additions & 0 deletions networks/suzuka/indexer-grpc-cache-worker/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "indexer-grpc-cache-worker"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
publish.workspace = true
rust-version.workspace = true

[dependencies]

[lints]
workspace = true
59 changes: 59 additions & 0 deletions networks/suzuka/indexer-grpc-cache-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::{Context, Result};
use aptos_indexer_grpc_utils::{config::IndexerGrpcCacheWorkerConfig, IndexerGrpcFileStoreConfig, types::RedisUrl};
use serde::{Deserialize, Serialize};
use url::Url;

const RUNTIME_WORKER_MULTIPLIER: usize = 2;

fn main() -> Result<()> {

use tracing_subscriber::EnvFilter;

tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.init();

let dot_movement = dot_movement::DotMovement::try_from_env()?;
let config = dot_movement.try_get_config_from_json::<suzuka_config::Config>()?;


let fullnode_grpc_address = format!(
"http://{}:{}",
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_hostname,
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_port
);
println!("Connecting to indexer gRPC server at: {}", fullnode_grpc_address.clone());

let config = IndexerGrpcCacheWorkerConfig {
fullnode_grpc_address: fullnode_grpc_address.clone(),
file_store_config: IndexerGrpcFileStoreConfig {
// variable provideded by gcs file store and local file store
// https://github.com/aptos-labs/aptos-core/blob/main/ecosystem/indexer-grpc/indexer-grpc-utils/src/config.rs#L45
},
redis_main_instance_address: RedisUrl::new("redis://localhost:6379")?,
enable_cache_compression: true,
};

let num_cpus = num_cpus::get();
let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16);
println!(
"[Indexer cache worker] Starting cache worker tokio runtime: num_cpus={}, worker_threads={}",
num_cpus, worker_threads
);

let mut builder = tokio::runtime::Builder::new_multi_thread();
builder
.disable_lifo_slot()
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
.block_on(async {
config.run().await
})
}
15 changes: 15 additions & 0 deletions networks/suzuka/indexer-grpc-data-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "indexer-grpc-data-service"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
publish.workspace = true
rust-version.workspace = true

[dependencies]

[lints]
workspace = true
25 changes: 25 additions & 0 deletions networks/suzuka/indexer-processor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "suzuka-indexer-processor"
description = "Indexer GRPC processor in Rust."
version = "1.0.0"

# Workspace inherited keys
authors = ["Aptos Labs <[email protected]>"]
edition = "2021"
homepage = "https://aptoslabs.com"
license = "Apache-2.0"
publish = false
repository = "https://github.com/aptos-labs/aptos-core"
rust-version = { workspace = true }

[dependencies]
processor = { workspace = true }
server-framework = { workspace = true }
tokio = { workspace = true }
anyhow = { workspace = true }
num_cpus = { workspace = true }
dot-movement = { workspace = true }
suzuka-config = { workspace = true }
ahash = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
74 changes: 74 additions & 0 deletions networks/suzuka/indexer-processor/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use processor::IndexerGrpcProcessorConfig;
use processor::processors::ProcessorConfig;
use server_framework::RunnableConfig;
use ahash::AHashMap;

const RUNTIME_WORKER_MULTIPLIER: usize = 2;

fn main() -> Result<()> {

use tracing_subscriber::EnvFilter;

tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.init();

let dot_movement = dot_movement::DotMovement::try_from_env()?;
let config = dot_movement.try_get_config_from_json::<suzuka_config::Config>()?;


let url = format!(
"http://{}:{}",
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_hostname,
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_port
);
println!("Connecting to indexer gRPC server at: {}", url);

let config = IndexerGrpcProcessorConfig {
processor_config: ProcessorConfig::DefaultProcessor,
postgres_connection_string: config.execution_config.maptos_config.indexer_processor.postgres_connection_string.clone(),
indexer_grpc_data_service_address: format!(
"http://{}:{}",
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_hostname,
config.execution_config.maptos_config.client.maptos_indexer_grpc_connection_port
).parse()?,
grpc_http2_config: Default::default(),
auth_token: config.execution_config.maptos_config.indexer_processor.indexer_processor_auth_token.clone(),
starting_version: None,
ending_version: None,
number_concurrent_processing_tasks: None,
db_pool_size: None,
gap_detection_batch_size: IndexerGrpcProcessorConfig::default_gap_detection_batch_size(),
parquet_gap_detection_batch_size: IndexerGrpcProcessorConfig::default_gap_detection_batch_size(),
pb_channel_txn_chunk_size: IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size(),
per_table_chunk_sizes: AHashMap::new(),
enable_verbose_logging: None,
grpc_response_item_timeout_in_secs: IndexerGrpcProcessorConfig::default_grpc_response_item_timeout_in_secs(),
transaction_filter: Default::default(),
deprecated_tables: Default::default(),
};

let num_cpus = num_cpus::get();
let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16);
println!(
"[Processor] Starting processor tokio runtime: num_cpus={}, worker_threads={}",
num_cpus, worker_threads
);

let mut builder = tokio::runtime::Builder::new_multi_thread();
builder
.disable_lifo_slot()
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
.block_on(async {
config.run().await
})
}
Empty file.
5 changes: 5 additions & 0 deletions networks/suzuka/redis-server/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// seems like we could only run an instance of redis-server in a separate process, and then connect to it from the other services.
// we might want to spin up a redis server then.
// https://discourse.nixos.org/t/how-can-i-spawn-a-redis-instance-within-a-nix-build/5155
// For testing purposes we could use:
// https://medium.com/@suyashkant.srivastava/this-post-aims-to-help-setting-up-a-minimal-redis-cluster-on-nix-environment-d607e3628e08
11 changes: 7 additions & 4 deletions networks/suzuka/suzuka-client/src/tests/indexer_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use aptos_protos::indexer::v1::{
raw_data_client::RawDataClient,
};
use futures::StreamExt;
use std::io::{self, Write};


static SUZUKA_CONFIG: Lazy<suzuka_config::Config> = Lazy::new(|| {
let dot_movement = dot_movement::DotMovement::try_from_env().unwrap();
Expand Down Expand Up @@ -39,11 +41,11 @@ static INDEXER_URL: Lazy<String> = Lazy::new(|| {
#[tokio::test]
async fn test_example_indexer_stream() -> Result<(), anyhow::Error> {

/*let channel = tonic::transport::Channel::from_shared(
let channel = tonic::transport::Channel::from_shared(
INDEXER_URL.to_string(),
).expect(
"[Parser] Failed to build GRPC channel, perhaps because the data service URL is invalid",
);*/
);

let mut client = RawDataClient::connect(
INDEXER_URL.as_str(),
Expand All @@ -54,15 +56,16 @@ async fn test_example_indexer_stream() -> Result<(), anyhow::Error> {
transactions_count : Some(10),
batch_size : Some(100),
};

println!("{:?}", request);
let mut stream = client.get_transactions(request).await?.into_inner();

for _ in 1..10 {
let response = stream
.next()
.await;
println!("{:?}", response);
}
io::stdout().flush().unwrap();
}

Ok(())
}
25 changes: 25 additions & 0 deletions process-compose/suzuka-full-node/process-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,28 @@ processes:
exec:
command: curl http://0.0.0.0:30732

suzuka-indexer-processor:

command : |
#/bin/bash
# todo: this fails with gRPC Unimplemented error on the RawData.GetTransactions call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to test the indexer-processor prior to proceeding. But, I think that's going to be difficult.

RUST_LOG=debug suzuka-indexer-processor
# for now, we will just prove the service is running with a grpcurl call
# we will check the output
RESPONSE=$(grpcurl -plaintext 0.0.0.0:30734 list aptos.indexer.v1.RawData)
echho $RESPONSE
EXPECTED="aptos.indexer.v1.RawData.GetTransactions"
if [[ "$RESPONSE" == "$EXPECTED" ]]; then
exit 0
else
exit 1
fi
depends_on:
suzuka-full-node:
condition: process_healthy
readiness_probe:
initial_delay_seconds: 30
exec:
command: echo "true"


4 changes: 1 addition & 3 deletions scripts/cargo/choose
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@
set -e

# change symbolic link to be Cargo.$1.toml
ln -sf Cargo.$1.toml Cargo.toml


ln -sf Cargo.$1.toml Cargo.toml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be deleted.

6 changes: 5 additions & 1 deletion scripts/services/suzuka-full-node/build
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ echo "Built suzuka-full-node-setup!"

echo "Building wait-for-celestia-light-node..."
cargo build $CARGO_PROFILE_FLAGS --bin wait-for-celestia-light-node
echo "Built wait-for-celestia-light-node!"
echo "Built wait-for-celestia-light-node!"

echo "Building suzuka-indexer-processor..."
cargo build $CARGO_PROFILE_FLAGS --bin suzuka-indexer-processor
echo "Built suzuka-indexer-processor!"