Skip to content

Commit f45ba7c

Browse files
author
James Cor
committed
just increase buffer size instead
1 parent 6cac6cd commit f45ba7c

File tree

3 files changed

+38
-42
lines changed

3 files changed

+38
-42
lines changed

sql/plan/load_data.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
package plan
1616

1717
import (
18-
"github.com/dolthub/vitess/go/vt/sqlparser"
18+
"bytes"
1919

2020
"github.com/dolthub/go-mysql-server/sql"
21+
"github.com/dolthub/vitess/go/vt/sqlparser"
2122
)
2223

2324
type LoadData struct {
@@ -67,6 +68,25 @@ func (l *LoadData) IsReadOnly() bool {
6768
return false
6869
}
6970

71+
func (l *LoadData) SplitLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
72+
// Return Nothing if at end of file and no data passed.
73+
if atEOF && len(data) == 0 {
74+
return 0, nil, nil
75+
}
76+
77+
// Find the index of the LINES TERMINATED BY delim.
78+
if i := bytes.Index(data, []byte(l.LinesTerminatedBy)); i >= 0 {
79+
return i + len(l.LinesTerminatedBy), data[0:i], nil
80+
}
81+
82+
// If at end of file with data return the data.
83+
if atEOF {
84+
return len(data), data, nil
85+
}
86+
87+
return
88+
}
89+
7090
func (l *LoadData) WithChildren(children ...sql.Node) (sql.Node, error) {
7191
if len(children) != 0 {
7292
return nil, sql.ErrInvalidChildrenNumber.New(l, len(children), 0)

sql/rowexec/ddl.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func (b *BaseBuilder) buildLoadData(ctx *sql.Context, n *plan.LoadData, row sql.
9393
reader = file
9494
}
9595

96+
scanner := bufio.NewScanner(reader)
97+
scanner.Buffer(nil, int(types.LongTextBlobMax))
98+
scanner.Split(n.SplitLines)
99+
96100
sch := n.Schema()
97101
source := sch[0].Source // Schema will always have at least one column
98102
colNames := n.ColNames
@@ -116,7 +120,7 @@ func (b *BaseBuilder) buildLoadData(ctx *sql.Context, n *plan.LoadData, row sql.
116120
return &loadDataIter{
117121
destSch: n.DestSch,
118122
reader: reader,
119-
scanner: bufio.NewReader(reader),
123+
scanner: scanner,
120124
colCount: len(n.ColNames), // Needs to be the original column count
121125
fieldToColMap: fieldToColMap,
122126
setExprs: n.SetExprs,

sql/rowexec/ddl_iters.go

Lines changed: 12 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ package rowexec
1616

1717
import (
1818
"bufio"
19-
"bytes"
20-
"fmt"
19+
"fmt"
2120
"io"
2221
"strings"
2322
"sync"
@@ -39,10 +38,8 @@ import (
3938
)
4039

4140
type loadDataIter struct {
42-
scanner *bufio.Reader
41+
scanner *bufio.Scanner
4342
reader io.ReadCloser
44-
buffer bytes.Buffer
45-
hitEOF bool
4643

4744
destSch sql.Schema
4845
colCount int
@@ -64,38 +61,12 @@ type loadDataIter struct {
6461
var _ sql.RowIter = (*loadDataIter)(nil)
6562
var _ sql.Closer = (*loadDataIter)(nil)
6663

67-
// readLine reads a line from the scanner. It does not include the delimiter.
68-
func (l *loadDataIter) readLine() ([]byte, error) {
69-
// return EOF on the call after EOF
70-
if l.hitEOF {
71-
return nil, io.EOF
72-
}
73-
delim := []byte(l.linesTerminatedBy)
74-
delimDelim := delim[len(delim)-1]
75-
l.buffer.Reset()
76-
var buf []byte
77-
for {
78-
chunk, err := l.scanner.ReadBytes(delimDelim)
79-
if err != nil && err != io.EOF {
80-
return nil, err
81-
}
82-
l.buffer.Write(chunk)
83-
buf = l.buffer.Bytes()
84-
if bytes.HasSuffix(buf, delim) {
85-
return buf[:len(buf)-len(delim)], nil
86-
}
87-
if err == io.EOF {
88-
l.hitEOF = true
89-
return buf, nil
90-
}
91-
}
92-
}
93-
9464
func (l *loadDataIter) Next(ctx *sql.Context) (returnRow sql.Row, returnErr error) {
9565
// skip first ignoreNum lines
96-
for ; l.ignoreNum > 0; l.ignoreNum-- {
97-
_, err := l.readLine()
98-
if err != nil {
66+
var err error
67+
for ; l.ignoreNum > 0 && l.scanner.Scan(); l.ignoreNum-- {
68+
if err = l.scanner.Err(); err != nil {
69+
l.reader.Close()
9970
return nil, err
10071
}
10172
}
@@ -104,19 +75,20 @@ func (l *loadDataIter) Next(ctx *sql.Context) (returnRow sql.Row, returnErr erro
10475
// until exprs != nil
10576
var exprs []sql.Expression
10677
for exprs == nil {
107-
line, err := l.readLine()
108-
if err != nil {
109-
return nil, err
78+
if keepGoing := l.scanner.Scan(); !keepGoing {
79+
if err = l.scanner.Err(); err != nil {
80+
return nil, err
81+
}
82+
return nil, io.EOF
11083
}
111-
exprs, err = l.parseFields(ctx, string(line))
84+
exprs, err = l.parseFields(ctx, l.scanner.Text())
11285
if err != nil {
11386
return nil, err
11487
}
11588
}
11689

11790
row := make(sql.Row, len(exprs))
11891
var secondPass []int
119-
var err error
12092
for i, expr := range exprs {
12193
if expr != nil {
12294
// Non-literal default values may reference other columns, so we need to evaluate them in a second pass.

0 commit comments

Comments
 (0)