Skip to content

Commit d4432c0

Browse files
authored
Add ipc reader benchmark (#7091)
1 parent 6c1d8c3 commit d4432c0

File tree

2 files changed

+236
-0
lines changed

2 files changed

+236
-0
lines changed

arrow-ipc/Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,14 @@ lz4 = ["lz4_flex"]
5050
criterion = "0.5.1"
5151
tempfile = "3.3"
5252
tokio = "1.43.0"
53+
# used in benches
54+
memmap2 = "0.9.3"
55+
bytes = "1.9"
5356

5457
[[bench]]
5558
name = "ipc_writer"
5659
harness = false
60+
61+
[[bench]]
62+
name = "ipc_reader"
63+
harness = false

arrow-ipc/benches/ipc_reader.rs

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
19+
use arrow_array::{builder::StringBuilder, RecordBatch};
20+
use arrow_buffer::Buffer;
21+
use arrow_ipc::convert::fb_to_schema;
22+
use arrow_ipc::reader::{read_footer_length, FileDecoder, FileReader, StreamReader};
23+
use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
24+
use arrow_ipc::{root_as_footer, Block, CompressionType};
25+
use arrow_schema::{DataType, Field, Schema};
26+
use criterion::{criterion_group, criterion_main, Criterion};
27+
use std::io::Cursor;
28+
use std::sync::Arc;
29+
use tempfile::tempdir;
30+
31+
fn criterion_benchmark(c: &mut Criterion) {
32+
let mut group = c.benchmark_group("arrow_ipc_reader");
33+
34+
group.bench_function("StreamReader/read_10", |b| {
35+
let batch = create_batch(8192, true);
36+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
37+
let mut writer = StreamWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
38+
for _ in 0..10 {
39+
writer.write(&batch).unwrap();
40+
}
41+
writer.finish().unwrap();
42+
43+
b.iter(move || {
44+
let projection = None;
45+
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
46+
for _ in 0..10 {
47+
reader.next().unwrap().unwrap();
48+
}
49+
assert!(reader.next().is_none());
50+
})
51+
});
52+
53+
group.bench_function("StreamReader/read_10/zstd", |b| {
54+
let batch = create_batch(8192, true);
55+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
56+
let options = IpcWriteOptions::default()
57+
.try_with_compression(Some(CompressionType::ZSTD))
58+
.unwrap();
59+
let mut writer =
60+
StreamWriter::try_new_with_options(&mut buffer, batch.schema().as_ref(), options)
61+
.unwrap();
62+
for _ in 0..10 {
63+
writer.write(&batch).unwrap();
64+
}
65+
writer.finish().unwrap();
66+
67+
b.iter(move || {
68+
let projection = None;
69+
let mut reader = StreamReader::try_new(buffer.as_slice(), projection).unwrap();
70+
for _ in 0..10 {
71+
reader.next().unwrap().unwrap();
72+
}
73+
assert!(reader.next().is_none());
74+
})
75+
});
76+
77+
group.bench_function("FileReader/read_10", |b| {
78+
let batch = create_batch(8192, true);
79+
let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
80+
let mut writer = FileWriter::try_new(&mut buffer, batch.schema().as_ref()).unwrap();
81+
for _ in 0..10 {
82+
writer.write(&batch).unwrap();
83+
}
84+
writer.finish().unwrap();
85+
86+
b.iter(move || {
87+
let projection = None;
88+
let cursor = Cursor::new(buffer.as_slice());
89+
let mut reader = FileReader::try_new(cursor, projection).unwrap();
90+
for _ in 0..10 {
91+
reader.next().unwrap().unwrap();
92+
}
93+
assert!(reader.next().is_none());
94+
})
95+
});
96+
97+
group.bench_function("FileReader/read_10/mmap", |b| {
98+
let batch = create_batch(8192, true);
99+
// write to an actual file
100+
let dir = tempdir().unwrap();
101+
let path = dir.path().join("test.arrow");
102+
let file = std::fs::File::create(&path).unwrap();
103+
let mut writer = FileWriter::try_new(file, batch.schema().as_ref()).unwrap();
104+
for _ in 0..10 {
105+
writer.write(&batch).unwrap();
106+
}
107+
writer.finish().unwrap();
108+
109+
b.iter(move || {
110+
let ipc_file = std::fs::File::open(&path).expect("failed to open file");
111+
let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap file") };
112+
113+
// Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
114+
let bytes = bytes::Bytes::from_owner(mmap);
115+
let buffer = Buffer::from(bytes);
116+
let decoder = IPCBufferDecoder::new(buffer);
117+
assert_eq!(decoder.num_batches(), 10);
118+
119+
for i in 0..decoder.num_batches() {
120+
decoder.get_batch(i);
121+
}
122+
})
123+
});
124+
}
125+
126+
// copied from the zero_copy_ipc example.
127+
// should we move this to an actual API?
128+
/// Wrapper around the example in the `FileDecoder` which handles the
129+
/// low level interaction with the Arrow IPC format.
130+
struct IPCBufferDecoder {
131+
/// Memory (or memory mapped) Buffer with the data
132+
buffer: Buffer,
133+
/// Decoder that reads Arrays that refers to the underlying buffers
134+
decoder: FileDecoder,
135+
/// Location of the batches within the buffer
136+
batches: Vec<Block>,
137+
}
138+
139+
impl IPCBufferDecoder {
140+
fn new(buffer: Buffer) -> Self {
141+
let trailer_start = buffer.len() - 10;
142+
let footer_len = read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
143+
let footer = root_as_footer(&buffer[trailer_start - footer_len..trailer_start]).unwrap();
144+
145+
let schema = fb_to_schema(footer.schema().unwrap());
146+
147+
let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
148+
149+
// Read dictionaries
150+
for block in footer.dictionaries().iter().flatten() {
151+
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
152+
let data = buffer.slice_with_length(block.offset() as _, block_len);
153+
decoder.read_dictionary(block, &data).unwrap();
154+
}
155+
156+
// convert to Vec from the flatbuffers Vector to avoid having a direct dependency on flatbuffers
157+
let batches = footer
158+
.recordBatches()
159+
.map(|b| b.iter().copied().collect())
160+
.unwrap_or_default();
161+
162+
Self {
163+
buffer,
164+
decoder,
165+
batches,
166+
}
167+
}
168+
169+
fn num_batches(&self) -> usize {
170+
self.batches.len()
171+
}
172+
173+
fn get_batch(&self, i: usize) -> RecordBatch {
174+
let block = &self.batches[i];
175+
let block_len = block.bodyLength() as usize + block.metaDataLength() as usize;
176+
let data = self
177+
.buffer
178+
.slice_with_length(block.offset() as _, block_len);
179+
self.decoder
180+
.read_record_batch(block, &data)
181+
.unwrap()
182+
.unwrap()
183+
}
184+
}
185+
186+
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {
187+
let schema = Arc::new(Schema::new(vec![
188+
Field::new("c0", DataType::Int32, true),
189+
Field::new("c1", DataType::Utf8, true),
190+
Field::new("c2", DataType::Date32, true),
191+
Field::new("c3", DataType::Decimal128(11, 2), true),
192+
]));
193+
let mut a = Int32Builder::new();
194+
let mut b = StringBuilder::new();
195+
let mut c = Date32Builder::new();
196+
let mut d = Decimal128Builder::new()
197+
.with_precision_and_scale(11, 2)
198+
.unwrap();
199+
for i in 0..num_rows {
200+
a.append_value(i as i32);
201+
c.append_value(i as i32);
202+
d.append_value((i * 1000000) as i128);
203+
if allow_nulls && i % 10 == 0 {
204+
b.append_null();
205+
} else {
206+
b.append_value(format!("this is string number {i}"));
207+
}
208+
}
209+
let a = a.finish();
210+
let b = b.finish();
211+
let c = c.finish();
212+
let d = d.finish();
213+
RecordBatch::try_new(
214+
schema.clone(),
215+
vec![Arc::new(a), Arc::new(b), Arc::new(c), Arc::new(d)],
216+
)
217+
.unwrap()
218+
}
219+
220+
fn config() -> Criterion {
221+
Criterion::default()
222+
}
223+
224+
criterion_group! {
225+
name = benches;
226+
config = config();
227+
targets = criterion_benchmark
228+
}
229+
criterion_main!(benches);

0 commit comments

Comments
 (0)