Skip to content

Commit 72d29b3

Browse files
committed
Merge remote-tracking branch 'origin/develop' into ji/min-max-rt
# Conflicts: # vortex-array/src/arrays/decimal/compute/min_max.rs
2 parents aa45cfc + 14841c0 commit 72d29b3

File tree

93 files changed

+1273
-873
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+1273
-873
lines changed

benchmarks-website/code.js

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -679,38 +679,73 @@ window.initAndRender = (function () {
679679
const initializer = {
680680
async loadData() {
681681
const [dataResponse, commitsResponse] = await Promise.all([
682-
this.fetchGzippedData(
682+
this.fetchAndParseGzippedJsonl(
683683
"https://vortex-benchmark-results-database.s3.amazonaws.com/data.json.gz"
684684
),
685685
fetch(
686686
"https://vortex-benchmark-results-database.s3.amazonaws.com/commits.json"
687687
).then((r) => r.text()),
688688
]);
689689

690-
// Return raw text data for worker processing
690+
// Return parsed data for worker processing
691+
// dataResponse is now an array of parsed objects instead of a string
691692
return {
692693
benchmarkData: dataResponse,
693694
commitsData: commitsResponse
694695
};
695696
},
696697

697-
async fetchGzippedData(url) {
698+
async fetchAndParseGzippedJsonl(url) {
698699
const response = await fetch(url);
699700
const decompressedStream = response.body.pipeThrough(
700701
new DecompressionStream("gzip")
701702
);
702703
const reader = decompressedStream.getReader();
703704
const decoder = new TextDecoder();
704-
let result = "";
705+
706+
let buffer = '';
707+
const lines = [];
705708

706709
while (true) {
707710
const { done, value } = await reader.read();
708-
if (done) break;
709-
result += decoder.decode(value, { stream: true });
711+
712+
if (value) {
713+
buffer += decoder.decode(value, { stream: true });
714+
715+
// Process complete lines
716+
const parts = buffer.split('\n');
717+
// Keep the last part (incomplete line) in the buffer
718+
buffer = parts.pop();
719+
720+
// Parse and collect complete lines
721+
for (const line of parts) {
722+
const trimmed = line.trim();
723+
if (trimmed.length > 0) {
724+
try {
725+
lines.push(JSON.parse(trimmed));
726+
} catch (e) {
727+
console.warn('Failed to parse JSONL line:', trimmed, e);
728+
}
729+
}
730+
}
731+
}
732+
733+
if (done) {
734+
// Process any remaining buffer
735+
buffer += decoder.decode();
736+
const trimmed = buffer.trim();
737+
if (trimmed.length > 0) {
738+
try {
739+
lines.push(JSON.parse(trimmed));
740+
} catch (e) {
741+
console.warn('Failed to parse final JSONL line:', trimmed, e);
742+
}
743+
}
744+
break;
745+
}
710746
}
711747

712-
result += decoder.decode();
713-
return result;
748+
return lines;
714749
},
715750

716751
parseJsonl(jsonl) {
@@ -754,7 +789,7 @@ window.initAndRender = (function () {
754789
];
755790

756791
elementIds.forEach((id) => {
757-
const camelCaseId = id.replace(/-(.)/g, (match, char) =>
792+
const camelCaseId = id.replace(/-(.)/g, (_match, char) =>
758793
char.toUpperCase()
759794
);
760795
domElements[camelCaseId] = document.getElementById(id);

benchmarks-website/data-worker.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,10 @@ self.addEventListener('message', async function(e) {
323323
message: `${getRandomMessage(PARSING_MESSAGES)} Benchmark data...`
324324
});
325325

326-
const benchmarkData = parseJsonl(data.benchmarkData);
326+
// benchmarkData might already be parsed as an array
327+
const benchmarkData = Array.isArray(data.benchmarkData)
328+
? data.benchmarkData
329+
: parseJsonl(data.benchmarkData);
327330

328331
self.postMessage({
329332
type: 'progress',

benchmarks-website/worker-manager.js

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,16 +89,18 @@ export const workerManager = {
8989
// Fallback data processing on main thread
9090
async processDataFallback(benchmarkData, commitsData, keptGroups, onProgress) {
9191
if (onProgress) onProgress(10, 'Parsing benchmark data...');
92-
93-
// Parse JSONL data
94-
const parsedBenchmarkData = this.parseJsonl(benchmarkData);
95-
92+
93+
// benchmarkData is now already parsed as an array
94+
const parsedBenchmarkData = Array.isArray(benchmarkData)
95+
? benchmarkData
96+
: this.parseJsonl(benchmarkData);
97+
9698
if (onProgress) onProgress(30, 'Parsing commit data...');
97-
99+
98100
const parsedCommitsData = this.parseJsonl(commitsData);
99-
101+
100102
if (onProgress) onProgress(50, 'Processing and grouping data...');
101-
103+
102104
// Convert commits array to object
103105
const commits = {};
104106
parsedCommitsData.forEach((commit) => {
@@ -113,7 +115,7 @@ export const workerManager = {
113115
);
114116

115117
if (onProgress) onProgress(100, 'Data processing complete!');
116-
118+
117119
return result;
118120
},
119121

encodings/alp/benches/alp_compress.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
use divan::Bencher;
77
use rand::rngs::StdRng;
88
use rand::{Rng, SeedableRng as _};
9-
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode};
9+
use vortex_alp::{ALPFloat, ALPRDFloat, RDEncoder, alp_encode, decompress};
1010
use vortex_array::arrays::PrimitiveArray;
1111
use vortex_array::compute::warm_up_vtables;
1212
use vortex_array::validity::Validity;
13-
use vortex_buffer::buffer;
13+
use vortex_buffer::{Buffer, buffer};
1414
use vortex_dtype::NativePType;
1515

1616
fn main() {
@@ -84,10 +84,15 @@ fn decompress_alp<T: ALPFloat + NativePType>(bencher: Bencher, args: (usize, f64
8484
Validity::NonNullable
8585
};
8686
let values = values.freeze();
87-
let array = alp_encode(&PrimitiveArray::new(values, validity), None).unwrap();
8887
bencher
89-
.with_inputs(|| array.clone())
90-
.bench_values(|array| array.to_canonical());
88+
.with_inputs(|| {
89+
alp_encode(
90+
&PrimitiveArray::new(Buffer::copy_from(&values), validity.clone()),
91+
None,
92+
)
93+
.unwrap()
94+
})
95+
.bench_values(decompress);
9196
}
9297

9398
#[divan::bench(types = [f32, f64], args = [10_000, 100_000])]

encodings/alp/src/alp/array.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,6 @@ impl ArrayVTable<ALPVTable> for ALPVTable {
282282

283283
impl CanonicalVTable<ALPVTable> for ALPVTable {
284284
fn canonicalize(array: &ALPArray) -> Canonical {
285-
Canonical::Primitive(decompress(array))
285+
Canonical::Primitive(decompress(array.clone()))
286286
}
287287
}

0 commit comments

Comments
 (0)