@@ -42,6 +42,7 @@ type loadDataIter struct {
4242 scanner * bufio.Reader
4343 reader io.ReadCloser
4444 buffer bytes.Buffer
45+ hitEOF bool
4546
4647 destSch sql.Schema
4748 colCount int
@@ -60,26 +61,37 @@ type loadDataIter struct {
6061 linesTerminatedBy string
6162}
6263
63- // readLine reads a line from the scanner.
64- func (l loadDataIter ) readLine () ([]byte , error ) {
65- // last byte of the line terminator
64+ var _ sql.RowIter = (* loadDataIter )(nil )
65+ var _ sql.Closer = (* loadDataIter )(nil )
66+
67+ // readLine reads a line from the scanner. It does not include the delimiter.
68+ func (l * loadDataIter ) readLine () ([]byte , error ) {
69+ // return EOF on the call after EOF
70+ if l .hitEOF {
71+ return nil , io .EOF
72+ }
6673 delim := []byte (l .linesTerminatedBy )
6774 delimDelim := delim [len (delim )- 1 ]
6875 l .buffer .Reset ()
76+ var buf []byte
6977 for {
7078 chunk , err := l .scanner .ReadBytes (delimDelim )
71- if err != nil {
79+ if err != nil && err != io . EOF {
7280 return nil , err
7381 }
7482 l .buffer .Write (chunk )
75- buf : = l .buffer .Bytes ()
83+ buf = l .buffer .Bytes ()
7684 if bytes .HasSuffix (buf , delim ) {
85+ return buf [:len (buf )- len (delim )], nil
86+ }
87+ if err == io .EOF {
88+ l .hitEOF = true
7789 return buf , nil
7890 }
7991 }
8092}
8193
82- func (l loadDataIter ) Next (ctx * sql.Context ) (returnRow sql.Row , returnErr error ) {
94+ func (l * loadDataIter ) Next (ctx * sql.Context ) (returnRow sql.Row , returnErr error ) {
8395 // skip first ignoreNum lines
8496 for ; l .ignoreNum > 0 ; l .ignoreNum -- {
8597 _ , err := l .readLine ()
@@ -128,12 +140,12 @@ func (l loadDataIter) Next(ctx *sql.Context) (returnRow sql.Row, returnErr error
128140 return sql .NewRow (row ... ), nil
129141}
130142
131- func (l loadDataIter ) Close (ctx * sql.Context ) error {
143+ func (l * loadDataIter ) Close (ctx * sql.Context ) error {
132144 return l .reader .Close ()
133145}
134146
135147// parseLinePrefix searches for the delim defined by linesStartingByDelim.
136- func (l loadDataIter ) parseLinePrefix (line string ) string {
148+ func (l * loadDataIter ) parseLinePrefix (line string ) string {
137149 if l .linesStartingBy == "" {
138150 return line
139151 }
@@ -148,7 +160,7 @@ func (l loadDataIter) parseLinePrefix(line string) string {
148160 }
149161}
150162
151- func (l loadDataIter ) parseFields (ctx * sql.Context , line string ) ([]sql.Expression , error ) {
163+ func (l * loadDataIter ) parseFields (ctx * sql.Context , line string ) ([]sql.Expression , error ) {
152164 // Step 1. Start by Searching for prefix if there is one
153165 line = l .parseLinePrefix (line )
154166 if line == "" {
0 commit comments