Skip to content

Commit f271e0c

Browse files
chore: Optimized DB queries (#580)
2 parents 1255d4d + 9277f69 commit f271e0c

File tree

3 files changed

+174
-52
lines changed

3 files changed

+174
-52
lines changed

docker-compose.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ version: '3.1'
22
services:
33
db:
44
build:
5+
context: .
56
dockerfile: postgres.Dockerfile
67
restart: "always"
78
environment:
@@ -35,6 +36,6 @@ services:
3536
- ./extractors.yaml:/opt/tycho-indexer/extractors.yaml
3637
- ./substreams/:/opt/tycho-indexer/substreams/
3738
entrypoint: [ "/usr/wait-for-postgres.sh", "db" ]
38-
command: [ "/opt/tycho-indexer/tycho-indexer", "--endpoint", "https://mainnet.eth.streamingfast.io:443", "index"]
39+
command: [ "/opt/tycho-indexer/tycho-indexer", "--endpoint", "https://mainnet.eth.streamingfast.io:443", "index" ]
3940
volumes:
4041
postgres_data:

tycho-storage/README.md

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,47 @@
11
# Tycho Storage
22

3-
Tycho is an indexer designed to process and store data, necessitating the saving of state.
3+
Tycho is an indexer designed to process and store data, necessitating the saving of state.
44

55
This tycho-storage crate handles all data storage and communication with the database.
66

77
## Architecture
88

99
### Database
1010

11-
Tycho currently uses PostgresSQL as its storage backend. The full schema can be found in [schema.rs](./src/postgres/schema.rs).
11+
Tycho currently uses PostgresSQL as its storage backend. The full schema can be found
12+
in [schema.rs](./src/postgres/schema.rs).
1213

1314
Below is the Entity Relationship (ER) diagram illustrating the tables used for this project:
1415

1516
[![Entity Relation Diagram](../assets/tycho_db_er.png)](https://drive.google.com/file/d/1IQvdsfwRtg-AqtLuJjyGM2s6bqJGuciK/view?usp=sharing)
1617

1718
### Gateways
1819

19-
Database interactions are managed through multiple gateways, including [cache](./src/postgres/cache.rs), [chain](./src/postgres/chain.rs), [contract](./src/postgres/contract.rs), [extraction_state](./src/postgres/extraction_state.rs) and [protocol](./src/postgres/protocol.rs).
20+
Database interactions are managed through multiple gateways,
21+
including [cache](./src/postgres/cache.rs), [chain](./src/postgres/chain.rs), [contract](./src/postgres/contract.rs), [extraction_state](./src/postgres/extraction_state.rs)
22+
and [protocol](./src/postgres/protocol.rs).
2023

21-
The CachedGateway serves as the main entry point for all database communications. It is designed to efficiently manage and execute database operations by utilizing an in-memory cache and ensuring data consistency through transactional writes. Writes are batched and deduplicated to improve performance and reduce load on the database.
24+
The CachedGateway serves as the main entry point for all database communications. It is designed to efficiently manage
25+
and execute database operations by utilizing an in-memory cache and ensuring data consistency through transactional
26+
writes. Writes are batched and deduplicated to improve performance and reduce load on the database.
2227

2328
### Versioning
2429

25-
Tycho employs a robust versioning system to track historical data within the database. The [versioning](./src/postgres/versioning.rs) module provides tools to handle historical data, ensuring that each version of an entity is tracked and stored appropriately.
30+
Tycho employs a robust versioning system to track historical data within the database.
31+
The [versioning](./src/postgres/versioning.rs) module provides tools to handle historical data, ensuring that each
32+
version of an entity is tracked and stored appropriately.
2633

2734
#### Key Concepts
28-
- VersionedRow: A trait for structs that can be inserted into a versioned table. It automates the valid_to attribute management, facilitating batch insertions.
2935

30-
- DeltaVersionedRow: Similar to VersionedRow, but also handles setting previous_value attributes, allowing for more complex versioning scenarios.
36+
- VersionedRow: A trait for structs that can be inserted into a versioned table. It automates the valid_to attribute
37+
management, facilitating batch insertions.
3138

32-
- StoredVersionedRow: A trait that enables setting the end version on currently active rows in the database based on new incoming entries. It's essential for ensuring that historical data is correctly marked as outdated when new versions are inserted.
39+
- DeltaVersionedRow: Similar to VersionedRow, but also handles setting previous_value attributes, allowing for more
40+
complex versioning scenarios.
41+
42+
- StoredVersionedRow: A trait that enables setting the end version on currently active rows in the database based on new
43+
incoming entries. It's essential for ensuring that historical data is correctly marked as outdated when new versions
44+
are inserted.
3345

3446
# Development
3547

@@ -58,8 +70,9 @@ docker-compose up -d db
5870
```
5971

6072
4. Set Environment Variables:
73+
6174
```
62-
export DATABASE_URL=postgres://postgres:mypassword@localhost:5432/tycho_indexer_0
75+
export DATABASE_URL=postgres://postgres:mypassword@localhost:5431/tycho_indexer_0
6376
export ETH_RPC_URL="url-here"
6477
6578
```
@@ -77,19 +90,25 @@ We use [pgFormatter](https://github.com/darold/pgFormatter) to keep SQL files co
7790
### Setup pgFormatter with RustRover
7891

7992
1. Ensure you have pgFormatter installed:
93+
8094
```bash
8195
brew install pgformatter
8296
```
97+
8398
2. In RustRover, search for "External Tools" and add a new tool using the "+" button.
8499
3. Get the path of pgFormatter installation:
100+
85101
```bash
86102
which pg_format
87103
```
104+
88105
4. Set the "Program" feild to this path.
89-
5. Set the "Arguments" field to:
106+
5. Set the "Arguments" field to:
107+
90108
```bash
91109
--no-space-function -i $FilePath$
92110
```
111+
93112
6. Leave working directory empty.
94113
7. Save the tool under "pgFormat" and add a shortcut if desired.
95114

@@ -108,7 +127,8 @@ If you have to change the database schema, please make sure the down migration i
108127
diesel migration redo --migration-dir ./tycho-storage/migrations
109128
```
110129

111-
If the schema.rs file does not automatically update after you've run a migration with table changes, you can trigger the update manually by executing:
130+
If the schema.rs file does not automatically update after you've run a migration with table changes, you can trigger the
131+
update manually by executing:
112132

113133
```bash
114134
diesel print-schema --config-file ./tycho-storage/diesel.toml > ./tycho-storage/src/postgres/schema.rs
@@ -118,6 +138,10 @@ diesel print-schema --config-file ./tycho-storage/diesel.toml > ./tycho-storage/
118138

119139
Currently Tycho exposes a single special [test-group](https://nexte.st/book/test-groups.html) via nextest:
120140

121-
1. `test(serial-db)`: These are tests against the database that need to commit data. To not intefere with other test that require a empty db but do not commit, we run these tests separately. Most of these tests use the `run_against_db` test harness. Test within that group are run sequentially, the remaining tests run in parallel. To add a test to this group simply ensure its name or its test package name includes the string `serial_db`.
141+
1. `test(serial-db)`: These are tests against the database that need to commit data. To not intefere with other test
142+
that require a empty db but do not commit, we run these tests separately. Most of these tests use
143+
the `run_against_db` test harness. Test within that group are run sequentially, the remaining tests run in parallel.
144+
To add a test to this group simply ensure its name or its test package name includes the string `serial_db`.
122145

123-
If your test does not require committing to the database and has no special resource requirements, create the test as usual.
146+
If your test does not require committing to the database and has no special resource requirements, create the test as
147+
usual.

tycho-storage/src/postgres/orm.rs

Lines changed: 135 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -460,29 +460,59 @@ impl PartitionedVersionedRow for NewComponentBalance {
460460
.zip(token_ids.iter())
461461
.collect::<HashSet<_>>();
462462

463-
// PERF: The removal of the filter 'valid_to = MAX_TS' means we now search in archived
464-
// tables as well. A possible optimisation would be to add the valid_to filter back
465-
// and then use a second query for balances still missing that will access the
466-
// archived tables. Therefore, performance is not impacted in the common case
467-
// (balances are rarely deleted).
468-
Ok(component_balance::table
463+
let mut results: Vec<ComponentBalance> = component_balance::table
469464
.select(ComponentBalance::as_select())
465+
.into_boxed()
470466
.filter(
471467
component_balance::protocol_component_id
472468
.eq_any(&component_ids)
473-
.and(component_balance::token_id.eq_any(&token_ids)),
469+
.and(component_balance::token_id.eq_any(&token_ids))
470+
.and(component_balance::valid_to.eq(MAX_TS)),
474471
)
475-
.distinct_on((component_balance::protocol_component_id, component_balance::token_id))
476-
.order_by((
477-
component_balance::protocol_component_id,
478-
component_balance::token_id,
479-
component_balance::valid_to.desc(),
480-
))
481472
.get_results(conn)
482473
.await
483-
.map_err(PostgresError::from)?
474+
.map_err(PostgresError::from)?;
475+
476+
let found_ids: HashSet<_> = results
477+
.iter()
478+
.map(|cb| (&cb.protocol_component_id, &cb.token_id))
479+
.collect();
480+
481+
let missing_ids: Vec<_> = tuple_ids
482+
.clone()
484483
.into_iter()
485-
.filter(|cs| tuple_ids.contains(&(&cs.protocol_component_id, &cs.token_id)))
484+
.filter(|id| !found_ids.contains(id))
485+
.collect();
486+
487+
// If we have missing ids, we need to query the archived tables as well. This is necessary
488+
// when entries are deleted
489+
if !missing_ids.is_empty() {
490+
let (missing_component_ids, missing_token_ids): (Vec<&i64>, Vec<&i64>) =
491+
missing_ids.into_iter().unzip();
492+
let deleted_results = component_balance::table
493+
.select(ComponentBalance::as_select())
494+
.filter(
495+
component_balance::protocol_component_id
496+
.eq_any(&missing_component_ids)
497+
.and(component_balance::token_id.eq_any(&missing_token_ids)),
498+
)
499+
.distinct_on((
500+
component_balance::protocol_component_id,
501+
component_balance::token_id,
502+
))
503+
.order_by((
504+
component_balance::protocol_component_id,
505+
component_balance::token_id,
506+
component_balance::valid_to.desc(),
507+
))
508+
.get_results(conn)
509+
.await
510+
.map_err(PostgresError::from)?;
511+
results.extend(deleted_results);
512+
}
513+
Ok(results
514+
.into_iter()
515+
.filter(|cb| tuple_ids.contains(&(&cb.protocol_component_id, &cb.token_id)))
486516
.map(NewComponentBalance::from)
487517
.collect())
488518
}
@@ -1115,28 +1145,62 @@ impl PartitionedVersionedRow for NewProtocolState {
11151145
.zip(attr_name.iter())
11161146
.collect::<HashSet<_>>();
11171147

1118-
// PERF: The removal of the filter 'valid_to = MAX_TS' means we now search in archived
1119-
// tables as well. A possible optimisation would be to add the valid_to filter back
1120-
// and then use a second query for states still missing that will access the
1121-
// archived tables. Therefore, performance is not impacted in the common case.
1122-
Ok(protocol_state::table
1148+
let mut results: Vec<ProtocolState> = protocol_state::table
11231149
.select(ProtocolState::as_select())
1150+
.into_boxed()
11241151
.filter(
11251152
protocol_state::protocol_component_id
11261153
.eq_any(&pc_id)
1127-
.and(protocol_state::attribute_name.eq_any(&attr_name)),
1154+
.and(protocol_state::attribute_name.eq_any(&attr_name))
1155+
.and(protocol_state::valid_to.eq(MAX_TS)),
11281156
)
1129-
.distinct_on((protocol_state::protocol_component_id, protocol_state::attribute_name))
1130-
.order_by((
1131-
protocol_state::protocol_component_id,
1132-
protocol_state::attribute_name,
1133-
protocol_state::valid_to.desc(),
1134-
))
11351157
.get_results(conn)
11361158
.await
1137-
.map_err(PostgresError::from)?
1159+
.map_err(PostgresError::from)?;
1160+
1161+
let found_ids: HashSet<_> = results
1162+
.iter()
1163+
.map(|ps| (&ps.protocol_component_id, &ps.attribute_name))
1164+
.collect();
1165+
1166+
let missing_ids: Vec<_> = tuple_ids
1167+
.clone()
11381168
.into_iter()
1139-
.filter(|cs| tuple_ids.contains(&(&cs.protocol_component_id, &cs.attribute_name)))
1169+
.filter(|id| !found_ids.contains(id))
1170+
.collect();
1171+
1172+
// If we have missing ids, we need to query the archived tables as well. This is necessary
1173+
// when entries are deleted
1174+
if !missing_ids.is_empty() {
1175+
let (missing_protocol_component_ids, missing_attribute_names): (
1176+
Vec<&i64>,
1177+
Vec<&String>,
1178+
) = missing_ids.into_iter().unzip();
1179+
let deleted_results: Vec<ProtocolState> = protocol_state::table
1180+
.select(ProtocolState::as_select())
1181+
.filter(
1182+
protocol_state::protocol_component_id
1183+
.eq_any(&missing_protocol_component_ids)
1184+
.and(protocol_state::attribute_name.eq_any(&missing_attribute_names)),
1185+
)
1186+
.distinct_on((
1187+
protocol_state::protocol_component_id,
1188+
protocol_state::attribute_name,
1189+
))
1190+
.order_by((
1191+
protocol_state::protocol_component_id,
1192+
protocol_state::attribute_name,
1193+
protocol_state::valid_to.desc(),
1194+
))
1195+
.get_results(conn)
1196+
.await
1197+
.map_err(PostgresError::from)?;
1198+
results.extend(deleted_results);
1199+
}
1200+
1201+
Ok(results
1202+
.into_iter()
1203+
.filter(|ps| tuple_ids.contains(&(&ps.protocol_component_id, &ps.attribute_name)))
11401204
.map(NewProtocolState::from)
11411205
.collect())
11421206
}
@@ -1644,22 +1708,55 @@ impl PartitionedVersionedRow for NewSlot {
16441708
// tables as well. A possible optimisation would be to add the valid_to filter back
16451709
// and then use a second query for storage still missing that will access the
16461710
// archived tables. Therefore, performance is not impacted in the common case.
1647-
Ok(contract_storage::table
1711+
let mut results: Vec<ContractStorage> = contract_storage::table
16481712
.select(ContractStorage::as_select())
1713+
.into_boxed()
16491714
.filter(
16501715
contract_storage::account_id
16511716
.eq_any(&accounts)
1652-
.and(contract_storage::slot.eq_any(&slots)),
1717+
.and(contract_storage::slot.eq_any(&slots))
1718+
.and(contract_storage::valid_to.eq(MAX_TS)),
16531719
)
1654-
.distinct_on((contract_storage::account_id, contract_storage::slot))
1655-
.order_by((
1656-
contract_storage::account_id,
1657-
contract_storage::slot,
1658-
contract_storage::valid_to.desc(),
1659-
))
16601720
.get_results(conn)
16611721
.await
1662-
.map_err(PostgresError::from)?
1722+
.map_err(PostgresError::from)?;
1723+
1724+
let found_ids: HashSet<_> = results
1725+
.iter()
1726+
.map(|cs| (&cs.account_id, &cs.slot))
1727+
.collect();
1728+
1729+
let missing_ids: Vec<_> = tuple_ids
1730+
.clone()
1731+
.into_iter()
1732+
.filter(|id| !found_ids.contains(id))
1733+
.collect();
1734+
1735+
// If we have missing ids, we need to query the archived tables as well. This is necessary
1736+
// when entries are deleted
1737+
if !missing_ids.is_empty() {
1738+
let (missing_accounts, missing_slots): (Vec<&i64>, Vec<&Bytes>) =
1739+
missing_ids.into_iter().unzip();
1740+
let deleted_results: Vec<ContractStorage> = contract_storage::table
1741+
.select(ContractStorage::as_select())
1742+
.filter(
1743+
contract_storage::account_id
1744+
.eq_any(&missing_accounts)
1745+
.and(contract_storage::slot.eq_any(&missing_slots)),
1746+
)
1747+
.distinct_on((contract_storage::account_id, contract_storage::slot))
1748+
.order_by((
1749+
contract_storage::account_id,
1750+
contract_storage::slot,
1751+
contract_storage::valid_to.desc(),
1752+
))
1753+
.get_results(conn)
1754+
.await
1755+
.map_err(PostgresError::from)?;
1756+
results.extend(deleted_results);
1757+
}
1758+
1759+
Ok(results
16631760
.into_iter()
16641761
.filter(|cs| tuple_ids.contains(&(&cs.account_id, &cs.slot)))
16651762
.map(NewSlot::from)

0 commit comments

Comments
 (0)