Skip to content

Commit 3f6db02

Browse files
committed
Optimize benchmark CSV reading with memory mapping
Replace buffered file reading in benchmark CSV source with a memory-mapped implementation. This approach iterates over newline-delimited integers from the mapped slice, trims trailing carriage returns, and ensures immediate completion when the file is empty.
1 parent 9af8b11 commit 3f6db02

File tree

1 file changed

+34
-14
lines changed

1 file changed

+34
-14
lines changed

examples/billion-rows-benchmark/benchmark_test.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package benchmark
22

33
import (
4-
"bufio"
4+
"bytes"
55
"context"
66
"os"
77
"path/filepath"
88
"strconv"
99
"testing"
1010

1111
"github.com/samber/ro"
12+
"golang.org/x/exp/mmap"
1213
)
1314

1415
// csvSource creates an Observable that reads int64 values (one per line)
@@ -17,33 +18,52 @@ import (
1718
// on subscribe and emits values to the destination observer.
1819
func csvSource(path string) ro.Observable[int64] {
1920
return ro.NewObservableWithContext(func(ctx context.Context, dest ro.Observer[int64]) ro.Teardown {
20-
f, err := os.Open(path)
21+
reader, err := mmap.Open(path)
2122
if err != nil {
2223
dest.Error(err)
2324
return nil
2425
}
26+
defer func() { _ = reader.Close() }()
2527

26-
scanner := bufio.NewScanner(f)
27-
for scanner.Scan() {
28-
line := scanner.Text()
29-
v, err := strconv.ParseInt(line, 10, 64)
28+
size := reader.Len()
29+
if size == 0 {
30+
dest.CompleteWithContext(ctx)
31+
return nil
32+
}
33+
34+
data := make([]byte, size)
35+
if _, err := reader.ReadAt(data, 0); err != nil {
36+
dest.Error(err)
37+
return nil
38+
}
39+
40+
offset := 0
41+
for offset < len(data) {
42+
next := bytes.IndexByte(data[offset:], '\n')
43+
var line []byte
44+
if next == -1 {
45+
line = data[offset:]
46+
offset = len(data)
47+
} else {
48+
line = data[offset : offset+next]
49+
offset += next + 1
50+
}
51+
52+
if len(line) > 0 && line[len(line)-1] == '\r' {
53+
line = line[:len(line)-1]
54+
}
55+
56+
v, err := strconv.ParseInt(string(line), 10, 64)
3057
if err != nil {
3158
dest.Error(err)
32-
_ = f.Close()
3359
return nil
3460
}
3561

3662
// propagate context-aware notifications
3763
dest.NextWithContext(ctx, v)
3864
}
3965

40-
if err := scanner.Err(); err != nil {
41-
dest.Error(err)
42-
} else {
43-
dest.CompleteWithContext(ctx)
44-
}
45-
46-
_ = f.Close()
66+
dest.CompleteWithContext(ctx)
4767
return nil
4868
})
4969
}

0 commit comments

Comments
 (0)