Skip to content

Commit 5eca57b

Browse files
committed
[do not merge] grpc test
1 parent 38d78fd commit 5eca57b

File tree

4 files changed

+31
-4
lines changed

4 files changed

+31
-4
lines changed

rust/processor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ pub struct IndexerGrpcProcessorConfig {
5555
// String vector for deprecated tables to skip db writes
5656
#[serde(default)]
5757
pub deprecated_tables: HashSet<String>,
58+
59+
pub db_row_name: String,
5860
}
5961

6062
impl IndexerGrpcProcessorConfig {
@@ -103,6 +105,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
103105
self.transaction_filter.clone(),
104106
self.grpc_response_item_timeout_in_secs,
105107
self.deprecated_tables.clone(),
108+
self.db_row_name.clone(),
106109
)
107110
.await
108111
.context("Failed to build worker")?;

rust/processor/src/processors/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ pub struct DefaultProcessingResult {
7979
pub trait ProcessorTrait: Send + Sync + Debug {
8080
fn name(&self) -> &'static str;
8181

82+
fn db_row_name(&self) -> &str {
83+
""
84+
}
85+
8286
/// Process all transactions including writing to the database
8387
async fn process_transactions(
8488
&self,
@@ -135,7 +139,7 @@ pub trait ProcessorTrait: Send + Sync + Debug {
135139
) -> anyhow::Result<()> {
136140
let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64));
137141
let status = ProcessorStatus {
138-
processor: self.name().to_string(),
142+
processor: self.db_row_name().to_string(),
139143
last_success_version: version as i64,
140144
last_transaction_timestamp: timestamp,
141145
};

rust/processor/src/processors/monitoring_processor.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@ use std::fmt::Debug;
99

1010
pub struct MonitoringProcessor {
1111
connection_pool: ArcDbPool,
12+
db_row_name: String,
1213
}
1314

1415
impl MonitoringProcessor {
15-
pub fn new(connection_pool: ArcDbPool) -> Self {
16-
Self { connection_pool }
16+
pub fn new(connection_pool: ArcDbPool, db_row_name: String) -> Self {
17+
Self {
18+
connection_pool,
19+
db_row_name,
20+
}
1721
}
1822
}
1923

@@ -34,6 +38,10 @@ impl ProcessorTrait for MonitoringProcessor {
3438
ProcessorName::MonitoringProcessor.into()
3539
}
3640

41+
fn db_row_name(&self) -> &str {
42+
&self.db_row_name
43+
}
44+
3745
async fn process_transactions(
3846
&self,
3947
transactions: Vec<Transaction>,

rust/processor/src/worker.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ pub struct Worker {
132132
pub transaction_filter: TransactionFilter,
133133
pub grpc_response_item_timeout_in_secs: u64,
134134
pub deprecated_tables: TableFlags,
135+
pub db_row_name: String,
135136
}
136137

137138
impl Worker {
@@ -155,6 +156,7 @@ impl Worker {
155156
transaction_filter: TransactionFilter,
156157
grpc_response_item_timeout_in_secs: u64,
157158
deprecated_tables: HashSet<String>,
159+
db_row_name: String,
158160
) -> Result<Self> {
159161
let processor_name = processor_config.name();
160162
info!(processor_name = processor_name, "[Parser] Kicking off");
@@ -200,6 +202,7 @@ impl Worker {
200202
transaction_filter,
201203
grpc_response_item_timeout_in_secs,
202204
deprecated_tables: deprecated_tables_flags,
205+
db_row_name,
203206
})
204207
}
205208

@@ -332,6 +335,7 @@ impl Worker {
332335
self.deprecated_tables,
333336
self.db_pool.clone(),
334337
maybe_gap_detector_sender,
338+
self.db_row_name.clone(),
335339
);
336340

337341
let gap_detector = if is_parquet_processor {
@@ -376,6 +380,7 @@ impl Worker {
376380
receiver.clone(),
377381
gap_detector_sender.clone(),
378382
gap_detector.clone(),
383+
self.db_row_name.clone(),
379384
)
380385
.await;
381386
processor_tasks.push(join_handle);
@@ -401,6 +406,7 @@ impl Worker {
401406
receiver: kanal::AsyncReceiver<TransactionsPBResponse>,
402407
gap_detector_sender: AsyncSender<ProcessingResult>,
403408
mut gap_detector: GapDetector,
409+
db_row_name: String,
404410
) -> JoinHandle<()> {
405411
let processor_name = self.processor_config.name();
406412
let stream_address = self.indexer_grpc_data_service_address.to_string();
@@ -415,6 +421,7 @@ impl Worker {
415421
self.deprecated_tables,
416422
self.db_pool.clone(),
417423
Some(gap_detector_sender.clone()),
424+
"".to_string(),
418425
)
419426
} else {
420427
build_processor(
@@ -423,6 +430,7 @@ impl Worker {
423430
self.deprecated_tables,
424431
self.db_pool.clone(),
425432
None,
433+
db_row_name,
426434
)
427435
};
428436

@@ -900,6 +908,7 @@ pub fn build_processor_for_testing(
900908
deprecated_tables,
901909
db_pool,
902910
None,
911+
"".to_string(),
903912
)
904913
}
905914

@@ -914,6 +923,7 @@ pub fn build_processor(
914923
deprecated_tables: TableFlags,
915924
db_pool: ArcDbPool,
916925
gap_detector_sender: Option<AsyncSender<ProcessingResult>>, // Parquet only
926+
db_row_name: String,
917927
) -> Processor {
918928
match config {
919929
ProcessorConfig::AccountTransactionsProcessor => Processor::from(
@@ -938,7 +948,9 @@ pub fn build_processor(
938948
per_table_chunk_sizes,
939949
deprecated_tables,
940950
)),
941-
ProcessorConfig::MonitoringProcessor => Processor::from(MonitoringProcessor::new(db_pool)),
951+
ProcessorConfig::MonitoringProcessor => {
952+
Processor::from(MonitoringProcessor::new(db_pool, db_row_name))
953+
},
942954
ProcessorConfig::NftMetadataProcessor(config) => {
943955
Processor::from(NftMetadataProcessor::new(db_pool, config.clone()))
944956
},

0 commit comments

Comments
 (0)