Skip to content

Commit 6f360de

Browse files
committed
Random access for lance
Signed-off-by: Adam Gutglick <[email protected]>
1 parent b6edd04 commit 6f360de

File tree

12 files changed

+108
-39
lines changed

12 files changed

+108
-39
lines changed

.github/workflows/bench.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ jobs:
8585
env:
8686
RUST_BACKTRACE: full
8787
run: |
88-
target/release_debug/${{ matrix.benchmark.id }} -d gh-json --formats parquet,lance,vortex
88+
target/release_debug/${{ matrix.benchmark.id }} --formats parquet,lance,vortex -o results.json
8989
9090
- name: Setup AWS CLI
9191
uses: aws-actions/configure-aws-credentials@v5
@@ -96,7 +96,7 @@ jobs:
9696
- name: Upload Benchmark Results
9797
shell: bash
9898
run: |
99-
bash scripts/cat-s3.sh vortex-benchmark-results-database data.json.gz target/vortex-bench/${{ matrix.benchmark.id }}/results.json
99+
bash scripts/cat-s3.sh vortex-benchmark-results-database data.json.gz results.json
100100
101101
sql:
102102
uses: ./.github/workflows/sql-benchmarks.yml

.github/workflows/nightly-bench.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ jobs:
3939
"subcommand": "tpch",
4040
"name": "TPC-H on NVME",
4141
"targets": "datafusion:parquet,datafusion:vortex,datafusion:lance,duckdb:parquet,duckdb:vortex,duckdb:duckdb",
42-
"scale_factor": "--scale-factor 10.0",
42+
"scale_factor": "10.0",
4343
"build_args": "--features lance"
4444
},
4545
{
@@ -49,15 +49,15 @@ jobs:
4949
"local_dir": "vortex-bench/data/tpch/10.0",
5050
"remote_storage": "s3://vortex-bench-dev-eu/${{github.ref_name}}/${{github.run_id}}/tpch/10.0/",
5151
"targets": "datafusion:parquet,datafusion:vortex,datafusion:lance,duckdb:parquet,duckdb:vortex",
52-
"scale_factor": "--scale-factor 10.0",
52+
"scale_factor": "10.0",
5353
"build_args": "--features lance"
5454
},
5555
{
5656
"id": "tpch-nvme",
5757
"subcommand": "tpch",
5858
"name": "TPC-H on NVME",
5959
"targets": "datafusion:parquet,duckdb:parquet,duckdb:vortex",
60-
"scale_factor": "--scale-factor 100"
60+
"scale_factor": "100"
6161
},
6262
{
6363
"id": "tpch-s3",
@@ -66,7 +66,7 @@ jobs:
6666
"local_dir": "vortex-bench/data/tpch/100.0",
6767
"remote_storage": "s3://vortex-bench-dev-eu/${{github.ref_name}}/${{github.run_id}}/tpch/100.0/",
6868
"targets": "datafusion:parquet,duckdb:parquet,duckdb:vortex",
69-
"scale_factor": "--scale-factor 100.0"
69+
"scale_factor": "100.0"
7070
},
7171
]
7272
strategy:

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/compress-bench/src/bench.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,6 @@ pub async fn benchmark_decompress(
169169
})
170170
}
171171

172-
// ============================================================================
173-
// Ratio calculations
174-
// ============================================================================
175-
176172
/// Calculate cross-format comparison ratios.
177173
pub fn calculate_ratios(
178174
measurements: &HashMap<(Format, CompressOp), Duration>,

benchmarks/lance-bench/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ lance-encoding = { version = "0.39.0" }
2121
anyhow = { workspace = true }
2222
arrow-cast = { workspace = true }
2323
arrow-schema = { workspace = true }
24+
async-trait = { workspace = true }
2425
clap = { workspace = true, features = ["derive"] }
2526
futures = { workspace = true }
2627
parquet = { workspace = true }

benchmarks/lance-bench/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33

44
pub mod compress;
55
pub mod convert;
6+
pub mod random_access;
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::fs::File;
2+
use std::path::PathBuf;
3+
4+
use anyhow::anyhow;
5+
use async_trait::async_trait;
6+
use lance::Dataset;
7+
use lance::dataset::ProjectionRequest;
8+
use lance::dataset::WriteParams;
9+
use lance_encoding::version::LanceFileVersion;
10+
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11+
use vortex_bench::Format;
12+
use vortex_bench::datasets::taxi_data::taxi_data_parquet;
13+
use vortex_bench::idempotent_async;
14+
use vortex_bench::random_access::RandomAccessor;
15+
16+
pub async fn taxi_data_lance() -> anyhow::Result<PathBuf> {
17+
idempotent_async("taxi/taxi.lance", |output_fname| async move {
18+
let parquet_path = taxi_data_parquet().await?;
19+
20+
let file = File::open(&parquet_path)?;
21+
let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
22+
let reader = builder.build()?;
23+
24+
let write_params = WriteParams::with_storage_version(LanceFileVersion::V2_1);
25+
Dataset::write(
26+
reader,
27+
output_fname
28+
.to_str()
29+
.ok_or_else(|| anyhow!("Invalid output file path"))?,
30+
Some(write_params),
31+
)
32+
.await?;
33+
34+
Ok(output_fname.to_path_buf())
35+
})
36+
.await
37+
}
38+
39+
pub struct LanceRandomAccessor {
40+
path: PathBuf,
41+
}
42+
43+
impl LanceRandomAccessor {
44+
pub fn new(path: PathBuf) -> Self {
45+
Self { path }
46+
}
47+
}
48+
49+
#[async_trait]
50+
impl RandomAccessor for LanceRandomAccessor {
51+
fn format(&self) -> Format {
52+
Format::Lance
53+
}
54+
55+
fn name(&self) -> &str {
56+
"random-access/lance-tokio-local-disk"
57+
}
58+
59+
fn path(&self) -> &PathBuf {
60+
&self.path
61+
}
62+
63+
async fn take(&self, indices: Vec<u64>) -> anyhow::Result<usize> {
64+
let dataset = Dataset::open(
65+
self.path
66+
.to_str()
67+
.ok_or_else(|| anyhow!("Invalid dataset path"))?,
68+
)
69+
.await?;
70+
let projection = ProjectionRequest::from_schema(dataset.schema().clone()); // All columns.
71+
let result = dataset.take(indices.as_slice(), projection).await?;
72+
Ok(result.num_rows())
73+
}
74+
}

benchmarks/random-access-bench/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@ publish = false
1818
anyhow = { workspace = true }
1919
clap = { workspace = true, features = ["derive"] }
2020
indicatif = { workspace = true }
21+
lance-bench = { path = "../lance-bench", optional = true }
2122
tokio = { workspace = true, features = ["full"] }
2223
vortex = { workspace = true }
2324
vortex-bench = { workspace = true }
2425

26+
[features]
27+
lance = ["dep:lance-bench"]
28+
2529
[lints]
2630
workspace = true

benchmarks/random-access-bench/src/main.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ use std::time::Instant;
77

88
use clap::Parser;
99
use indicatif::ProgressBar;
10-
use vortex::buffer::Buffer;
11-
use vortex::buffer::buffer;
1210
use vortex_bench::BenchmarkOutput;
1311
use vortex_bench::Engine;
1412
use vortex_bench::Format;
@@ -54,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
5452
setup_logging_and_tracing(args.verbose, args.tracing)?;
5553

5654
// Row count of the dataset is 3,339,715.
57-
let indices = buffer![10u64, 11, 12, 13, 100_000, 3_000_000];
55+
let indices = vec![10u64, 11, 12, 13, 100_000, 3_000_000];
5856

5957
run_random_access(
6058
args.formats,
@@ -81,6 +79,14 @@ async fn get_accessor(format: Format) -> anyhow::Result<Box<dyn RandomAccessor>>
8179
let path = taxi_data_parquet().await?;
8280
Ok(Box::new(ParquetRandomAccessor::new(path)))
8381
}
82+
#[cfg(feature = "lance")]
83+
Format::Lance => {
84+
use lance_bench::random_access::LanceRandomAccessor;
85+
use lance_bench::random_access::taxi_data_lance;
86+
87+
let path = taxi_data_lance().await?;
88+
Ok(Box::new(LanceRandomAccessor::new(path)))
89+
}
8490
_ => unimplemented!("Random access bench not implemented for {format}"),
8591
}
8692
}
@@ -91,7 +97,7 @@ async fn get_accessor(format: Format) -> anyhow::Result<Box<dyn RandomAccessor>>
9197
/// collecting timing for each run.
9298
async fn benchmark_random_access(
9399
accessor: &dyn RandomAccessor,
94-
indices: &Buffer<u64>,
100+
indices: &[u64],
95101
time_limit_secs: u64,
96102
storage: &str,
97103
) -> anyhow::Result<TimingMeasurement> {
@@ -101,9 +107,9 @@ async fn benchmark_random_access(
101107

102108
// Run at least once, then continue until time limit
103109
loop {
104-
let indices_clone = indices.clone();
110+
let indices = indices.to_vec();
105111
let start = Instant::now();
106-
let _row_count = accessor.take(indices_clone).await?;
112+
let _row_count = accessor.take(indices).await?;
107113
runs.push(start.elapsed());
108114

109115
if overall_start.elapsed() >= time_limit {
@@ -124,6 +130,8 @@ fn format_to_engine(format: Format) -> Engine {
124130
match format {
125131
Format::OnDiskVortex | Format::VortexCompact => Engine::Vortex,
126132
Format::Parquet => Engine::Arrow,
133+
#[cfg(feature = "lance")]
134+
Format::Lance => Engine::Arrow, // Is this right here?
127135
_ => Engine::default(),
128136
}
129137
}
@@ -135,7 +143,7 @@ async fn run_random_access(
135143
formats: Vec<Format>,
136144
time_limit: u64,
137145
display_format: DisplayFormat,
138-
indices: Buffer<u64>,
146+
indices: Vec<u64>,
139147
output_path: Option<PathBuf>,
140148
) -> anyhow::Result<()> {
141149
let progress = ProgressBar::new(formats.len() as u64);

vortex-bench/src/lib.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,6 @@ impl CompactionStrategy {
220220
}
221221
}
222222

223-
// ============================================================================
224-
// Benchmark CLI types
225-
// ============================================================================
226-
227223
/// CLI argument for selecting which benchmark to run.
228224
#[derive(clap::ValueEnum, Clone, Copy)]
229225
pub enum BenchmarkArg {

0 commit comments

Comments
 (0)