From f4d13a5ab23978916eed1270e2d39887879d5180 Mon Sep 17 00:00:00 2001 From: iuwqyir Date: Mon, 21 Jul 2025 23:59:10 +0300 Subject: [PATCH] update clickhouse schemas to latest state --- .../tools/clickhouse_create_logs_table.sql | 14 +- .../clickhouse_create_token_balances_mv.sql | 14 +- .../clickhouse_create_token_transfers_mv.sql | 196 ++++++++++++++++++ .../clickhouse_create_transactions_table.sql | 20 +- 4 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 internal/tools/clickhouse_create_token_transfers_mv.sql diff --git a/internal/tools/clickhouse_create_logs_table.sql b/internal/tools/clickhouse_create_logs_table.sql index 026dcf6..b1d3db3 100644 --- a/internal/tools/clickhouse_create_logs_table.sql +++ b/internal/tools/clickhouse_create_logs_table.sql @@ -22,6 +22,18 @@ CREATE TABLE IF NOT EXISTS logs ( INDEX idx_topic1 topic_1 TYPE bloom_filter GRANULARITY 1, INDEX idx_topic2 topic_2 TYPE bloom_filter GRANULARITY 1, INDEX idx_topic3 topic_3 TYPE bloom_filter GRANULARITY 1, + PROJECTION logs_chainid_topic0_address + ( + SELECT * + ORDER BY + chain_id, + topic_0, + address, + block_number, + transaction_index, + log_index + ) ) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) ORDER BY (chain_id, block_number, transaction_hash, log_index) -PARTITION BY chain_id; \ No newline at end of file +PARTITION BY chain_id +SETTINGS deduplicate_merge_projection_mode = 'drop', lightweight_mutation_projection_mode = 'rebuild'; diff --git a/internal/tools/clickhouse_create_token_balances_mv.sql b/internal/tools/clickhouse_create_token_balances_mv.sql index da94d5f..9f0cadd 100644 --- a/internal/tools/clickhouse_create_token_balances_mv.sql +++ b/internal/tools/clickhouse_create_token_balances_mv.sql @@ -5,10 +5,20 @@ CREATE TABLE IF NOT EXISTS token_balances `owner` FixedString(42), `address` FixedString(42), `token_id` UInt256, - `balance` Int256 + `balance` Int256, + PROJECTION address_projection + ( + SELECT * + ORDER BY + token_type, + chain_id, + address, + token_id + ) ) ENGINE = SummingMergeTree -ORDER BY (token_type, chain_id, owner, address, token_id); +ORDER BY (token_type, chain_id, owner, address, token_id) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild'; CREATE MATERIALIZED VIEW IF NOT EXISTS single_token_transfers_mv TO token_balances AS SELECT chain_id, owner, address, token_type, token_id, sum(amount) as balance diff --git a/internal/tools/clickhouse_create_token_transfers_mv.sql b/internal/tools/clickhouse_create_token_transfers_mv.sql new file mode 100644 index 0000000..64a4e44 --- /dev/null +++ b/internal/tools/clickhouse_create_token_transfers_mv.sql @@ -0,0 +1,196 @@ +CREATE TABLE IF NOT EXISTS token_transfers +( + `token_type` LowCardinality(String), + `chain_id` UInt256, + `token_address` FixedString(42), + `from_address` FixedString(42), + `to_address` FixedString(42), + `block_number` UInt256, + `block_timestamp` DateTime CODEC(Delta(4), ZSTD(1)), + `transaction_hash` FixedString(66), + `token_id` UInt256, + `amount` UInt256, + `log_index` UInt64, + `sign` Int8 DEFAULT 1, + `insert_timestamp` DateTime DEFAULT now(), + PROJECTION from_address_projection + ( + SELECT * + ORDER BY + chain_id, + token_type, + from_address, + block_number, + log_index + ), + PROJECTION to_address_projection + ( + SELECT * + ORDER BY + chain_id, + token_type, + to_address, + block_number, + log_index + ), + PROJECTION transaction_hash_projection + ( + SELECT * + ORDER BY + chain_id, + token_type, + transaction_hash, + block_number, + log_index + ) +) +ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) +PARTITION BY chain_id +ORDER BY (chain_id, token_type, token_address, block_number, log_index) +SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild'; + +CREATE MATERIALIZED VIEW IF NOT EXISTS logs_to_token_transfers TO token_transfers +( + `chain_id` UInt256, + `token_address` FixedString(42), + `from_address` String, + `to_address` String, + `token_type` String, + `block_number` UInt256, + `block_timestamp` DateTime, + `transaction_hash` FixedString(66), + `log_index` UInt64, + `sign` Int8, + `insert_timestamp` DateTime, + `token_id` UInt256, + `amount` UInt256 +) +AS WITH + transfer_logs AS + ( + SELECT + chain_id, + address AS token_address, + topic_0, + topic_1, + topic_2, + topic_3, + (topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef') AND (topic_3 = '') AS is_erc20, + (topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef') AND (topic_3 != '') AS is_erc721, + topic_0 IN ('0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62', '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') AS is_erc1155, + multiIf(is_erc20, 'erc20', is_erc721, 'erc721', 'erc1155') AS token_type, + if(is_erc1155, concat('0x', substring(topic_2, 27, 40)), concat('0x', substring(topic_1, 27, 40))) AS from_address, + if(is_erc1155, concat('0x', substring(topic_3, 27, 40)), concat('0x', substring(topic_2, 27, 40))) AS to_address, + data, + block_number, + block_timestamp, + transaction_hash, + log_index, + sign, + insert_timestamp + FROM default.logs + WHERE topic_0 IN ('0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62', '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') + ), + batch_transfer_metadata AS + ( + SELECT + *, + 3 + (2 * 64) AS ids_length_idx, + ids_length_idx + 64 AS ids_values_idx, + reinterpretAsUInt64(reverse(unhex(substring(data, ids_length_idx, 64)))) AS ids_length, + (ids_length_idx + 64) + (ids_length * 64) AS amounts_length_idx, + reinterpretAsUInt64(reverse(unhex(substring(data, amounts_length_idx, 64)))) AS amounts_length, + amounts_length_idx + 64 AS amounts_values_idx + FROM transfer_logs + WHERE (topic_0 = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') AND (length(topic_1) = 66) AND (length(topic_2) = 66) AND (length(topic_3) = 66) AND (length(data) != (258 + ((ids_length + amounts_length) * 64))) AND (ids_length = amounts_length) + ), + batch_transfer_logs AS + ( + SELECT + *, + arrayMap(x -> substring(data, ids_values_idx + ((x - 1) * 64), 64), range(1, toInt32(ids_length) + 1)) AS ids_hex, + arrayMap(x -> substring(data, amounts_values_idx + ((x - 1) * 64), 64), range(1, toInt32(amounts_length) + 1)) AS amounts_hex + FROM batch_transfer_metadata + ) +SELECT + chain_id, + token_address, + from_address, + to_address, + token_type, + block_number, + block_timestamp, + transaction_hash, + log_index, + sign, + insert_timestamp, + multiIf(is_erc1155, reinterpretAsUInt256(reverse(unhex(substring(data, 3, 64)))), is_erc721, reinterpretAsUInt256(reverse(unhex(substring(topic_3, 3, 64)))), toUInt256(0)) AS token_id, + multiIf(is_erc20 AND (length(data) = 66), reinterpretAsUInt256(reverse(unhex(substring(data, 3)))), is_erc721, toUInt256(1), is_erc1155, if(length(data) = 130, reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64)))), toUInt256(1)), toUInt256(0)) AS amount +FROM transfer_logs +WHERE topic_0 IN ('0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62') +UNION ALL +WITH + transfer_logs AS + ( + SELECT + chain_id, + address AS token_address, + topic_0, + topic_1, + topic_2, + topic_3, + (topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef') AND (topic_3 = '') AS is_erc20, + (topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef') AND (topic_3 != '') AS is_erc721, + topic_0 IN ('0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62', '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') AS is_erc1155, + multiIf(is_erc20, 'erc20', is_erc721, 'erc721', 'erc1155') AS token_type, + if(is_erc1155, concat('0x', substring(topic_2, 27, 40)), concat('0x', substring(topic_1, 27, 40))) AS from_address, + if(is_erc1155, concat('0x', substring(topic_3, 27, 40)), concat('0x', substring(topic_2, 27, 40))) AS to_address, + data, + block_number, + block_timestamp, + transaction_hash, + log_index, + sign, + insert_timestamp + FROM default.logs + WHERE topic_0 IN ('0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef', '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62', '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') + ), + batch_transfer_metadata AS + ( + SELECT + *, + 3 + (2 * 64) AS ids_length_idx, + ids_length_idx + 64 AS ids_values_idx, + reinterpretAsUInt64(reverse(unhex(substring(data, ids_length_idx, 64)))) AS ids_length, + (ids_length_idx + 64) + (ids_length * 64) AS amounts_length_idx, + reinterpretAsUInt64(reverse(unhex(substring(data, amounts_length_idx, 64)))) AS amounts_length, + amounts_length_idx + 64 AS amounts_values_idx + FROM transfer_logs + WHERE (topic_0 = '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb') AND (length(topic_1) = 66) AND (length(topic_2) = 66) AND (length(topic_3) = 66) AND (length(data) != (258 + ((ids_length + amounts_length) * 64))) AND (ids_length = amounts_length) + ), + batch_transfer_logs AS + ( + SELECT + *, + arrayMap(x -> substring(data, ids_values_idx + ((x - 1) * 64), 64), range(1, toInt32(ids_length) + 1)) AS ids_hex, + arrayMap(x -> substring(data, amounts_values_idx + ((x - 1) * 64), 64), range(1, toInt32(amounts_length) + 1)) AS amounts_hex + FROM batch_transfer_metadata + ) +SELECT + chain_id, + token_address, + from_address, + to_address, + token_type, + block_number, + block_timestamp, + transaction_hash, + log_index, + sign, + insert_timestamp, + reinterpretAsUInt256(reverse(unhex(substring(hex_id, 1, 64)))) AS token_id, + reinterpretAsUInt256(reverse(unhex(substring(hex_amount, 1, 64)))) AS amount +FROM batch_transfer_logs +ARRAY JOIN + ids_hex AS hex_id, + amounts_hex AS hex_amount \ No newline at end of file diff --git a/internal/tools/clickhouse_create_transactions_table.sql b/internal/tools/clickhouse_create_transactions_table.sql index b5ffc56..48f1e0d 100644 --- a/internal/tools/clickhouse_create_transactions_table.sql +++ b/internal/tools/clickhouse_create_transactions_table.sql @@ -39,6 +39,24 @@ CREATE TABLE IF NOT EXISTS transactions ( INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 1, INDEX idx_to_address to_address TYPE bloom_filter GRANULARITY 1, INDEX idx_function_selector function_selector TYPE bloom_filter GRANULARITY 1, + PROJECTION txs_chainid_from_address + ( + SELECT * + ORDER BY + chain_id, + from_address, + block_number + ), + PROJECTION txs_chainid_to_address + ( + SELECT * + ORDER BY + chain_id, + to_address, + block_number, + hash + ) ) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp) ORDER BY (chain_id, block_number, hash) -PARTITION BY chain_id; \ No newline at end of file +PARTITION BY chain_id +SETTINGS deduplicate_merge_projection_mode = 'drop', lightweight_mutation_projection_mode = 'rebuild'; \ No newline at end of file