Skip to content

Commit fc65c59

Browse files
authored
[1/2][FA migration] Drop generated fields and add KS for unified (#493)
* add migration files * add killswitch to unified balance * lint * lint more * lint
1 parent c16b31b commit fc65c59

File tree

9 files changed

+62
-32
lines changed

9 files changed

+62
-32
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Your SQL goes here
2+
-- removing generated fields because we're redoing them
3+
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
4+
ADD COLUMN IF NOT EXISTS asset_type VARCHAR(1000) GENERATED ALWAYS AS (COALESCE(asset_type_v2, asset_type_v1)) STORED;
5+
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed
6+
ADD COLUMN IF NOT EXISTS token_standard VARCHAR(10) GENERATED ALWAYS AS (
7+
CASE
8+
WHEN asset_type_v2 IS NOT NULL THEN 'v2'
9+
ELSE 'v1'
10+
END
11+
) STORED;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- Your SQL goes here
2+
-- removing generated fields because we're redoing them
3+
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS asset_type;
4+
ALTER TABLE current_unified_fungible_asset_balances_to_be_renamed DROP COLUMN IF EXISTS token_standard;

rust/processor/src/db/postgres/schema.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,6 @@ diesel::table! {
673673
last_transaction_timestamp_v2 -> Nullable<Timestamp>,
674674
last_transaction_timestamp -> Nullable<Timestamp>,
675675
inserted_at -> Timestamp,
676-
#[max_length = 1000]
677-
asset_type -> Nullable<Varchar>,
678-
#[max_length = 10]
679-
token_standard -> Nullable<Varchar>,
680676
}
681677
}
682678

rust/processor/src/processors/fungible_asset_processor.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,17 @@ impl ProcessorTrait for FungibleAssetProcessor {
372372
let processing_duration_in_secs = processing_start.elapsed().as_secs_f64();
373373
let db_insertion_start = std::time::Instant::now();
374374

375-
let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = current_unified_fungible_asset_balances
376-
.into_iter()
377-
.partition(|x| x.is_primary.is_none());
375+
// if flag turned on we need to not include any value in the table
376+
let (coin_balance, fa_balance): (Vec<_>, Vec<_>) = if self
377+
.deprecated_tables
378+
.contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES)
379+
{
380+
(vec![], vec![])
381+
} else {
382+
current_unified_fungible_asset_balances
383+
.into_iter()
384+
.partition(|x| x.is_primary.is_none())
385+
};
378386

379387
if self
380388
.deprecated_tables

rust/processor/src/processors/mod.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -155,21 +155,22 @@ pub trait ProcessorTrait: Send + Sync + Debug {
155155
}
156156

157157
/// This enum captures the configs for all the different processors that are defined.
158+
///
158159
/// The configs for each processor should only contain configuration specific to that
159160
/// processor. For configuration that is common to all processors, put it in
160161
/// IndexerGrpcProcessorConfig.
161162
#[derive(Clone, Debug, Deserialize, Serialize, strum::IntoStaticStr, strum::EnumDiscriminants)]
162163
#[serde(tag = "type", rename_all = "snake_case")]
163-
// What is all this strum stuff? Let me explain.
164-
//
165-
// Previously we had consts called NAME in each module and a function called `name` on
166-
// the ProcessorTrait. As such it was possible for this name to not match the snake case
167-
// representation of the struct name. By using strum we can have a single source for
168-
// processor names derived from the enum variants themselves.
169-
//
170-
// That's what this strum_discriminants stuff is, it uses macro magic to generate the
171-
// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
172-
// generation logic, e.g. to make sure we use snake_case.
164+
/// What is all this strum stuff? Let me explain.
165+
///
166+
/// Previously we had consts called NAME in each module and a function called `name` on
167+
/// the ProcessorTrait. As such it was possible for this name to not match the snake case
168+
/// representation of the struct name. By using strum we can have a single source for
169+
/// processor names derived from the enum variants themselves.
170+
///
171+
/// That's what this strum_discriminants stuff is, it uses macro magic to generate the
172+
/// ProcessorName enum based on ProcessorConfig. The rest of the derives configure this
173+
/// generation logic, e.g. to make sure we use snake_case.
173174
#[strum(serialize_all = "snake_case")]
174175
#[strum_discriminants(
175176
derive(
@@ -226,14 +227,16 @@ impl ProcessorConfig {
226227
}
227228
}
228229

229-
/// This enum contains all the processors defined in this crate. We use enum_dispatch
230-
/// as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
230+
/// This enum contains all the processors defined in this crate.
231+
///
232+
/// We use enum_dispatch as it is more efficient than using dynamic dispatch (Box<dyn ProcessorTrait>) and
231233
/// it enables nice safety checks like in we do in `test_processor_names_complete`.
232-
#[enum_dispatch(ProcessorTrait)]
233-
#[derive(Debug)]
234-
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
234+
///
235+
/// // To ensure that the variants of ProcessorConfig and Processor line up, in the testing
235236
// build path we derive EnumDiscriminants on this enum as well and make sure the two
236237
// sets of variants match up in `test_processor_names_complete`.
238+
#[enum_dispatch(ProcessorTrait)]
239+
#[derive(Debug)]
237240
#[cfg_attr(
238241
test,
239242
derive(strum::EnumDiscriminants),
@@ -271,8 +274,10 @@ mod test {
271274
use strum::VariantNames;
272275

273276
/// This test exists to make sure that when a new processor is added, it is added
274-
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
275-
/// variants are in the same order (lexicographical) and the names match.
277+
/// to both Processor and ProcessorConfig.
278+
///
279+
/// To make sure this passes, make sure the variants are in the same order
280+
/// (lexicographical) and the names match.
276281
#[test]
277282
fn test_processor_names_complete() {
278283
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);

rust/processor/src/utils/database.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
// Copyright © Aptos Foundation
22
// SPDX-License-Identifier: Apache-2.0
33

4-
//! Database-related functions
5-
#![allow(clippy::extra_unused_lifetimes)]
6-
74
use crate::utils::util::remove_null_bytes;
85
use ahash::AHashMap;
96
use diesel::{
@@ -33,15 +30,18 @@ pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/postgres/mi
3330
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;
3431

3532
#[derive(QueryId)]
36-
/// Using this will append a where clause at the end of the string upsert function, e.g.
33+
/// Using this will append a where clause at the end of the string upsert function
34+
///
35+
/// e.g.
3736
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
3837
/// This is needed when we want to maintain a table with only the latest state
3938
pub struct UpsertFilterLatestTransactionQuery<T> {
4039
query: T,
4140
where_clause: Option<&'static str>,
4241
}
4342

44-
// the max is actually u16::MAX but we see that when the size is too big we get an overflow error so reducing it a bit
43+
/// the max is actually u16::MAX but we see that when the size is too big we get
44+
/// an overflow error so reducing it a bit
4545
pub const MAX_DIESEL_PARAM_SIZE: usize = (u16::MAX / 2) as usize;
4646

4747
/// This function will clean the data for postgres. Currently it has support for removing
@@ -190,7 +190,8 @@ where
190190
res
191191
}
192192

193-
/// Returns the entry for the config hashmap, or the default field count for the insert
193+
/// Returns the entry for the config hashmap, or the default field count for the insert.
194+
///
194195
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
195196
/// we default to chunk an array of items based on how many columns are in the table.
196197
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(

rust/processor/src/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ bitflags! {
8383
const FUNGIBLE_ASSET_BALANCES = 1 << 6;
8484
const CURRENT_FUNGIBLE_ASSET_BALANCES = 1 << 7;
8585
const COIN_SUPPLY = 1 << 8;
86+
const CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES = 1 << 24;
8687

8788
// Objects
8889
const OBJECTS = 1 << 9;

rust/sdk-processor/src/config/processor_config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::processors::events_processor::EventsProcessorConfig;
22
use serde::{Deserialize, Serialize};
33

44
/// This enum captures the configs for all the different processors that are defined.
5+
///
56
/// The configs for each processor should only contain configuration specific to that
67
/// processor. For configuration that is common to all processors, put it in
78
/// IndexerGrpcProcessorConfig.

rust/sdk-processor/src/utils/database.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ pub const MIGRATIONS: EmbeddedMigrations =
3434
pub const DEFAULT_MAX_POOL_SIZE: u32 = 150;
3535

3636
#[derive(QueryId)]
37-
/// Using this will append a where clause at the end of the string upsert function, e.g.
37+
/// Using this will append a where clause at the end of the string upsert function
38+
///
39+
/// e.g.
3840
/// INSERT INTO ... ON CONFLICT DO UPDATE SET ... WHERE "transaction_version" = excluded."transaction_version"
3941
/// This is needed when we want to maintain a table with only the latest state
4042
pub struct UpsertFilterLatestTransactionQuery<T> {
@@ -191,7 +193,8 @@ where
191193
res
192194
}
193195

194-
/// Returns the entry for the config hashmap, or the default field count for the insert
196+
/// Returns the entry for the config hashmap, or the default field count for the insert.
197+
///
195198
/// Given diesel has a limit of how many parameters can be inserted in a single operation (u16::MAX),
196199
/// we default to chunk an array of items based on how many columns are in the table.
197200
pub fn get_config_table_chunk_size<T: field_count::FieldCount>(

0 commit comments

Comments
 (0)