Skip to content

Commit 7c65e51

Browse files
authored
Add Parquet Events Processor (#451)
* temp * remove logs * add more metrics * Add Parquet Events Processor * add validator txn handling logi * use context * change variable name * rebase * add event_version for events v2 future proofing * add block_timestamp to events * fix conflicts * handle validator txn event to have default size of 0 * rebase * lint * add logs when event size and event size info size don't match
1 parent b11e05a commit 7c65e51

File tree

10 files changed

+319
-27
lines changed

10 files changed

+319
-27
lines changed

rust/processor/src/db/common/models/events_models/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,6 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
pub mod events;
5+
6+
// parquet model
7+
pub mod parquet_events;
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#![allow(clippy::extra_unused_lifetimes)]
5+
6+
use crate::{
7+
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
8+
utils::util::{standardize_address, truncate_str},
9+
};
10+
use allocative_derive::Allocative;
11+
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
12+
use itertools::Itertools;
13+
use parquet_derive::ParquetRecordWriter;
14+
use serde::{Deserialize, Serialize};
15+
16+
// p99 currently is 303 so using 300 as a safe max length
17+
const EVENT_TYPE_MAX_LENGTH: usize = 300;
18+
19+
#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
20+
pub struct Event {
21+
pub txn_version: i64,
22+
pub account_address: String,
23+
pub sequence_number: i64,
24+
pub creation_number: i64,
25+
pub block_height: i64,
26+
pub event_type: String,
27+
pub data: String,
28+
pub event_index: i64,
29+
pub indexed_type: String,
30+
pub type_tag_bytes: i64,
31+
pub total_bytes: i64,
32+
pub event_version: i8,
33+
#[allocative(skip)]
34+
pub block_timestamp: chrono::NaiveDateTime,
35+
}
36+
37+
impl NamedTable for Event {
38+
const TABLE_NAME: &'static str = "events";
39+
}
40+
41+
impl HasVersion for Event {
42+
fn version(&self) -> i64 {
43+
self.txn_version
44+
}
45+
}
46+
47+
impl GetTimeStamp for Event {
48+
fn get_timestamp(&self) -> chrono::NaiveDateTime {
49+
self.block_timestamp
50+
}
51+
}
52+
53+
impl Event {
54+
pub fn from_event(
55+
event: &EventPB,
56+
txn_version: i64,
57+
block_height: i64,
58+
event_index: i64,
59+
size_info: &EventSizeInfo,
60+
block_timestamp: chrono::NaiveDateTime,
61+
) -> Self {
62+
let event_type: &str = event.type_str.as_ref();
63+
Event {
64+
account_address: standardize_address(
65+
event.key.as_ref().unwrap().account_address.as_str(),
66+
),
67+
creation_number: event.key.as_ref().unwrap().creation_number as i64,
68+
sequence_number: event.sequence_number as i64,
69+
txn_version,
70+
block_height,
71+
event_type: event_type.to_string(),
72+
data: event.data.clone(),
73+
event_index,
74+
indexed_type: truncate_str(event_type, EVENT_TYPE_MAX_LENGTH),
75+
type_tag_bytes: size_info.type_tag_bytes as i64,
76+
total_bytes: size_info.total_bytes as i64,
77+
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
78+
block_timestamp,
79+
}
80+
}
81+
82+
pub fn from_events(
83+
events: &[EventPB],
84+
txn_version: i64,
85+
block_height: i64,
86+
event_size_info: &[EventSizeInfo],
87+
block_timestamp: chrono::NaiveDateTime,
88+
) -> Vec<Self> {
89+
// Ensure that lengths match, otherwise log and panic to investigate
90+
if events.len() != event_size_info.len() {
91+
tracing::error!(
92+
events_len = events.len(),
93+
event_size_info_len = event_size_info.len(),
94+
txn_version,
95+
"Length mismatch: events size does not match event_size_info size.",
96+
);
97+
panic!("Length mismatch: events len does not match event_size_info len");
98+
}
99+
100+
events
101+
.iter()
102+
.zip_eq(event_size_info.iter())
103+
.enumerate()
104+
.map(|(index, (event, size_info))| {
105+
Self::from_event(
106+
event,
107+
txn_version,
108+
block_height,
109+
index as i64,
110+
size_info,
111+
block_timestamp,
112+
)
113+
})
114+
.collect::<Vec<ParquetEventModel>>()
115+
}
116+
}
117+
118+
pub type ParquetEventModel = Event;

rust/processor/src/processors/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use crate::{
3838
processors::parquet_processors::{
3939
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
4040
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
41+
parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig},
4142
parquet_fungible_asset_processor::{
4243
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
4344
},
@@ -200,6 +201,7 @@ pub enum ProcessorConfig {
200201
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
201202
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
202203
ParquetAnsProcessor(ParquetAnsProcessorConfig),
204+
ParquetEventsProcessor(ParquetEventsProcessorConfig),
203205
}
204206

205207
impl ProcessorConfig {
@@ -216,6 +218,7 @@ impl ProcessorConfig {
216218
| ProcessorConfig::ParquetFungibleAssetProcessor(_)
217219
| ProcessorConfig::ParquetTransactionMetadataProcessor(_)
218220
| ProcessorConfig::ParquetAnsProcessor(_)
221+
| ProcessorConfig::ParquetEventsProcessor(_)
219222
)
220223
}
221224
}
@@ -250,10 +253,12 @@ pub enum Processor {
250253
TokenV2Processor,
251254
TransactionMetadataProcessor,
252255
UserTransactionProcessor,
256+
// Parquet processors
253257
ParquetDefaultProcessor,
254258
ParquetFungibleAssetProcessor,
255259
ParquetTransactionMetadataProcessor,
256260
ParquetAnsProcessor,
261+
ParquetEventsProcessor,
257262
}
258263

259264
#[cfg(test)]

rust/processor/src/processors/parquet_processors/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,18 @@ use std::time::Duration;
22

33
pub mod parquet_ans_processor;
44
pub mod parquet_default_processor;
5+
pub mod parquet_events_processor;
56
pub mod parquet_fungible_asset_processor;
67
pub mod parquet_transaction_metadata_processor;
78

89
pub const GOOGLE_APPLICATION_CREDENTIALS: &str = "GOOGLE_APPLICATION_CREDENTIALS";
910

10-
pub trait UploadIntervalConfig {
11+
pub trait ParquetProcessorTrait {
1112
fn parquet_upload_interval_in_secs(&self) -> Duration;
13+
14+
fn set_google_credentials(&self, credentials: Option<String>) {
15+
if let Some(credentials) = credentials {
16+
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
17+
}
18+
}
1219
}

rust/processor/src/processors/parquet_processors/parquet_ans_processor.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use super::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS};
4+
use super::ParquetProcessorTrait;
55
use crate::{
66
bq_analytics::{
77
create_parquet_handler_loop, generic_parquet_processor::ParquetDataGeneric,
@@ -40,7 +40,7 @@ pub struct ParquetAnsProcessorConfig {
4040
pub parquet_upload_interval: u64,
4141
}
4242

43-
impl UploadIntervalConfig for ParquetAnsProcessorConfig {
43+
impl ParquetProcessorTrait for ParquetAnsProcessorConfig {
4444
fn parquet_upload_interval_in_secs(&self) -> Duration {
4545
Duration::from_secs(self.parquet_upload_interval)
4646
}
@@ -58,9 +58,7 @@ impl ParquetAnsProcessor {
5858
config: ParquetAnsProcessorConfig,
5959
new_gap_detector_sender: AsyncSender<ProcessingResult>,
6060
) -> Self {
61-
if let Some(credentials) = config.google_application_credentials.clone() {
62-
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
63-
}
61+
config.set_google_credentials(config.google_application_credentials.clone());
6462

6563
let ans_primary_name_v2_sender = create_parquet_handler_loop::<AnsPrimaryNameV2>(
6664
new_gap_detector_sender.clone(),

rust/processor/src/processors/parquet_processors/parquet_default_processor.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ use crate::{
1414
parquet_write_set_changes::{WriteSetChangeDetail, WriteSetChangeModel},
1515
},
1616
gap_detectors::ProcessingResult,
17-
processors::{
18-
parquet_processors::{UploadIntervalConfig, GOOGLE_APPLICATION_CREDENTIALS},
19-
ProcessorName, ProcessorTrait,
20-
},
17+
processors::{parquet_processors::ParquetProcessorTrait, ProcessorName, ProcessorTrait},
2118
utils::database::ArcDbPool,
2219
};
2320
use ahash::AHashMap;
@@ -42,7 +39,7 @@ pub struct ParquetDefaultProcessorConfig {
4239
pub parquet_upload_interval: u64,
4340
}
4441

45-
impl UploadIntervalConfig for ParquetDefaultProcessorConfig {
42+
impl ParquetProcessorTrait for ParquetDefaultProcessorConfig {
4643
fn parquet_upload_interval_in_secs(&self) -> Duration {
4744
Duration::from_secs(self.parquet_upload_interval)
4845
}
@@ -66,9 +63,7 @@ impl ParquetDefaultProcessor {
6663
config: ParquetDefaultProcessorConfig,
6764
new_gap_detector_sender: AsyncSender<ProcessingResult>,
6865
) -> Self {
69-
if let Some(credentials) = config.google_application_credentials.clone() {
70-
std::env::set_var(GOOGLE_APPLICATION_CREDENTIALS, credentials);
71-
}
66+
config.set_google_credentials(config.google_application_credentials.clone());
7267

7368
let transaction_sender = create_parquet_handler_loop::<ParquetTransaction>(
7469
new_gap_detector_sender.clone(),

0 commit comments

Comments
 (0)