From d16f837ae54ff2180a4094be4fbfbc1c2595a69c Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Tue, 24 Sep 2024 23:16:36 +0000 Subject: [PATCH] [do not merge] grpc test --- rust/processor/src/config.rs | 3 ++ rust/processor/src/processors/mod.rs | 6 ++- .../src/processors/monitoring_processor.rs | 12 ++++- rust/processor/src/worker.rs | 52 +++++++++++-------- 4 files changed, 49 insertions(+), 24 deletions(-) diff --git a/rust/processor/src/config.rs b/rust/processor/src/config.rs index 785130334..d95f3a98e 100644 --- a/rust/processor/src/config.rs +++ b/rust/processor/src/config.rs @@ -55,6 +55,8 @@ pub struct IndexerGrpcProcessorConfig { // String vector for deprecated tables to skip db writes #[serde(default)] pub deprecated_tables: HashSet, + + pub db_row_name: Option, } impl IndexerGrpcProcessorConfig { @@ -103,6 +105,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig { self.transaction_filter.clone(), self.grpc_response_item_timeout_in_secs, self.deprecated_tables.clone(), + self.db_row_name.clone().unwrap_or("".to_string()), ) .await .context("Failed to build worker")?; diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index a04b0904b..175ac0745 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -79,6 +79,10 @@ pub struct DefaultProcessingResult { pub trait ProcessorTrait: Send + Sync + Debug { fn name(&self) -> &'static str; + fn db_row_name(&self) -> &str { + "" + } + /// Process all transactions including writing to the database async fn process_transactions( &self, @@ -135,7 +139,7 @@ pub trait ProcessorTrait: Send + Sync + Debug { ) -> anyhow::Result<()> { let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64)); let status = ProcessorStatus { - processor: self.name().to_string(), + processor: self.db_row_name().to_string(), last_success_version: version as i64, last_transaction_timestamp: timestamp, }; diff --git a/rust/processor/src/processors/monitoring_processor.rs b/rust/processor/src/processors/monitoring_processor.rs index c7e750f82..d45015f7f 100644 --- a/rust/processor/src/processors/monitoring_processor.rs +++ b/rust/processor/src/processors/monitoring_processor.rs @@ -9,11 +9,15 @@ use std::fmt::Debug; pub struct MonitoringProcessor { connection_pool: ArcDbPool, + db_row_name: String, } impl MonitoringProcessor { - pub fn new(connection_pool: ArcDbPool) -> Self { - Self { connection_pool } + pub fn new(connection_pool: ArcDbPool, db_row_name: String) -> Self { + Self { + connection_pool, + db_row_name, + } } } @@ -34,6 +38,10 @@ impl ProcessorTrait for MonitoringProcessor { ProcessorName::MonitoringProcessor.into() } + fn db_row_name(&self) -> &str { + &self.db_row_name + } + async fn process_transactions( &self, transactions: Vec, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index cbbbd15c1..5603752b4 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -132,6 +132,7 @@ pub struct Worker { pub transaction_filter: TransactionFilter, pub grpc_response_item_timeout_in_secs: u64, pub deprecated_tables: TableFlags, + pub db_row_name: String, } impl Worker { @@ -155,6 +156,7 @@ impl Worker { transaction_filter: TransactionFilter, grpc_response_item_timeout_in_secs: u64, deprecated_tables: HashSet, + db_row_name: String, ) -> Result { let processor_name = processor_config.name(); info!(processor_name = processor_name, "[Parser] Kicking off"); @@ -200,6 +202,7 @@ impl Worker { transaction_filter, grpc_response_item_timeout_in_secs, deprecated_tables: deprecated_tables_flags, + db_row_name, }) } @@ -210,7 +213,7 @@ impl Worker { /// * Note that the batches will be sequential so we won't have problems with gaps /// 4. We will keep track of the last processed version and monitoring things like TPS pub async fn run(&mut self) { - let processor_name = self.processor_config.name(); + let processor_name = self.db_row_name.clone(); info!( processor_name = processor_name, service_type = PROCESSOR_SERVICE_TYPE, @@ -332,6 +335,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), maybe_gap_detector_sender, + self.db_row_name.clone(), ); let gap_detector = if is_parquet_processor { @@ -361,7 +365,7 @@ impl Worker { // 5. If it's the wrong chain, panic. info!( - processor_name = processor_name, + processor_name = self.db_row_name, service_type = PROCESSOR_SERVICE_TYPE, stream_address = self.indexer_grpc_data_service_address.as_str(), concurrent_tasks, @@ -376,13 +380,14 @@ impl Worker { receiver.clone(), gap_detector_sender.clone(), gap_detector.clone(), + self.db_row_name.clone(), ) .await; processor_tasks.push(join_handle); } info!( - processor_name = processor_name, + processor_name = self.db_row_name, service_type = PROCESSOR_SERVICE_TYPE, stream_address = self.indexer_grpc_data_service_address.as_str(), concurrent_tasks, @@ -401,8 +406,9 @@ impl Worker { receiver: kanal::AsyncReceiver, gap_detector_sender: AsyncSender, mut gap_detector: GapDetector, + db_row_name: String, ) -> JoinHandle<()> { - let processor_name = self.processor_config.name(); + let processor_name = db_row_name.clone(); let stream_address = self.indexer_grpc_data_service_address.to_string(); let receiver_clone = receiver.clone(); let auth_token = self.auth_token.clone(); @@ -415,6 +421,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), Some(gap_detector_sender.clone()), + "".to_string(), ) } else { build_processor( @@ -423,6 +430,7 @@ impl Worker { self.deprecated_tables, self.db_pool.clone(), None, + db_row_name, ) }; @@ -441,7 +449,7 @@ impl Worker { loop { let txn_channel_fetch_latency = std::time::Instant::now(); match fetch_transactions( - processor_name, + &processor_name, &stream_address, receiver_clone.clone(), task_index, @@ -522,7 +530,7 @@ impl Worker { transactions_pb, &processor, chain_id, - processor_name, + &processor_name, &auth_token, false, // enable_verbose_logging ) @@ -531,7 +539,7 @@ impl Worker { let processing_result = match res { Ok(versions) => { PROCESSOR_SUCCESSES_COUNT - .with_label_values(&[processor_name]) + .with_label_values(&[&processor_name]) .inc(); versions }, @@ -544,7 +552,7 @@ impl Worker { "[Parser][T#{}] Error processing transactions", task_index ); PROCESSOR_ERRORS_COUNT - .with_label_values(&[processor_name]) + .with_label_values(&[&processor_name]) .inc(); panic!( "[Parser][T#{}] Error processing '{:}' transactions: {:?}", @@ -590,13 +598,13 @@ impl Worker { // TODO: For these three, do an atomic thing, or ideally move to an async metrics collector! GRPC_LATENCY_BY_PROCESSOR_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) + .with_label_values(&[&processor_name, &task_index_str]) .observe(time_diff_since_pb_timestamp_in_secs( end_txn_timestamp.as_ref().unwrap(), )); LATEST_PROCESSED_VERSION .with_label_values(&[ - processor_name, + &processor_name, step, label, &task_index_str, @@ -604,7 +612,7 @@ impl Worker { .set(last_txn_version as i64); TRANSACTION_UNIX_TIMESTAMP .with_label_values(&[ - processor_name, + &processor_name, step, label, &task_index_str, @@ -614,7 +622,7 @@ impl Worker { // Single batch metrics PROCESSED_BYTES_COUNT .with_label_values(&[ - processor_name, + &processor_name, step, label, &task_index_str, @@ -622,7 +630,7 @@ impl Worker { .inc_by(size_in_bytes as u64); NUM_TRANSACTIONS_PROCESSED_COUNT .with_label_values(&[ - processor_name, + &processor_name, step, label, &task_index_str, @@ -630,13 +638,13 @@ impl Worker { .inc_by(num_processed); SINGLE_BATCH_PROCESSING_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) + .with_label_values(&[&processor_name, &task_index_str]) .observe(processing_time); SINGLE_BATCH_PARSING_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) + .with_label_values(&[&processor_name, &task_index_str]) .observe(processing_result.processing_duration_in_secs); SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS - .with_label_values(&[processor_name, &task_index_str]) + .with_label_values(&[&processor_name, &task_index_str]) .observe(processing_result.db_insertion_duration_in_secs); gap_detector_sender @@ -659,7 +667,7 @@ impl Worker { NUM_TRANSACTIONS_PROCESSED_COUNT .with_label_values(&[ - processor_name, + &processor_name, step, label, &task_index_str, @@ -740,9 +748,7 @@ impl Worker { pub async fn get_start_version(&self) -> Result> { let mut conn = self.db_pool.get().await?; - match ProcessorStatusQuery::get_by_processor(self.processor_config.name(), &mut conn) - .await? - { + match ProcessorStatusQuery::get_by_processor(&self.db_row_name, &mut conn).await? { Some(status) => Ok(Some(status.last_success_version as u64 + 1)), None => Ok(None), } @@ -900,6 +906,7 @@ pub fn build_processor_for_testing( deprecated_tables, db_pool, None, + "".to_string(), ) } @@ -914,6 +921,7 @@ pub fn build_processor( deprecated_tables: TableFlags, db_pool: ArcDbPool, gap_detector_sender: Option>, // Parquet only + db_row_name: String, ) -> Processor { match config { ProcessorConfig::AccountTransactionsProcessor => Processor::from( @@ -938,7 +946,9 @@ pub fn build_processor( per_table_chunk_sizes, deprecated_tables, )), - ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)), + ProcessorConfig::MonitoringProcessor => { + Processor::from(MonitoringProcessor::new(db_pool, db_row_name)) + }, ProcessorConfig::NftMetadataProcessor(config) => { Processor::from(NftMetadataProcessor::new(db_pool, config.clone())) },