-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathmod.rs
More file actions
155 lines (110 loc) · 4.87 KB
/
mod.rs
File metadata and controls
155 lines (110 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use gasket::runtime::spawn_stage;
use serde::Deserialize;
use crate::{bootstrap, crosscut, model, storage};
type InputPort = gasket::messaging::InputPort<model::ChainSyncCommandEx>;
type OutputPort = gasket::messaging::OutputPort<model::CRDTCommand>;
pub mod point_by_tx;
pub mod pool_by_stake;
pub mod utxo_by_address;
mod worker;
pub mod address_by_txo;
pub mod plutus_script_by_hash;
pub mod total_transactions_count;
pub mod total_transactions_count_by_contract_addresses;
pub mod transactions_count_by_contract_address;
pub mod transactions_count_by_contract_address_by_epoch;
pub mod transactions_count_by_epoch;
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
UtxoByAddress(utxo_by_address::Config),
PointByTx(point_by_tx::Config),
PoolByStake(pool_by_stake::Config),
AddressByTxo(address_by_txo::Config),
PlutusScriptByHash(plutus_script_by_hash::Config),
TotalTransactionsCount(total_transactions_count::Config),
TransactionsCountByEpoch(transactions_count_by_epoch::Config),
TransactionsCountByContractAddress(transactions_count_by_contract_address::Config),
TransactionsCountByContractAddressByEpoch(
transactions_count_by_contract_address_by_epoch::Config,
),
TotalTransactionsCountByContractAddresses(
total_transactions_count_by_contract_addresses::Config,
),
}
impl Config {
fn plugin(self, chain: &crosscut::ChainWellKnownInfo) -> Reducer {
match self {
Config::UtxoByAddress(c) => c.plugin(chain),
Config::PointByTx(c) => c.plugin(),
Config::PoolByStake(c) => c.plugin(),
Config::AddressByTxo(c) => c.plugin(chain),
Config::PlutusScriptByHash(c) => c.plugin(chain),
Config::TotalTransactionsCount(c) => c.plugin(),
Config::TransactionsCountByEpoch(c) => c.plugin(chain),
Config::TransactionsCountByContractAddress(c) => c.plugin(chain),
Config::TransactionsCountByContractAddressByEpoch(c) => c.plugin(chain),
Config::TotalTransactionsCountByContractAddresses(c) => c.plugin(),
}
}
}
pub struct Bootstrapper {
input: InputPort,
output: OutputPort,
reducers: Vec<Reducer>,
}
impl Bootstrapper {
pub fn new(configs: Vec<Config>, chain: &crosscut::ChainWellKnownInfo) -> Self {
Self {
reducers: configs.into_iter().map(|x| x.plugin(&chain)).collect(),
input: Default::default(),
output: Default::default(),
}
}
pub fn borrow_input_port(&mut self) -> &'_ mut InputPort {
&mut self.input
}
pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort {
&mut self.output
}
pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, state: storage::ReadPlugin) {
let worker = worker::Worker::new(self.reducers, state, self.input, self.output);
pipeline.register_stage("reducers", spawn_stage(worker, Default::default()));
}
}
pub enum Reducer {
UtxoByAddress(utxo_by_address::Reducer),
PointByTx(point_by_tx::Reducer),
PoolByStake(pool_by_stake::Reducer),
AddressByTxo(address_by_txo::Reducer),
PlutusScriptByHash(plutus_script_by_hash::Reducer),
TotalTransactionsCount(total_transactions_count::Reducer),
TransactionsCountByEpoch(transactions_count_by_epoch::Reducer),
TransactionsCountByContractAddress(transactions_count_by_contract_address::Reducer),
TransactionsCountByContractAddressByEpoch(
transactions_count_by_contract_address_by_epoch::Reducer,
),
TotalTransactionsCountByContractAddresses(
total_transactions_count_by_contract_addresses::Reducer,
),
}
impl Reducer {
pub fn reduce_block(
&mut self,
block: &model::MultiEraBlock,
output: &mut OutputPort,
) -> Result<(), gasket::error::Error> {
match self {
Reducer::UtxoByAddress(x) => x.reduce_block(block, output),
Reducer::PointByTx(x) => x.reduce_block(block, output),
Reducer::PoolByStake(x) => x.reduce_block(block, output),
Reducer::AddressByTxo(x) => x.reduce_block(block, output),
Reducer::PlutusScriptByHash(x) => x.reduce_block(block, output),
Reducer::TotalTransactionsCount(x) => x.reduce_block(block, output),
Reducer::TransactionsCountByEpoch(x) => x.reduce_block(block, output),
Reducer::TransactionsCountByContractAddress(x) => x.reduce_block(block, output),
Reducer::TransactionsCountByContractAddressByEpoch(x) => x.reduce_block(block, output),
Reducer::TotalTransactionsCountByContractAddresses(x) => x.reduce_block(block, output),
}
}
}