Skip to content

Commit 4fd141d

Browse files
committed
Update schema to use replacing merge tree
1 parent cd434a2 commit 4fd141d

17 files changed

+337
-259
lines changed

internal/storage/kafka_publisher.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ type PublishableMessagePayload struct {
3737

3838
type PublishableMessageBlockData struct {
3939
common.BlockData
40-
Sign int8 `json:"sign"`
40+
IsDeleted int8 `json:"is_deleted"`
4141
InsertTimestamp time.Time `json:"insert_timestamp"`
4242
}
4343

4444
type PublishableMessageRevert struct {
4545
ChainId uint64 `json:"chain_id"`
4646
BlockNumber uint64 `json:"block_number"`
47-
Sign int8 `json:"sign"`
47+
IsDeleted int8 `json:"is_deleted"`
4848
InsertTimestamp time.Time `json:"insert_timestamp"`
4949
}
5050

@@ -233,11 +233,11 @@ func (p *KafkaPublisher) createBlockDataMessage(block common.BlockData, isDelete
233233

234234
data := PublishableMessageBlockData{
235235
BlockData: block,
236-
Sign: 1,
236+
IsDeleted: 0,
237237
InsertTimestamp: timestamp,
238238
}
239239
if isDeleted {
240-
data.Sign = -1
240+
data.IsDeleted = 1
241241
}
242242

243243
msg := PublishableMessagePayload{
@@ -260,7 +260,7 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui
260260
data := PublishableMessageRevert{
261261
ChainId: chainId,
262262
BlockNumber: blockNumber,
263-
Sign: 1,
263+
IsDeleted: 0,
264264
InsertTimestamp: timestamp,
265265
}
266266

internal/tools/clickhouse/0000_clickhouse_create_blocks_table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ CREATE TABLE IF NOT EXISTS blocks (
2323
`base_fee_per_gas` Nullable(UInt64),
2424

2525
`insert_timestamp` DateTime DEFAULT now(),
26-
`sign` Int8 DEFAULT 1,
26+
`is_deleted` Int8 DEFAULT 0,
2727

2828
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
2929
INDEX idx_hash hash TYPE bloom_filter GRANULARITY 2,
30-
) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp)
30+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
3131
ORDER BY (chain_id, block_number)
3232
PARTITION BY (chain_id, toStartOfQuarter(block_timestamp))
3333
SETTINGS deduplicate_merge_projection_mode = 'rebuild', lightweight_mutation_projection_mode = 'rebuild';

internal/tools/clickhouse/0001_clickhouse_create_transactions_table.sql

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ CREATE TABLE IF NOT EXISTS transactions (
3232
`logs_bloom` Nullable(String),
3333
`status` Nullable(UInt64),
3434

35-
`sign` Int8 DEFAULT 1,
3635
`insert_timestamp` DateTime DEFAULT now(),
36+
`is_deleted` Int8 DEFAULT 0,
3737

3838
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
3939
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 3,
@@ -45,14 +45,7 @@ CREATE TABLE IF NOT EXISTS transactions (
4545
PROJECTION from_address_projection
4646
(
4747
SELECT
48-
chain_id,
49-
block_number,
50-
block_timestamp,
51-
hash,
52-
from_address,
53-
to_address,
54-
value,
55-
data
48+
*
5649
ORDER BY
5750
chain_id,
5851
from_address,
@@ -62,21 +55,42 @@ CREATE TABLE IF NOT EXISTS transactions (
6255
PROJECTION to_address_projection
6356
(
6457
SELECT
65-
chain_id,
66-
block_number,
67-
block_timestamp,
68-
hash,
69-
from_address,
70-
to_address,
71-
value,
72-
data
58+
*
7359
ORDER BY
7460
chain_id,
7561
to_address,
7662
block_number,
7763
hash
64+
),
65+
PROJECTION from_address_state_projection
66+
(
67+
SELECT
68+
chain_id,
69+
from_address,
70+
countState() AS tx_count_state,
71+
minState(block_number) AS min_block_number_state,
72+
minState(block_timestamp) AS min_block_timestamp_state,
73+
maxState(block_number) AS max_block_number_state,
74+
maxState(block_timestamp) AS max_block_timestamp_state
75+
GROUP BY
76+
chain_id,
77+
from_address
78+
),
79+
PROJECTION to_address_state_projection
80+
(
81+
SELECT
82+
chain_id,
83+
to_address,
84+
countState() AS tx_count_state,
85+
minState(block_number) AS min_block_number_state,
86+
minState(block_timestamp) AS min_block_timestamp_state,
87+
maxState(block_number) AS max_block_number_state,
88+
maxState(block_timestamp) AS max_block_timestamp_state
89+
GROUP BY
90+
chain_id,
91+
to_address
7892
)
79-
) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp)
93+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
8094
ORDER BY (chain_id, block_number, hash)
8195
PARTITION BY (chain_id, toStartOfQuarter(block_timestamp))
8296
SETTINGS deduplicate_merge_projection_mode = 'rebuild', lightweight_mutation_projection_mode = 'rebuild';

internal/tools/clickhouse/0002_clickhouse_create_logs_table.sql

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ CREATE TABLE IF NOT EXISTS logs (
1313
`topic_2` String,
1414
`topic_3` String,
1515

16-
`sign` Int8 DEFAULT 1,
1716
`insert_timestamp` DateTime DEFAULT now(),
17+
`is_deleted` Int8 DEFAULT 0,
1818

1919
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
2020
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 3,
@@ -48,8 +48,24 @@ CREATE TABLE IF NOT EXISTS logs (
4848
transaction_index,
4949
log_index,
5050
address
51+
),
52+
PROJECTION address_topic0_state_projection
53+
(
54+
SELECT
55+
chain_id,
56+
address,
57+
topic_0,
58+
countState() AS log_count_state,
59+
minState(block_number) AS min_block_number_state,
60+
minState(block_timestamp) AS min_block_timestamp_state,
61+
maxState(block_number) AS max_block_number_state,
62+
maxState(block_timestamp) AS max_block_timestamp_state
63+
GROUP BY
64+
chain_id,
65+
address,
66+
topic_0
5167
)
52-
) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp)
68+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
5369
ORDER BY (chain_id, block_number, transaction_hash, log_index)
5470
PARTITION BY (chain_id, toStartOfQuarter(block_timestamp))
5571
SETTINGS deduplicate_merge_projection_mode = 'rebuild', lightweight_mutation_projection_mode = 'rebuild';

internal/tools/clickhouse/0003_clickhouse_create_traces_table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ CREATE TABLE IF NOT EXISTS traces (
2121
`reward_type` LowCardinality(Nullable(String)),
2222
`refund_address` Nullable(FixedString(42)),
2323

24-
`sign` Int8 DEFAULT 1,
2524
`insert_timestamp` DateTime DEFAULT now(),
25+
`is_deleted` Int8 DEFAULT 0,
2626

2727
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
2828
INDEX idx_block_hash block_hash TYPE bloom_filter GRANULARITY 2,
@@ -52,7 +52,7 @@ CREATE TABLE IF NOT EXISTS traces (
5252
trace_address
5353
)
5454

55-
) ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp)
55+
) ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
5656
ORDER BY (chain_id, transaction_hash, trace_address)
5757
PARTITION BY (chain_id, toStartOfQuarter(block_timestamp))
5858
SETTINGS deduplicate_merge_projection_mode = 'rebuild', lightweight_mutation_projection_mode = 'rebuild';

internal/tools/clickhouse/0004_clickhouse_create_insert_null_table.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,6 @@ CREATE TABLE IF NOT EXISTS insert_null_block_data (
9494
refund_address Nullable(FixedString(42))
9595
)),
9696

97-
sign Int8 DEFAULT 1,
98-
insert_timestamp DateTime DEFAULT now()
97+
insert_timestamp DateTime DEFAULT now(),
98+
is_deleted Int8 DEFAULT 0
9999
) ENGINE = Null;

internal/tools/clickhouse/0005_clickhouse_create_insert_data_mv.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ SELECT
6565
t.30 AS logs_bloom,
6666
t.31 AS status,
6767
insert_timestamp,
68-
sign
68+
is_deleted
6969
FROM insert_null_block_data
7070
ARRAY JOIN transactions AS t;
7171

@@ -87,7 +87,7 @@ SELECT
8787
l.11 AS topic_2,
8888
l.12 AS topic_3,
8989
insert_timestamp,
90-
sign
90+
is_deleted
9191
FROM insert_null_block_data
9292
ARRAY JOIN logs AS l;
9393

@@ -117,6 +117,6 @@ SELECT
117117
tr.19 AS reward_type,
118118
tr.20 AS refund_address,
119119
insert_timestamp,
120-
sign
120+
is_deleted
121121
FROM insert_null_block_data
122122
ARRAY JOIN traces AS tr;

internal/tools/clickhouse/0006_clickhouse_create_token_transfers.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ CREATE TABLE IF NOT EXISTS token_transfers
1414
`log_index` UInt64,
1515
`batch_index` Nullable(UInt16) DEFAULT NULL,
1616

17-
`sign` Int8 DEFAULT 1,
1817
`insert_timestamp` DateTime DEFAULT now(),
18+
`is_deleted` Int8 DEFAULT 0,
1919

2020
INDEX idx_block_timestamp block_timestamp TYPE minmax GRANULARITY 1,
2121
INDEX idx_from_address from_address TYPE bloom_filter GRANULARITY 3,
@@ -54,7 +54,7 @@ CREATE TABLE IF NOT EXISTS token_transfers
5454
log_index
5555
)
5656
)
57-
ENGINE = VersionedCollapsingMergeTree(sign, insert_timestamp)
57+
ENGINE = ReplacingMergeTree(insert_timestamp, is_deleted)
5858
PARTITION BY (chain_id, toStartOfQuarter(block_timestamp))
5959
ORDER BY (chain_id, token_address, block_number, transaction_index, log_index)
6060
SETTINGS index_granularity = 8192, lightweight_mutation_projection_mode = 'rebuild', deduplicate_merge_projection_mode = 'rebuild';

internal/tools/clickhouse/0007_clickhouse_create_token_transfers_mv.sql

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ SELECT
1616
reinterpretAsUInt256(reverse(unhex(substring(data, 3, 64)))) AS amount,
1717
log_index,
1818
CAST(NULL AS Nullable(UInt16)) AS batch_index,
19-
sign,
20-
insert_timestamp
19+
insert_timestamp,
20+
is_deleted
2121
FROM logs
2222
WHERE topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef' -- Transfer
2323
AND length(topic_1) = 66 AND startsWith(topic_1, '0x')
@@ -43,8 +43,8 @@ SELECT
4343
toUInt8(1) AS amount,
4444
log_index,
4545
CAST(NULL AS Nullable(UInt16)) AS batch_index,
46-
sign,
47-
insert_timestamp
46+
insert_timestamp,
47+
is_deleted
4848
FROM logs
4949
WHERE topic_0 = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
5050
AND length(topic_1) = 66 AND startsWith(topic_1, '0x')
@@ -70,8 +70,8 @@ SELECT
7070
reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64)))) AS amount,
7171
log_index,
7272
toNullable(toUInt16(0)) AS batch_index,
73-
sign,
74-
insert_timestamp
73+
insert_timestamp,
74+
is_deleted
7575
FROM logs
7676
WHERE topic_0 = '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62' -- TransferSingle
7777
AND length(topic_2) = 66 AND length(topic_3) = 66
@@ -95,12 +95,21 @@ SELECT
9595
reinterpretAsUInt256(reverse(unhex(amount_hex))) AS amount,
9696
log_index,
9797
toNullable(toUInt16(array_index - 1)) AS batch_index,
98-
sign,
99-
insert_timestamp
98+
insert_timestamp,
99+
is_deleted
100100
FROM (
101101
SELECT
102-
chain_id, address, topic_2, topic_3,
103-
block_number, block_timestamp, transaction_hash, transaction_index, log_index, sign, insert_timestamp,
102+
chain_id,
103+
address,
104+
topic_2,
105+
topic_3,
106+
block_number,
107+
block_timestamp,
108+
transaction_hash,
109+
transaction_index,
110+
log_index,
111+
is_deleted,
112+
insert_timestamp,
104113
toUInt32(reinterpretAsUInt256(reverse(unhex(substring(data, 3, 64))))) AS ids_offset,
105114
toUInt32(reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64))))) AS amounts_offset,
106115
toUInt32(reinterpretAsUInt256(reverse(unhex(substring(data, 3 + ids_offset * 2, 64))))) AS ids_length,
@@ -136,8 +145,8 @@ SELECT
136145
reinterpretAsUInt256(reverse(unhex(substring(data, 67, 64)))) AS amount,
137146
log_index,
138147
CAST(NULL AS Nullable(UInt16)) AS batch_index,
139-
sign,
140-
insert_timestamp
148+
insert_timestamp,
149+
is_deleted
141150
FROM logs
142151
WHERE topic_0 = '0x1b3d7edb2e9c0b0e7c525b20aaaef0f5940d2ed71663c7d39266ecafac728859'
143152
AND length(topic_1) = 66

internal/tools/clickhouse/0008_clickhouse_create_token_balance.sql

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)