Skip to content

Commit aea22b1

Browse files
authored
refactor: use cached values (#68)
* refactor: use cached values * refactor: use cached values in writer * refactor: move cached values to writer
1 parent 732fb64 commit aea22b1

File tree

5 files changed

+98
-49
lines changed

5 files changed

+98
-49
lines changed

field_parser.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,14 @@ func (pr *parseResult) reset() {
103103
pr.rows = pr.rows[:0]
104104
}
105105

106-
// releaseParseResult returns a parseResult to the pool for reuse.
107-
func releaseParseResult(pr *parseResult) {
108-
if pr != nil {
109-
pr.reset()
110-
parseResultPool.Put(pr)
106+
// release returns the parseResult to the pool for reuse.
107+
func (pr *parseResult) release() {
108+
if pr == nil {
109+
return
111110
}
111+
112+
pr.reset()
113+
parseResultPool.Put(pr)
112114
}
113115

114116
// =============================================================================

parse.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func ParseBytes(data []byte, comma rune) ([][]string, error) {
2121
pr := parseBuffer(data, sr)
2222
records := buildRecords(data, pr, sr.hasCR)
2323

24-
releaseParseResult(pr)
25-
releaseScanResult(sr)
24+
pr.release()
25+
sr.release()
2626

2727
return records, nil
2828
}
@@ -37,8 +37,8 @@ func ParseBytesStreaming(data []byte, comma rune, callback func([]string) error)
3737
separator := byte(comma)
3838
sr := scanBuffer(data, separator)
3939
pr := parseBuffer(data, sr)
40-
defer releaseParseResult(pr)
41-
defer releaseScanResult(sr)
40+
defer pr.release()
41+
defer sr.release()
4242

4343
if pr == nil || len(pr.rows) == 0 {
4444
return nil

reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ func (r *Reader) initialize() error {
362362
r.state.parseResult = parseBuffer(r.state.rawBuffer, r.state.scanResult)
363363

364364
// Release scanResult (no longer needed after parsing)
365-
releaseScanResult(r.state.scanResult)
365+
r.state.scanResult.release()
366366
r.state.scanResult = nil
367367

368368
r.state.offset = int64(len(r.state.rawBuffer))

simd_scanner.go

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ func bytesToInt8Slice(b []byte) []int8 {
2121
// useAVX512 indicates whether AVX-512 instructions are available at runtime.
2222
var useAVX512 bool
2323

24+
// Cached broadcast values for fixed characters (initialized in init()).
25+
var (
26+
// AVX-512 (64-byte) cached values
27+
cachedQuoteCmp archsimd.Int8x64
28+
cachedCrCmp archsimd.Int8x64
29+
cachedNlCmp archsimd.Int8x64
30+
)
31+
2432
// SIMD processing constants.
2533
const (
2634
simdChunkSize = 64 // bytes per AVX-512 iteration
@@ -32,6 +40,12 @@ const (
3240

3341
func init() {
3442
useAVX512 = archsimd.X86.AVX512()
43+
if useAVX512 {
44+
// Pre-broadcast fixed characters to avoid repeated BroadcastInt8x64 calls
45+
cachedQuoteCmp = archsimd.BroadcastInt8x64('"')
46+
cachedCrCmp = archsimd.BroadcastInt8x64('\r')
47+
cachedNlCmp = archsimd.BroadcastInt8x64('\n')
48+
}
3549
}
3650

3751
// =============================================================================
@@ -124,9 +138,9 @@ func (sr *scanResult) reset() {
124138
sr.newlineCount = 0
125139
}
126140

127-
// releaseScanResult returns a scanResult to the pool for reuse.
141+
// release returns the scanResult to the pool for reuse.
128142
// Large results (>= scanResultLargeThreshold) are cached separately to survive GC.
129-
func releaseScanResult(sr *scanResult) {
143+
func (sr *scanResult) release() {
130144
if sr == nil {
131145
return
132146
}
@@ -203,12 +217,11 @@ func generateMasksScalar(data []byte, separator byte) (quote, sep, cr, nl uint64
203217

204218
// generateMasksAVX512 generates masks using AVX-512 SIMD instructions.
205219
// Requires AVX-512BW for ToBits() which uses VPMOVB2M instruction.
220+
// Uses cached broadcast values for fixed characters (quote, CR, NL) to avoid
221+
// repeated BroadcastInt8x64 calls.
206222
func generateMasksAVX512(data []byte, separator byte) (quote, sep, cr, nl uint64) {
207-
quoteCmp := archsimd.BroadcastInt8x64('"')
208223
sepCmp := archsimd.BroadcastInt8x64(int8(separator))
209-
crCmp := archsimd.BroadcastInt8x64('\r')
210-
nlCmp := archsimd.BroadcastInt8x64('\n')
211-
return generateMasksAVX512WithCmp(data, quoteCmp, sepCmp, crCmp, nlCmp)
224+
return generateMasksAVX512WithCmp(data, cachedQuoteCmp, sepCmp, cachedCrCmp, cachedNlCmp)
212225
}
213226

214227
// generateMasksAVX512WithCmp generates masks reusing pre-broadcasted comparators.
@@ -446,10 +459,10 @@ type avx512MaskGenerator struct {
446459

447460
func newAVX512MaskGenerator(separator byte) *avx512MaskGenerator {
448461
return &avx512MaskGenerator{
449-
quoteCmp: archsimd.BroadcastInt8x64('"'),
462+
quoteCmp: cachedQuoteCmp,
450463
sepCmp: archsimd.BroadcastInt8x64(int8(separator)),
451-
crCmp: archsimd.BroadcastInt8x64('\r'),
452-
nlCmp: archsimd.BroadcastInt8x64('\n'),
464+
crCmp: cachedCrCmp,
465+
nlCmp: cachedNlCmp,
453466
}
454467
}
455468

@@ -492,20 +505,34 @@ func scanBufferWithGenerator(buf []byte, gen maskGenerator) *scanResult {
492505
result := acquireScanResult(chunkCount)
493506
state := scanState{}
494507

495-
curMasks, curValidBits := generateFirstChunkMasks(buf, gen, result)
496-
nextMasks := generateSecondChunkMasks(buf, chunkCount, gen, result)
508+
sc := bufferScanContext{
509+
buf: buf,
510+
gen: gen,
511+
result: result,
512+
chunkCount: chunkCount,
513+
}
514+
515+
curMasks, curValidBits := sc.generateFirstChunkMasks()
516+
nextMasks := sc.generateSecondChunkMasks()
497517

498518
for chunkIdx := 0; chunkIdx < chunkCount; chunkIdx++ {
499519
processChunk(chunkIdx, curMasks, nextMasks, curValidBits, &state, result)
500520

501521
curMasks = nextMasks
502-
nextMasks, curValidBits = generateNextLookahead(buf, chunkIdx, chunkCount, gen, result)
522+
nextMasks, curValidBits = sc.generateNextLookahead(chunkIdx)
503523
}
504524

505525
result.finalQuoted = state.quoted
506526
return result
507527
}
508528

529+
type bufferScanContext struct {
530+
buf []byte
531+
gen maskGenerator
532+
result *scanResult
533+
chunkCount int
534+
}
535+
509536
// acquireScanResult gets a pooled scanResult and initializes it for the given chunk count.
510537
func acquireScanResult(chunkCount int) *scanResult {
511538
if chunkCount >= scanResultLargeThreshold {
@@ -531,61 +558,61 @@ func acquireScanResult(chunkCount int) *scanResult {
531558

532559
// generateFirstChunkMasks generates masks for the first chunk of the buffer.
533560
// Handles both full chunks and partial (padded) chunks.
534-
func generateFirstChunkMasks(buf []byte, gen maskGenerator, result *scanResult) (chunkMasks, int) {
535-
if len(buf) >= simdChunkSize {
536-
return gen.generateFull(buf[0:simdChunkSize]), simdChunkSize
561+
func (sc *bufferScanContext) generateFirstChunkMasks() (chunkMasks, int) {
562+
if len(sc.buf) >= simdChunkSize {
563+
return sc.gen.generateFull(sc.buf[0:simdChunkSize]), simdChunkSize
537564
}
538565

539-
masks, validBits := gen.generatePadded(buf)
540-
result.lastChunkBits = validBits
566+
masks, validBits := sc.gen.generatePadded(sc.buf)
567+
sc.result.lastChunkBits = validBits
541568
return masks, validBits
542569
}
543570

544571
// generateSecondChunkMasks generates lookahead masks for the second chunk if it exists.
545572
// Returns empty masks if there is no second chunk.
546-
func generateSecondChunkMasks(buf []byte, chunkCount int, gen maskGenerator, result *scanResult) chunkMasks {
547-
if chunkCount <= 1 || len(buf) <= simdChunkSize {
573+
func (sc *bufferScanContext) generateSecondChunkMasks() chunkMasks {
574+
if sc.chunkCount <= 1 || len(sc.buf) <= simdChunkSize {
548575
return chunkMasks{}
549576
}
550577

551-
if len(buf) >= 2*simdChunkSize {
552-
return gen.generateFull(buf[simdChunkSize : 2*simdChunkSize])
578+
if len(sc.buf) >= 2*simdChunkSize {
579+
return sc.gen.generateFull(sc.buf[simdChunkSize : 2*simdChunkSize])
553580
}
554581

555-
masks, validBits := gen.generatePadded(buf[simdChunkSize:])
556-
if chunkCount == 2 {
557-
result.lastChunkBits = validBits
582+
masks, validBits := sc.gen.generatePadded(sc.buf[simdChunkSize:])
583+
if sc.chunkCount == 2 {
584+
sc.result.lastChunkBits = validBits
558585
}
559586
return masks
560587
}
561588

562589
// generateNextLookahead generates masks for the chunk two positions ahead (lookahead).
563590
// This enables processing current chunk while knowing what comes next.
564-
func generateNextLookahead(buf []byte, chunkIdx, chunkCount int, gen maskGenerator, result *scanResult) (chunkMasks, int) {
591+
func (sc *bufferScanContext) generateNextLookahead(chunkIdx int) (chunkMasks, int) {
565592
lookaheadIdx := chunkIdx + 2
566-
if lookaheadIdx >= chunkCount {
567-
return handleFinalChunkValidBits(buf, chunkIdx, chunkCount, result)
593+
if lookaheadIdx >= sc.chunkCount {
594+
return sc.handleFinalChunkValidBits(chunkIdx)
568595
}
569596

570597
offset := lookaheadIdx * simdChunkSize
571-
remaining := len(buf) - offset
598+
remaining := len(sc.buf) - offset
572599

573600
if remaining >= simdChunkSize {
574-
return gen.generateFull(buf[offset : offset+simdChunkSize]), simdChunkSize
601+
return sc.gen.generateFull(sc.buf[offset : offset+simdChunkSize]), simdChunkSize
575602
}
576603

577-
masks, validBits := gen.generatePadded(buf[offset:])
578-
result.lastChunkBits = validBits
604+
masks, validBits := sc.gen.generatePadded(sc.buf[offset:])
605+
sc.result.lastChunkBits = validBits
579606
return masks, validBits
580607
}
581608

582609
// handleFinalChunkValidBits computes valid bits when no more lookahead chunks exist.
583-
func handleFinalChunkValidBits(buf []byte, chunkIdx, chunkCount int, result *scanResult) (chunkMasks, int) {
610+
func (sc *bufferScanContext) handleFinalChunkValidBits(chunkIdx int) (chunkMasks, int) {
584611
validBits := simdChunkSize
585612

586-
if chunkIdx+1 == chunkCount-1 && len(buf)%simdChunkSize != 0 {
587-
validBits = len(buf) % simdChunkSize
588-
result.lastChunkBits = validBits
613+
if chunkIdx+1 == sc.chunkCount-1 && len(sc.buf)%simdChunkSize != 0 {
614+
validBits = len(sc.buf) % simdChunkSize
615+
sc.result.lastChunkBits = validBits
589616
}
590617

591618
return chunkMasks{}, validBits

writer.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,23 @@ import (
1111
"simd/archsimd"
1212
)
1313

14+
// Cached broadcast values for fixed characters used in Writer (initialized in init()).
15+
var (
16+
cachedQuoteCmp32 archsimd.Int8x32
17+
cachedCrCmp32 archsimd.Int8x32
18+
cachedNlCmp32 archsimd.Int8x32
19+
)
20+
21+
func init() {
22+
if useAVX512 {
23+
// Pre-broadcast fixed characters for AVX2 (32-byte) operations to avoid
24+
// repeated BroadcastInt8x32 calls in Writer methods.
25+
cachedQuoteCmp32 = archsimd.BroadcastInt8x32('"')
26+
cachedCrCmp32 = archsimd.BroadcastInt8x32('\r')
27+
cachedNlCmp32 = archsimd.BroadcastInt8x32('\n')
28+
}
29+
}
30+
1431
// Writer writes records using CSV encoding.
1532
//
1633
// Records are terminated by a newline and use ',' as the field delimiter by default.
@@ -123,14 +140,16 @@ func (w *Writer) fieldNeedsQuotesScalar(field string) bool {
123140
}
124141

125142
// fieldNeedsQuotesSIMD uses SIMD to detect special characters requiring quoting.
143+
// Uses cached broadcast values for fixed characters (quote, CR, NL) to avoid
144+
// repeated BroadcastInt8x32 calls.
126145
func (w *Writer) fieldNeedsQuotesSIMD(field string) bool {
127146
data := unsafe.Slice(unsafe.StringData(field), len(field))
128147
int8Data := bytesToInt8Slice(data)
129148

130149
commaCmp := archsimd.BroadcastInt8x32(int8(w.Comma))
131-
newlineCmp := archsimd.BroadcastInt8x32('\n')
132-
carriageReturnCmp := archsimd.BroadcastInt8x32('\r')
133-
quoteCmp := archsimd.BroadcastInt8x32('"')
150+
newlineCmp := cachedNlCmp32
151+
carriageReturnCmp := cachedCrCmp32
152+
quoteCmp := cachedQuoteCmp32
134153

135154
// Process 32-byte chunks
136155
offset := 0
@@ -186,10 +205,11 @@ func (w *Writer) writeQuotedFieldScalar(field string) error {
186205
}
187206

188207
// writeQuotedFieldSIMD escapes quotes using SIMD to find quote positions.
208+
// Uses cached broadcast value for quote character to avoid repeated BroadcastInt8x32 calls.
189209
func (w *Writer) writeQuotedFieldSIMD(field string) error {
190210
data := unsafe.Slice(unsafe.StringData(field), len(field))
191211
int8Data := bytesToInt8Slice(data)
192-
quoteCmp := archsimd.BroadcastInt8x32('"')
212+
quoteCmp := cachedQuoteCmp32
193213

194214
offset := 0
195215
lastWritten := 0

0 commit comments

Comments
 (0)