Skip to content

Commit 10a069b

Browse files
Create bufferedValueReader and streamingValueReader
1 parent b21c124 commit 10a069b

File tree

4 files changed

+377
-143
lines changed

4 files changed

+377
-143
lines changed

bson/buffered_value_reader.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (C) MongoDB, Inc. 2025-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package bson
8+
9+
import (
10+
"bytes"
11+
"io"
12+
)
13+
14+
// bufferedValueReader implements the low-level byteSrc interface by reading
15+
// directly from an in-memory byte slice. It provides efficient, zero-copy
16+
// access for parsing BSON when the entire document is buffered in memory.
17+
type bufferedValueReader struct {
18+
buf []byte // entire BSON document
19+
offset int64 // Current read index into buf
20+
}
21+
22+
var _ valueReaderByteSrc = (*bufferedValueReader)(nil)
23+
24+
// Read reads up to len(p) bytes from buf[offset:] into p, advancing offset.
25+
//
26+
// TODO: Depending on streaming requirements, we might be able to remove this.
27+
func (b *bufferedValueReader) Read([]byte) (n int, err error) {
28+
panic("Read not implemented for bufferedValueReader")
29+
}
30+
31+
// ReadByte returns the single byte at buf[offset] and advances offset by 1.
32+
func (b *bufferedValueReader) ReadByte() (byte, error) {
33+
if b.offset >= int64(len(b.buf)) {
34+
return 0, io.EOF
35+
}
36+
b.offset++
37+
return b.buf[b.offset-1], nil
38+
}
39+
40+
// peek returns buf[offset:offset+n] without advancing offset.
41+
func (b *bufferedValueReader) peek(n int) ([]byte, error) {
42+
// Ensure we don't read past the end of the buffer.
43+
if int64(n)+b.offset > int64(len(b.buf)) {
44+
return nil, io.EOF
45+
}
46+
47+
// Return the next n bytes without advancing the offset
48+
return b.buf[b.offset : b.offset+int64(n)], nil
49+
}
50+
51+
// discard advances offset by n bytes, returning the number of bytes discarded.
52+
func (b *bufferedValueReader) discard(n int) (int, error) {
53+
// Ensure we don't read past the end of the buffer.
54+
if int64(n)+b.offset > int64(len(b.buf)) {
55+
return 0, io.EOF
56+
}
57+
58+
// Advance the read position
59+
b.offset += int64(n)
60+
return n, nil
61+
}
62+
63+
// readSlice scans buf[offset:] for the first occurrence of delim, returns
64+
// buf[offset:idx+1], and advances offset past it; errors if delim not found.
65+
func (b *bufferedValueReader) readSlice(delim byte) ([]byte, error) {
66+
// Ensure we don't read past the end of the buffer.
67+
if b.offset >= int64(len(b.buf)) {
68+
return nil, io.EOF
69+
}
70+
71+
// Look for the delimiter in the remaining bytes
72+
rem := b.buf[b.offset:]
73+
idx := bytes.IndexByte(rem, delim)
74+
if idx < 0 {
75+
return nil, io.EOF
76+
}
77+
78+
// Build the result slice up through the delimiter.
79+
result := rem[:idx+1]
80+
81+
// Advance the offset past the delimiter.
82+
b.offset += int64(idx + 1)
83+
84+
return result, nil
85+
}
86+
87+
// pos returns the current read position in the buffer.
88+
func (b *bufferedValueReader) pos() int64 {
89+
return b.offset
90+
}
91+
92+
// regexLength will return the total byte length of a BSON regex value.
93+
func (b *bufferedValueReader) regexLength() (int32, error) {
94+
rem := b.buf[b.offset:]
95+
96+
// Find end of the first C-string (pattern).
97+
i := bytes.IndexByte(rem, 0x00)
98+
if i < 0 {
99+
return 0, io.EOF
100+
}
101+
102+
// Find end of second C-string (options).
103+
j := bytes.IndexByte(rem[i+1:], 0x00)
104+
if j < 0 {
105+
return 0, io.EOF
106+
}
107+
108+
// Total length = first C-string length (pattern) + second C-string length
109+
// (options) + 2 null terminators
110+
return int32(i + j + 2), nil
111+
}

bson/streaming_value_reader.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// Copyright (C) MongoDB, Inc. 2025-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package bson
8+
9+
import "bufio"
10+
11+
// streamingValueReader reads from an ioReader wrapped in a bufio.Reader. It
12+
// first reads the BSON length header, then ensures it only ever reads exactly
13+
// that many bytes.
14+
//
15+
// Note: this approach trades memory usage for extra buffering and reader calls,
16+
// so it is less performanted than the in-memory bufferedValueReader.
17+
type streamingValueReader struct {
18+
br *bufio.Reader
19+
}
20+
21+
var _ valueReaderByteSrc = (*streamingValueReader)(nil)
22+
23+
// Read reads up to len(p) bytes from buf[offset:] into p, advancing offset.
24+
func (b *streamingValueReader) Read([]byte) (n int, err error) {
25+
panic("Read not implemented for streamingValueReader")
26+
}
27+
28+
// ReadByte returns the single byte at buf[offset] and advances offset by 1.
29+
func (b *streamingValueReader) ReadByte() (byte, error) {
30+
panic("ReadByte not implemented for streamingValueReader")
31+
}
32+
33+
// peek returns buf[offset:offset+n] without advancing offset.
34+
func (b *streamingValueReader) peek(int) ([]byte, error) {
35+
panic("peek not implemented for streamingValueReader")
36+
}
37+
38+
// discard advances offset by n bytes, returning the number of bytes discarded.
39+
func (b *streamingValueReader) discard(int) (int, error) {
40+
panic("discard not implemented for streamingValueReader")
41+
}
42+
43+
// readSlice scans buf[offset:] for the first occurrence of delim, returns
44+
// buf[offset:idx+1], and advances offset past it; errors if delim not found.
45+
func (b *streamingValueReader) readSlice(byte) ([]byte, error) {
46+
panic("readSlice not implemented for streamingValueReader")
47+
}
48+
49+
// pos returns the current read position in the buffer.
50+
func (b *streamingValueReader) pos() int64 {
51+
panic("pos not implemented for streamingValueReader")
52+
}
53+
54+
// regexLength will return the total byte length of a BSON regex value.
55+
func (b *streamingValueReader) regexLength() (int32, error) {
56+
panic("regexLength not implemented for streamingValueReader")
57+
}

0 commit comments

Comments
 (0)