Skip to content

Commit fe207f5

Browse files
authored
add parquet txn metadata processor that handles write set size info (#461)
* temp * remove logs * add more metrics * add parquet txn metadata processor that handles write set size info * rebase * add block_timestamp to write_set_size_info table
1 parent a3a16af commit fe207f5

File tree

8 files changed

+237
-0
lines changed

8 files changed

+237
-0
lines changed

rust/processor/src/db/common/models/transaction_metadata_model/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44
pub mod event_size_info;
55
pub mod transaction_size_info;
66
pub mod write_set_size_info;
7+
8+
// parquet models
9+
pub mod parquet_write_set_size_info;
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#![allow(clippy::extra_unused_lifetimes)]
5+
6+
use crate::bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable};
7+
use allocative_derive::Allocative;
8+
use aptos_protos::transaction::v1::WriteOpSizeInfo;
9+
use field_count::FieldCount;
10+
use parquet_derive::ParquetRecordWriter;
11+
use serde::{Deserialize, Serialize};
12+
13+
#[derive(
14+
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
15+
)]
16+
pub struct WriteSetSize {
17+
pub txn_version: i64,
18+
pub change_index: i64,
19+
pub key_bytes: i64,
20+
pub value_bytes: i64,
21+
pub total_bytes: i64,
22+
#[allocative(skip)]
23+
pub block_timestamp: chrono::NaiveDateTime,
24+
}
25+
26+
impl NamedTable for WriteSetSize {
27+
const TABLE_NAME: &'static str = "write_set_size";
28+
}
29+
30+
impl HasVersion for WriteSetSize {
31+
fn version(&self) -> i64 {
32+
self.txn_version
33+
}
34+
}
35+
36+
impl GetTimeStamp for WriteSetSize {
37+
fn get_timestamp(&self) -> chrono::NaiveDateTime {
38+
self.block_timestamp
39+
}
40+
}
41+
42+
impl WriteSetSize {
43+
pub fn from_transaction_info(
44+
info: &WriteOpSizeInfo,
45+
txn_version: i64,
46+
change_index: i64,
47+
block_timestamp: chrono::NaiveDateTime,
48+
) -> Self {
49+
WriteSetSize {
50+
txn_version,
51+
change_index,
52+
key_bytes: info.key_bytes as i64,
53+
value_bytes: info.value_bytes as i64,
54+
total_bytes: info.key_bytes as i64 + info.value_bytes as i64,
55+
block_timestamp,
56+
}
57+
}
58+
}

rust/processor/src/processors/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ use crate::{
4040
parquet_fungible_asset_processor::{
4141
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
4242
},
43+
parquet_transaction_metadata_processor::{
44+
ParquetTransactionMetadataProcessor, ParquetTransactionMetadataProcessorConfig,
45+
},
4346
},
4447
schema::processor_status,
4548
utils::{
@@ -194,6 +197,7 @@ pub enum ProcessorConfig {
194197
UserTransactionProcessor,
195198
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
196199
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
200+
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
197201
}
198202

199203
impl ProcessorConfig {
@@ -208,6 +212,7 @@ impl ProcessorConfig {
208212
self,
209213
ProcessorConfig::ParquetDefaultProcessor(_)
210214
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
215+
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
211216
)
212217
}
213218
}
@@ -244,6 +249,7 @@ pub enum Processor {
244249
UserTransactionProcessor,
245250
ParquetDefaultProcessor,
246251
ParquetFungibleAssetProcessor,
252+
ParquetTransactionMetadataProcessor,
247253
}
248254

249255
#[cfg(test)]

rust/processor/src/processors/parquet_processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33
pub mod parquet_default_processor;
44

55
pub mod parquet_fungible_asset_processor;
6+
pub mod parquet_transaction_metadata_processor;
67

78
pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";
89

rust/processor/src/processors/parquet_processors/parquet_default_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct ParquetDefaultProcessorConfig {
4141
pub max_buffer_size: usize,
4242
pub parquet_upload_interval: u64,
4343
}
44+
4445
impl UploadIntervalConfig for ParquetDefaultProcessorConfig {
4546
fn parquet_upload_interval_in_secs(&self) -> Duration {
4647
Duration::from_secs(self.parquet_upload_interval)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{
5+
bq_analytics::{
6+
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
7+
ParquetProcessingResult,
8+
},
9+
db::common::models::transaction_metadata_model::parquet_write_set_size_info::WriteSetSize,
10+
gap_detectors::ProcessingResult,
11+
processors::{
12+
parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS},
13+
ProcessorName, ProcessorTrait,
14+
},
15+
utils::{database::ArcDbPool, util::parse_timestamp},
16+
};
17+
use ahash::AHashMap;
18+
use anyhow::Context;
19+
use aptos_protos::transaction::v1::Transaction;
20+
use async_trait::async_trait;
21+
use kanal::AsyncSender;
22+
use serde::{Deserialize, Serialize};
23+
use std::{fmt::Debug, time::Duration};
24+
use tracing::warn;
25+
26+
#[derive(Clone, Debug, Deserialize, Serialize)]
27+
#[serde(deny_unknown_fields)]
28+
pub struct ParquetTransactionMetadataProcessorConfig {
29+
pub google_application_credentials: Option<String>,
30+
pub bucket_name: String,
31+
pub bucket_root: String,
32+
pub parquet_handler_response_channel_size: usize,
33+
pub max_buffer_size: usize,
34+
pub parquet_upload_interval: u64,
35+
}
36+
37+
impl UploadIntervalConfig for ParquetTransactionMetadataProcessorConfig {
38+
fn parquet_upload_interval_in_secs(&self) -> Duration {
39+
Duration::from_secs(self.parquet_upload_interval)
40+
}
41+
}
42+
43+
pub struct ParquetTransactionMetadataProcessor {
44+
connection_pool: ArcDbPool,
45+
write_set_size_info_sender: AsyncSender<ParquetDataGeneric<WriteSetSize>>,
46+
}
47+
48+
impl ParquetTransactionMetadataProcessor {
49+
pub fn new(
50+
connection_pool: ArcDbPool,
51+
config: ParquetTransactionMetadataProcessorConfig,
52+
new_gap_detector_sender: AsyncSender<ProcessingResult>,
53+
) -> Self {
54+
if let Some(credentials) = config.google_application_credentials.clone() {
55+
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
56+
}
57+
58+
let write_set_size_info_sender = create_parquet_handler_loop::<WriteSetSize>(
59+
new_gap_detector_sender.clone(),
60+
ProcessorName::ParquetTransactionMetadataProcessor.into(),
61+
config.bucket_name.clone(),
62+
config.bucket_root.clone(),
63+
config.parquet_handler_response_channel_size,
64+
config.max_buffer_size,
65+
config.parquet_upload_interval_in_secs(),
66+
);
67+
Self {
68+
connection_pool,
69+
write_set_size_info_sender,
70+
}
71+
}
72+
}
73+
74+
impl Debug for ParquetTransactionMetadataProcessor {
75+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76+
write!(
77+
f,
78+
"ParquetTransactionMetadataProcessor {{ capacity of write set size info channel: {:?} }}",
79+
self.write_set_size_info_sender.capacity(),
80+
)
81+
}
82+
}
83+
84+
#[async_trait]
85+
impl ProcessorTrait for ParquetTransactionMetadataProcessor {
86+
fn name(&self) -> &'static str {
87+
ProcessorName::ParquetTransactionMetadataProcessor.into()
88+
}
89+
90+
async fn process_transactions(
91+
&self,
92+
transactions: Vec<Transaction>,
93+
start_version: u64,
94+
end_version: u64,
95+
_: Option<u64>,
96+
) -> anyhow::Result<ProcessingResult> {
97+
let last_transaction_timestamp = transactions.last().unwrap().timestamp.clone();
98+
let mut transaction_version_to_struct_count: AHashMap<i64, i64> = AHashMap::new();
99+
100+
let mut write_set_sizes = vec![];
101+
102+
for txn in &transactions {
103+
let txn_version = txn.version as i64;
104+
let block_timestamp = parse_timestamp(txn.timestamp.as_ref().unwrap(), txn_version);
105+
let size_info = match txn.size_info.as_ref() {
106+
Some(size_info) => size_info,
107+
None => {
108+
warn!(version = txn.version, "Transaction size info not found");
109+
continue;
110+
},
111+
};
112+
for (index, write_set_size_info) in size_info.write_op_size_info.iter().enumerate() {
113+
write_set_sizes.push(WriteSetSize::from_transaction_info(
114+
write_set_size_info,
115+
txn_version,
116+
index as i64,
117+
block_timestamp,
118+
));
119+
transaction_version_to_struct_count
120+
.entry(txn_version)
121+
.and_modify(|e| *e += 1)
122+
.or_insert(1);
123+
}
124+
}
125+
126+
let write_set_size_info_parquet_data = ParquetDataGeneric {
127+
data: write_set_sizes,
128+
};
129+
130+
self.write_set_size_info_sender
131+
.send(write_set_size_info_parquet_data)
132+
.await
133+
.context("Error sending write set size info to parquet handler")?;
134+
135+
Ok(ProcessingResult::ParquetProcessingResult(
136+
ParquetProcessingResult {
137+
start_version: start_version as i64,
138+
end_version: end_version as i64,
139+
last_transaction_timestamp: last_transaction_timestamp.clone(),
140+
txn_version_to_struct_count: Some(transaction_version_to_struct_count),
141+
parquet_processed_structs: None,
142+
table_name: "".to_string(),
143+
},
144+
))
145+
}
146+
147+
fn connection_pool(&self) -> &ArcDbPool {
148+
&self.connection_pool
149+
}
150+
}

rust/processor/src/utils/counters.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,3 +293,13 @@ pub static PARQUET_BUFFER_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
293293
)
294294
.unwrap()
295295
});
296+
297+
/// Size of parquet buffer after upload
298+
pub static PARQUET_BUFFER_SIZE_AFTER_UPLOAD: Lazy<IntGaugeVec> = Lazy::new(|| {
299+
register_int_gauge_vec!(
300+
"indexer_parquet_size_after_upload",
301+
"Size of Parquet buffer after upload",
302+
&["parquet_type"]
303+
)
304+
.unwrap()
305+
});

rust/processor/src/worker.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::{
2121
parquet_processors::{
2222
parquet_default_processor::ParquetDefaultProcessor,
2323
parquet_fungible_asset_processor::ParquetFungibleAssetProcessor,
24+
parquet_transaction_metadata_processor::ParquetTransactionMetadataProcessor,
2425
},
2526
stake_processor::StakeProcessor,
2627
token_v2_processor::TokenV2Processor,
@@ -959,5 +960,12 @@ pub fn build_processor(
959960
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
960961
))
961962
},
963+
ProcessorConfig::ParquetTransactionMetadataProcessor(config) => {
964+
Processor::from(ParquetTransactionMetadataProcessor::new(
965+
db_pool,
966+
config.clone(),
967+
gap_detector_sender.expect("Parquet processor requires a gap detector sender"),
968+
))
969+
},
962970
}
963971
}

0 commit comments

Comments
 (0)