Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct IndexerGrpcProcessorConfig {
// String vector for deprecated tables to skip db writes
#[serde(default)]
pub deprecated_tables: HashSet<String>,

pub db_row_name: Option<String>,
}

impl IndexerGrpcProcessorConfig {
Expand Down Expand Up @@ -103,6 +105,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.transaction_filter.clone(),
self.grpc_response_item_timeout_in_secs,
self.deprecated_tables.clone(),
self.db_row_name.clone().unwrap_or("".to_string()),
)
.await
.context("Failed to build worker")?;
Expand Down
6 changes: 5 additions & 1 deletion rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub struct DefaultProcessingResult {
pub trait ProcessorTrait: Send + Sync + Debug {
fn name(&self) -> &'static str;

fn db_row_name(&self) -> &str {
""
}

/// Process all transactions including writing to the database
async fn process_transactions(
&self,
Expand Down Expand Up @@ -135,7 +139,7 @@ pub trait ProcessorTrait: Send + Sync + Debug {
) -> anyhow::Result<()> {
let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64));
let status = ProcessorStatus {
processor: self.name().to_string(),
processor: self.db_row_name().to_string(),
last_success_version: version as i64,
last_transaction_timestamp: timestamp,
};
Expand Down
12 changes: 10 additions & 2 deletions rust/processor/src/processors/monitoring_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@ use std::fmt::Debug;

pub struct MonitoringProcessor {
connection_pool: ArcDbPool,
db_row_name: String,
}

impl MonitoringProcessor {
pub fn new(connection_pool: ArcDbPool) -> Self {
Self { connection_pool }
pub fn new(connection_pool: ArcDbPool, db_row_name: String) -> Self {
Self {
connection_pool,
db_row_name,
}
}
}

Expand All @@ -34,6 +38,10 @@ impl ProcessorTrait for MonitoringProcessor {
ProcessorName::MonitoringProcessor.into()
}

fn db_row_name(&self) -> &str {
&self.db_row_name
}

async fn process_transactions(
&self,
transactions: Vec<Transaction>,
Expand Down
52 changes: 31 additions & 21 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct Worker {
pub transaction_filter: TransactionFilter,
pub grpc_response_item_timeout_in_secs: u64,
pub deprecated_tables: TableFlags,
pub db_row_name: String,
}

impl Worker {
Expand All @@ -155,6 +156,7 @@ impl Worker {
transaction_filter: TransactionFilter,
grpc_response_item_timeout_in_secs: u64,
deprecated_tables: HashSet<String>,
db_row_name: String,
) -> Result<Self> {
let processor_name = processor_config.name();
info!(processor_name = processor_name, "[Parser] Kicking off");
Expand Down Expand Up @@ -200,6 +202,7 @@ impl Worker {
transaction_filter,
grpc_response_item_timeout_in_secs,
deprecated_tables: deprecated_tables_flags,
db_row_name,
})
}

Expand All @@ -210,7 +213,7 @@ impl Worker {
/// * Note that the batches will be sequential so we won't have problems with gaps
/// 4. We will keep track of the last processed version and monitoring things like TPS
pub async fn run(&mut self) {
let processor_name = self.processor_config.name();
let processor_name = self.db_row_name.clone();
info!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
Expand Down Expand Up @@ -332,6 +335,7 @@ impl Worker {
self.deprecated_tables,
self.db_pool.clone(),
maybe_gap_detector_sender,
self.db_row_name.clone(),
);

let gap_detector = if is_parquet_processor {
Expand Down Expand Up @@ -361,7 +365,7 @@ impl Worker {
// 5. If it's the wrong chain, panic.

info!(
processor_name = processor_name,
processor_name = self.db_row_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = self.indexer_grpc_data_service_address.as_str(),
concurrent_tasks,
Expand All @@ -376,13 +380,14 @@ impl Worker {
receiver.clone(),
gap_detector_sender.clone(),
gap_detector.clone(),
self.db_row_name.clone(),
)
.await;
processor_tasks.push(join_handle);
}

info!(
processor_name = processor_name,
processor_name = self.db_row_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = self.indexer_grpc_data_service_address.as_str(),
concurrent_tasks,
Expand All @@ -401,8 +406,9 @@ impl Worker {
receiver: kanal::AsyncReceiver<TransactionsPBResponse>,
gap_detector_sender: AsyncSender<ProcessingResult>,
mut gap_detector: GapDetector,
db_row_name: String,
) -> JoinHandle<()> {
let processor_name = self.processor_config.name();
let processor_name = db_row_name.clone();
let stream_address = self.indexer_grpc_data_service_address.to_string();
let receiver_clone = receiver.clone();
let auth_token = self.auth_token.clone();
Expand All @@ -415,6 +421,7 @@ impl Worker {
self.deprecated_tables,
self.db_pool.clone(),
Some(gap_detector_sender.clone()),
"".to_string(),
)
} else {
build_processor(
Expand All @@ -423,6 +430,7 @@ impl Worker {
self.deprecated_tables,
self.db_pool.clone(),
None,
db_row_name,
)
};

Expand All @@ -441,7 +449,7 @@ impl Worker {
loop {
let txn_channel_fetch_latency = std::time::Instant::now();
match fetch_transactions(
processor_name,
&processor_name,
&stream_address,
receiver_clone.clone(),
task_index,
Expand Down Expand Up @@ -522,7 +530,7 @@ impl Worker {
transactions_pb,
&processor,
chain_id,
processor_name,
&processor_name,
&auth_token,
false, // enable_verbose_logging
)
Expand All @@ -531,7 +539,7 @@ impl Worker {
let processing_result = match res {
Ok(versions) => {
PROCESSOR_SUCCESSES_COUNT
.with_label_values(&[processor_name])
.with_label_values(&[&processor_name])
.inc();
versions
},
Expand All @@ -544,7 +552,7 @@ impl Worker {
"[Parser][T#{}] Error processing transactions", task_index
);
PROCESSOR_ERRORS_COUNT
.with_label_values(&[processor_name])
.with_label_values(&[&processor_name])
.inc();
panic!(
"[Parser][T#{}] Error processing '{:}' transactions: {:?}",
Expand Down Expand Up @@ -590,21 +598,21 @@ impl Worker {

// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.with_label_values(&[&processor_name, &task_index_str])
.observe(time_diff_since_pb_timestamp_in_secs(
end_txn_timestamp.as_ref().unwrap(),
));
LATEST_PROCESSED_VERSION
.with_label_values(&[
processor_name,
&processor_name,
step,
label,
&task_index_str,
])
.set(last_txn_version as i64);
TRANSACTION_UNIX_TIMESTAMP
.with_label_values(&[
processor_name,
&processor_name,
step,
label,
&task_index_str,
Expand All @@ -614,29 +622,29 @@ impl Worker {
// Single batch metrics
PROCESSED_BYTES_COUNT
.with_label_values(&[
processor_name,
&processor_name,
step,
label,
&task_index_str,
])
.inc_by(size_in_bytes as u64);
NUM_TRANSACTIONS_PROCESSED_COUNT
.with_label_values(&[
processor_name,
&processor_name,
step,
label,
&task_index_str,
])
.inc_by(num_processed);

SINGLE_BATCH_PROCESSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.with_label_values(&[&processor_name, &task_index_str])
.observe(processing_time);
SINGLE_BATCH_PARSING_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.with_label_values(&[&processor_name, &task_index_str])
.observe(processing_result.processing_duration_in_secs);
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
.with_label_values(&[processor_name, &task_index_str])
.with_label_values(&[&processor_name, &task_index_str])
.observe(processing_result.db_insertion_duration_in_secs);

gap_detector_sender
Expand All @@ -659,7 +667,7 @@ impl Worker {

NUM_TRANSACTIONS_PROCESSED_COUNT
.with_label_values(&[
processor_name,
&processor_name,
step,
label,
&task_index_str,
Expand Down Expand Up @@ -740,9 +748,7 @@ impl Worker {
pub async fn get_start_version(&self) -> Result<Option<u64>> {
let mut conn = self.db_pool.get().await?;

match ProcessorStatusQuery::get_by_processor(self.processor_config.name(), &mut conn)
.await?
{
match ProcessorStatusQuery::get_by_processor(&self.db_row_name, &mut conn).await? {
Some(status) => Ok(Some(status.last_success_version as u64 + 1)),
None => Ok(None),
}
Expand Down Expand Up @@ -900,6 +906,7 @@ pub fn build_processor_for_testing(
deprecated_tables,
db_pool,
None,
"".to_string(),
)
}

Expand All @@ -914,6 +921,7 @@ pub fn build_processor(
deprecated_tables: TableFlags,
db_pool: ArcDbPool,
gap_detector_sender: Option<AsyncSender<ProcessingResult>>, // Parquet only
db_row_name: String,
) -> Processor {
match config {
ProcessorConfig::AccountTransactionsProcessor => Processor::from(
Expand All @@ -938,7 +946,9 @@ pub fn build_processor(
per_table_chunk_sizes,
deprecated_tables,
)),
ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)),
ProcessorConfig::MonitoringProcessor => {
Processor::from(MonitoringProcessor::new(db_pool, db_row_name))
},
ProcessorConfig::NftMetadataProcessor(config) => {
Processor::from(NftMetadataProcessor::new(db_pool, config.clone()))
},
Expand Down
Loading