Skip to content

Commit a0da90b

Browse files
authored
segment metrics (#2713)
1 parent 4dea140 commit a0da90b

File tree

6 files changed

+103
-20
lines changed

6 files changed

+103
-20
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/src/file.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,6 @@ impl<F: FileType> VortexFile<F> {
4949
self.footer.layout().clone(),
5050
self.footer().ctx().clone(),
5151
)
52+
.with_metrics(self.metrics.clone())
5253
}
5354
}

vortex-file/src/generic.rs

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,18 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
231231
prefetch: prefetch_stream,
232232
requests_ready_chunks: 1024,
233233
prefetch_ready_chunks: self.options.io_concurrency,
234+
requested_segments: self.metrics.requested_segments.clone(),
235+
prefetched_segments: self.metrics.prefetched_segments.clone(),
234236
}
235-
.map(move |r| coalesce(r, perf_hint.coalescing_window(), perf_hint.max_read()))
236-
.flat_map(stream::iter)
237-
.inspect(move |(coalesced, _)| self.metrics.record(coalesced));
237+
.map(move |r| {
238+
coalesce(
239+
r,
240+
perf_hint.coalescing_window(),
241+
perf_hint.max_read(),
242+
self.metrics.clone(),
243+
)
244+
})
245+
.flat_map(stream::iter);
238246

239247
// Submit the coalesced requests to the I/O.
240248
let read = self.read.clone();
@@ -276,6 +284,8 @@ pin_project! {
276284
pub prefetch: Prefetch,
277285
requests_ready_chunks: usize,
278286
prefetch_ready_chunks: usize,
287+
requested_segments: Arc<Counter>,
288+
prefetched_segments: Arc<Counter>,
279289
}
280290
}
281291

@@ -293,6 +303,7 @@ where
293303
match this.requests.as_mut().poll_next(cx) {
294304
Poll::Ready(Some(item)) => {
295305
items.push(item);
306+
this.requested_segments.inc();
296307
if items.len() >= *this.requests_ready_chunks {
297308
return Poll::Ready(Some(items));
298309
}
@@ -309,6 +320,7 @@ where
309320
match this.prefetch.as_mut().poll_next(cx) {
310321
Poll::Ready(Some(item)) => {
311322
items.push(item);
323+
this.prefetched_segments.inc();
312324
if items.len() >= prefetch_limit {
313325
return Poll::Ready(Some(items));
314326
}
@@ -349,17 +361,6 @@ impl SegmentRequest {
349361
}
350362
}
351363

352-
impl From<(SegmentId, Segment, oneshot::Receiver<()>)> for SegmentRequest {
353-
fn from(value: (SegmentId, Segment, oneshot::Receiver<()>)) -> Self {
354-
let (id, location, cancel_handle) = value;
355-
SegmentRequest {
356-
id,
357-
location,
358-
cancel_handle,
359-
}
360-
}
361-
}
362-
363364
async fn filter_with_cache(
364365
request: SegmentRequest,
365366
cache: Arc<dyn SegmentCache>,
@@ -392,19 +393,37 @@ struct CoalescedSegmentRequest {
392393
}
393394

394395
#[derive(Default)]
395-
struct CoalescedCancellationHandle(Vec<oneshot::Receiver<()>>);
396+
struct CoalescedCancellationHandle {
397+
handles: Vec<oneshot::Receiver<()>>,
398+
cancel_received: Arc<Counter>,
399+
cancelled: Arc<Counter>,
400+
}
396401

397402
impl CoalescedCancellationHandle {
403+
fn new(
404+
handles: Vec<oneshot::Receiver<()>>,
405+
cancel_received: Arc<Counter>,
406+
cancelled: Arc<Counter>,
407+
) -> Self {
408+
Self {
409+
handles,
410+
cancel_received,
411+
cancelled,
412+
}
413+
}
414+
398415
fn push(&mut self, handle: oneshot::Receiver<()>) {
399-
self.0.push(handle);
416+
self.handles.push(handle);
400417
}
401418

402419
async fn cancelled(self) {
403-
for rx in self.0 {
420+
for rx in self.handles {
404421
// if this segment is completed before cancellation,
405422
// tx of this will be dropped, so ignore errors.
406423
let _ = rx.await;
424+
self.cancel_received.inc();
407425
}
426+
self.cancelled.inc();
408427
}
409428
}
410429

@@ -480,6 +499,7 @@ fn coalesce(
480499
requests: Vec<SegmentRequest>,
481500
coalescing_window: u64,
482501
max_read: Option<u64>,
502+
metrics: CoalescingMetrics,
483503
) -> Vec<(CoalescedSegmentRequest, CoalescedCancellationHandle)> {
484504
let fetch_ranges = merge_ranges(
485505
requests.iter().map(|r| r.range()),
@@ -501,7 +521,11 @@ fn coalesce(
501521
byte_range: range.clone(),
502522
requests: vec![],
503523
},
504-
CoalescedCancellationHandle::default(),
524+
CoalescedCancellationHandle::new(
525+
Vec::new(),
526+
metrics.cancel_received.clone(),
527+
metrics.cancelled.clone(),
528+
),
505529
)
506530
})
507531
.collect::<Vec<_>>();
@@ -516,6 +540,7 @@ fn coalesce(
516540
// Ensure we sort the requests by segment ID within the coalesced request.
517541
for (req, _) in coalesced.iter_mut() {
518542
req.requests.sort_unstable_by_key(|(id, _)| *id);
543+
metrics.record(req);
519544
}
520545
coalesced
521546
}
@@ -563,11 +588,16 @@ where
563588
ret
564589
}
565590

591+
#[derive(Clone)]
566592
struct CoalescingMetrics {
567593
bytes_uncoalesced: Arc<Counter>,
568594
bytes_coalesced: Arc<Counter>,
569595
request_count_uncoalesced: Arc<Counter>,
570596
request_count_coalesced: Arc<Counter>,
597+
prefetched_segments: Arc<Counter>,
598+
requested_segments: Arc<Counter>,
599+
cancel_received: Arc<Counter>,
600+
cancelled: Arc<Counter>,
571601
}
572602

573603
impl From<VortexMetrics> for CoalescingMetrics {
@@ -579,6 +609,10 @@ impl From<VortexMetrics> for CoalescingMetrics {
579609
bytes_coalesced: metrics.counter(format!("{BYTES}.coalesced")),
580610
request_count_uncoalesced: metrics.counter(format!("{COUNT}.uncoalesced")),
581611
request_count_coalesced: metrics.counter(format!("{COUNT}.coalesced")),
612+
prefetched_segments: metrics.counter("vortex.scan.segments.prefetch_count"),
613+
requested_segments: metrics.counter("vortex.scan.segments.request_count"),
614+
cancel_received: metrics.counter("vortex.scan.segments.cancel_received"),
615+
cancelled: metrics.counter("vortex.scan.segments.cancelled"),
582616
}
583617
}
584618
}

vortex-layout/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ vortex-error = { workspace = true }
4040
vortex-expr = { workspace = true }
4141
vortex-flatbuffers = { workspace = true, features = ["layout"] }
4242
vortex-mask = { workspace = true }
43+
vortex-metrics = { workspace = true }
4344

4445
[dev-dependencies]
4546
futures = { workspace = true, features = ["executor"] }

vortex-layout/src/scan/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use vortex_expr::transform::immediate_access::immediate_scope_access;
1414
use vortex_expr::transform::simplify_typed::simplify_typed;
1515
use vortex_expr::{ExprRef, Identity};
1616
use vortex_mask::Mask;
17+
use vortex_metrics::VortexMetrics;
1718

1819
use crate::scan::filter::FilterExpr;
1920
use crate::scan::unified::UnifiedDriverStream;
@@ -56,6 +57,7 @@ pub struct ScanBuilder<D: ScanDriver> {
5657
// The number of splits to make progress on concurrently.
5758
concurrency: usize,
5859
prefetch_conjuncts: bool,
60+
metrics: VortexMetrics,
5961
}
6062

6163
impl<D: ScanDriver> ScanBuilder<D> {
@@ -72,6 +74,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
7274
canonicalize: false,
7375
prefetch_conjuncts: false,
7476
concurrency: 1024,
77+
metrics: Default::default(),
7578
}
7679
}
7780

@@ -129,6 +132,11 @@ impl<D: ScanDriver> ScanBuilder<D> {
129132
self
130133
}
131134

135+
pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
136+
self.metrics = metrics;
137+
self
138+
}
139+
132140
pub fn build(self) -> VortexResult<Scan<D>> {
133141
let projection = simplify_typed(self.projection.clone(), self.layout.dtype())?;
134142
let filter = self
@@ -146,7 +154,7 @@ impl<D: ScanDriver> ScanBuilder<D> {
146154
.collect();
147155

148156
let splits = self.split_by.splits(&self.layout, &field_mask)?;
149-
let mut collector = SegmentCollector::default();
157+
let mut collector = SegmentCollector::new(self.metrics.clone());
150158
self.layout
151159
.required_segments(0, &filter_mask, &projection_mask, &mut collector)?;
152160
let (mut row_range_pruner, segments) = collector.finish();

vortex-layout/src/segments.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use futures::{SinkExt, Stream, StreamExt};
1010
use range_union_find::RangeUnionFind;
1111
use vortex_buffer::{Buffer, ByteBuffer};
1212
use vortex_error::{VortexExpect, VortexResult, vortex_err};
13+
use vortex_metrics::VortexMetrics;
1314

1415
use crate::range_intersection;
1516

@@ -93,14 +94,23 @@ type SegmentStore = BTreeMap<SegmentPriority, Vec<SegmentId>>;
9394
pub struct SegmentCollector {
9495
store: Arc<RwLock<SegmentStore>>,
9596
pub kind: RequiredSegmentKind,
97+
metrics: VortexMetrics,
9698
}
9799

98100
impl SegmentCollector {
101+
pub fn new(metrics: VortexMetrics) -> Self {
102+
Self {
103+
metrics,
104+
..Default::default()
105+
}
106+
}
107+
99108
pub fn with_priority_hint(&self, kind: RequiredSegmentKind) -> Self {
100109
SegmentCollector {
101110
store: self.store.clone(),
102111
// highest priority wins
103112
kind: kind.min(self.kind),
113+
metrics: self.metrics.clone(),
104114
}
105115
}
106116

@@ -110,6 +120,7 @@ impl SegmentCollector {
110120
RequiredSegmentKind::PRUNING => (0, 0),
111121
_ => (row_start, row_end),
112122
};
123+
self.increment_metrics();
113124
let priority = SegmentPriority::new(start, end, self.kind);
114125
self.store
115126
.write()
@@ -126,6 +137,7 @@ impl SegmentCollector {
126137
store: self.store.clone(),
127138
cancellations_tx,
128139
excluded_ranges: Default::default(),
140+
metrics: self.metrics.clone(),
129141
},
130142
SegmentStream {
131143
store: self.store,
@@ -135,13 +147,23 @@ impl SegmentCollector {
135147
},
136148
)
137149
}
150+
151+
fn increment_metrics(&self) {
152+
self.metrics
153+
.counter("vortex.scan.segments.count.total")
154+
.inc();
155+
self.metrics
156+
.counter(format!("vortex.scan.segments.count.{:?}", self.kind))
157+
.inc();
158+
}
138159
}
139160

140161
#[derive(Debug, Clone)]
141162
pub struct RowRangePruner {
142163
store: Arc<RwLock<SegmentStore>>,
143164
cancellations_tx: mpsc::UnboundedSender<SegmentId>,
144165
excluded_ranges: Arc<RwLock<RangeUnionFind<u64>>>,
166+
metrics: VortexMetrics,
145167
}
146168

147169
impl RowRangePruner {
@@ -184,6 +206,9 @@ impl RowRangePruner {
184206
.flat_map(|key| store.remove(key).unwrap_or_default())
185207
.collect()
186208
};
209+
self.metrics
210+
.counter("vortex.scan.segments.cancel_sent")
211+
.add(cancelled_segments.len() as i64);
187212
for id in cancelled_segments {
188213
self.cancellations_tx
189214
.send(id)
@@ -207,7 +232,14 @@ impl RowRangePruner {
207232
if key.kind == RequiredSegmentKind::PRUNING {
208233
return true; // keep segments required for pruning
209234
}
210-
range_intersection(&(key.row_start..key.row_end), &row_indices).is_some()
235+
let keep =
236+
range_intersection(&(key.row_start..key.row_end), &row_indices).is_some();
237+
if !keep {
238+
self.metrics
239+
.counter("vortex.scan.segment.pruned_by_row_indices")
240+
.inc();
241+
}
242+
keep
211243
});
212244
}
213245
}
@@ -347,6 +379,7 @@ pub mod test {
347379
store: store.clone(),
348380
cancellations_tx: tx,
349381
excluded_ranges: Default::default(),
382+
metrics: Default::default(),
350383
};
351384

352385
// Test removing segments in range 0..200
@@ -405,6 +438,7 @@ pub mod test {
405438
store: store.clone(),
406439
cancellations_tx: tx,
407440
excluded_ranges: Default::default(),
441+
metrics: Default::default(),
408442
};
409443

410444
// First removal (0..100)
@@ -448,6 +482,7 @@ pub mod test {
448482
store: store.clone(),
449483
cancellations_tx: tx,
450484
excluded_ranges: Default::default(),
485+
metrics: Default::default(),
451486
};
452487

453488
// First removal (0..75)
@@ -492,6 +527,7 @@ pub mod test {
492527
store: store.clone(),
493528
cancellations_tx: tx,
494529
excluded_ranges: Default::default(),
530+
metrics: Default::default(),
495531
};
496532

497533
// Create a buffer with specific row indices
@@ -543,6 +579,7 @@ pub mod test {
543579
store: store.clone(),
544580
cancellations_tx: tx,
545581
excluded_ranges: Default::default(),
582+
metrics: Default::default(),
546583
};
547584

548585
// Drop the receiver to close the channel
@@ -569,6 +606,7 @@ pub mod test {
569606
store: store.clone(),
570607
cancellations_tx: tx,
571608
excluded_ranges: Default::default(),
609+
metrics: Default::default(),
572610
};
573611

574612
// Test removing segments that cover the entire range

0 commit comments

Comments
 (0)