Skip to content

Commit a553ebd

Browse files
[parquet] Parse fa activities with parquet (#506)
* parse fa activities with parquet * add new processor for fa activities * Yuunlimm/parquet fa migrate (#509) * fix lint --------- Co-authored-by: Yuun Lim <[email protected]>
1 parent e53f4da commit a553ebd

File tree

6 files changed

+664
-0
lines changed

6 files changed

+664
-0
lines changed

rust/processor/src/db/common/models/fungible_asset_models/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ pub mod v2_fungible_metadata;
88

99
// parquet models
1010
pub mod parquet_coin_supply;
11+
pub mod parquet_v2_fungible_asset_activities;
1112
pub mod parquet_v2_fungible_asset_balances;
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// This is required because a diesel macro makes clippy sad
5+
#![allow(clippy::extra_unused_lifetimes)]
6+
#![allow(clippy::unused_unit)]
7+
8+
use super::v2_fungible_asset_utils::{FeeStatement, FungibleAssetEvent};
9+
use crate::{
10+
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
11+
db::common::models::{
12+
coin_models::{
13+
coin_activities::CoinActivity,
14+
coin_utils::{CoinEvent, CoinInfoType, EventGuidResource},
15+
},
16+
object_models::v2_object_utils::ObjectAggregatedDataMapping,
17+
token_v2_models::v2_token_utils::TokenStandard,
18+
},
19+
utils::util::{bigdecimal_to_u64, standardize_address},
20+
};
21+
use ahash::AHashMap;
22+
use allocative::Allocative;
23+
use anyhow::Context;
24+
use aptos_protos::transaction::v1::{Event, TransactionInfo, UserTransactionRequest};
25+
use field_count::FieldCount;
26+
use parquet_derive::ParquetRecordWriter;
27+
use serde::{Deserialize, Serialize};
28+
29+
pub const GAS_FEE_EVENT: &str = "0x1::aptos_coin::GasFeeEvent";
30+
// We will never have a negative number on chain so this will avoid collision in postgres
31+
pub const BURN_GAS_EVENT_CREATION_NUM: i64 = -1;
32+
pub const BURN_GAS_EVENT_INDEX: i64 = -1;
33+
34+
pub type OwnerAddress = String;
35+
pub type CoinType = String;
36+
// Primary key of the current_coin_balances table, i.e. (owner_address, coin_type)
37+
pub type CurrentCoinBalancePK = (OwnerAddress, CoinType);
38+
pub type EventToCoinType = AHashMap<EventGuidResource, CoinType>;
39+
40+
/// TODO: This is just a copy of v2_fungible_asset_activities.rs. We should unify the 2 implementations
41+
/// and have parquet as an output.
42+
#[derive(
43+
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
44+
)]
45+
pub struct FungibleAssetActivity {
46+
pub txn_version: i64,
47+
pub event_index: i64,
48+
pub owner_address: Option<String>,
49+
pub storage_id: String,
50+
pub asset_type: Option<String>,
51+
pub is_frozen: Option<bool>,
52+
pub amount: Option<String>, // it is a string representation of the u128
53+
pub event_type: String,
54+
pub is_gas_fee: bool,
55+
pub gas_fee_payer_address: Option<String>,
56+
pub is_transaction_success: bool,
57+
pub entry_function_id_str: Option<String>,
58+
pub block_height: i64,
59+
pub token_standard: String,
60+
#[allocative(skip)]
61+
pub block_timestamp: chrono::NaiveDateTime,
62+
pub storage_refund_octa: u64,
63+
}
64+
65+
impl NamedTable for FungibleAssetActivity {
66+
const TABLE_NAME: &'static str = "fungible_asset_activities";
67+
}
68+
69+
impl HasVersion for FungibleAssetActivity {
70+
fn version(&self) -> i64 {
71+
self.txn_version
72+
}
73+
}
74+
75+
impl GetTimeStamp for FungibleAssetActivity {
76+
fn get_timestamp(&self) -> chrono::NaiveDateTime {
77+
self.block_timestamp
78+
}
79+
}
80+
81+
impl FungibleAssetActivity {
82+
pub fn get_v2_from_event(
83+
event: &Event,
84+
txn_version: i64,
85+
block_height: i64,
86+
txn_timestamp: chrono::NaiveDateTime,
87+
event_index: i64,
88+
entry_function_id_str: &Option<String>,
89+
object_aggregated_data_mapping: &ObjectAggregatedDataMapping,
90+
) -> anyhow::Result<Option<Self>> {
91+
let event_type = event.type_str.clone();
92+
if let Some(fa_event) =
93+
&FungibleAssetEvent::from_event(event_type.as_str(), &event.data, txn_version)?
94+
{
95+
let (storage_id, is_frozen, amount) = match fa_event {
96+
FungibleAssetEvent::WithdrawEvent(inner) => (
97+
standardize_address(&event.key.as_ref().unwrap().account_address),
98+
None,
99+
Some(inner.amount.to_string()),
100+
),
101+
FungibleAssetEvent::DepositEvent(inner) => (
102+
standardize_address(&event.key.as_ref().unwrap().account_address),
103+
None,
104+
Some(inner.amount.to_string()),
105+
),
106+
FungibleAssetEvent::FrozenEvent(inner) => (
107+
standardize_address(&event.key.as_ref().unwrap().account_address),
108+
Some(inner.frozen),
109+
None,
110+
),
111+
FungibleAssetEvent::WithdrawEventV2(inner) => (
112+
standardize_address(&inner.store),
113+
None,
114+
Some(inner.amount.to_string()),
115+
),
116+
FungibleAssetEvent::DepositEventV2(inner) => (
117+
standardize_address(&inner.store),
118+
None,
119+
Some(inner.amount.to_string()),
120+
),
121+
FungibleAssetEvent::FrozenEventV2(inner) => {
122+
(standardize_address(&inner.store), Some(inner.frozen), None)
123+
},
124+
};
125+
126+
// The event account address will also help us find fungible store which tells us where to find
127+
// the metadata
128+
let maybe_object_metadata = object_aggregated_data_mapping.get(&storage_id);
129+
// The ObjectCore might not exist in the transaction if the object got deleted
130+
let maybe_owner_address = maybe_object_metadata
131+
.map(|metadata| &metadata.object.object_core)
132+
.map(|object_core| object_core.get_owner_address());
133+
// The FungibleStore might not exist in the transaction if it's a secondary store that got burnt
134+
let maybe_asset_type = maybe_object_metadata
135+
.and_then(|metadata| metadata.fungible_asset_store.as_ref())
136+
.map(|fa| fa.metadata.get_reference_address());
137+
138+
return Ok(Some(Self {
139+
txn_version,
140+
event_index,
141+
owner_address: maybe_owner_address,
142+
storage_id: storage_id.clone(),
143+
asset_type: maybe_asset_type,
144+
is_frozen,
145+
amount,
146+
event_type: event_type.clone(),
147+
is_gas_fee: false,
148+
gas_fee_payer_address: None,
149+
is_transaction_success: true,
150+
entry_function_id_str: entry_function_id_str.clone(),
151+
block_height,
152+
token_standard: TokenStandard::V2.to_string(),
153+
block_timestamp: txn_timestamp,
154+
storage_refund_octa: 0,
155+
}));
156+
}
157+
Ok(None)
158+
}
159+
160+
pub fn get_v1_from_event(
161+
event: &Event,
162+
txn_version: i64,
163+
block_height: i64,
164+
block_timestamp: chrono::NaiveDateTime,
165+
entry_function_id_str: &Option<String>,
166+
event_to_coin_type: &EventToCoinType,
167+
event_index: i64,
168+
) -> anyhow::Result<Option<Self>> {
169+
if let Some(inner) =
170+
CoinEvent::from_event(event.type_str.as_str(), &event.data, txn_version)?
171+
{
172+
let (owner_address, amount, coin_type_option) = match inner {
173+
CoinEvent::WithdrawCoinEvent(inner) => (
174+
standardize_address(&event.key.as_ref().unwrap().account_address),
175+
inner.amount.to_string(),
176+
None,
177+
),
178+
CoinEvent::DepositCoinEvent(inner) => (
179+
standardize_address(&event.key.as_ref().unwrap().account_address),
180+
inner.amount.to_string(),
181+
None,
182+
),
183+
};
184+
let coin_type = if let Some(coin_type) = coin_type_option {
185+
coin_type
186+
} else {
187+
let event_key = event.key.as_ref().context("event must have a key")?;
188+
let event_move_guid = EventGuidResource {
189+
addr: standardize_address(event_key.account_address.as_str()),
190+
creation_num: event_key.creation_number as i64,
191+
};
192+
// Given this mapping only contains coin type < 1000 length, we should not assume that the mapping exists.
193+
// If it doesn't exist, skip.
194+
match event_to_coin_type.get(&event_move_guid) {
195+
Some(coin_type) => coin_type.clone(),
196+
None => {
197+
tracing::warn!(
198+
"Could not find event in resources (CoinStore), version: {}, event guid: {:?}, mapping: {:?}",
199+
txn_version, event_move_guid, event_to_coin_type
200+
);
201+
return Ok(None);
202+
},
203+
}
204+
};
205+
206+
let storage_id =
207+
CoinInfoType::get_storage_id(coin_type.as_str(), owner_address.as_str());
208+
209+
Ok(Some(Self {
210+
txn_version,
211+
event_index,
212+
owner_address: Some(owner_address),
213+
storage_id,
214+
asset_type: Some(coin_type),
215+
is_frozen: None,
216+
amount: Some(amount),
217+
event_type: event.type_str.clone(),
218+
is_gas_fee: false,
219+
gas_fee_payer_address: None,
220+
is_transaction_success: true,
221+
entry_function_id_str: entry_function_id_str.clone(),
222+
block_height,
223+
token_standard: TokenStandard::V1.to_string(),
224+
block_timestamp,
225+
storage_refund_octa: 0,
226+
}))
227+
} else {
228+
Ok(None)
229+
}
230+
}
231+
232+
/// Artificially creates a gas event. If it's a fee payer, still show gas event to the sender
233+
/// but with an extra field to indicate the fee payer.
234+
pub fn get_gas_event(
235+
txn_info: &TransactionInfo,
236+
user_transaction_request: &UserTransactionRequest,
237+
entry_function_id_str: &Option<String>,
238+
txn_version: i64,
239+
block_timestamp: chrono::NaiveDateTime,
240+
block_height: i64,
241+
fee_statement: Option<FeeStatement>,
242+
) -> Self {
243+
let v1_activity = CoinActivity::get_gas_event(
244+
txn_info,
245+
user_transaction_request,
246+
entry_function_id_str,
247+
txn_version,
248+
block_timestamp,
249+
block_height,
250+
fee_statement,
251+
);
252+
let storage_id = CoinInfoType::get_storage_id(
253+
v1_activity.coin_type.as_str(),
254+
v1_activity.owner_address.as_str(),
255+
);
256+
Self {
257+
txn_version,
258+
event_index: v1_activity.event_index.unwrap(),
259+
owner_address: Some(v1_activity.owner_address),
260+
storage_id,
261+
asset_type: Some(v1_activity.coin_type),
262+
is_frozen: None,
263+
amount: Some(v1_activity.amount.to_string()),
264+
event_type: v1_activity.activity_type,
265+
is_gas_fee: v1_activity.is_gas_fee,
266+
gas_fee_payer_address: v1_activity.gas_fee_payer_address,
267+
is_transaction_success: v1_activity.is_transaction_success,
268+
entry_function_id_str: v1_activity.entry_function_id_str,
269+
block_height,
270+
token_standard: TokenStandard::V1.to_string(),
271+
block_timestamp,
272+
storage_refund_octa: bigdecimal_to_u64(&v1_activity.storage_refund_amount),
273+
}
274+
}
275+
}

rust/processor/src/processors/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ use crate::{
3939
parquet_ans_processor::{ParquetAnsProcessor, ParquetAnsProcessorConfig},
4040
parquet_default_processor::{ParquetDefaultProcessor, ParquetDefaultProcessorConfig},
4141
parquet_events_processor::{ParquetEventsProcessor, ParquetEventsProcessorConfig},
42+
parquet_fungible_asset_activities_processor::{
43+
ParquetFungibleAssetActivitiesProcessor, ParquetFungibleAssetActivitiesProcessorConfig,
44+
},
4245
parquet_fungible_asset_processor::{
4346
ParquetFungibleAssetProcessor, ParquetFungibleAssetProcessorConfig,
4447
},
@@ -99,6 +102,7 @@ pub trait ProcessorTrait: Send + Sync + Debug {
99102

100103
/// Gets the connection.
101104
/// If it was unable to do so (default timeout: 30s), it will keep retrying until it can.
105+
#[allow(unknown_lints)]
102106
#[allow(elided_named_lifetimes)]
103107
async fn get_conn(&self) -> DbPoolConnection {
104108
let pool = self.connection_pool();
@@ -201,6 +205,7 @@ pub enum ProcessorConfig {
201205
TransactionMetadataProcessor,
202206
UserTransactionProcessor,
203207
ParquetDefaultProcessor(ParquetDefaultProcessorConfig),
208+
ParquetFungibleAssetActivitiesProcessor(ParquetFungibleAssetActivitiesProcessorConfig),
204209
ParquetFungibleAssetProcessor(ParquetFungibleAssetProcessorConfig),
205210
ParquetTransactionMetadataProcessor(ParquetTransactionMetadataProcessorConfig),
206211
ParquetAnsProcessor(ParquetAnsProcessorConfig),
@@ -224,6 +229,7 @@ impl ProcessorConfig {
224229
| ProcessorConfig::ParquetAnsProcessor(_)
225230
| ProcessorConfig::ParquetEventsProcessor(_)
226231
| ProcessorConfig::ParquetTokenV2Processor(_)
232+
| ProcessorConfig::ParquetFungibleAssetActivitiesProcessor(_)
227233
)
228234
}
229235
}
@@ -262,6 +268,7 @@ pub enum Processor {
262268
UserTransactionProcessor,
263269
// Parquet processors
264270
ParquetDefaultProcessor,
271+
ParquetFungibleAssetActivitiesProcessor,
265272
ParquetFungibleAssetProcessor,
266273
ParquetTransactionMetadataProcessor,
267274
ParquetAnsProcessor,

rust/processor/src/processors/parquet_processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33
pub mod parquet_ans_processor;
44
pub mod parquet_default_processor;
55
pub mod parquet_events_processor;
6+
pub mod parquet_fungible_asset_activities_processor;
67
pub mod parquet_fungible_asset_processor;
78
pub mod parquet_token_v2_processor;
89
pub mod parquet_transaction_metadata_processor;

0 commit comments

Comments
 (0)