Skip to content

Commit 3d5cc09

Browse files
author
James Cor
committed
use Reader instead of Scanner to bypass buffer limits
1 parent 9aafbeb commit 3d5cc09

File tree

5 files changed

+61
-27
lines changed

5 files changed

+61
-27
lines changed

.gitattributes

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ enginetest/testdata/test5.txt binary
77
enginetest/testdata/test6.csv binary
88
enginetest/testdata/test7.txt binary
99
enginetest/testdata/test8.txt binary
10-
enginetest/testdata/test9.txt binary
10+
enginetest/testdata/test9.txt binary
11+
enginetest/testdata/test10.txt binary

enginetest/queries/load_queries.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,23 @@ var LoadDataScripts = []ScriptTest{
539539
},
540540
},
541541
},
542+
{
543+
Name: "LOAD DATA with column data larger than 64KB",
544+
SetUpScript: []string{
545+
"create table t(id int primary key, lt longtext);",
546+
"load data infile './testdata/test10.txt' into table t fields terminated by ',' lines terminated by '\n';",
547+
},
548+
Assertions: []ScriptTestAssertion{
549+
{
550+
Query: "select id, length(lt) from t order by id",
551+
Expected: []sql.Row{
552+
{1, 65536},
553+
{2, 100000},
554+
{3, 1000000},
555+
},
556+
},
557+
},
558+
},
542559
}
543560

544561
var LoadDataErrorScripts = []ScriptTest{

enginetest/testdata/test10.txt

Lines changed: 3 additions & 0 deletions
Large diffs are not rendered by default.

sql/rowexec/ddl.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,6 @@ func (b *BaseBuilder) buildLoadData(ctx *sql.Context, n *plan.LoadData, row sql.
9393
reader = file
9494
}
9595

96-
scanner := bufio.NewScanner(reader)
97-
scanner.Split(n.SplitLines)
98-
99-
// Skip through the lines that need to be ignored.
100-
for n.IgnoreNum > 0 && scanner.Scan() {
101-
scanner.Text()
102-
n.IgnoreNum--
103-
}
104-
105-
if scanner.Err() != nil {
106-
reader.Close()
107-
return nil, scanner.Err()
108-
}
109-
11096
sch := n.Schema()
11197
source := sch[0].Source // Schema will always have at least one column
11298
colNames := n.ColNames
@@ -130,7 +116,7 @@ func (b *BaseBuilder) buildLoadData(ctx *sql.Context, n *plan.LoadData, row sql.
130116
return &loadDataIter{
131117
destSch: n.DestSch,
132118
reader: reader,
133-
scanner: scanner,
119+
scanner: bufio.NewReader(reader),
134120
colCount: len(n.ColNames), // Needs to be the original column count
135121
fieldToColMap: fieldToColMap,
136122
setExprs: n.SetExprs,

sql/rowexec/ddl_iters.go

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package rowexec
1616

1717
import (
1818
"bufio"
19+
"bytes"
1920
"fmt"
2021
"io"
2122
"strings"
@@ -38,15 +39,18 @@ import (
3839
)
3940

4041
type loadDataIter struct {
41-
scanner *bufio.Scanner
42+
scanner *bufio.Reader
4243
reader io.ReadCloser
44+
buffer bytes.Buffer
4345

4446
destSch sql.Schema
4547
colCount int
4648
fieldToColMap []int
4749
setExprs []sql.Expression
4850
userVars []sql.Expression
4951

52+
ignoreNum int64
53+
5054
fieldsTerminatedBy string
5155
fieldsEnclosedBy string
5256
fieldsEnclosedByOpt bool
@@ -56,28 +60,51 @@ type loadDataIter struct {
5660
linesTerminatedBy string
5761
}
5862

63+
// readLine reads a line from the scanner.
64+
func (l loadDataIter) readLine() ([]byte, error) {
65+
// last byte of the line terminator
66+
delim := []byte(l.linesTerminatedBy)
67+
delimDelim := delim[len(delim)-1]
68+
l.buffer.Reset()
69+
for {
70+
chunk, err := l.scanner.ReadBytes(delimDelim)
71+
if err != nil {
72+
return nil, err
73+
}
74+
l.buffer.Write(chunk)
75+
buf := l.buffer.Bytes()
76+
if bytes.HasSuffix(buf, delim) {
77+
return buf, nil
78+
}
79+
}
80+
}
81+
5982
func (l loadDataIter) Next(ctx *sql.Context) (returnRow sql.Row, returnErr error) {
60-
var exprs []sql.Expression
61-
var err error
83+
// skip first ignoreNum lines
84+
for ; l.ignoreNum > 0; l.ignoreNum-- {
85+
_, err := l.readLine()
86+
if err != nil {
87+
return nil, err
88+
}
89+
}
90+
6291
// If exprs is nil then this is a skipped line (see test cases). Keep skipping
6392
// until exprs != nil
93+
var exprs []sql.Expression
6494
for exprs == nil {
65-
if keepGoing := l.scanner.Scan(); !keepGoing {
66-
if l.scanner.Err() != nil {
67-
return nil, l.scanner.Err()
68-
}
69-
return nil, io.EOF
95+
line, err := l.readLine()
96+
if err != nil {
97+
return nil, err
7098
}
71-
72-
line := l.scanner.Text()
73-
exprs, err = l.parseFields(ctx, line)
99+
exprs, err = l.parseFields(ctx, string(line))
74100
if err != nil {
75101
return nil, err
76102
}
77103
}
78104

79105
row := make(sql.Row, len(exprs))
80106
var secondPass []int
107+
var err error
81108
for i, expr := range exprs {
82109
if expr != nil {
83110
// Non-literal default values may reference other columns, so we need to evaluate them in a second pass.

0 commit comments

Comments
 (0)