Skip to content

Commit 6b0c7ce

Browse files
authored
add parquet_token_v2_processor (#462)
* temp * remove logs * add more metrics * add parquet_token_v2_processor * exclude collection v2 * add struct count map * rebase * lint * use constant * lint
1 parent da88021 commit 6b0c7ce

File tree

12 files changed

+1266
-3
lines changed

12 files changed

+1266
-3
lines changed

rust/processor/src/db/common/models/fungible_asset_models/parquet_v2_fungible_asset_balances.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,14 @@ use allocative_derive::Allocative;
2626
use aptos_protos::transaction::v1::{DeleteResource, WriteResource};
2727
use bigdecimal::{BigDecimal, Zero};
2828
use field_count::FieldCount;
29+
use lazy_static::lazy_static;
2930
use parquet_derive::ParquetRecordWriter;
3031
use serde::{Deserialize, Serialize};
3132

33+
lazy_static! {
34+
pub static ref DEFAULT_AMOUNT_VALUE: String = "0".to_string();
35+
}
36+
3237
#[derive(
3338
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
3439
)]
@@ -147,7 +152,7 @@ impl FungibleAssetBalance {
147152
asset_type: coin_type.clone(),
148153
is_primary: true,
149154
is_frozen: false,
150-
amount: "0".to_string(),
155+
amount: DEFAULT_AMOUNT_VALUE.clone(),
151156
block_timestamp: txn_timestamp,
152157
token_standard: TokenStandard::V1.to_string(),
153158
};

rust/processor/src/db/common/models/token_v2_models/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@ pub mod v2_token_datas;
88
pub mod v2_token_metadata;
99
pub mod v2_token_ownerships;
1010
pub mod v2_token_utils;
11+
12+
// parquet models
13+
// pub mod parquet_v2_collections; // revisit this
14+
pub mod parquet_v2_token_datas;
15+
pub mod parquet_v2_token_ownerships;
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
// Copyright © Aptos Foundation
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
// This is required because a diesel macro makes clippy sad
5+
#![allow(clippy::extra_unused_lifetimes)]
6+
#![allow(clippy::unused_unit)]
7+
8+
use crate::{
9+
bq_analytics::generic_parquet_processor::{GetTimeStamp, HasVersion, NamedTable},
10+
db::common::models::{
11+
object_models::v2_object_utils::ObjectAggregatedDataMapping,
12+
token_models::{
13+
collection_datas::CollectionData,
14+
token_utils::{CollectionDataIdType, TokenWriteSet},
15+
tokens::TableHandleToOwner,
16+
},
17+
token_v2_models::{
18+
v2_collections::CreatorFromCollectionTableV1,
19+
v2_token_utils::{TokenStandard, V2TokenResource},
20+
},
21+
},
22+
utils::{database::DbPoolConnection, util::standardize_address},
23+
};
24+
use allocative_derive::Allocative;
25+
use anyhow::Context;
26+
use aptos_protos::transaction::v1::{WriteResource, WriteTableItem};
27+
use bigdecimal::{BigDecimal, Zero};
28+
use diesel::{sql_query, sql_types::Text};
29+
use diesel_async::RunQueryDsl;
30+
use field_count::FieldCount;
31+
use parquet_derive::ParquetRecordWriter;
32+
use serde::{Deserialize, Serialize};
33+
34+
#[derive(
35+
Allocative, Clone, Debug, Default, Deserialize, FieldCount, ParquetRecordWriter, Serialize,
36+
)]
37+
pub struct CollectionV2 {
38+
pub txn_version: i64,
39+
pub write_set_change_index: i64,
40+
pub collection_id: String,
41+
pub creator_address: String,
42+
pub collection_name: String,
43+
pub description: String,
44+
pub uri: String,
45+
pub current_supply: String,
46+
pub max_supply: Option<String>,
47+
pub total_minted_v2: Option<String>,
48+
pub mutable_description: Option<bool>,
49+
pub mutable_uri: Option<bool>,
50+
pub table_handle_v1: Option<String>,
51+
pub token_standard: String,
52+
#[allocative(skip)]
53+
pub block_timestamp: chrono::NaiveDateTime,
54+
}
55+
56+
impl NamedTable for CollectionV2 {
57+
const TABLE_NAME: &'static str = "collection_v2";
58+
}
59+
60+
impl HasVersion for CollectionV2 {
61+
fn version(&self) -> i64 {
62+
self.txn_version
63+
}
64+
}
65+
66+
impl GetTimeStamp for CollectionV2 {
67+
fn get_timestamp(&self) -> chrono::NaiveDateTime {
68+
self.block_timestamp
69+
}
70+
}
71+
72+
impl CollectionV2 {
73+
pub fn get_v2_from_write_resource(
74+
write_resource: &WriteResource,
75+
txn_version: i64,
76+
write_set_change_index: i64,
77+
txn_timestamp: chrono::NaiveDateTime,
78+
object_metadatas: &ObjectAggregatedDataMapping,
79+
) -> anyhow::Result<Option<Self>> {
80+
let type_str = crate::db::common::models::default_models::move_resources::MoveResource::get_outer_type_from_write_resource(write_resource);
81+
if !V2TokenResource::is_resource_supported(type_str.as_str()) {
82+
return Ok(None);
83+
}
84+
let resource = crate::db::common::models::default_models::move_resources::MoveResource::from_write_resource(
85+
write_resource,
86+
0, // Placeholder, this isn't used anyway
87+
txn_version,
88+
0, // Placeholder, this isn't used anyway
89+
);
90+
91+
if let V2TokenResource::Collection(inner) = &V2TokenResource::from_resource(
92+
&type_str,
93+
resource.data.as_ref().unwrap(),
94+
txn_version,
95+
)? {
96+
let (mut current_supply, mut max_supply, mut total_minted_v2) =
97+
(BigDecimal::zero(), None, None);
98+
let (mut mutable_description, mut mutable_uri) = (None, None);
99+
if let Some(object_data) = object_metadatas.get(&resource.address) {
100+
// Getting supply data (prefer fixed supply over unlimited supply although they should never appear at the same time anyway)
101+
let fixed_supply = object_data.fixed_supply.as_ref();
102+
let unlimited_supply = object_data.unlimited_supply.as_ref();
103+
if let Some(supply) = unlimited_supply {
104+
(current_supply, max_supply, total_minted_v2) = (
105+
supply.current_supply.clone(),
106+
None,
107+
Some(supply.total_minted.clone()),
108+
);
109+
}
110+
if let Some(supply) = fixed_supply {
111+
(current_supply, max_supply, total_minted_v2) = (
112+
supply.current_supply.clone(),
113+
Some(supply.max_supply.clone()),
114+
Some(supply.total_minted.clone()),
115+
);
116+
}
117+
118+
// Aggregator V2 enables a separate struct for supply
119+
let concurrent_supply = object_data.concurrent_supply.as_ref();
120+
if let Some(supply) = concurrent_supply {
121+
(current_supply, max_supply, total_minted_v2) = (
122+
supply.current_supply.value.clone(),
123+
if supply.current_supply.max_value == u64::MAX.into() {
124+
None
125+
} else {
126+
Some(supply.current_supply.max_value.clone())
127+
},
128+
Some(supply.total_minted.value.clone()),
129+
);
130+
}
131+
132+
// Getting collection mutability config from AptosCollection
133+
let collection = object_data.aptos_collection.as_ref();
134+
if let Some(collection) = collection {
135+
mutable_description = Some(collection.mutable_description);
136+
mutable_uri = Some(collection.mutable_uri);
137+
}
138+
} else {
139+
// ObjectCore should not be missing, returning from entire function early
140+
return Ok(None);
141+
}
142+
143+
let collection_id = resource.address.clone();
144+
let creator_address = inner.get_creator_address();
145+
let collection_name = inner.get_name_trunc();
146+
let description = inner.description.clone();
147+
let uri = inner.get_uri_trunc();
148+
149+
Ok(Some(Self {
150+
txn_version,
151+
write_set_change_index,
152+
collection_id: collection_id.clone(),
153+
creator_address: creator_address.clone(),
154+
collection_name: collection_name.clone(),
155+
description: description.clone(),
156+
uri: uri.clone(),
157+
current_supply: current_supply.to_string(),
158+
max_supply: Some(max_supply.clone().unwrap().clone().to_string()),
159+
total_minted_v2: Some(total_minted_v2.clone().unwrap().clone().to_string()),
160+
mutable_description,
161+
mutable_uri,
162+
table_handle_v1: None,
163+
token_standard: TokenStandard::V2.to_string(),
164+
block_timestamp: txn_timestamp,
165+
}))
166+
} else {
167+
Ok(None)
168+
}
169+
}
170+
171+
pub async fn get_v1_from_write_table_item(
172+
table_item: &WriteTableItem,
173+
txn_version: i64,
174+
write_set_change_index: i64,
175+
txn_timestamp: chrono::NaiveDateTime,
176+
table_handle_to_owner: &TableHandleToOwner,
177+
conn: &mut DbPoolConnection<'_>,
178+
query_retries: u32,
179+
query_retry_delay_ms: u64,
180+
) -> anyhow::Result<Option<Self>> {
181+
let table_item_data = table_item.data.as_ref().unwrap();
182+
183+
let maybe_collection_data = match TokenWriteSet::from_table_item_type(
184+
table_item_data.value_type.as_str(),
185+
&table_item_data.value,
186+
txn_version,
187+
)? {
188+
Some(TokenWriteSet::CollectionData(inner)) => Some(inner),
189+
_ => None,
190+
};
191+
if let Some(collection_data) = maybe_collection_data {
192+
let table_handle = table_item.handle.to_string();
193+
let maybe_creator_address = table_handle_to_owner
194+
.get(&standardize_address(&table_handle))
195+
.map(|table_metadata| table_metadata.get_owner_address());
196+
let mut creator_address = match maybe_creator_address {
197+
Some(ca) => ca,
198+
None => {
199+
match Self::get_collection_creator_for_v1(
200+
conn,
201+
&table_handle,
202+
query_retries,
203+
query_retry_delay_ms,
204+
)
205+
.await
206+
.context(format!(
207+
"Failed to get collection creator for table handle {}, txn version {}",
208+
table_handle, txn_version
209+
)) {
210+
Ok(ca) => ca,
211+
Err(_) => {
212+
// Try our best by getting from the older collection data
213+
match CollectionData::get_collection_creator(
214+
conn,
215+
&table_handle,
216+
query_retries,
217+
query_retry_delay_ms,
218+
)
219+
.await
220+
{
221+
Ok(creator) => creator,
222+
Err(_) => {
223+
tracing::error!(
224+
transaction_version = txn_version,
225+
lookup_key = &table_handle,
226+
"Failed to get collection v2 creator for table handle. You probably should backfill db."
227+
);
228+
return Ok(None);
229+
},
230+
}
231+
},
232+
}
233+
},
234+
};
235+
creator_address = standardize_address(&creator_address);
236+
let collection_id_struct =
237+
CollectionDataIdType::new(creator_address, collection_data.get_name().to_string());
238+
let collection_id = collection_id_struct.to_id();
239+
let collection_name = collection_data.get_name_trunc();
240+
let uri = collection_data.get_uri_trunc();
241+
242+
Ok(Some(Self {
243+
txn_version,
244+
write_set_change_index,
245+
collection_id: collection_id.clone(),
246+
creator_address: collection_id_struct.creator.clone(),
247+
collection_name: collection_name.clone(),
248+
description: collection_data.description.clone(),
249+
uri: uri.clone(),
250+
current_supply: collection_data.supply.to_string(),
251+
max_supply: Some(collection_data.maximum.to_string()),
252+
total_minted_v2: None,
253+
mutable_uri: Some(collection_data.mutability_config.uri),
254+
mutable_description: Some(collection_data.mutability_config.description),
255+
table_handle_v1: Some(table_handle.clone()),
256+
token_standard: TokenStandard::V1.to_string(),
257+
block_timestamp: txn_timestamp,
258+
}))
259+
} else {
260+
Ok(None)
261+
}
262+
}
263+
264+
/// If collection data is not in resources of the same transaction, then try looking for it in the database. Since collection owner
265+
/// cannot change, we can just look in the current_collection_datas table.
266+
/// Retrying a few times since this collection could've been written in a separate thread.
267+
async fn get_collection_creator_for_v1(
268+
conn: &mut DbPoolConnection<'_>,
269+
table_handle: &str,
270+
query_retries: u32,
271+
query_retry_delay_ms: u64,
272+
) -> anyhow::Result<String> {
273+
let mut tried = 0;
274+
while tried < query_retries {
275+
tried += 1;
276+
match Self::get_by_table_handle(conn, table_handle).await {
277+
Ok(creator) => return Ok(creator),
278+
Err(_) => {
279+
if tried < query_retries {
280+
tokio::time::sleep(std::time::Duration::from_millis(query_retry_delay_ms))
281+
.await;
282+
}
283+
},
284+
}
285+
}
286+
Err(anyhow::anyhow!("Failed to get collection creator"))
287+
}
288+
289+
/// TODO: Change this to a KV store
290+
async fn get_by_table_handle(
291+
conn: &mut DbPoolConnection<'_>,
292+
table_handle: &str,
293+
) -> anyhow::Result<String> {
294+
let mut res: Vec<Option<CreatorFromCollectionTableV1>> = sql_query(
295+
"SELECT creator_address FROM current_collections_v2 WHERE table_handle_v1 = $1",
296+
)
297+
.bind::<Text, _>(table_handle)
298+
.get_results(conn)
299+
.await?;
300+
Ok(res
301+
.pop()
302+
.context("collection result empty")?
303+
.context("collection result null")?
304+
.creator_address)
305+
}
306+
}

0 commit comments

Comments
 (0)