Skip to content

Commit 3e72aee

Browse files
authored
feat(cat-gateway): Remove removed_addresses from the rbac_registration table (#3724)
* remove `removed_addresses` from the `rbac_registration` table * fix clippy * wip
1 parent 32fcb9d commit 3e72aee

17 files changed

+115
-120
lines changed

catalyst-gateway/bin/src/db/index/block/rbac509/cql/insert_rbac509.cql

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ INSERT INTO rbac_registration (
55
txn_index,
66
txn_id,
77
prv_txn_id,
8-
removed_stake_addresses,
98
purpose
109
) VALUES (
1110
:catalyst_id,
1211
:slot_no,
1312
:txn_index,
1413
:txn_id,
1514
:prv_txn_id,
16-
:removed_stake_addresses,
1715
:purpose
1816
);

catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_public_key.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use tracing::error;
1010

1111
use crate::{
1212
db::{
13-
index::queries::{PreparedQueries, SizedBatch},
13+
index::{
14+
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
15+
session::CassandraSession,
16+
},
1417
types::{DbCatalystId, DbPublicKey, DbSlot},
1518
},
1619
settings::cassandra_db::EnvVars,
@@ -57,6 +60,16 @@ impl Params {
5760
}
5861
}
5962

63+
/// Executes prepared queries as a batch.
64+
pub(crate) async fn execute_batch(
65+
session: &Arc<CassandraSession>,
66+
queries: Vec<Self>,
67+
) -> FallibleQueryResults {
68+
session
69+
.execute_batch(PreparedQuery::CatalystIdForPublicKeyInsertQuery, queries)
70+
.await
71+
}
72+
6073
/// Prepares a batch of queries.
6174
pub(crate) async fn prepare_batch(
6275
session: &Arc<Session>,

catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tracing::error;
99

1010
use crate::{
1111
db::{
12-
index::queries::{PreparedQueries, SizedBatch},
12+
index::{
13+
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
14+
session::CassandraSession,
15+
},
1316
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTxnIndex},
1417
},
1518
settings::cassandra_db::EnvVars,
@@ -61,6 +64,23 @@ impl Params {
6164
}
6265
}
6366

67+
/// Executes prepared queries as a batch.
68+
pub(crate) async fn execute_batch(
69+
session: &Arc<CassandraSession>,
70+
queries: Vec<Self>,
71+
) -> FallibleQueryResults {
72+
for q in &queries {
73+
session
74+
.caches()
75+
.rbac_stake_address()
76+
.insert(q.stake_address.clone().into(), q.catalyst_id.clone().into());
77+
}
78+
79+
session
80+
.execute_batch(PreparedQuery::CatalystIdForStakeAddressInsertQuery, queries)
81+
.await
82+
}
83+
6484
/// Prepare Batch of RBAC Registration Index Data Queries
6585
pub(crate) async fn prepare_batch(
6686
session: &Arc<Session>,

catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use tracing::error;
99

1010
use crate::{
1111
db::{
12-
index::queries::{PreparedQueries, SizedBatch},
12+
index::{
13+
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
14+
session::CassandraSession,
15+
},
1316
types::{DbCatalystId, DbSlot, DbTransactionId},
1417
},
1518
settings::cassandra_db::EnvVars,
@@ -56,6 +59,16 @@ impl Params {
5659
}
5760
}
5861

62+
/// Executes prepared queries as a batch.
63+
pub(crate) async fn execute_batch(
64+
session: &Arc<CassandraSession>,
65+
queries: Vec<Self>,
66+
) -> FallibleQueryResults {
67+
session
68+
.execute_batch(PreparedQuery::CatalystIdForTxnIdInsertQuery, queries)
69+
.await
70+
}
71+
5972
/// Prepares a Batch of RBAC Registration Index Data Queries.
6073
pub(crate) async fn prepare_batch(
6174
session: &Arc<Session>,

catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
11
//! Insert RBAC 509 Registration Query.
22
3-
use std::{collections::HashSet, fmt::Debug, sync::Arc};
3+
use std::{fmt::Debug, sync::Arc};
44

5-
use cardano_chain_follower::{Slot, StakeAddress, TxnIndex, hashes::TransactionId};
5+
use cardano_chain_follower::{Slot, TxnIndex, hashes::TransactionId};
66
use catalyst_types::{catalyst_id::CatalystId, uuid::UuidV4};
77
use scylla::{SerializeRow, client::session::Session, value::MaybeUnset};
88
use tracing::error;
99

1010
use crate::{
1111
db::{
12-
index::queries::{PreparedQueries, SizedBatch},
13-
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbUuidV4},
12+
index::{
13+
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
14+
session::CassandraSession,
15+
},
16+
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
1417
},
1518
settings::cassandra_db::EnvVars,
1619
};
@@ -31,8 +34,6 @@ pub(crate) struct Params {
3134
txn_id: DbTransactionId,
3235
/// Hash of Previous Transaction. Is `None` for the first registration. 32 Bytes.
3336
prv_txn_id: MaybeUnset<DbTransactionId>,
34-
/// A set of removed stake addresses.
35-
removed_stake_addresses: HashSet<DbStakeAddress>,
3637
/// A registration purpose.
3738
///
3839
/// The value of purpose is `None` if the chain is modified by the registration
@@ -55,7 +56,6 @@ impl Debug for Params {
5556
.field("slot_no", &self.slot_no)
5657
.field("txn_index", &self.txn_index)
5758
.field("prv_txn_id", &prv_txn_id)
58-
.field("removed_stake_addresses", &self.removed_stake_addresses)
5959
.field("purpose", &self.purpose)
6060
.finish()
6161
}
@@ -69,14 +69,9 @@ impl Params {
6969
slot_no: Slot,
7070
txn_index: TxnIndex,
7171
prv_txn_id: Option<TransactionId>,
72-
removed_stake_addresses: HashSet<StakeAddress>,
7372
purpose: Option<UuidV4>,
7473
) -> Self {
7574
let prv_txn_id = prv_txn_id.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into()));
76-
let removed_stake_addresses = removed_stake_addresses
77-
.into_iter()
78-
.map(Into::into)
79-
.collect();
8075
let purpose = purpose.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into()));
8176

8277
Self {
@@ -85,11 +80,20 @@ impl Params {
8580
slot_no: slot_no.into(),
8681
txn_index: txn_index.into(),
8782
prv_txn_id,
88-
removed_stake_addresses,
8983
purpose,
9084
}
9185
}
9286

87+
/// Executes prepared queries as a batch.
88+
pub(crate) async fn execute_batch(
89+
session: &Arc<CassandraSession>,
90+
queries: Vec<Self>,
91+
) -> FallibleQueryResults {
92+
session
93+
.execute_batch(PreparedQuery::Rbac509InsertQuery, queries)
94+
.await
95+
}
96+
9397
/// Prepare Batch of RBAC Registration Index Data Queries
9498
pub(crate) async fn prepare_batch(
9599
session: &Arc<Session>,

catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ use tracing::error;
1010

1111
use crate::{
1212
db::{
13-
index::queries::{PreparedQueries, SizedBatch},
13+
index::{
14+
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
15+
session::CassandraSession,
16+
},
1417
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
1518
},
1619
service::common::objects::generic::problem_report::ProblemReport,
@@ -65,6 +68,16 @@ impl Params {
6568
}
6669
}
6770

71+
/// Executes prepared queries as a batch.
72+
pub(crate) async fn execute_batch(
73+
session: &Arc<CassandraSession>,
74+
queries: Vec<Self>,
75+
) -> FallibleQueryResults {
76+
session
77+
.execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, queries)
78+
.await
79+
}
80+
6881
/// Prepare Batch of RBAC Registration Index Data Queries
6982
pub(crate) async fn prepare_batch(
7083
session: &Arc<Session>,

catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@ pub(crate) mod insert_catalyst_id_for_txn_id;
66
pub(crate) mod insert_rbac509;
77
pub(crate) mod insert_rbac509_invalid;
88

9-
use std::{
10-
collections::{BTreeSet, HashSet},
11-
sync::Arc,
12-
};
9+
use std::{collections::BTreeSet, sync::Arc};
1310

1411
use anyhow::{Context, Result};
1512
use cardano_chain_follower::{MultiEraBlock, Slot, TxnIndex, hashes::TransactionId};
@@ -20,7 +17,7 @@ use tracing::{debug, error};
2017

2118
use crate::{
2219
db::index::{
23-
queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
20+
queries::{FallibleQueryTasks, SizedBatch},
2421
session::CassandraSession,
2522
},
2623
metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
@@ -176,14 +173,11 @@ impl Rbac509InsertQuery {
176173
slot,
177174
index,
178175
previous_transaction,
179-
// Addresses can only be removed from other chains, so this list is always
180-
// empty for the chain that is being updated.
181-
HashSet::new(),
182176
purpose,
183177
));
184178

185179
// Update other chains that were affected by this registration.
186-
for (catalyst_id, removed_addresses) in modified_chains {
180+
for (catalyst_id, _) in modified_chains {
187181
self.registrations.push(insert_rbac509::Params::new(
188182
catalyst_id.clone(),
189183
txn_hash,
@@ -193,7 +187,6 @@ impl Rbac509InsertQuery {
193187
// make sense to include a previous transaction ID unrelated to the chain
194188
// that is being updated.
195189
None,
196-
removed_addresses,
197190
None,
198191
));
199192
}
@@ -254,54 +247,47 @@ impl Rbac509InsertQuery {
254247
if !self.registrations.is_empty() {
255248
let inner_session = session.clone();
256249
query_handles.push(tokio::spawn(async move {
257-
inner_session
258-
.execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
259-
.await
250+
insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
260251
}));
261252
}
262253

263254
if !self.invalid.is_empty() {
264255
let inner_session = session.clone();
265256
query_handles.push(tokio::spawn(async move {
266-
inner_session
267-
.execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
268-
.await
257+
insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
269258
}));
270259
}
271260

272261
if !self.catalyst_id_for_txn_id.is_empty() {
273262
let inner_session = session.clone();
274263
query_handles.push(tokio::spawn(async move {
275-
inner_session
276-
.execute_batch(
277-
PreparedQuery::CatalystIdForTxnIdInsertQuery,
278-
self.catalyst_id_for_txn_id,
279-
)
280-
.await
264+
insert_catalyst_id_for_txn_id::Params::execute_batch(
265+
&inner_session,
266+
self.catalyst_id_for_txn_id,
267+
)
268+
.await
281269
}));
282270
}
283271

284272
if !self.catalyst_id_for_stake_address.is_empty() {
285273
let inner_session = session.clone();
286274
query_handles.push(tokio::spawn(async move {
287-
inner_session
288-
.execute_batch(
289-
PreparedQuery::CatalystIdForStakeAddressInsertQuery,
290-
self.catalyst_id_for_stake_address,
291-
)
292-
.await
275+
insert_catalyst_id_for_stake_address::Params::execute_batch(
276+
&inner_session,
277+
self.catalyst_id_for_stake_address,
278+
)
279+
.await
293280
}));
294281
}
295282

296283
if !self.catalyst_id_for_public_key.is_empty() {
297284
let inner_session = session.clone();
298285
query_handles.push(tokio::spawn(async move {
299-
inner_session
300-
.execute_batch(
301-
PreparedQuery::CatalystIdForPublicKeyInsertQuery,
302-
self.catalyst_id_for_public_key,
303-
)
304-
.await
286+
insert_catalyst_id_for_public_key::Params::execute_batch(
287+
&inner_session,
288+
self.catalyst_id_for_public_key,
289+
)
290+
.await
305291
}));
306292
}
307293

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
SELECT txn_id,
22
slot_no,
33
txn_index,
4-
prv_txn_id,
5-
removed_stake_addresses
4+
prv_txn_id
65
FROM rbac_registration
76
WHERE catalyst_id = :catalyst_id

catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,20 +120,6 @@ impl Query {
120120
}
121121
}
122122

123-
/// Adds the given value to the cache.
124-
pub fn cache_stake_address(
125-
is_persistent: bool,
126-
stake_address: StakeAddress,
127-
catalyst_id: CatalystId,
128-
) {
129-
CassandraSession::get(is_persistent).inspect(|session| {
130-
session
131-
.caches()
132-
.rbac_stake_address()
133-
.insert(stake_address, catalyst_id);
134-
});
135-
}
136-
137123
/// Removes all cached values.
138124
pub fn invalidate_stake_addresses_cache(is_persistent: bool) {
139125
CassandraSession::get(is_persistent).inspect(|session| {

catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Get RBAC registrations by Catalyst ID.
22
3-
use std::{collections::HashSet, sync::Arc};
3+
use std::sync::Arc;
44

55
use scylla::{
66
DeserializeRow, SerializeRow,
@@ -14,7 +14,7 @@ use crate::db::{
1414
queries::{PreparedQueries, PreparedSelectQuery},
1515
session::CassandraSession,
1616
},
17-
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex},
17+
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex},
1818
};
1919

2020
/// Get registrations by Catalyst ID query.
@@ -40,8 +40,6 @@ pub(crate) struct Query {
4040
/// A previous transaction id.
4141
#[allow(dead_code)]
4242
pub prv_txn_id: Option<DbTransactionId>,
43-
/// A set of removed stake addresses.
44-
pub removed_stake_addresses: HashSet<DbStakeAddress>,
4543
}
4644

4745
impl Query {

0 commit comments

Comments
 (0)