Skip to content

Commit da88021

Browse files
authored
fix parquet event processor to handle miss-matching events and event … (#474)
* fix parquet event processor to handle miss-matching events and event size info * lint
1 parent 7c65e51 commit da88021

File tree

2 files changed

+19
-7
lines changed

2 files changed

+19
-7
lines changed

rust/processor/src/db/common/models/events_models/parquet_events.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use crate::{
99
};
1010
use allocative_derive::Allocative;
1111
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
12-
use itertools::Itertools;
1312
use parquet_derive::ParquetRecordWriter;
1413
use serde::{Deserialize, Serialize};
1514

@@ -85,23 +84,31 @@ impl Event {
8584
block_height: i64,
8685
event_size_info: &[EventSizeInfo],
8786
block_timestamp: chrono::NaiveDateTime,
87+
is_user_txn_type: bool,
8888
) -> Vec<Self> {
89-
// Ensure that lengths match, otherwise log and panic to investigate
89+
// event_size_info will be used for user transactions only, no promises for other transactions.
90+
// If event_size_info is missing due to fewer or no events, it defaults to 0.
91+
// No need to backfill as event_size_info is primarily for debugging user transactions.
9092
if events.len() != event_size_info.len() {
91-
tracing::error!(
93+
tracing::warn!(
9294
events_len = events.len(),
9395
event_size_info_len = event_size_info.len(),
9496
txn_version,
9597
"Length mismatch: events size does not match event_size_info size.",
9698
);
97-
panic!("Length mismatch: events len does not match event_size_info len");
99+
if is_user_txn_type {
100+
panic!("Event size info is missing for user transaction.")
101+
}
98102
}
99103

100104
events
101105
.iter()
102-
.zip_eq(event_size_info.iter())
103106
.enumerate()
104-
.map(|(index, (event, size_info))| {
107+
.map(|(index, event)| {
108+
let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo {
109+
type_tag_bytes: 0,
110+
total_bytes: 0,
111+
});
105112
Self::from_event(
106113
event,
107114
txn_version,

rust/processor/src/processors/parquet_processors/parquet_events_processor.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,14 @@ impl ProcessorTrait for ParquetEventsProcessor {
119119
},
120120
};
121121
let default = vec![];
122+
let mut is_user_txn_type = false;
122123
let raw_events = match txn_data {
123124
TxnData::BlockMetadata(tx_inner) => &tx_inner.events,
124125
TxnData::Genesis(tx_inner) => &tx_inner.events,
125-
TxnData::User(tx_inner) => &tx_inner.events,
126+
TxnData::User(tx_inner) => {
127+
is_user_txn_type = true;
128+
&tx_inner.events
129+
},
126130
TxnData::Validator(txn) => &txn.events,
127131
_ => &default,
128132
};
@@ -133,6 +137,7 @@ impl ProcessorTrait for ParquetEventsProcessor {
133137
block_height,
134138
size_info.event_size_info.as_slice(),
135139
block_timestamp,
140+
is_user_txn_type,
136141
);
137142
transaction_version_to_struct_count
138143
.entry(txn_version)

0 commit comments

Comments
 (0)