Skip to content

Commit 105dd1d

Browse files
authored
refactor: improve memory allocation and parsing efficiency in CSV processing (#64)
1 parent a0aa03c commit 105dd1d

File tree

4 files changed

+234
-34
lines changed

4 files changed

+234
-34
lines changed

field_parser.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,23 @@ func estimateCounts(bufLen int, sr *scanResult) (estimatedFields, estimatedRows
203203
}
204204

205205
// ensureResultCapacity ensures result slices have sufficient capacity.
206+
// Uses scan counts for accurate pre-allocation when available.
206207
func ensureResultCapacity(result *parseResult, bufLen int, sr *scanResult) {
208+
// Use exact counts from scan when available (most accurate)
209+
if sr != nil && sr.separatorCount > 0 {
210+
estimatedFields := sr.separatorCount + sr.newlineCount + 1
211+
estimatedRows := sr.newlineCount + 1
212+
213+
if cap(result.fields) < estimatedFields {
214+
result.fields = make([]fieldInfo, 0, estimatedFields)
215+
}
216+
if cap(result.rows) < estimatedRows {
217+
result.rows = make([]rowInfo, 0, estimatedRows)
218+
}
219+
return
220+
}
221+
222+
// Fallback: conservative estimate from buffer size
207223
estimatedFields, estimatedRows := estimateCounts(bufLen, sr)
208224

209225
if cap(result.fields) < estimatedFields {

parse.go

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,36 @@ func ParseBytesStreaming(data []byte, comma rune, callback func([]string) error)
5858
// ============================================================================
5959

6060
// buildRecords converts a parseResult to [][]string.
61-
// Optimizes memory by accumulating fields into a single buffer per record,
62-
// then using zero-copy slicing after a single string conversion.
61+
// Fast path: zero-copy when no transformation needed.
62+
// Slow path: accumulate into buffer when unescape/CRLF handling required.
6363
func buildRecords(buf []byte, pr *parseResult, hasCR bool) [][]string {
6464
if pr == nil || len(pr.rows) == 0 {
6565
return nil
6666
}
6767

6868
records := make([][]string, len(pr.rows))
6969

70-
// fieldEnds can be reused, but recordBuf must be unique per record for unsafe.String
71-
var fieldEnds []int
70+
// Check if any field needs transformation
71+
needsTransform := hasCR
72+
if !needsTransform {
73+
for _, f := range pr.fields {
74+
if f.needsUnescape() {
75+
needsTransform = true
76+
break
77+
}
78+
}
79+
}
80+
81+
// Fast path: zero-copy direct from buffer when no transformation needed
82+
if !needsTransform {
83+
for i, row := range pr.rows {
84+
records[i] = buildRecordZeroCopy(buf, pr, row)
85+
}
86+
return records
87+
}
7288

89+
// Slow path: accumulate with transformation
90+
var fieldEnds []int
7391
for i, row := range pr.rows {
7492
var recordBuf []byte
7593
recordBuf, fieldEnds = accumulateFields(buf, pr, row, hasCR, recordBuf, fieldEnds[:0])
@@ -78,6 +96,36 @@ func buildRecords(buf []byte, pr *parseResult, hasCR bool) [][]string {
7896
return records
7997
}
8098

99+
// buildRecordZeroCopy creates a record with zero-copy strings from buf.
100+
// Only safe when no transformation (unescape/CRLF) is needed.
101+
func buildRecordZeroCopy(buf []byte, pr *parseResult, row rowInfo) []string {
102+
if row.fieldCount == 0 {
103+
return nil
104+
}
105+
record := make([]string, row.fieldCount)
106+
bufLen := uint32(len(buf))
107+
for i := 0; i < row.fieldCount; i++ {
108+
fieldIdx := row.firstField + i
109+
if fieldIdx >= len(pr.fields) {
110+
break
111+
}
112+
field := pr.fields[fieldIdx]
113+
if field.length == 0 {
114+
continue
115+
}
116+
start := field.start
117+
end := start + field.length
118+
if start >= bufLen {
119+
continue
120+
}
121+
if end > bufLen {
122+
end = bufLen
123+
}
124+
record[i] = unsafe.String(&buf[start], int(end-start))
125+
}
126+
return record
127+
}
128+
81129
// buildRecord builds a single record from a rowInfo (for streaming API).
82130
func buildRecord(buf []byte, pr *parseResult, row rowInfo, hasCR bool) []string {
83131
recordBuf, fieldEnds := accumulateFields(buf, pr, row, hasCR, nil, nil)
@@ -101,15 +149,20 @@ func accumulateFields(buf []byte, pr *parseResult, row rowInfo, hasCR bool, reco
101149
// sliceFieldsFromBuffer converts the accumulated buffer to individual field strings.
102150
// Uses unsafe.String for zero-copy conversion. Caller must ensure recordBuf is not reused.
103151
func sliceFieldsFromBuffer(recordBuf []byte, fieldEnds []int) []string {
152+
fieldCount := len(fieldEnds)
153+
if fieldCount == 0 {
154+
return nil
155+
}
156+
record := make([]string, fieldCount)
104157
if len(recordBuf) == 0 {
105-
return make([]string, len(fieldEnds))
158+
return record
106159
}
107160
// Zero-copy string conversion - safe because recordBuf is unique per record
108-
str := unsafe.String(unsafe.SliceData(recordBuf), len(recordBuf))
109-
record := make([]string, len(fieldEnds))
110161
prevEnd := 0
111162
for i, end := range fieldEnds {
112-
record[i] = str[prevEnd:end]
163+
if prevEnd < end && prevEnd < len(recordBuf) {
164+
record[i] = unsafe.String(&recordBuf[prevEnd], end-prevEnd)
165+
}
113166
prevEnd = end
114167
}
115168
return record

record_builder.go

Lines changed: 116 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
//nolint:gosec // G115: Integer conversions are safe - buffer size bounded by DefaultMaxInputSize (2GB)
44
package simdcsv
55

6-
import "bytes"
6+
import (
7+
"bytes"
8+
"unsafe"
9+
)
710

811
// Buffer allocation constants for reducing reallocations in hot path.
912
const (
@@ -30,10 +33,27 @@ const (
3033
// This matches encoding/csv behavior and allows callers to recover partial data.
3134
func (r *Reader) buildRecordWithValidation(row rowInfo, rowIdx int) ([]string, error) {
3235
fieldCount := row.fieldCount
33-
r.prepareBuffers(row, fieldCount)
34-
3536
fields := r.getFieldsForRow(row, fieldCount)
3637

38+
// Fast path: check if any field needs transformation
39+
needsTransform := r.state.hasCR
40+
if !needsTransform {
41+
for _, field := range fields {
42+
if field.needsUnescape() {
43+
needsTransform = true
44+
break
45+
}
46+
}
47+
}
48+
49+
// Fast path: zero-copy when no transformation needed (but still validate)
50+
if !needsTransform && !r.TrimLeadingSpace {
51+
return r.buildRecordWithValidationZeroCopy(row, fields)
52+
}
53+
54+
// Standard path with transformation
55+
r.prepareBuffers(row, fieldCount)
56+
3757
for i, field := range fields {
3858
if err := r.validateFieldIfNeeded(field, row.lineNum); err != nil {
3959
return r.buildPartialRecord(i), err
@@ -45,36 +65,116 @@ func (r *Reader) buildRecordWithValidation(row rowInfo, rowIdx int) ([]string, e
4565
return r.buildFinalRecord(fieldCount), nil
4666
}
4767

48-
// buildRecordNoQuotes builds a record when the input contains no quotes.
49-
// It avoids the recordBuffer copy path and mirrors appendSimpleContent behavior.
50-
func (r *Reader) buildRecordNoQuotes(row rowInfo) []string {
68+
// buildRecordWithValidationZeroCopy builds a record with zero-copy strings while still validating.
69+
func (r *Reader) buildRecordWithValidationZeroCopy(row rowInfo, fields []fieldInfo) ([]string, error) {
5170
fieldCount := row.fieldCount
5271
record := r.allocateRecord(fieldCount)
5372
r.state.fieldPositions = r.ensureFieldPositionsCapacity(fieldCount)
5473

55-
fields := r.getFieldsForRow(row, fieldCount)
5674
buf := r.state.rawBuffer
5775
bufLen := uint32(len(buf))
5876

5977
for i, field := range fields {
78+
// Validate even in zero-copy path
79+
if err := r.validateFieldIfNeeded(field, row.lineNum); err != nil {
80+
return record[:i], err
81+
}
82+
6083
start := field.start
6184
end := start + field.length
6285
if start >= bufLen {
6386
record[i] = ""
64-
r.state.fieldPositions[i] = position{line: row.lineNum, column: int(start) + 1}
65-
continue
87+
} else {
88+
if end > bufLen {
89+
end = bufLen
90+
}
91+
// Zero-copy string from rawBuffer
92+
record[i] = unsafe.String(&buf[start], int(end-start))
6693
}
67-
if end > bufLen {
68-
end = bufLen
94+
r.state.fieldPositions[i] = position{line: row.lineNum, column: int(field.rawStart()) + 1}
95+
}
96+
return record, nil
97+
}
98+
99+
// buildRecordNoQuotes builds a record when the input contains no quotes.
100+
// Uses a single row string to avoid per-field allocations.
101+
func (r *Reader) buildRecordNoQuotes(row rowInfo) []string {
102+
fieldCount := row.fieldCount
103+
record := r.allocateRecord(fieldCount)
104+
r.state.fieldPositions = r.ensureFieldPositionsCapacity(fieldCount)
105+
106+
fields := r.getFieldsForRow(row, fieldCount)
107+
buf := r.state.rawBuffer
108+
bufLen := uint32(len(buf))
109+
110+
if len(fields) == 0 {
111+
return record
112+
}
113+
114+
rowStart := fields[0].rawStart()
115+
rowEnd := fields[len(fields)-1].rawEnd()
116+
if rowStart >= bufLen {
117+
for i, field := range fields {
118+
record[i] = ""
119+
r.state.fieldPositions[i] = position{line: row.lineNum, column: int(field.rawStart()) + 1}
120+
}
121+
return record
122+
}
123+
if rowEnd > bufLen {
124+
rowEnd = bufLen
125+
}
126+
if rowEnd < rowStart {
127+
rowEnd = rowStart
128+
}
129+
130+
var rowStr string
131+
if r.TrimLeadingSpace {
132+
rowStr = string(buf[rowStart:rowEnd])
133+
} else {
134+
// Zero-copy string from rawBuffer - safe because rawBuffer outlives record.
135+
rowStr = unsafe.String(&buf[rowStart], int(rowEnd-rowStart))
136+
}
137+
rowStrLen := len(rowStr)
138+
139+
for i, field := range fields {
140+
start := field.start
141+
end := start + field.length
142+
rawStart := field.rawStart()
143+
144+
if start < bufLen {
145+
if end > bufLen {
146+
end = bufLen
147+
}
148+
if r.TrimLeadingSpace && start < end {
149+
for start < end && (buf[start] == ' ' || buf[start] == '\t') {
150+
start++
151+
}
152+
}
69153
}
70154

71-
content := buf[start:end]
72-
if r.TrimLeadingSpace {
73-
content = trimLeftBytes(content)
155+
if start < rowStart {
156+
start = rowStart
157+
}
158+
if end < start {
159+
end = start
160+
}
161+
relStart := int(start - rowStart)
162+
relEnd := int(end - rowStart)
163+
if relStart < 0 {
164+
relStart = 0
165+
}
166+
if relStart > rowStrLen {
167+
relStart = rowStrLen
168+
}
169+
if relEnd < relStart {
170+
relEnd = relStart
171+
}
172+
if relEnd > rowStrLen {
173+
relEnd = rowStrLen
74174
}
75175

76-
record[i] = string(content)
77-
r.state.fieldPositions[i] = position{line: row.lineNum, column: int(start) + 1}
176+
record[i] = rowStr[relStart:relEnd]
177+
r.state.fieldPositions[i] = position{line: row.lineNum, column: int(rawStart) + 1}
78178
}
79179
return record
80180
}

simd_scanner.go

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,12 @@ type chunkMasks struct {
8080
// =============================================================================
8181

8282
// scanResultPoolCapacity is the pre-allocated capacity for pooled scanResult slices.
83-
// 512 chunks = ~32KB input, balancing small and large file performance.
84-
const scanResultPoolCapacity = 512
83+
// 4096 chunks = ~256KB input, reducing allocations for typical CSV sizes.
84+
const scanResultPoolCapacity = 4096
85+
86+
// scanResultLargeThreshold retains large scanResults to avoid repeated allocations across GCs.
87+
// 16384 chunks = ~1MB input.
88+
const scanResultLargeThreshold = 16384
8589

8690
// scanResultPool provides reusable scanResult objects to reduce allocations.
8791
var scanResultPool = sync.Pool{
@@ -96,6 +100,12 @@ var scanResultPool = sync.Pool{
96100
},
97101
}
98102

103+
// scanResultLargeCache retains a single large scanResult across GC cycles.
104+
var scanResultLargeCache struct {
105+
mu sync.Mutex
106+
sr *scanResult
107+
}
108+
99109
// reset clears the scanResult for reuse while preserving slice capacity.
100110
func (sr *scanResult) reset() {
101111
sr.quoteMasks = sr.quoteMasks[:0]
@@ -116,6 +126,15 @@ func (sr *scanResult) reset() {
116126
func releaseScanResult(sr *scanResult) {
117127
if sr != nil {
118128
sr.reset()
129+
if cap(sr.quoteMasks) >= scanResultLargeThreshold {
130+
scanResultLargeCache.mu.Lock()
131+
if scanResultLargeCache.sr == nil || cap(scanResultLargeCache.sr.quoteMasks) < cap(sr.quoteMasks) {
132+
scanResultLargeCache.sr = sr
133+
scanResultLargeCache.mu.Unlock()
134+
return
135+
}
136+
scanResultLargeCache.mu.Unlock()
137+
}
119138
scanResultPool.Put(sr)
120139
}
121140
}
@@ -125,27 +144,25 @@ func releaseScanResult(sr *scanResult) {
125144
// =============================================================================
126145

127146
// ensureUint64SliceCap ensures slice has at least required length.
128-
// Uses 2x growth with 25% headroom when reallocation is needed.
147+
// Reuses existing capacity when possible.
129148
func ensureUint64SliceCap(s []uint64, required int) []uint64 {
130149
if cap(s) >= required {
131150
return s[:required]
132151
}
133-
newCap := max(cap(s)*2, required)
134-
newCap += newCap / 4
135-
return make([]uint64, required, newCap)
152+
// Allocate exact size to avoid over-allocation for small inputs
153+
return make([]uint64, required)
136154
}
137155

138156
// ensureBoolSliceCap ensures slice has at least required length (cleared).
139-
// Uses 2x growth with 25% headroom when reallocation is needed.
157+
// Reuses existing capacity when possible.
140158
func ensureBoolSliceCap(s []bool, required int) []bool {
141159
if cap(s) >= required {
142160
s = s[:required]
143161
clear(s)
144162
return s
145163
}
146-
newCap := max(cap(s)*2, required)
147-
newCap += newCap / 4
148-
return make([]bool, required, newCap)
164+
// Allocate exact size to avoid over-allocation for small inputs
165+
return make([]bool, required)
149166
}
150167

151168
// =============================================================================
@@ -481,6 +498,20 @@ func scanBufferWithGenerator(buf []byte, gen maskGenerator) *scanResult {
481498

482499
// acquireScanResult gets a pooled scanResult and initializes it for the given chunk count.
483500
func acquireScanResult(chunkCount int) *scanResult {
501+
if chunkCount >= scanResultLargeThreshold {
502+
scanResultLargeCache.mu.Lock()
503+
result := scanResultLargeCache.sr
504+
if result != nil && cap(result.quoteMasks) >= chunkCount {
505+
scanResultLargeCache.sr = nil
506+
scanResultLargeCache.mu.Unlock()
507+
result.reset()
508+
result.chunkCount = chunkCount
509+
initScanResultSlices(result, chunkCount)
510+
return result
511+
}
512+
scanResultLargeCache.mu.Unlock()
513+
}
514+
484515
result := scanResultPool.Get().(*scanResult)
485516
result.reset()
486517
result.chunkCount = chunkCount

0 commit comments

Comments
 (0)