Skip to content

Commit 446a377

Browse files
committed
wip
Signed-off-by: Joe Isaacs <[email protected]>
1 parent 367a53e commit 446a377

File tree

12 files changed

+226
-28
lines changed

12 files changed

+226
-28
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ vortex-array = { path = "../vortex-array", features = ["test-harness"] }
6565
vortex-btrblocks = { path = "../vortex-btrblocks" }
6666
vortex-io = { path = "../vortex-io", features = ["tokio"] }
6767
vortex-scan = { path = "../vortex-scan", features = ["tokio"] }
68+
criterion = { version = "0.7.0", features = ["html_reports", "async", "async_tokio"] }
69+
vortex-gpu = { workspace = true, features = ["cuda"] }
6870

6971
[lints]
7072
workspace = true
@@ -86,3 +88,8 @@ gpu = [
8688
"vortex-layout/gpu",
8789
"vortex-scan/gpu",
8890
]
91+
92+
[[bench]]
93+
name = "bench_read"
94+
harness = false
95+
test = false

vortex-file/benches/bench_read.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
#![allow(clippy::unwrap_used)]
5+
6+
use std::sync::Arc;
7+
8+
use criterion::{Criterion, Throughput, criterion_group, criterion_main};
9+
use cudarc::driver::CudaContext;
10+
use futures::TryStreamExt;
11+
use tokio::runtime::Runtime;
12+
use vortex_array::arrays::{ChunkedArray, StructArray};
13+
use vortex_array::{ArrayRef, IntoArray};
14+
use vortex_buffer::Buffer;
15+
use vortex_error::VortexUnwrap;
16+
use vortex_file::{FileGpuSegmentSource, VortexOpenOptions, VortexWriteOptions};
17+
18+
// Data sizes: 1GB, 2.5GB, 5GB, 10GB
19+
// These are approximate sizes in bytes, accounting for bit-packing compression
20+
const DATA_SIZES: &[(usize, &str)] = &[
21+
(268_435_456, "1GB"), // ~1GB when unpacked (268M * 4 bytes)
22+
];
23+
24+
#[allow(clippy::cast_possible_truncation)]
25+
fn make_test_array(len: usize) -> ArrayRef {
26+
let numbers = ChunkedArray::from_iter([
27+
(0..len / 2)
28+
.map(|i| (i as u32) % 64)
29+
.collect::<Buffer<u32>>()
30+
.into_array(),
31+
(0..len / 2)
32+
.map(|i| (i as u32) % 64)
33+
.collect::<Buffer<u32>>()
34+
.into_array(),
35+
])
36+
.into_array();
37+
let floats = ChunkedArray::from_iter([
38+
(0..len / 2)
39+
.map(|i| (i % 2) as f32 + 0.1)
40+
.collect::<Buffer<f32>>()
41+
.into_array(),
42+
(0..len / 2)
43+
.map(|i| (i % 2) as f32 + 4.1)
44+
.collect::<Buffer<f32>>()
45+
.into_array(),
46+
])
47+
.into_array();
48+
49+
StructArray::from_fields(&[("numbers", numbers), ("floats", floats)])
50+
.vortex_unwrap()
51+
.into_array()
52+
}
53+
54+
fn benchmark_gpu_scan(c: &mut Criterion) {
55+
let runtime = Runtime::new().unwrap();
56+
let mut group = c.benchmark_group("gpu_scan");
57+
58+
group.sample_size(10);
59+
let bench_file_name = "/tmp/test-vx/bench_out.vortex";
60+
61+
for (len, label) in DATA_SIZES {
62+
let len = len.next_multiple_of(1024);
63+
let array = make_test_array(len);
64+
65+
runtime.block_on(async {
66+
VortexWriteOptions::default()
67+
.write(
68+
tokio::fs::File::create(bench_file_name).await.unwrap(),
69+
array.to_array_stream(),
70+
)
71+
.await
72+
.unwrap();
73+
});
74+
75+
let cuda_ctx = CudaContext::new(0).unwrap();
76+
cuda_ctx.set_blocking_synchronize().unwrap();
77+
group.throughput(Throughput::Bytes((len * size_of::<u32>() * 2) as u64));
78+
group.bench_function(*label, |b| {
79+
b.to_async(&runtime).iter_with_large_drop(async || {
80+
let file = std::fs::File::open(bench_file_name).unwrap();
81+
let vx_file = VortexOpenOptions::new()
82+
.open(bench_file_name)
83+
.await
84+
.vortex_unwrap();
85+
let stream = vx_file
86+
.gpu_scan(
87+
cuda_ctx.clone(),
88+
Arc::new(FileGpuSegmentSource::new(
89+
vx_file.footer.segment_map().clone(),
90+
cuda_ctx.default_stream(),
91+
file,
92+
)),
93+
)
94+
.vortex_unwrap()
95+
.into_array_stream()
96+
.vortex_unwrap()
97+
.try_collect::<Vec<_>>()
98+
.await
99+
.vortex_unwrap();
100+
stream
101+
102+
// VortexOpenOptions::new()
103+
// .open(bench_file_name)
104+
// .await
105+
// .vortex_unwrap()
106+
// .gpu_scan(ctx.clone())
107+
// .vortex_unwrap()
108+
// .into_array_stream()
109+
// .vortex_unwrap()
110+
// .try_collect::<Vec<_>>()
111+
// .await
112+
// .vortex_unwrap()
113+
});
114+
});
115+
}
116+
117+
group.finish();
118+
}
119+
120+
criterion_group!(benches, benchmark_gpu_scan);
121+
122+
criterion_main!(benches);

vortex-file/src/file.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::pruning::extract_relevant_file_stats_as_struct_row;
3333
#[derive(Clone)]
3434
pub struct VortexFile {
3535
/// The footer of the Vortex file, containing metadata and layout information.
36-
pub(crate) footer: Footer,
36+
pub footer: Footer,
3737
/// The segment source used to read segments from this file.
3838
pub(crate) segment_source: Arc<dyn SegmentSource>,
3939
/// Metrics tied to the file.

vortex-file/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ pub use file::*;
107107
pub use footer::*;
108108
pub use forever_constant::*;
109109
pub use open::*;
110+
#[cfg(feature = "gpu")]
111+
pub use segments::FileGpuSegmentSource;
110112
pub use strategy::*;
111113
use vortex_alp::{ALPEncoding, ALPRDEncoding};
112114
use vortex_array::{ArrayRegistry, EncodingRef};

vortex-file/src/segments/gpu_source.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,12 @@ impl GpuSegmentSource for FileGpuSegmentSource {
4848
.vortex_expect("missing segment id")
4949
.clone();
5050

51-
let mut cu_slice = self
52-
.stream
53-
.alloc_zeros::<u8>(spec.length as usize)
51+
let mut cu_slice = unsafe { self.stream.alloc::<u8>(spec.length as usize) }
5452
.map_err(|e| vortex_err!("cu slice {e}"))
5553
.vortex_expect("Failed to allocate cu slice");
5654

5755
// this is optional? and has strange perf characteristics.
58-
// cu_file
56+
// self.cu_file
5957
// .buf_register(&cu_slice)
6058
// .map_err(|e| vortex_err!("cu file {e}"))
6159
// .vortex_unwrap();
@@ -64,14 +62,18 @@ impl GpuSegmentSource for FileGpuSegmentSource {
6462
let file_handle = self.file_handle.clone();
6563
let stream = self.stream.clone();
6664
async move {
65+
// println!("try read");
66+
file_handle.sync_read(offset, &mut cu_slice);
6767
let read = stream
6868
.memcpy_ftod(&file_handle, offset, &mut cu_slice)
6969
.ok()
7070
.vortex_expect("memcpy_ftod");
71+
// println!("did read");
7172

72-
read.synchronize()
73-
.map_err(|e| vortex_err!("sync write {e}"))
74-
.vortex_unwrap();
73+
// read.synchronize()
74+
// .map_err(|e| vortex_err!("sync write {e}"))
75+
// .vortex_unwrap();
76+
// println!("did sync");
7577
Ok(cu_slice)
7678
}
7779
.boxed()

vortex-file/src/tests.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,7 @@ async fn test_writer_with_statistics() -> VortexResult<()> {
14751475
#[cfg_attr(miri, ignore)]
14761476
#[tokio::test]
14771477
async fn test_gpu_read_simple() -> VortexResult<()> {
1478+
use vortex_array::compute::take;
14781479
use vortex_btrblocks::BtrBlocksCompressor;
14791480

14801481
use crate::segments::FileGpuSegmentSource;
@@ -1550,7 +1551,59 @@ async fn test_gpu_read_simple() -> VortexResult<()> {
15501551
gpu_chunks.push(array.into_array());
15511552
}
15521553

1553-
assert_arrays_eq!(ChunkedArray::from_iter(gpu_chunks), cpu_read);
1554+
// assert_arrays_eq!(
1555+
// ChunkedArray::from_iter(gpu_chunks).into_array().as_ref(),
1556+
// cpu_read
1557+
// );
1558+
1559+
let left = ChunkedArray::from_iter(gpu_chunks).into_array();
1560+
let right = cpu_read;
1561+
if left.dtype() != right.dtype() {
1562+
panic!(
1563+
"assertion left == right failed: arrays differ in type: {} != {}.\n left: {}\n right: {}",
1564+
left.dtype(),
1565+
right.dtype(),
1566+
"x",
1567+
"x" // left.display_values(),
1568+
// right.display_values()
1569+
)
1570+
}
1571+
1572+
if left.len() != right.len() {
1573+
panic!(
1574+
"assertion left == right failed: arrays differ in length: {} != {}.\n left: {}\n right: {}",
1575+
left.len(),
1576+
right.len(),
1577+
"x",
1578+
"x" // left.display_values(),
1579+
// right.display_values()
1580+
)
1581+
}
1582+
let n = left.len();
1583+
let mismatched_indices = (0..n)
1584+
.filter(|i| left.scalar_at(*i) != right.scalar_at(*i))
1585+
.collect::<Vec<_>>();
1586+
if mismatched_indices.len() != 0 {
1587+
let idx = PrimitiveArray::from_iter(
1588+
mismatched_indices
1589+
.clone()
1590+
.into_iter()
1591+
.map(|x| x as u64)
1592+
.take(20),
1593+
);
1594+
panic!(
1595+
"assertion left == right failed: arrays do not match at indices: {}.\n left: {}\n right: {}",
1596+
Itertools::format(mismatched_indices.into_iter(), ", "),
1597+
take(&left, idx.as_ref())
1598+
.unwrap()
1599+
.slice(0..20)
1600+
.display_values(),
1601+
take(&right, idx.as_ref())
1602+
.unwrap()
1603+
.slice(0..20)
1604+
.display_values(),
1605+
)
1606+
}
15541607

15551608
assert_eq!(row_count, 32768);
15561609
Ok(())

vortex-gpu/src/array_parts.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ use vortex_flatbuffers::array::Array;
1616
use vortex_scalar::Scalar;
1717

1818
use crate::CudaByteBuffer;
19-
use crate::jit::{
20-
AlpEncodingTree, BitPackedEncodingTree, EncodingTree, EncodingTreeRef, FoREncodingTree,
21-
};
19+
use crate::jit::{AlpEncodingTree, BitPackedEncodingTree, EncodingTreeRef, FoREncodingTree};
2220

2321
pub struct GpuArrayParts<'a> {
2422
buffers: Vec<Option<CudaByteBuffer>>,
@@ -100,7 +98,7 @@ impl<'a> GpuArrayParts<'a> {
10098
len,
10199
);
102100
let reference = Scalar::new(dtype.clone(), deser);
103-
return Arc::new(FoREncodingTree { reference, child }) as Arc<dyn EncodingTree>;
101+
return Arc::new(FoREncodingTree { reference, child }) as EncodingTreeRef;
104102
} else if enc.id() == BitPackedEncoding.id() {
105103
assert!(array_node.children().unwrap_or_default().is_empty());
106104
let deser =

vortex-gpu/src/jit/encoding_tree.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use std::any::Any;
55
use std::sync::Arc;
66

7-
pub type EncodingTreeRef = Arc<dyn EncodingTree + 'static>;
7+
pub type EncodingTreeRef = Arc<dyn EncodingTree + Send + Sync + 'static>;
88

99
pub trait EncodingTree {
1010
fn as_any(&self) -> &dyn Any;

vortex-gpu/src/jit/kernel_fmt.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

44
use std::fmt::Write;
5-
use std::sync::Arc;
5+
use std::hash::BuildHasher;
6+
use std::sync::{Arc, LazyLock};
67

78
use cudarc::driver::{CudaContext, CudaFunction};
89
use vortex_error::{VortexExpect, VortexResult, vortex_err};
10+
use vortex_utils::aliases::dash_map::{DashMap, Entry};
911

1012
use crate::indent::{IndentedWrite, IndentedWriter};
1113
use crate::jit::{GPUKernelParameter, GPUPipelineJIT, GPUVisitor};
1214

15+
static JIT_CACHE: LazyLock<DashMap<u64, CudaFunction>> = LazyLock::new(DashMap::default);
16+
1317
struct DeclPrinter<'a, 'b: 'a> {
1418
w: &'a mut IndentedWrite<'b>,
1519
}
@@ -115,15 +119,22 @@ pub fn create_kernel(
115119
create_kernel_str(w, array, kernel_out_array)
116120
.map_err(|e| vortex_err!("jit str cannot fail {e}"))?;
117121

118-
let module =
119-
cudarc::nvrtc::compile_ptx(s.clone()).map_err(|e| vortex_err!("compile ptx {e}"))?;
120-
121-
// Dynamically load it into the device
122-
let module = ctx
123-
.load_module(module)
124-
.map_err(|e| vortex_err!("load module {e}"))?;
125-
126-
module
127-
.load_function("kernel")
128-
.map_err(|e| vortex_err!("load_function {e}"))
122+
match JIT_CACHE.entry(JIT_CACHE.hasher().hash_one(&s)) {
123+
Entry::Occupied(oc) => Ok(oc.get().clone()),
124+
Entry::Vacant(vac) => {
125+
let module =
126+
cudarc::nvrtc::compile_ptx(s).map_err(|e| vortex_err!("compile ptx {e}"))?;
127+
128+
// Dynamically load it into the device
129+
let module = ctx
130+
.load_module(module)
131+
.map_err(|e| vortex_err!("load module {e}"))?;
132+
133+
let cuf = module
134+
.load_function("kernel")
135+
.map_err(|e| vortex_err!("load_function {e}"))?;
136+
vac.insert(cuf.clone());
137+
Ok(cuf)
138+
}
139+
}
129140
}

0 commit comments

Comments
 (0)