Skip to content

Commit b3875a3

Browse files
authored
handle filtered events and make it null events when there are events size info (#476)
1 parent ee05799 commit b3875a3

File tree

2 files changed

+84
-4
lines changed

2 files changed

+84
-4
lines changed

rust/processor/src/db/common/models/events_models/parquet_events.rs

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,19 @@ use crate::{
99
};
1010
use allocative_derive::Allocative;
1111
use aptos_protos::transaction::v1::{Event as EventPB, EventSizeInfo};
12+
use lazy_static::lazy_static;
1213
use parquet_derive::ParquetRecordWriter;
1314
use serde::{Deserialize, Serialize};
1415

1516
// p99 currently is 303 so using 300 as a safe max length
1617
const EVENT_TYPE_MAX_LENGTH: usize = 300;
18+
const DEFAULT_CREATION_NUMBER: i64 = 0;
19+
const DEFAULT_SEQUENCE_NUMBER: i64 = 0;
20+
lazy_static! {
21+
pub static ref DEFAULT_ACCOUNT_ADDRESS: String = "NULL_ACCOUNT_ADDRESS".to_string();
22+
pub static ref DEFAULT_EVENT_TYPE: String = "NULL_EVENT_TYPE".to_string();
23+
pub static ref DEFAULT_EVENT_DATA: String = "NULL_EVENT_DATA".to_string();
24+
}
1725

1826
#[derive(Allocative, Clone, Debug, Default, Deserialize, ParquetRecordWriter, Serialize)]
1927
pub struct Event {
@@ -78,6 +86,31 @@ impl Event {
7886
}
7987
}
8088

89+
// This function is added to handle the txn with events filtered, but event_size_info is not filtered.
90+
pub fn from_null_event(
91+
txn_version: i64,
92+
block_height: i64,
93+
event_index: i64,
94+
size_info: &EventSizeInfo,
95+
block_timestamp: chrono::NaiveDateTime,
96+
) -> Self {
97+
Event {
98+
account_address: DEFAULT_ACCOUNT_ADDRESS.clone(),
99+
creation_number: DEFAULT_CREATION_NUMBER,
100+
sequence_number: DEFAULT_SEQUENCE_NUMBER,
101+
txn_version,
102+
block_height,
103+
event_type: DEFAULT_EVENT_TYPE.clone(),
104+
data: DEFAULT_EVENT_DATA.clone(),
105+
event_index,
106+
indexed_type: truncate_str(&DEFAULT_EVENT_TYPE, EVENT_TYPE_MAX_LENGTH),
107+
type_tag_bytes: size_info.type_tag_bytes as i64,
108+
total_bytes: size_info.total_bytes as i64,
109+
event_version: 1i8, // this is for future proofing. TODO: change when events v2 comes
110+
block_timestamp,
111+
}
112+
}
113+
81114
pub fn from_events(
82115
events: &[EventPB],
83116
txn_version: i64,
@@ -86,22 +119,29 @@ impl Event {
86119
block_timestamp: chrono::NaiveDateTime,
87120
is_user_txn_type: bool,
88121
) -> Vec<Self> {
122+
let mut temp_events = events.to_vec();
123+
89124
// event_size_info will be used for user transactions only, no promises for other transactions.
90125
// If event_size_info is missing due to fewer or no events, it defaults to 0.
91126
// No need to backfill as event_size_info is primarily for debugging user transactions.
92-
if events.len() != event_size_info.len() {
127+
if temp_events.len() != event_size_info.len() {
93128
tracing::warn!(
94129
events_len = events.len(),
95130
event_size_info_len = event_size_info.len(),
96131
txn_version,
97132
"Length mismatch: events size does not match event_size_info size.",
98133
);
99134
if is_user_txn_type {
100-
panic!("Event size info is missing for user transaction.")
135+
return handle_user_txn_type(
136+
&mut temp_events,
137+
txn_version,
138+
event_size_info,
139+
block_timestamp,
140+
block_height,
141+
);
101142
}
102143
}
103-
104-
events
144+
temp_events
105145
.iter()
106146
.enumerate()
107147
.map(|(index, event)| {
@@ -122,4 +162,43 @@ impl Event {
122162
}
123163
}
124164

165+
fn handle_user_txn_type(
166+
temp_events: &mut Vec<EventPB>,
167+
txn_version: i64,
168+
event_size_info: &[EventSizeInfo],
169+
block_timestamp: chrono::NaiveDateTime,
170+
block_height: i64,
171+
) -> Vec<Event> {
172+
if event_size_info.is_empty() {
173+
tracing::error!(
174+
txn_version,
175+
"Event size info is missing for user transaction."
176+
);
177+
panic!("Event size info is missing for user transaction.");
178+
}
179+
// Add default events to temp_events until its length matches event_size_info length
180+
tracing::info!(
181+
txn_version,
182+
"Events are empty but event_size_info is not empty."
183+
);
184+
temp_events.resize(event_size_info.len(), EventPB::default());
185+
temp_events
186+
.iter()
187+
.enumerate()
188+
.map(|(index, _event)| {
189+
let size_info = event_size_info.get(index).unwrap_or(&EventSizeInfo {
190+
type_tag_bytes: 0,
191+
total_bytes: 0,
192+
});
193+
Event::from_null_event(
194+
txn_version,
195+
block_height,
196+
index as i64,
197+
size_info,
198+
block_timestamp,
199+
)
200+
})
201+
.collect()
202+
}
203+
125204
pub type ParquetEventModel = Event;

rust/processor/src/gap_detectors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ pub async fn create_gap_detector_status_tracker_loop(
155155
{
156156
tracing::info!(
157157
last_processed_version = res.next_version_to_process,
158+
processor_name,
158159
"Updating last processed version"
159160
);
160161
processor

0 commit comments

Comments
 (0)