Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions field_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,14 @@ func (pr *parseResult) reset() {
pr.rows = pr.rows[:0]
}

// releaseParseResult returns a parseResult to the pool for reuse.
func releaseParseResult(pr *parseResult) {
if pr != nil {
pr.reset()
parseResultPool.Put(pr)
// release returns the parseResult to the pool for reuse.
func (pr *parseResult) release() {
if pr == nil {
return
}

pr.reset()
parseResultPool.Put(pr)
}

// =============================================================================
Expand Down
8 changes: 4 additions & 4 deletions parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func ParseBytes(data []byte, comma rune) ([][]string, error) {
pr := parseBuffer(data, sr)
records := buildRecords(data, pr, sr.hasCR)

releaseParseResult(pr)
releaseScanResult(sr)
pr.release()
sr.release()

return records, nil
}
Expand All @@ -37,8 +37,8 @@ func ParseBytesStreaming(data []byte, comma rune, callback func([]string) error)
separator := byte(comma)
sr := scanBuffer(data, separator)
pr := parseBuffer(data, sr)
defer releaseParseResult(pr)
defer releaseScanResult(sr)
defer pr.release()
defer sr.release()

if pr == nil || len(pr.rows) == 0 {
return nil
Expand Down
2 changes: 1 addition & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func (r *Reader) initialize() error {
r.state.parseResult = parseBuffer(r.state.rawBuffer, r.state.scanResult)

// Release scanResult (no longer needed after parsing)
releaseScanResult(r.state.scanResult)
r.state.scanResult.release()
r.state.scanResult = nil

r.state.offset = int64(len(r.state.rawBuffer))
Expand Down
97 changes: 62 additions & 35 deletions simd_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ func bytesToInt8Slice(b []byte) []int8 {
// useAVX512 indicates whether AVX-512 instructions are available at runtime.
var useAVX512 bool

// Cached broadcast values for fixed characters (initialized in init()).
var (
// AVX-512 (64-byte) cached values
cachedQuoteCmp archsimd.Int8x64
cachedCrCmp archsimd.Int8x64
cachedNlCmp archsimd.Int8x64
)

// SIMD processing constants.
const (
simdChunkSize = 64 // bytes per AVX-512 iteration
Expand All @@ -32,6 +40,12 @@ const (

func init() {
useAVX512 = archsimd.X86.AVX512()
if useAVX512 {
// Pre-broadcast fixed characters to avoid repeated BroadcastInt8x64 calls
cachedQuoteCmp = archsimd.BroadcastInt8x64('"')
cachedCrCmp = archsimd.BroadcastInt8x64('\r')
cachedNlCmp = archsimd.BroadcastInt8x64('\n')
}
}

// =============================================================================
Expand Down Expand Up @@ -124,9 +138,9 @@ func (sr *scanResult) reset() {
sr.newlineCount = 0
}

// releaseScanResult returns a scanResult to the pool for reuse.
// release returns the scanResult to the pool for reuse.
// Large results (>= scanResultLargeThreshold) are cached separately to survive GC.
func releaseScanResult(sr *scanResult) {
func (sr *scanResult) release() {
if sr == nil {
return
}
Expand Down Expand Up @@ -203,12 +217,11 @@ func generateMasksScalar(data []byte, separator byte) (quote, sep, cr, nl uint64

// generateMasksAVX512 generates masks using AVX-512 SIMD instructions.
// Requires AVX-512BW for ToBits() which uses VPMOVB2M instruction.
// Uses cached broadcast values for fixed characters (quote, CR, NL) to avoid
// repeated BroadcastInt8x64 calls.
func generateMasksAVX512(data []byte, separator byte) (quote, sep, cr, nl uint64) {
quoteCmp := archsimd.BroadcastInt8x64('"')
sepCmp := archsimd.BroadcastInt8x64(int8(separator))
crCmp := archsimd.BroadcastInt8x64('\r')
nlCmp := archsimd.BroadcastInt8x64('\n')
return generateMasksAVX512WithCmp(data, quoteCmp, sepCmp, crCmp, nlCmp)
return generateMasksAVX512WithCmp(data, cachedQuoteCmp, sepCmp, cachedCrCmp, cachedNlCmp)
}

// generateMasksAVX512WithCmp generates masks reusing pre-broadcasted comparators.
Expand Down Expand Up @@ -446,10 +459,10 @@ type avx512MaskGenerator struct {

func newAVX512MaskGenerator(separator byte) *avx512MaskGenerator {
return &avx512MaskGenerator{
quoteCmp: archsimd.BroadcastInt8x64('"'),
quoteCmp: cachedQuoteCmp,
sepCmp: archsimd.BroadcastInt8x64(int8(separator)),
crCmp: archsimd.BroadcastInt8x64('\r'),
nlCmp: archsimd.BroadcastInt8x64('\n'),
crCmp: cachedCrCmp,
nlCmp: cachedNlCmp,
}
}

Expand Down Expand Up @@ -492,20 +505,34 @@ func scanBufferWithGenerator(buf []byte, gen maskGenerator) *scanResult {
result := acquireScanResult(chunkCount)
state := scanState{}

curMasks, curValidBits := generateFirstChunkMasks(buf, gen, result)
nextMasks := generateSecondChunkMasks(buf, chunkCount, gen, result)
sc := bufferScanContext{
buf: buf,
gen: gen,
result: result,
chunkCount: chunkCount,
}

curMasks, curValidBits := sc.generateFirstChunkMasks()
nextMasks := sc.generateSecondChunkMasks()

for chunkIdx := 0; chunkIdx < chunkCount; chunkIdx++ {
processChunk(chunkIdx, curMasks, nextMasks, curValidBits, &state, result)

curMasks = nextMasks
nextMasks, curValidBits = generateNextLookahead(buf, chunkIdx, chunkCount, gen, result)
nextMasks, curValidBits = sc.generateNextLookahead(chunkIdx)
}

result.finalQuoted = state.quoted
return result
}

type bufferScanContext struct {
buf []byte
gen maskGenerator
result *scanResult
chunkCount int
}

// acquireScanResult gets a pooled scanResult and initializes it for the given chunk count.
func acquireScanResult(chunkCount int) *scanResult {
if chunkCount >= scanResultLargeThreshold {
Expand All @@ -531,61 +558,61 @@ func acquireScanResult(chunkCount int) *scanResult {

// generateFirstChunkMasks generates masks for the first chunk of the buffer.
// Handles both full chunks and partial (padded) chunks.
func generateFirstChunkMasks(buf []byte, gen maskGenerator, result *scanResult) (chunkMasks, int) {
if len(buf) >= simdChunkSize {
return gen.generateFull(buf[0:simdChunkSize]), simdChunkSize
func (sc *bufferScanContext) generateFirstChunkMasks() (chunkMasks, int) {
if len(sc.buf) >= simdChunkSize {
return sc.gen.generateFull(sc.buf[0:simdChunkSize]), simdChunkSize
}

masks, validBits := gen.generatePadded(buf)
result.lastChunkBits = validBits
masks, validBits := sc.gen.generatePadded(sc.buf)
sc.result.lastChunkBits = validBits
return masks, validBits
}

// generateSecondChunkMasks generates lookahead masks for the second chunk if it exists.
// Returns empty masks if there is no second chunk.
func generateSecondChunkMasks(buf []byte, chunkCount int, gen maskGenerator, result *scanResult) chunkMasks {
if chunkCount <= 1 || len(buf) <= simdChunkSize {
func (sc *bufferScanContext) generateSecondChunkMasks() chunkMasks {
if sc.chunkCount <= 1 || len(sc.buf) <= simdChunkSize {
return chunkMasks{}
}

if len(buf) >= 2*simdChunkSize {
return gen.generateFull(buf[simdChunkSize : 2*simdChunkSize])
if len(sc.buf) >= 2*simdChunkSize {
return sc.gen.generateFull(sc.buf[simdChunkSize : 2*simdChunkSize])
}

masks, validBits := gen.generatePadded(buf[simdChunkSize:])
if chunkCount == 2 {
result.lastChunkBits = validBits
masks, validBits := sc.gen.generatePadded(sc.buf[simdChunkSize:])
if sc.chunkCount == 2 {
sc.result.lastChunkBits = validBits
}
return masks
}

// generateNextLookahead generates masks for the chunk two positions ahead (lookahead).
// This enables processing current chunk while knowing what comes next.
func generateNextLookahead(buf []byte, chunkIdx, chunkCount int, gen maskGenerator, result *scanResult) (chunkMasks, int) {
func (sc *bufferScanContext) generateNextLookahead(chunkIdx int) (chunkMasks, int) {
lookaheadIdx := chunkIdx + 2
if lookaheadIdx >= chunkCount {
return handleFinalChunkValidBits(buf, chunkIdx, chunkCount, result)
if lookaheadIdx >= sc.chunkCount {
return sc.handleFinalChunkValidBits(chunkIdx)
}

offset := lookaheadIdx * simdChunkSize
remaining := len(buf) - offset
remaining := len(sc.buf) - offset

if remaining >= simdChunkSize {
return gen.generateFull(buf[offset : offset+simdChunkSize]), simdChunkSize
return sc.gen.generateFull(sc.buf[offset : offset+simdChunkSize]), simdChunkSize
}

masks, validBits := gen.generatePadded(buf[offset:])
result.lastChunkBits = validBits
masks, validBits := sc.gen.generatePadded(sc.buf[offset:])
sc.result.lastChunkBits = validBits
return masks, validBits
}

// handleFinalChunkValidBits computes valid bits when no more lookahead chunks exist.
func handleFinalChunkValidBits(buf []byte, chunkIdx, chunkCount int, result *scanResult) (chunkMasks, int) {
func (sc *bufferScanContext) handleFinalChunkValidBits(chunkIdx int) (chunkMasks, int) {
validBits := simdChunkSize

if chunkIdx+1 == chunkCount-1 && len(buf)%simdChunkSize != 0 {
validBits = len(buf) % simdChunkSize
result.lastChunkBits = validBits
if chunkIdx+1 == sc.chunkCount-1 && len(sc.buf)%simdChunkSize != 0 {
validBits = len(sc.buf) % simdChunkSize
sc.result.lastChunkBits = validBits
}

return chunkMasks{}, validBits
Expand Down
28 changes: 24 additions & 4 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@ import (
"simd/archsimd"
)

// Cached broadcast values for fixed characters used in Writer (initialized in init()).
var (
cachedQuoteCmp32 archsimd.Int8x32
cachedCrCmp32 archsimd.Int8x32
cachedNlCmp32 archsimd.Int8x32
)

func init() {
if useAVX512 {
// Pre-broadcast fixed characters for AVX2 (32-byte) operations to avoid
// repeated BroadcastInt8x32 calls in Writer methods.
cachedQuoteCmp32 = archsimd.BroadcastInt8x32('"')
cachedCrCmp32 = archsimd.BroadcastInt8x32('\r')
cachedNlCmp32 = archsimd.BroadcastInt8x32('\n')
}
}

// Writer writes records using CSV encoding.
//
// Records are terminated by a newline and use ',' as the field delimiter by default.
Expand Down Expand Up @@ -123,14 +140,16 @@ func (w *Writer) fieldNeedsQuotesScalar(field string) bool {
}

// fieldNeedsQuotesSIMD uses SIMD to detect special characters requiring quoting.
// Uses cached broadcast values for fixed characters (quote, CR, NL) to avoid
// repeated BroadcastInt8x32 calls.
func (w *Writer) fieldNeedsQuotesSIMD(field string) bool {
data := unsafe.Slice(unsafe.StringData(field), len(field))
int8Data := bytesToInt8Slice(data)

commaCmp := archsimd.BroadcastInt8x32(int8(w.Comma))
newlineCmp := archsimd.BroadcastInt8x32('\n')
carriageReturnCmp := archsimd.BroadcastInt8x32('\r')
quoteCmp := archsimd.BroadcastInt8x32('"')
newlineCmp := cachedNlCmp32
carriageReturnCmp := cachedCrCmp32
quoteCmp := cachedQuoteCmp32

// Process 32-byte chunks
offset := 0
Expand Down Expand Up @@ -186,10 +205,11 @@ func (w *Writer) writeQuotedFieldScalar(field string) error {
}

// writeQuotedFieldSIMD escapes quotes using SIMD to find quote positions.
// Uses cached broadcast value for quote character to avoid repeated BroadcastInt8x32 calls.
func (w *Writer) writeQuotedFieldSIMD(field string) error {
data := unsafe.Slice(unsafe.StringData(field), len(field))
int8Data := bytesToInt8Slice(data)
quoteCmp := archsimd.BroadcastInt8x32('"')
quoteCmp := cachedQuoteCmp32

offset := 0
lastWritten := 0
Expand Down