Skip to content

Commit f1b7358

Browse files
authored
Merge pull request #2457 from subspace/farmer-events-and-metrics
Farmer events and metrics
2 parents 7f95420 + 65867e6 commit f1b7358

File tree

7 files changed

+476
-129
lines changed

7 files changed

+476
-129
lines changed

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
mod dsn;
2+
mod metrics;
23

34
use crate::commands::farm::dsn::configure_dsn;
4-
use crate::utils::{shutdown_signal, FarmerMetrics};
5+
use crate::commands::farm::metrics::FarmerMetrics;
6+
use crate::utils::shutdown_signal;
57
use anyhow::anyhow;
68
use bytesize::ByteSize;
79
use clap::{Parser, ValueHint};
@@ -22,6 +24,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
2224
use subspace_core_primitives::{PublicKey, Record, SectorIndex};
2325
use subspace_erasure_coding::ErasureCoding;
2426
use subspace_farmer::piece_cache::PieceCache;
27+
use subspace_farmer::single_disk_farm::farming::FarmingNotification;
2528
use subspace_farmer::single_disk_farm::{
2629
SectorPlottingDetails, SectorUpdate, SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
2730
};
@@ -694,27 +697,72 @@ where
694697
}
695698
};
696699

697-
// Register audit plot events
698-
let farmer_metrics = farmer_metrics.clone();
699-
let on_plot_audited_callback = move |audit_event: &_| {
700-
farmer_metrics.observe_audit_event(audit_event);
701-
};
702-
703700
single_disk_farm
704-
.on_sector_update(Arc::new(move |(_sector_index, sector_state)| {
705-
if let SectorUpdate::Plotting(SectorPlottingDetails::Finished {
706-
plotted_sector,
707-
old_plotted_sector,
708-
..
709-
}) = sector_state
710-
{
711-
on_plotted_sector_callback(plotted_sector, old_plotted_sector);
701+
.on_sector_update(Arc::new({
702+
let single_disk_farm_id = *single_disk_farm.id();
703+
let farmer_metrics = farmer_metrics.clone();
704+
705+
move |(_sector_index, sector_state)| match sector_state {
706+
SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => {
707+
farmer_metrics.sector_plotting.inc();
708+
}
709+
SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => {
710+
farmer_metrics.sector_downloading.inc();
711+
}
712+
SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => {
713+
farmer_metrics
714+
.observe_sector_downloading_time(&single_disk_farm_id, time);
715+
farmer_metrics.sector_downloaded.inc();
716+
}
717+
SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => {
718+
farmer_metrics.sector_encoding.inc();
719+
}
720+
SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => {
721+
farmer_metrics.observe_sector_encoding_time(&single_disk_farm_id, time);
722+
farmer_metrics.sector_encoded.inc();
723+
}
724+
SectorUpdate::Plotting(SectorPlottingDetails::Writing) => {
725+
farmer_metrics.sector_writing.inc();
726+
}
727+
SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => {
728+
farmer_metrics.observe_sector_writing_time(&single_disk_farm_id, time);
729+
farmer_metrics.sector_written.inc();
730+
}
731+
SectorUpdate::Plotting(SectorPlottingDetails::Finished {
732+
plotted_sector,
733+
old_plotted_sector,
734+
time,
735+
}) => {
736+
on_plotted_sector_callback(plotted_sector, old_plotted_sector);
737+
farmer_metrics.observe_sector_plotting_time(&single_disk_farm_id, time);
738+
farmer_metrics.sector_plotted.inc();
739+
}
740+
_ => {}
712741
}
713742
}))
714743
.detach();
715744

716745
single_disk_farm
717-
.on_plot_audited(Arc::new(on_plot_audited_callback))
746+
.on_farming_notification(Arc::new({
747+
let single_disk_farm_id = *single_disk_farm.id();
748+
let farmer_metrics = farmer_metrics.clone();
749+
750+
move |farming_notification| match farming_notification {
751+
FarmingNotification::Auditing(auditing_details) => {
752+
farmer_metrics.observe_auditing_time(
753+
&single_disk_farm_id,
754+
&auditing_details.time,
755+
);
756+
}
757+
FarmingNotification::Proving(proving_details) => {
758+
farmer_metrics.observe_proving_time(
759+
&single_disk_farm_id,
760+
&proving_details.time,
761+
proving_details.result,
762+
);
763+
}
764+
}
765+
}))
718766
.detach();
719767

720768
single_disk_farm.run()
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
use prometheus_client::metrics::counter::Counter;
2+
use prometheus_client::metrics::family::Family;
3+
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
4+
use prometheus_client::registry::{Registry, Unit};
5+
use std::sync::atomic::AtomicU64;
6+
use std::time::Duration;
7+
use subspace_farmer::single_disk_farm::farming::ProvingResult;
8+
use subspace_farmer::single_disk_farm::SingleDiskFarmId;
9+
10+
#[derive(Debug, Clone)]
11+
pub(super) struct FarmerMetrics {
12+
auditing_time: Family<Vec<(String, String)>, Histogram>,
13+
proving_time: Family<Vec<(String, String)>, Histogram>,
14+
sector_downloading_time: Family<Vec<(String, String)>, Histogram>,
15+
sector_encoding_time: Family<Vec<(String, String)>, Histogram>,
16+
sector_writing_time: Family<Vec<(String, String)>, Histogram>,
17+
sector_plotting_time: Family<Vec<(String, String)>, Histogram>,
18+
pub(super) sector_downloading: Counter<u64, AtomicU64>,
19+
pub(super) sector_downloaded: Counter<u64, AtomicU64>,
20+
pub(super) sector_encoding: Counter<u64, AtomicU64>,
21+
pub(super) sector_encoded: Counter<u64, AtomicU64>,
22+
pub(super) sector_writing: Counter<u64, AtomicU64>,
23+
pub(super) sector_written: Counter<u64, AtomicU64>,
24+
pub(super) sector_plotting: Counter<u64, AtomicU64>,
25+
pub(super) sector_plotted: Counter<u64, AtomicU64>,
26+
}
27+
28+
impl FarmerMetrics {
29+
pub(super) fn new(registry: &mut Registry) -> Self {
30+
let sub_registry = registry.sub_registry_with_prefix("subspace_farmer");
31+
32+
let auditing_time = Family::<_, _>::new_with_constructor(|| {
33+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
34+
});
35+
36+
sub_registry.register_with_unit(
37+
"auditing_time",
38+
"Auditing time",
39+
Unit::Seconds,
40+
auditing_time.clone(),
41+
);
42+
43+
let proving_time = Family::<_, _>::new_with_constructor(|| {
44+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
45+
});
46+
47+
sub_registry.register_with_unit(
48+
"proving_time",
49+
"Proving time",
50+
Unit::Seconds,
51+
proving_time.clone(),
52+
);
53+
54+
let sector_downloading_time = Family::<_, _>::new_with_constructor(|| {
55+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
56+
});
57+
58+
sub_registry.register_with_unit(
59+
"sector_downloading_time",
60+
"Sector downloading time",
61+
Unit::Seconds,
62+
sector_downloading_time.clone(),
63+
);
64+
65+
let sector_encoding_time = Family::<_, _>::new_with_constructor(|| {
66+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
67+
});
68+
69+
sub_registry.register_with_unit(
70+
"sector_encoding_time",
71+
"Sector encoding time",
72+
Unit::Seconds,
73+
sector_encoding_time.clone(),
74+
);
75+
76+
let sector_writing_time = Family::<_, _>::new_with_constructor(|| {
77+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
78+
});
79+
80+
sub_registry.register_with_unit(
81+
"sector_writing_time",
82+
"Sector writing time",
83+
Unit::Seconds,
84+
sector_writing_time.clone(),
85+
);
86+
87+
let sector_plotting_time = Family::<_, _>::new_with_constructor(|| {
88+
Histogram::new(exponential_buckets(0.0001, 2.0, 15))
89+
});
90+
91+
sub_registry.register_with_unit(
92+
"sector_plotting_time",
93+
"Sector plotting time",
94+
Unit::Seconds,
95+
sector_plotting_time.clone(),
96+
);
97+
98+
let sector_downloading = Counter::<_, _>::default();
99+
100+
sub_registry.register_with_unit(
101+
"sector_downloading_counter",
102+
"Number of sectors being downloaded",
103+
Unit::Other("sectors".to_string()),
104+
sector_downloading.clone(),
105+
);
106+
107+
let sector_downloaded = Counter::<_, _>::default();
108+
109+
sub_registry.register_with_unit(
110+
"sector_downloaded_counter",
111+
"Number of sectors being downloaded",
112+
Unit::Other("sectors".to_string()),
113+
sector_downloaded.clone(),
114+
);
115+
116+
let sector_encoding = Counter::<_, _>::default();
117+
118+
sub_registry.register_with_unit(
119+
"sector_encoding_counter",
120+
"Number of sectors being downloaded",
121+
Unit::Other("sectors".to_string()),
122+
sector_encoding.clone(),
123+
);
124+
125+
let sector_encoded = Counter::<_, _>::default();
126+
127+
sub_registry.register_with_unit(
128+
"sector_encoded_counter",
129+
"Number of sectors being downloaded",
130+
Unit::Other("sectors".to_string()),
131+
sector_encoded.clone(),
132+
);
133+
134+
let sector_writing = Counter::<_, _>::default();
135+
136+
sub_registry.register_with_unit(
137+
"sector_writing_counter",
138+
"Number of sectors being downloaded",
139+
Unit::Other("sectors".to_string()),
140+
sector_writing.clone(),
141+
);
142+
143+
let sector_written = Counter::<_, _>::default();
144+
145+
sub_registry.register_with_unit(
146+
"sector_written_counter",
147+
"Number of sectors being downloaded",
148+
Unit::Other("sectors".to_string()),
149+
sector_written.clone(),
150+
);
151+
152+
let sector_plotting = Counter::<_, _>::default();
153+
154+
sub_registry.register_with_unit(
155+
"sector_plotting_counter",
156+
"Number of sectors being downloaded",
157+
Unit::Other("sectors".to_string()),
158+
sector_plotting.clone(),
159+
);
160+
161+
let sector_plotted = Counter::<_, _>::default();
162+
163+
sub_registry.register_with_unit(
164+
"sector_plotted_counter",
165+
"Number of sectors being downloaded",
166+
Unit::Other("sectors".to_string()),
167+
sector_plotted.clone(),
168+
);
169+
170+
Self {
171+
auditing_time,
172+
proving_time,
173+
sector_downloading_time,
174+
sector_encoding_time,
175+
sector_writing_time,
176+
sector_plotting_time,
177+
sector_downloading,
178+
sector_downloaded,
179+
sector_encoding,
180+
sector_encoded,
181+
sector_writing,
182+
sector_written,
183+
sector_plotting,
184+
sector_plotted,
185+
}
186+
}
187+
188+
pub(super) fn observe_auditing_time(
189+
&self,
190+
single_disk_farm_id: &SingleDiskFarmId,
191+
time: &Duration,
192+
) {
193+
self.auditing_time
194+
.get_or_create(&vec![(
195+
"farm_id".to_string(),
196+
single_disk_farm_id.to_string(),
197+
)])
198+
.observe(time.as_secs_f64());
199+
}
200+
201+
pub(super) fn observe_proving_time(
202+
&self,
203+
single_disk_farm_id: &SingleDiskFarmId,
204+
time: &Duration,
205+
result: ProvingResult,
206+
) {
207+
self.proving_time
208+
.get_or_create(&vec![
209+
("farm_id".to_string(), single_disk_farm_id.to_string()),
210+
("result".to_string(), result.to_string()),
211+
])
212+
.observe(time.as_secs_f64());
213+
}
214+
215+
pub(super) fn observe_sector_downloading_time(
216+
&self,
217+
single_disk_farm_id: &SingleDiskFarmId,
218+
time: &Duration,
219+
) {
220+
self.sector_downloading_time
221+
.get_or_create(&vec![(
222+
"farm_id".to_string(),
223+
single_disk_farm_id.to_string(),
224+
)])
225+
.observe(time.as_secs_f64());
226+
}
227+
228+
pub(super) fn observe_sector_encoding_time(
229+
&self,
230+
single_disk_farm_id: &SingleDiskFarmId,
231+
time: &Duration,
232+
) {
233+
self.sector_encoding_time
234+
.get_or_create(&vec![(
235+
"farm_id".to_string(),
236+
single_disk_farm_id.to_string(),
237+
)])
238+
.observe(time.as_secs_f64());
239+
}
240+
241+
pub(super) fn observe_sector_writing_time(
242+
&self,
243+
single_disk_farm_id: &SingleDiskFarmId,
244+
time: &Duration,
245+
) {
246+
self.sector_writing_time
247+
.get_or_create(&vec![(
248+
"farm_id".to_string(),
249+
single_disk_farm_id.to_string(),
250+
)])
251+
.observe(time.as_secs_f64());
252+
}
253+
254+
pub(super) fn observe_sector_plotting_time(
255+
&self,
256+
single_disk_farm_id: &SingleDiskFarmId,
257+
time: &Duration,
258+
) {
259+
self.sector_plotting_time
260+
.get_or_create(&vec![(
261+
"farm_id".to_string(),
262+
single_disk_farm_id.to_string(),
263+
)])
264+
.observe(time.as_secs_f64());
265+
}
266+
}

0 commit comments

Comments
 (0)