Skip to content

Commit d16f837

Browse files
committed
[do not merge] grpc test
1 parent 38d78fd commit d16f837

File tree

4 files changed

+49
-24
lines changed

4 files changed

+49
-24
lines changed

rust/processor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pub struct IndexerGrpcProcessorConfig {
5555
// String vector for deprecated tables to skip db writes
5656
#[serde(default)]
5757
pub deprecated_tables: HashSet<String>,
58+
59+
pub db_row_name: Option<String>,
5860
}
5961

6062
impl IndexerGrpcProcessorConfig {
@@ -103,6 +105,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
103105
self.transaction_filter.clone(),
104106
self.grpc_response_item_timeout_in_secs,
105107
self.deprecated_tables.clone(),
108+
self.db_row_name.clone().unwrap_or("".to_string()),
106109
)
107110
.await
108111
.context("Failed to build worker")?;

rust/processor/src/processors/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ pub struct DefaultProcessingResult {
7979
pub trait ProcessorTrait: Send + Sync + Debug {
8080
fn name(&self) -> &'static str;
8181

82+
fn db_row_name(&self) -> &str {
83+
""
84+
}
85+
8286
/// Process all transactions including writing to the database
8387
async fn process_transactions(
8488
&self,
@@ -135,7 +139,7 @@ pub trait ProcessorTrait: Send + Sync + Debug {
135139
) -> anyhow::Result<()> {
136140
let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64));
137141
let status = ProcessorStatus {
138-
processor: self.name().to_string(),
142+
processor: self.db_row_name().to_string(),
139143
last_success_version: version as i64,
140144
last_transaction_timestamp: timestamp,
141145
};

rust/processor/src/processors/monitoring_processor.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ use std::fmt::Debug;
99

1010
pub struct MonitoringProcessor {
1111
connection_pool: ArcDbPool,
12+
db_row_name: String,
1213
}
1314

1415
impl MonitoringProcessor {
15-
pub fn new(connection_pool: ArcDbPool) -> Self {
16-
Self { connection_pool }
16+
pub fn new(connection_pool: ArcDbPool, db_row_name: String) -> Self {
17+
Self {
18+
connection_pool,
19+
db_row_name,
20+
}
1721
}
1822
}
1923

@@ -34,6 +38,10 @@ impl ProcessorTrait for MonitoringProcessor {
3438
ProcessorName::MonitoringProcessor.into()
3539
}
3640

41+
fn db_row_name(&self) -> &str {
42+
&self.db_row_name
43+
}
44+
3745
async fn process_transactions(
3846
&self,
3947
transactions: Vec<Transaction>,

rust/processor/src/worker.rs

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ pub struct Worker {
132132
pub transaction_filter: TransactionFilter,
133133
pub grpc_response_item_timeout_in_secs: u64,
134134
pub deprecated_tables: TableFlags,
135+
pub db_row_name: String,
135136
}
136137

137138
impl Worker {
@@ -155,6 +156,7 @@ impl Worker {
155156
transaction_filter: TransactionFilter,
156157
grpc_response_item_timeout_in_secs: u64,
157158
deprecated_tables: HashSet<String>,
159+
db_row_name: String,
158160
) -> Result<Self> {
159161
let processor_name = processor_config.name();
160162
info!(processor_name = processor_name, "[Parser] Kicking off");
@@ -200,6 +202,7 @@ impl Worker {
200202
transaction_filter,
201203
grpc_response_item_timeout_in_secs,
202204
deprecated_tables: deprecated_tables_flags,
205+
db_row_name,
203206
})
204207
}
205208

@@ -210,7 +213,7 @@ impl Worker {
210213
/// * Note that the batches will be sequential so we won't have problems with gaps
211214
/// 4. We will keep track of the last processed version and monitoring things like TPS
212215
pub async fn run(&mut self) {
213-
let processor_name = self.processor_config.name();
216+
let processor_name = self.db_row_name.clone();
214217
info!(
215218
processor_name = processor_name,
216219
service_type = PROCESSOR_SERVICE_TYPE,
@@ -332,6 +335,7 @@ impl Worker {
332335
self.deprecated_tables,
333336
self.db_pool.clone(),
334337
maybe_gap_detector_sender,
338+
self.db_row_name.clone(),
335339
);
336340

337341
let gap_detector = if is_parquet_processor {
@@ -361,7 +365,7 @@ impl Worker {
361365
// 5. If it's the wrong chain, panic.
362366

363367
info!(
364-
processor_name = processor_name,
368+
processor_name = self.db_row_name,
365369
service_type = PROCESSOR_SERVICE_TYPE,
366370
stream_address = self.indexer_grpc_data_service_address.as_str(),
367371
concurrent_tasks,
@@ -376,13 +380,14 @@ impl Worker {
376380
receiver.clone(),
377381
gap_detector_sender.clone(),
378382
gap_detector.clone(),
383+
self.db_row_name.clone(),
379384
)
380385
.await;
381386
processor_tasks.push(join_handle);
382387
}
383388

384389
info!(
385-
processor_name = processor_name,
390+
processor_name = self.db_row_name,
386391
service_type = PROCESSOR_SERVICE_TYPE,
387392
stream_address = self.indexer_grpc_data_service_address.as_str(),
388393
concurrent_tasks,
@@ -401,8 +406,9 @@ impl Worker {
401406
receiver: kanal::AsyncReceiver<TransactionsPBResponse>,
402407
gap_detector_sender: AsyncSender<ProcessingResult>,
403408
mut gap_detector: GapDetector,
409+
db_row_name: String,
404410
) -> JoinHandle<()> {
405-
let processor_name = self.processor_config.name();
411+
let processor_name = db_row_name.clone();
406412
let stream_address = self.indexer_grpc_data_service_address.to_string();
407413
let receiver_clone = receiver.clone();
408414
let auth_token = self.auth_token.clone();
@@ -415,6 +421,7 @@ impl Worker {
415421
self.deprecated_tables,
416422
self.db_pool.clone(),
417423
Some(gap_detector_sender.clone()),
424+
"".to_string(),
418425
)
419426
} else {
420427
build_processor(
@@ -423,6 +430,7 @@ impl Worker {
423430
self.deprecated_tables,
424431
self.db_pool.clone(),
425432
None,
433+
db_row_name,
426434
)
427435
};
428436

@@ -441,7 +449,7 @@ impl Worker {
441449
loop {
442450
let txn_channel_fetch_latency = std::time::Instant::now();
443451
match fetch_transactions(
444-
processor_name,
452+
&processor_name,
445453
&stream_address,
446454
receiver_clone.clone(),
447455
task_index,
@@ -522,7 +530,7 @@ impl Worker {
522530
transactions_pb,
523531
&processor,
524532
chain_id,
525-
processor_name,
533+
&processor_name,
526534
&auth_token,
527535
false, // enable_verbose_logging
528536
)
@@ -531,7 +539,7 @@ impl Worker {
531539
let processing_result = match res {
532540
Ok(versions) => {
533541
PROCESSOR_SUCCESSES_COUNT
534-
.with_label_values(&[processor_name])
542+
.with_label_values(&[&processor_name])
535543
.inc();
536544
versions
537545
},
@@ -544,7 +552,7 @@ impl Worker {
544552
"[Parser][T#{}] Error processing transactions", task_index
545553
);
546554
PROCESSOR_ERRORS_COUNT
547-
.with_label_values(&[processor_name])
555+
.with_label_values(&[&processor_name])
548556
.inc();
549557
panic!(
550558
"[Parser][T#{}] Error processing '{:}' transactions: {:?}",
@@ -590,21 +598,21 @@ impl Worker {
590598

591599
// TODO: For these three, do an atomic thing, or ideally move to an async metrics collector!
592600
GRPC_LATENCY_BY_PROCESSOR_IN_SECS
593-
.with_label_values(&[processor_name, &task_index_str])
601+
.with_label_values(&[&processor_name, &task_index_str])
594602
.observe(time_diff_since_pb_timestamp_in_secs(
595603
end_txn_timestamp.as_ref().unwrap(),
596604
));
597605
LATEST_PROCESSED_VERSION
598606
.with_label_values(&[
599-
processor_name,
607+
&processor_name,
600608
step,
601609
label,
602610
&task_index_str,
603611
])
604612
.set(last_txn_version as i64);
605613
TRANSACTION_UNIX_TIMESTAMP
606614
.with_label_values(&[
607-
processor_name,
615+
&processor_name,
608616
step,
609617
label,
610618
&task_index_str,
@@ -614,29 +622,29 @@ impl Worker {
614622
// Single batch metrics
615623
PROCESSED_BYTES_COUNT
616624
.with_label_values(&[
617-
processor_name,
625+
&processor_name,
618626
step,
619627
label,
620628
&task_index_str,
621629
])
622630
.inc_by(size_in_bytes as u64);
623631
NUM_TRANSACTIONS_PROCESSED_COUNT
624632
.with_label_values(&[
625-
processor_name,
633+
&processor_name,
626634
step,
627635
label,
628636
&task_index_str,
629637
])
630638
.inc_by(num_processed);
631639

632640
SINGLE_BATCH_PROCESSING_TIME_IN_SECS
633-
.with_label_values(&[processor_name, &task_index_str])
641+
.with_label_values(&[&processor_name, &task_index_str])
634642
.observe(processing_time);
635643
SINGLE_BATCH_PARSING_TIME_IN_SECS
636-
.with_label_values(&[processor_name, &task_index_str])
644+
.with_label_values(&[&processor_name, &task_index_str])
637645
.observe(processing_result.processing_duration_in_secs);
638646
SINGLE_BATCH_DB_INSERTION_TIME_IN_SECS
639-
.with_label_values(&[processor_name, &task_index_str])
647+
.with_label_values(&[&processor_name, &task_index_str])
640648
.observe(processing_result.db_insertion_duration_in_secs);
641649

642650
gap_detector_sender
@@ -659,7 +667,7 @@ impl Worker {
659667

660668
NUM_TRANSACTIONS_PROCESSED_COUNT
661669
.with_label_values(&[
662-
processor_name,
670+
&processor_name,
663671
step,
664672
label,
665673
&task_index_str,
@@ -740,9 +748,7 @@ impl Worker {
740748
pub async fn get_start_version(&self) -> Result<Option<u64>> {
741749
let mut conn = self.db_pool.get().await?;
742750

743-
match ProcessorStatusQuery::get_by_processor(self.processor_config.name(), &mut conn)
744-
.await?
745-
{
751+
match ProcessorStatusQuery::get_by_processor(&self.db_row_name, &mut conn).await? {
746752
Some(status) => Ok(Some(status.last_success_version as u64 + 1)),
747753
None => Ok(None),
748754
}
@@ -900,6 +906,7 @@ pub fn build_processor_for_testing(
900906
deprecated_tables,
901907
db_pool,
902908
None,
909+
"".to_string(),
903910
)
904911
}
905912

@@ -914,6 +921,7 @@ pub fn build_processor(
914921
deprecated_tables: TableFlags,
915922
db_pool: ArcDbPool,
916923
gap_detector_sender: Option<AsyncSender<ProcessingResult>>, // Parquet only
924+
db_row_name: String,
917925
) -> Processor {
918926
match config {
919927
ProcessorConfig::AccountTransactionsProcessor => Processor::from(
@@ -938,7 +946,9 @@ pub fn build_processor(
938946
per_table_chunk_sizes,
939947
deprecated_tables,
940948
)),
941-
ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)),
949+
ProcessorConfig::MonitoringProcessor => {
950+
Processor::from(MonitoringProcessor::new(db_pool, db_row_name))
951+
},
942952
ProcessorConfig::NftMetadataProcessor(config) => {
943953
Processor::from(NftMetadataProcessor::new(db_pool, config.clone()))
944954
},

0 commit comments

Comments
 (0)