Skip to content

Commit f2efc7d

Browse files
authored
read and coalescing metrics (#2413)
This PR introduces VortexMetrics, a thin wrapper on top of witchcraft-metrics that supports default tags. I have added metrics for `VortexReadAt`, and coalescing metrics to the scan driver. Next step would be to tie these with the datafusion metrics on `VortexExec::metrics`
1 parent d6a1ba6 commit f2efc7d

File tree

13 files changed

+338
-17
lines changed

13 files changed

+338
-17
lines changed

Cargo.lock

Lines changed: 42 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ members = [
1818
"vortex-ipc",
1919
"vortex-layout",
2020
"vortex-mask",
21+
"vortex-metrics",
2122
"vortex-proto",
2223
"vortex-sampling-compressor",
2324
"vortex-scalar",
@@ -158,6 +159,7 @@ tracing-subscriber = "0.3.19"
158159
url = "2.5.4"
159160
uuid = "1.8.0"
160161
wasm-bindgen-futures = "0.4.39"
162+
witchcraft-metrics = "1.0.1"
161163

162164
# BEGIN crates published by this project
163165
vortex = { version = "0.24.0", path = "./vortex" }
@@ -180,6 +182,7 @@ vortex-io = { version = "0.24.0", path = "./vortex-io" }
180182
vortex-ipc = { version = "0.24.0", path = "./vortex-ipc" }
181183
vortex-layout = { version = "0.24.0", path = "./vortex-layout" }
182184
vortex-mask = { version = "0.24.0", path = "./vortex-mask" }
185+
vortex-metrics = { version = "0.24.0", path = "./vortex-metrics" }
183186
vortex-proto = { version = "0.24.0", path = "./vortex-proto" }
184187
vortex-runend = { version = "0.24.0", path = "./encodings/runend" }
185188
vortex-sampling-compressor = { version = "0.24.0", path = "./vortex-sampling-compressor" }

vortex-datafusion/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ vortex-expr = { workspace = true, features = ["datafusion"] }
4848
vortex-file = { workspace = true, features = ["object_store", "tokio"] }
4949
vortex-io = { workspace = true, features = ["object_store", "tokio"] }
5050
vortex-layout = { workspace = true, features = ["tokio"] }
51+
vortex-metrics = { workspace = true }
52+
5153
[features]
5254
tracing = ["dep:tracing", "dep:tracing-futures", "vortex-io/tracing"]
5355

vortex-datafusion/src/persistent/opener.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ use vortex_error::VortexResult;
1111
use vortex_expr::{ExprRef, VortexExpr};
1212
use vortex_file::executor::{TaskExecutor, TokioExecutor};
1313
use vortex_file::{SplitBy, VortexOpenOptions};
14-
use vortex_io::ObjectStoreReadAt;
14+
use vortex_io::{InstrumentedReadAt, ObjectStoreReadAt};
15+
use vortex_metrics::VortexMetrics;
1516

1617
use super::cache::FileLayoutCache;
1718

@@ -54,10 +55,16 @@ impl VortexFileOpener {
5455

5556
impl FileOpener for VortexFileOpener {
5657
fn open(&self, file_meta: FileMeta) -> DFResult<FileOpenFuture> {
57-
let read_at = ObjectStoreReadAt::new(
58-
self.object_store.clone(),
59-
file_meta.location().clone(),
60-
Some(self.scheme.clone()),
58+
let metrics = VortexMetrics::default_with_tags(
59+
[("filename", file_meta.location().to_string())].as_slice(),
60+
);
61+
let read_at = InstrumentedReadAt::new(
62+
ObjectStoreReadAt::new(
63+
self.object_store.clone(),
64+
file_meta.location().clone(),
65+
Some(self.scheme.clone()),
66+
),
67+
&metrics,
6168
);
6269

6370
let filter = self.filter.clone();

vortex-file/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ vortex-expr = { workspace = true }
3636
vortex-flatbuffers = { workspace = true, features = ["file"] }
3737
vortex-io = { workspace = true }
3838
vortex-layout = { workspace = true }
39+
vortex-metrics = { workspace = true }
3940
vortex-sampling-compressor = { workspace = true }
4041

4142
[dev-dependencies]

vortex-file/src/file.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use vortex_array::stats::StatsSet;
55
use vortex_array::ContextRef;
66
use vortex_dtype::DType;
77
use vortex_layout::scan::ScanBuilder;
8+
use vortex_metrics::VortexMetrics;
89

910
use crate::footer::FileLayout;
1011
use crate::open::FileType;
@@ -16,6 +17,7 @@ pub struct VortexFile<F: FileType> {
1617
pub(crate) ctx: ContextRef,
1718
pub(crate) file_layout: FileLayout,
1819
pub(crate) segment_cache: Arc<dyn SegmentCache>,
20+
pub(crate) metrics: Arc<VortexMetrics>,
1921
pub(crate) _marker: PhantomData<F>,
2022
}
2123

@@ -42,6 +44,7 @@ impl<F: FileType> VortexFile<F> {
4244
self.options.clone(),
4345
self.file_layout.clone(),
4446
self.segment_cache.clone(),
47+
self.metrics.clone(),
4548
);
4649
ScanBuilder::new(
4750
driver,

vortex-file/src/generic.rs

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use vortex_error::{vortex_err, vortex_panic, VortexExpect, VortexResult};
1010
use vortex_io::VortexReadAt;
1111
use vortex_layout::scan::ScanDriver;
1212
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
13+
use vortex_metrics::{Counter, MetricId, VortexMetrics};
1314

1415
use crate::footer::{FileLayout, Segment};
1516
use crate::segments::channel::SegmentChannel;
@@ -31,13 +32,15 @@ impl<R: VortexReadAt> FileType for GenericVortexFile<R> {
3132
options: Self::Options,
3233
file_layout: FileLayout,
3334
segment_cache: Arc<dyn SegmentCache>,
35+
metrics: Arc<VortexMetrics>,
3436
) -> Self::ScanDriver {
3537
GenericScanDriver {
3638
read,
3739
options,
3840
file_layout,
3941
segment_cache,
4042
segment_channel: SegmentChannel::new(),
43+
metrics: metrics.into(),
4144
}
4245
}
4346
}
@@ -68,6 +71,7 @@ pub struct GenericScanDriver<R> {
6871
file_layout: FileLayout,
6972
segment_cache: Arc<dyn SegmentCache>,
7073
segment_channel: SegmentChannel,
74+
metrics: CoalescingMetrics,
7175
}
7276

7377
impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
@@ -145,7 +149,8 @@ impl<R: VortexReadAt> ScanDriver for GenericScanDriver<R> {
145149
let perf_hint = self.read.performance_hint();
146150
let io_stream = io_stream
147151
.map(move |r| coalesce(r, perf_hint.coalescing_window(), perf_hint.max_read()))
148-
.flat_map(stream::iter);
152+
.flat_map(stream::iter)
153+
.inspect(move |coalesced| self.metrics.record(coalesced));
149154

150155
// Submit the coalesced requests to the I/O.
151156
let read = self.read.clone();
@@ -188,6 +193,10 @@ impl FileSegmentRequest {
188193
.map_err(|_| vortex_err!("send failed"))
189194
.vortex_expect("send failed");
190195
}
196+
197+
fn range(&self) -> Range<u64> {
198+
self.location.offset..self.location.offset + self.location.length as u64
199+
}
191200
}
192201

193202
#[derive(Debug)]
@@ -271,9 +280,7 @@ fn coalesce(
271280
max_read: Option<u64>,
272281
) -> Vec<CoalescedSegmentRequest> {
273282
let fetch_ranges = merge_ranges(
274-
requests
275-
.iter()
276-
.map(|r| r.location.offset..r.location.offset + r.location.length as u64),
283+
requests.iter().map(|r| r.range()),
277284
coalescing_window,
278285
max_read,
279286
);
@@ -348,6 +355,48 @@ where
348355
ret
349356
}
350357

358+
struct CoalescingMetrics {
359+
bytes_uncoalesced: Arc<Counter>,
360+
bytes_coalesced: Arc<Counter>,
361+
request_count_uncoalesced: Arc<Counter>,
362+
request_count_coalesced: Arc<Counter>,
363+
}
364+
365+
impl From<Arc<VortexMetrics>> for CoalescingMetrics {
366+
fn from(metrics: Arc<VortexMetrics>) -> Self {
367+
let byte_ranges = MetricId::new("vortex.scan.requests.bytes");
368+
let requests = MetricId::new("vortex.scan.requests.count");
369+
Self {
370+
bytes_uncoalesced: metrics.counter(byte_ranges.clone().with_tag("kind", "uncoalesced")),
371+
bytes_coalesced: metrics.counter(byte_ranges.with_tag("kind", "coalesced")),
372+
request_count_uncoalesced: metrics
373+
.counter(requests.clone().with_tag("kind", "uncoalesced")),
374+
request_count_coalesced: metrics.counter(requests.with_tag("kind", "coalesced")),
375+
}
376+
}
377+
}
378+
379+
impl CoalescingMetrics {
380+
fn record(&self, req: &CoalescedSegmentRequest) {
381+
// record request counts
382+
self.request_count_coalesced.inc();
383+
if let Ok(len) = req.requests.len().try_into() {
384+
self.request_count_uncoalesced.add(len);
385+
}
386+
387+
// record uncoalesced total byte requests vs coalesced
388+
if let Ok(bytes) = (req.byte_range.end - req.byte_range.start).try_into() {
389+
self.bytes_coalesced.add(bytes);
390+
}
391+
self.bytes_uncoalesced.add(
392+
req.requests
393+
.iter()
394+
.map(|req| req.location.length as i64)
395+
.sum(),
396+
);
397+
}
398+
}
399+
351400
#[cfg(test)]
352401
mod test {
353402
use super::*;

vortex-file/src/memory.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use vortex_buffer::ByteBuffer;
66
use vortex_error::{vortex_err, VortexResult};
77
use vortex_layout::scan::ScanDriver;
88
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};
9+
use vortex_metrics::VortexMetrics;
910

1011
use crate::segments::SegmentCache;
1112
use crate::{FileLayout, FileType, Segment};
@@ -30,6 +31,7 @@ impl FileType for InMemoryVortexFile {
3031
_options: Self::Options,
3132
file_layout: FileLayout,
3233
_segment_cache: Arc<dyn SegmentCache>,
34+
_metrics: Arc<VortexMetrics>,
3335
) -> Self::ScanDriver {
3436
Self {
3537
buffer: read,

0 commit comments

Comments
 (0)