Skip to content

Commit 2b88e17

Browse files
authored
Merge pull request #2489 from input-output-hk/ctl/2478-record-signatures-origin-in-the-signature-processor
Record signatures origin in the signature processor
2 parents 090b7dc + 1d4de25 commit 2b88e17

File tree

9 files changed

+80
-40
lines changed

9 files changed

+80
-40
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-aggregator/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-aggregator"
3-
version = "0.7.52"
3+
version = "0.7.53"
44
description = "A Mithril Aggregator server"
55
authors = { workspace = true }
66
edition = { workspace = true }

mithril-aggregator/src/dependency_injection/builder/enablers/misc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ impl DependenciesBuilder {
8787
self.get_certifier_service().await?,
8888
stop_rx,
8989
self.root_logger(),
90+
self.get_metrics_service().await?,
9091
);
9192

9293
Ok(Arc::new(signature_processor))

mithril-aggregator/src/http_server/routes/signatures_routes.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ fn register_signatures(
1414
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
1515
warp::path!("register-signatures")
1616
.and(warp::post())
17-
.and(middlewares::with_origin_tag(router_state))
1817
.and(warp::body::json())
1918
.and(middlewares::with_logger(router_state))
2019
.and(middlewares::with_certifier_service(router_state))
@@ -40,9 +39,10 @@ mod handlers {
4039
unwrap_to_internal_server_error, MetricsService, SingleSignatureAuthenticator,
4140
};
4241

42+
const METRICS_HTTP_ORIGIN: &str = "HTTP";
43+
4344
/// Register Signatures
4445
pub async fn register_signatures(
45-
origin_tag: Option<String>,
4646
message: RegisterSignatureMessage,
4747
logger: Logger,
4848
certifier_service: Arc<dyn CertifierService>,
@@ -53,7 +53,7 @@ mod handlers {
5353

5454
metrics_service
5555
.get_signature_registration_total_received_since_startup()
56-
.increment(&[origin_tag.as_deref().unwrap_or_default()]);
56+
.increment(&[METRICS_HTTP_ORIGIN]);
5757

5858
let signed_entity_type = message.signed_entity_type.clone();
5959
let signed_message = message.signed_message.clone();
@@ -120,7 +120,6 @@ mod handlers {
120120
mod tests {
121121
use anyhow::anyhow;
122122
use mithril_common::entities::ClientError;
123-
use mithril_common::MITHRIL_ORIGIN_TAG_HEADER;
124123
use std::sync::Arc;
125124
use warp::http::{Method, StatusCode};
126125
use warp::test::request;
@@ -158,16 +157,14 @@ mod tests {
158157
let initial_counter_value = dependency_manager
159158
.metrics_service
160159
.get_signature_registration_total_received_since_startup()
161-
.get(&["TEST"]);
160+
.get(&["HTTP"]);
162161

163162
request()
164163
.method(method)
165164
.path(path)
166165
.json(&RegisterSignatureMessage::dummy())
167-
.header(MITHRIL_ORIGIN_TAG_HEADER, "TEST")
168-
.reply(&setup_router(RouterState::new_with_origin_tag_white_list(
166+
.reply(&setup_router(RouterState::new_with_dummy_config(
169167
dependency_manager.clone(),
170-
&["TEST"],
171168
)))
172169
.await;
173170

@@ -176,7 +173,7 @@ mod tests {
176173
dependency_manager
177174
.metrics_service
178175
.get_signature_registration_total_received_since_startup()
179-
.get(&["TEST"])
176+
.get(&["HTTP"])
180177
);
181178
}
182179

mithril-aggregator/src/metrics/service.rs

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,85 +5,88 @@ use mithril_metric::{build_metrics_service, MetricCounterWithLabels, MetricsServ
55
use mithril_metric::metric::{MetricCollector, MetricCounter};
66
use prometheus::proto::{LabelPair, MetricFamily};
77

8-
static ORIGIN_TAG_LABEL: &str = "origin_tag";
8+
// Those are three differents dimensions, they use the same value to simplify usage in Grafana
9+
static CLIENT_ORIGIN_TAG_LABEL: &str = "origin_tag";
10+
static SIGNER_REGISTRATION_ORIGIN_TAG_LABEL: &str = "origin_tag";
11+
static SIGNER_SIGNATURE_ORIGIN_TAG_LABEL: &str = "origin_tag";
912

1013
build_metrics_service!(
1114
MetricsService,
1215

1316
certificate_detail_total_served_since_startup:MetricCounterWithLabels(
1417
"certificate_detail_total_served_since_startup",
1518
"Number of certificate details served since startup on a Mithril aggregator node",
16-
&[ORIGIN_TAG_LABEL]
19+
&[CLIENT_ORIGIN_TAG_LABEL]
1720
),
1821
artifact_detail_cardano_immutable_files_full_total_served_since_startup:MetricCounterWithLabels(
1922
"mithril_aggregator_artifact_detail_cardano_db_total_served_since_startup",
2023
"Number of Cardano immutable files full artifact details served since startup on a Mithril aggregator node",
21-
&[ORIGIN_TAG_LABEL]
24+
&[CLIENT_ORIGIN_TAG_LABEL]
2225
),
2326
cardano_immutable_files_full_total_restoration_since_startup:MetricCounterWithLabels(
2427
"mithril_aggregator_cardano_db_total_restoration_since_startup",
2528
"Number of Cardano immutable files full restorations since startup on a Mithril aggregator node",
26-
&[ORIGIN_TAG_LABEL]
29+
&[CLIENT_ORIGIN_TAG_LABEL]
2730
),
2831
cardano_database_immutable_files_restored_since_startup:MetricCounterWithLabels(
2932
"mithril_aggregator_cardano_db_immutable_files_restored_since_startup",
3033
"Number of Cardano immutable files restored since startup on a Mithril aggregator node",
31-
&[ORIGIN_TAG_LABEL]
34+
&[CLIENT_ORIGIN_TAG_LABEL]
3235
),
3336
cardano_database_ancillary_files_restored_since_startup:MetricCounterWithLabels(
3437
"mithril_aggregator_cardano_db_ancillary_files_restored_since_startup",
3538
"Number of Cardano ancillary files restored since startup on a Mithril aggregator node",
36-
&[ORIGIN_TAG_LABEL]
39+
&[CLIENT_ORIGIN_TAG_LABEL]
3740
),
3841
cardano_database_complete_restoration_since_startup:MetricCounterWithLabels(
3942
"mithril_aggregator_cardano_db_complete_restoration_since_startup",
4043
"Number of complete Cardano database restoration since startup on a Mithril aggregator node",
41-
&[ORIGIN_TAG_LABEL]
44+
&[CLIENT_ORIGIN_TAG_LABEL]
4245
),
4346
cardano_database_partial_restoration_since_startup:MetricCounterWithLabels(
4447
"mithril_aggregator_cardano_db_partial_restoration_since_startup",
4548
"Number of partial Cardano database restoration since startup on a Mithril aggregator node",
46-
&[ORIGIN_TAG_LABEL]
49+
&[CLIENT_ORIGIN_TAG_LABEL]
4750
),
4851
artifact_detail_cardano_database_total_served_since_startup:MetricCounterWithLabels(
4952
"mithril_aggregator_artifact_detail_cardano_database_total_served_since_startup",
5053
"Number of Cardano database artifact details served since startup on a Mithril aggregator node",
51-
&[ORIGIN_TAG_LABEL]
54+
&[CLIENT_ORIGIN_TAG_LABEL]
5255
),
5356
artifact_detail_mithril_stake_distribution_total_served_since_startup:MetricCounterWithLabels(
5457
"mithril_aggregator_artifact_detail_mithril_stake_distribution_total_served_since_startup",
5558
"Number of Mithril stake distribution artifact details served since startup on a Mithril aggregator node",
56-
&[ORIGIN_TAG_LABEL]
59+
&[CLIENT_ORIGIN_TAG_LABEL]
5760
),
5861
artifact_detail_cardano_stake_distribution_total_served_since_startup:MetricCounterWithLabels(
5962
"mithril_aggregator_artifact_detail_cardano_stake_distribution_total_served_since_startup",
6063
"Number of Cardano stake distribution artifact details served since startup on a Mithril aggregator node",
61-
&[ORIGIN_TAG_LABEL]
64+
&[CLIENT_ORIGIN_TAG_LABEL]
6265
),
6366
artifact_detail_cardano_transaction_total_served_since_startup:MetricCounterWithLabels(
6467
"mithril_aggregator_artifact_detail_cardano_transaction_total_served_since_startup",
6568
"Number of Cardano transaction artifact details served since startup on a Mithril aggregator node",
66-
&[ORIGIN_TAG_LABEL]
69+
&[CLIENT_ORIGIN_TAG_LABEL]
6770
),
6871
proof_cardano_transaction_total_proofs_served_since_startup:MetricCounterWithLabels(
6972
"mithril_aggregator_proof_cardano_transaction_total_proofs_served_since_startup",
7073
"Number of Cardano transaction proofs served since startup on a Mithril aggregator node",
71-
&[ORIGIN_TAG_LABEL]
74+
&[CLIENT_ORIGIN_TAG_LABEL]
7275
),
7376
proof_cardano_transaction_total_transactions_served_since_startup:MetricCounterWithLabels(
7477
"mithril_aggregator_proof_cardano_transaction_total_transactions_served_since_startup",
7578
"Number of Cardano transaction hashes requested for proof since startup on a Mithril aggregator node",
76-
&[ORIGIN_TAG_LABEL]
79+
&[CLIENT_ORIGIN_TAG_LABEL]
7780
),
7881
signer_registration_total_received_since_startup:MetricCounterWithLabels(
7982
"mithril_aggregator_signer_registration_total_received_since_startup",
8083
"Number of signer registrations received since startup on a Mithril aggregator node",
81-
&[ORIGIN_TAG_LABEL]
84+
&[SIGNER_REGISTRATION_ORIGIN_TAG_LABEL]
8285
),
8386
signature_registration_total_received_since_startup:MetricCounterWithLabels(
8487
"mithril_aggregator_signature_registration_total_received_since_startup",
8588
"Number of signature registrations received since startup on a Mithril aggregator node",
86-
&[ORIGIN_TAG_LABEL]
89+
&[SIGNER_SIGNATURE_ORIGIN_TAG_LABEL]
8790
),
8891
certificate_total_produced_since_startup:MetricCounter(
8992
"mithril_aggregator_certificate_total_produced_since_startup",

mithril-aggregator/src/services/signature_consumer/fake.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ impl SignatureConsumer for FakeSignatureConsumer {
3737
}
3838
}
3939
}
40+
fn get_origin_tag(&self) -> String {
41+
"FAKE".to_string()
42+
}
4043
}
4144

4245
#[cfg(test)]

mithril-aggregator/src/services/signature_consumer/interface.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ use mithril_common::{
99
pub trait SignatureConsumer: Sync + Send {
1010
/// Returns signatures when available
1111
async fn get_signatures(&self) -> StdResult<Vec<(SingleSignature, SignedEntityType)>>;
12+
13+
/// Returns the origin tag of the consumer (e.g. HTTP or DMQ)
14+
fn get_origin_tag(&self) -> String;
1215
}

mithril-aggregator/src/services/signature_consumer/noop.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ impl SignatureConsumer for SignatureConsumerNoop {
1919
> {
2020
future::pending().await
2121
}
22+
23+
fn get_origin_tag(&self) -> String {
24+
"NOOP".to_string()
25+
}
2226
}
2327

2428
#[cfg(test)]

mithril-aggregator/src/services/signature_processor.rs

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use slog::{error, warn, Logger};
55
use mithril_common::{logging::LoggerExtensions, StdResult};
66
use tokio::{select, sync::watch::Receiver};
77

8+
use crate::MetricsService;
9+
810
use super::{CertifierService, SignatureConsumer};
911

1012
/// A signature processor which receives signature and processes them.
@@ -24,6 +26,7 @@ pub struct SequentialSignatureProcessor {
2426
certifier: Arc<dyn CertifierService>,
2527
stop_rx: Receiver<()>,
2628
logger: Logger,
29+
metrics_service: Arc<MetricsService>,
2730
}
2831

2932
impl SequentialSignatureProcessor {
@@ -33,12 +36,14 @@ impl SequentialSignatureProcessor {
3336
certifier: Arc<dyn CertifierService>,
3437
stop_rx: Receiver<()>,
3538
logger: Logger,
39+
metrics_service: Arc<MetricsService>,
3640
) -> Self {
3741
Self {
3842
consumer,
3943
certifier,
4044
stop_rx,
4145
logger: logger.new_with_component_name::<Self>(),
46+
metrics_service,
4247
}
4348
}
4449
}
@@ -55,6 +60,11 @@ impl SignatureProcessor for SequentialSignatureProcessor {
5560
.await
5661
{
5762
error!(self.logger, "Error dispatching single signature"; "error" => ?e);
63+
} else {
64+
let origin_network = self.consumer.get_origin_tag();
65+
self.metrics_service
66+
.get_signature_registration_total_received_since_startup()
67+
.increment(&[&origin_network]);
5868
}
5969
}
6070
}
@@ -107,24 +117,29 @@ mod tests {
107117
#[tokio::test]
108118
async fn processor_process_signatures_succeeds() {
109119
let logger = TestLogger::stdout();
120+
let single_signatures = vec![
121+
(
122+
fake_data::single_signature(vec![1, 2, 3]),
123+
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
124+
),
125+
(
126+
fake_data::single_signature(vec![4, 5, 6]),
127+
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
128+
),
129+
];
130+
let single_signatures_length = single_signatures.len();
131+
let network_origin = "test_network";
110132
let mock_consumer = {
111133
let mut mock_consumer = MockSignatureConsumer::new();
112134
mock_consumer
113135
.expect_get_signatures()
114-
.returning(|| {
115-
Ok(vec![
116-
(
117-
fake_data::single_signature(vec![1, 2, 3]),
118-
SignedEntityType::MithrilStakeDistribution(Epoch(1)),
119-
),
120-
(
121-
fake_data::single_signature(vec![4, 5, 6]),
122-
SignedEntityType::MithrilStakeDistribution(Epoch(2)),
123-
),
124-
])
125-
})
136+
.returning(move || Ok(single_signatures.clone()))
126137
.times(1);
127138
mock_consumer
139+
.expect_get_origin_tag()
140+
.returning(|| network_origin.to_string())
141+
.times(single_signatures_length);
142+
mock_consumer
128143
};
129144
let mock_certifier = {
130145
let mut mock_certifier = MockCertifierService::new();
@@ -144,21 +159,33 @@ mod tests {
144159
)
145160
.returning(|_, _| Ok(SignatureRegistrationStatus::Registered))
146161
.times(1);
147-
148162
mock_certifier
149163
};
150164
let (_stop_tx, stop_rx) = channel(());
165+
let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
166+
let initial_counter_value = metrics_service
167+
.get_signature_registration_total_received_since_startup()
168+
.get(&[network_origin]);
169+
let metrics_service = Arc::new(metrics_service);
151170
let processor = SequentialSignatureProcessor::new(
152171
Arc::new(mock_consumer),
153172
Arc::new(mock_certifier),
154173
stop_rx,
155174
logger,
175+
metrics_service.clone(),
156176
);
157177

158178
processor
159179
.process_signatures()
160180
.await
161181
.expect("Failed to process signatures");
182+
183+
assert_eq!(
184+
initial_counter_value + single_signatures_length as u32,
185+
metrics_service
186+
.get_signature_registration_total_received_since_startup()
187+
.get(&[network_origin])
188+
)
162189
}
163190

164191
#[tokio::test]
@@ -185,11 +212,13 @@ mod tests {
185212
mock_certifier
186213
};
187214
let (stop_tx, stop_rx) = channel(());
215+
let metrics_service = MetricsService::new(TestLogger::stdout()).unwrap();
188216
let processor = SequentialSignatureProcessor::new(
189217
Arc::new(fake_consumer),
190218
Arc::new(mock_certifier),
191219
stop_rx,
192220
logger,
221+
Arc::new(metrics_service),
193222
);
194223

195224
tokio::select!(

0 commit comments

Comments
 (0)