diff --git a/internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql b/internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql index b87e35b..3c60293 100644 --- a/internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql +++ b/internal/tools/clickhouse/0010_clickhouse_create_token_transfers_mv.sql @@ -14,8 +14,8 @@ CREATE TABLE IF NOT EXISTS token_transfers `sign` Int8 DEFAULT 1, `insert_timestamp` DateTime DEFAULT now(), - INDEX minmax_block_number block_number TYPE minmax GRANULARITY 16, - INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 16, + INDEX minmax_block_number block_number TYPE minmax GRANULARITY 4, + INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 4, PROJECTION from_address_projection ( diff --git a/internal/tools/clickhouse/0011_clickhouse_create_logs_transfers_erc20_mv.sql b/internal/tools/clickhouse/0011_clickhouse_create_logs_transfers_erc20_mv.sql new file mode 100644 index 0000000..a44d473 --- /dev/null +++ b/internal/tools/clickhouse/0011_clickhouse_create_logs_transfers_erc20_mv.sql @@ -0,0 +1,48 @@ +---- Setup the tables for logs transfers +CREATE TABLE IF NOT EXISTS logs_transfers_erc20 +( + `chain_id` UInt256, + `token_address` FixedString(42), + `from_address` FixedString(42), + `to_address` FixedString(42), + `block_number` UInt256, + `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), + `transaction_hash` FixedString(66), + `amount` UInt256, + `log_index` UInt64, + `sign` Int8 DEFAULT 1, + `insert_timestamp` DateTime DEFAULT now(), + + INDEX minmax_block_number block_number TYPE minmax GRANULARITY 4, + INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 4, + INDEX bloomfilter_token_address token_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_from_address from_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_to_address to_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 4, +) +ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) +PARTITION BY (chain_id, toStartOfYear(block_timestamp)) +ORDER BY (chain_id, token_address, block_number, transaction_hash, log_index) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild', deduplicate_merge_projection_mode = 'rebuild'; + + + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_to_erc20 +TO logs_transfers_erc20 +AS +SELECT + chain_id, + address AS token_address, + concat('0x', substring(topic_1, 27, 40)) AS from_address, + concat('0x', substring(topic_2, 27, 40)) AS to_address, + block_number, + block_timestamp, + transaction_hash, + reinterpretAsUInt256(reverse(unhex(substring(data, 3, 64)))) AS amount, + log_index, + sign, + insert_timestamp +FROM logs +WHERE topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' -- Transfer + AND topic_3 = '' + AND length(data) = 66; diff --git a/internal/tools/clickhouse/0012_clickhouse_create_logs_transfers_erc721_mv.sql b/internal/tools/clickhouse/0012_clickhouse_create_logs_transfers_erc721_mv.sql new file mode 100644 index 0000000..aee2368 --- /dev/null +++ b/internal/tools/clickhouse/0012_clickhouse_create_logs_transfers_erc721_mv.sql @@ -0,0 +1,50 @@ +CREATE TABLE IF NOT EXISTS logs_transfers_erc721 +( + `chain_id` UInt256, + `token_address` FixedString(42), + `token_id` UInt256, + `from_address` FixedString(42), + `to_address` FixedString(42), + `block_number` UInt256, + `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), + `transaction_hash` FixedString(66), + `amount` UInt8 DEFAULT 1, + `log_index` UInt64, + `sign` Int8 DEFAULT 1, + `insert_timestamp` DateTime DEFAULT now(), + + INDEX minmax_block_number block_number TYPE minmax GRANULARITY 4, + INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 4, + INDEX bloomfilter_token_address token_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_token_id token_id TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_from_address from_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_to_address to_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 4, +) +ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) +PARTITION BY (chain_id, toStartOfYear(block_timestamp)) +ORDER BY (chain_id, token_address, block_number, transaction_hash, log_index) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild', deduplicate_merge_projection_mode = 'rebuild'; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_to_erc721 +TO logs_transfers_erc721 +AS +SELECT + chain_id, + address AS token_address, + reinterpretAsUInt256(reverse(unhex(substring(topic_3, 3, 64)))) AS token_id, + concat('0x', substring(topic_1, 27, 40)) AS from_address, + concat('0x', substring(topic_2, 27, 40)) AS to_address, + block_number, + block_timestamp, + transaction_hash, + toUInt8(1) AS amount, + log_index, + sign, + insert_timestamp +FROM logs +WHERE topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' + AND topic_3 != '' + AND length(topic_3) = 66 + AND length(data) = 2; \ No newline at end of file diff --git a/internal/tools/clickhouse/0013_clickhouse_create_logs_transfers_erc1155_mv.sql b/internal/tools/clickhouse/0013_clickhouse_create_logs_transfers_erc1155_mv.sql new file mode 100644 index 0000000..68e0162 --- /dev/null +++ b/internal/tools/clickhouse/0013_clickhouse_create_logs_transfers_erc1155_mv.sql @@ -0,0 +1,100 @@ +CREATE TABLE IF NOT EXISTS logs_transfers_erc1155 +( + `chain_id` UInt256, + `token_address` FixedString(42), + `token_id` UInt256, + `from_address` FixedString(42), + `to_address` FixedString(42), + `block_number` UInt256, + `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), + `transaction_hash` FixedString(66), + `amount` UInt256, + `log_index` UInt64, + `batch_index` UInt16 DEFAULT 0, + `sign` Int8 DEFAULT 1, + `insert_timestamp` DateTime DEFAULT now(), + + INDEX minmax_block_number block_number TYPE minmax GRANULARITY 4, + INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 4, + INDEX bloomfilter_token_address token_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_token_id token_id TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_from_address from_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_to_address to_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 4, +) +ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) +PARTITION BY (chain_id, toStartOfYear(block_timestamp)) +ORDER BY (chain_id, token_address, block_number, transaction_hash, log_index, batch_index) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild', deduplicate_merge_projection_mode = 'rebuild'; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_to_erc1155_all +TO logs_transfers_erc1155 +AS + +SELECT + chain_id, + address AS token_address, + reinterpretAsUInt256(reverse(unhex(substring(data, 3, 64)))) AS token_id, + concat('0x', substring(topic_2, 27, 40)) AS from_address, + concat('0x', substring(topic_3, 27, 40)) AS to_address, + block_number, + block_timestamp, + transaction_hash AS transaction_hash, + reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64)))) AS amount, + log_index, + toUInt16(0) AS batch_index, + sign, + insert_timestamp +FROM logs +WHERE topic_0 = '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62' -- TransferSingle + AND length(topic_2) = 66 AND length(topic_3) = 66 + AND length(data) = (2 + 2*64) -- 0x + 2 words = 130 + +UNION ALL + +WITH meta AS +( + SELECT + chain_id, address, topic_2, topic_3, data, + block_number, block_timestamp, transaction_hash, log_index, sign, insert_timestamp, + toUInt32(3 + 2*64) AS ids_len_idx, -- "0x" + 2*32B heads + reinterpretAsUInt64(reverse(unhex(substring(data, ids_len_idx, 64)))) AS ids_len, + (ids_len_idx + 64) AS ids_vals_idx, + (ids_len_idx + 64) + ids_len * 64 AS amts_len_idx, + reinterpretAsUInt64(reverse(unhex(substring(data, amts_len_idx, 64)))) AS amts_len, + (amts_len_idx + 64) AS amts_vals_idx, + (2 + (4 + ids_len + amts_len) * 64) AS expected_len + FROM logs + WHERE topic_0 = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb' -- TransferBatch + AND length(topic_2) = 66 AND length(topic_3) = 66 +), +expanded AS +( + SELECT + *, + arrayMap(i -> substring(data, ids_vals_idx + (i-1)*64, 64), range(1, toInt32(ids_len) + 1)) AS ids_hex, + arrayMap(i -> substring(data, amts_vals_idx + (i-1)*64, 64), range(1, toInt32(amts_len) + 1)) AS amts_hex + FROM meta + WHERE amts_len = ids_len + AND length(data) = expected_len +) +SELECT + chain_id, + address AS token_address, + reinterpretAsUInt256(reverse(unhex(id_hex))) AS token_id, + concat('0x', substring(topic_2, 27, 40)) AS from_address, + concat('0x', substring(topic_3, 27, 40)) AS to_address, + block_number, + block_timestamp, + transaction_hash AS transaction_hash, + reinterpretAsUInt256(reverse(unhex(amt_hex))) AS amount, + log_index, + toUInt16(idx - 1) AS batch_index, -- make it 0-based + sign, + insert_timestamp +FROM expanded +ARRAY JOIN + ids_hex AS id_hex, + amts_hex AS amt_hex, + arrayEnumerate(ids_hex) AS idx; \ No newline at end of file diff --git a/internal/tools/clickhouse/0014_clickhouse_create_logs_transfers_erc6909_mv.sql b/internal/tools/clickhouse/0014_clickhouse_create_logs_transfers_erc6909_mv.sql new file mode 100644 index 0000000..ed1e3c3 --- /dev/null +++ b/internal/tools/clickhouse/0014_clickhouse_create_logs_transfers_erc6909_mv.sql @@ -0,0 +1,50 @@ +CREATE TABLE IF NOT EXISTS logs_transfers_erc6909 +( + `chain_id` UInt256, + `token_address` FixedString(42), + `token_id` UInt256, + `from_address` FixedString(42), + `to_address` FixedString(42), + `block_number` UInt256, + `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), + `transaction_hash` FixedString(66), + `amount` UInt256, + `log_index` UInt64, + `sign` Int8 DEFAULT 1, + `insert_timestamp` DateTime DEFAULT now(), + + INDEX minmax_block_number block_number TYPE minmax GRANULARITY 4, + INDEX minmax_block_timestamp block_timestamp TYPE minmax GRANULARITY 4, + INDEX bloomfilter_token_address token_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_token_id token_id TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_from_address from_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_to_address to_address TYPE bloom_filter GRANULARITY 4, + INDEX bloomfilter_transaction_hash transaction_hash TYPE bloom_filter GRANULARITY 4, +) +ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) +PARTITION BY (chain_id, toStartOfYear(block_timestamp)) +ORDER BY (chain_id, token_address, block_number, transaction_hash, log_index) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild', deduplicate_merge_projection_mode = 'rebuild'; + + +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_logs_to_erc6909 +TO logs_transfers_erc6909 +AS +SELECT + chain_id, + lower(address) AS token_address, + reinterpretAsUInt256(reverse(unhex(substring(topic_3, 3, 64)))) AS token_id, + lower(concat('0x', substring(topic_1, 27, 40))) AS from_address, + lower(concat('0x', substring(topic_2, 27, 40))) AS to_address, + block_number, + block_timestamp, + transaction_hash, + reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64)))) AS amount, + log_index, + sign, + insert_timestamp +FROM logs +WHERE topic_0 = '0x1b3d7edb2e9c0b0e7c525b20aaaef0f5940d2ed71663c7d39266ecafac728859' + AND length(topic_1) = 66 + AND length(topic_2) = 66 + AND length(data) == 2 + 128; diff --git a/internal/tools/clickhouse/0015_clickhouse_create_token_states_mv.sql b/internal/tools/clickhouse/0015_clickhouse_create_token_states_mv.sql new file mode 100644 index 0000000..ad1c6aa --- /dev/null +++ b/internal/tools/clickhouse/0015_clickhouse_create_token_states_mv.sql @@ -0,0 +1,198 @@ +CREATE TABLE IF NOT EXISTS token_states +( + `chain_id` UInt256, + `token_address` FixedString(42), + `owner_address` FixedString(42), + + `token_type` LowCardinality(String), + `token_id` Nullable(UInt256), -- Nullable for fungible tokens + + `balance_state` AggregateFunction(sum, Int256), + `last_block_number_state` AggregateFunction(max, UInt256), + `last_block_timestamp_state` AggregateFunction(max, DateTime), + + INDEX bf_owner owner_address TYPE bloom_filter GRANULARITY 4, + INDEX bf_token token_address TYPE bloom_filter GRANULARITY 4, + + PROJECTION owner_balances_projection + ( + SELECT + chain_id, + owner_address, + token_address, + token_id, + any(token_type) AS token_type, + sumMerge(balance_state) AS balance, + maxMerge(last_block_number_state) AS last_block_number, + maxMerge(last_block_timestamp_state) AS last_block_timestamp + GROUP BY chain_id, owner_address, token_address, token_id + ), + + PROJECTION token_projection + ( + SELECT + chain_id, + token_address, + token_id, + owner_address, + token_type, + balance_state, + last_block_number_state, + last_block_timestamp_state + ORDER BY chain_id, token_address, token_id, owner_address + ) +) +ENGINE = AggregatingMergeTree +PARTITION BY chain_id +ORDER BY (chain_id, owner_address, token_address, ifNull(token_id, 0)) +SETTINGS index_granularity = 8192; + +------ + +-- ERC20 +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_erc20_to_balances +TO token_states +AS +SELECT + chain_id, + token_address, + owner_address, + 'erc20' AS token_type, + CAST(NULL AS Nullable(UInt256)) AS token_id, + sumState(delta) AS balance_state, + maxState(block_number) AS last_block_number_state, + maxState(block_timestamp) AS last_block_timestamp_state +FROM +( + -- FROM side (negative) + SELECT + chain_id, + token_address, + from_address AS owner_address, + toInt256(amount) * (-1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc20 + UNION ALL + -- TO side (positive) + SELECT + chain_id, + token_address, + to_address AS owner_address, + toInt256(amount) * (+1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc20 +) +GROUP BY chain_id, token_address, owner_address, token_type, token_id; + +-- ERC721 +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_erc721_to_balances +TO token_states +AS +SELECT + chain_id, + token_address, + owner_address, + 'erc721' AS token_type, + token_id, + sumState(delta) AS balance_state, + maxState(block_number) AS last_block_number_state, + maxState(block_timestamp) AS last_block_timestamp_state +FROM +( + SELECT + chain_id, + token_address, + from_address AS owner_address, + token_id, + toInt256(1) * (-1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc721 + UNION ALL + SELECT + chain_id, + token_address, + to_address AS owner_address, + token_id, + toInt256(1) * (+1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc721 +) +GROUP BY chain_id, token_address, owner_address, token_type, token_id; + +-- ERC1155 +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_erc1155_to_balances +TO token_states +AS +SELECT + chain_id, + token_address, + owner_address, + 'erc1155' AS token_type, + token_id, + sumState(delta) AS balance_state, + maxState(block_number) AS last_block_number_state, + maxState(block_timestamp) AS last_block_timestamp_state +FROM +( + SELECT + chain_id, + token_address, + from_address AS owner_address, + token_id, + toInt256(amount) * (-1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc1155 + UNION ALL + SELECT + chain_id, + token_address, + to_address AS owner_address, + token_id, + toInt256(amount) * (+1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc1155 +) +GROUP BY chain_id, token_address, owner_address, token_type, token_id; + +-- ERC6909 +CREATE MATERIALIZED VIEW IF NOT EXISTS mv_erc6909_to_balances +TO token_states +AS +SELECT + chain_id, + token_address, + owner_address, + 'erc6909' AS token_type, + token_id, + sumState(delta) AS balance_state, + maxState(block_number) AS last_block_number_state, + maxState(block_timestamp) AS last_block_timestamp_state +FROM +( + SELECT + chain_id, + token_address, + from_address AS owner_address, + token_id, + toInt256(amount) * (-1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc6909 + UNION ALL + SELECT + chain_id, + token_address, + to_address AS owner_address, + token_id, + toInt256(amount) * (+1) * sign AS delta, + block_number, + block_timestamp + FROM logs_transfers_erc6909 +) +GROUP BY chain_id, token_address, owner_address, token_type, token_id; \ No newline at end of file