@@ -32,7 +32,7 @@ CSVScanner::CSVScanner(peloton::type::AbstractPool &pool,
3232 file_path_ (file_path),
3333 file_(),
3434 buffer_(nullptr ),
35- buffer_begin_ (0 ),
35+ buffer_pos_ (0 ),
3636 buffer_end_(0 ),
3737 line_(nullptr ),
3838 line_len_(0 ),
@@ -59,12 +59,17 @@ CSVScanner::CSVScanner(peloton::type::AbstractPool &pool,
5959CSVScanner::~CSVScanner () {
6060 if (buffer_ != nullptr ) {
6161 memory_.Free (buffer_);
62+ buffer_ = nullptr ;
6263 }
64+
6365 if (line_ != nullptr ) {
6466 memory_.Free (line_);
67+ line_ = nullptr ;
6568 }
69+
6670 if (cols_ != nullptr ) {
6771 memory_.Free (cols_);
72+ cols_ = nullptr ;
6873 }
6974}
7075
@@ -90,21 +95,22 @@ void CSVScanner::Produce() {
9095 Initialize ();
9196
9297 // Loop lines
93- while (const char *line = NextLine ()) {
98+ while (char *line = NextLine ()) {
9499 ProduceCSV (line);
95100 }
96101}
97102
98103void CSVScanner::Initialize () {
99104 // Let's first perform a few validity checks
100- boost::filesystem::path path{ file_path_} ;
105+ boost::filesystem::path path ( file_path_) ;
101106
102107 if (!boost::filesystem::exists (path)) {
103- throw ExecutorException{ StringUtil::Format (" input path '%s' does not exist" ,
104- file_path_.c_str ())} ;
108+ throw ExecutorException ( StringUtil::Format (" input path '%s' does not exist" ,
109+ file_path_.c_str ())) ;
105110 } else if (!boost::filesystem::is_regular_file (file_path_)) {
106- throw ExecutorException{
107- StringUtil::Format (" unable to read file '%s'" , file_path_.c_str ())};
111+ auto msg =
112+ StringUtil::Format (" unable to read file '%s'" , file_path_.c_str ());
113+ throw ExecutorException (msg);
108114 }
109115
110116 // The path looks okay, let's try opening it
@@ -125,7 +131,7 @@ void CSVScanner::Initialize() {
125131
126132bool CSVScanner::NextBuffer () {
127133 // Do read
128- buffer_begin_ = 0 ;
134+ buffer_pos_ = 0 ;
129135 buffer_end_ = static_cast <uint32_t >(file_.Read (buffer_, kDefaultBufferSize ));
130136
131137 // Update stats
@@ -134,7 +140,9 @@ bool CSVScanner::NextBuffer() {
134140 return (buffer_end_ != 0 );
135141}
136142
137- void CSVScanner::AppendToCurrentLine (const char *data, uint32_t len) {
143+ void CSVScanner::AppendToLineBuffer (const char *data, uint32_t len) {
144+ PELOTON_ASSERT (len > 0 );
145+
138146 // Short-circuit if we're not appending any data
139147 if (len == 0 ) {
140148 return ;
@@ -146,7 +154,7 @@ void CSVScanner::AppendToCurrentLine(const char *data, uint32_t len) {
146154 const auto msg = StringUtil::Format (
147155 " Line %u in file '%s' exceeds maximum line length: %lu" ,
148156 line_number_ + 1 , file_path_.c_str (), kMaxAllocSize );
149- throw Exception{ msg} ;
157+ throw Exception ( msg) ;
150158 }
151159
152160 // The current line buffer isn't large enough to store the new bytes, so we
@@ -186,41 +194,44 @@ void CSVScanner::AppendToCurrentLine(const char *data, uint32_t len) {
186194 stats_.num_copies ++;
187195}
188196
189- // The main purpose of this function is to find the start of the next line in
190- // the CSV file.
191- const char *CSVScanner::NextLine () {
197+ // The objective of this function is to find a complete line in the CSV file.
198+ // The returned value will be a valid pointer to a null-terminated string that
199+ // is the next line in the CSV to be processed.
200+ char *CSVScanner::NextLine () {
192201 line_len_ = 0 ;
193202
203+ const char quote = quote_;
204+ const char escape = (quote_ == escape_ ? static_cast <char >(' \0 ' ) : escape_);
205+
194206 bool in_quote = false ;
195207 bool last_was_escape = false ;
196- bool copied_to_line_buf = false ;
197208
198- uint32_t line_end = buffer_begin_;
199-
200- char quote = quote_;
201- char escape = (quote_ == escape_ ? static_cast <char >(' \0 ' ) : escape_);
209+ const char *buf = buffer_;
210+ uint32_t curr_buffer_pos = buffer_pos_;
202211
203212 while (true ) {
204- if (line_end > = buffer_end_) {
213+ if (curr_buffer_pos = = buffer_end_) {
205214 // We need to read more data from the CSV file. But first, we need to copy
206215 // all the data in the read-buffer (i.e., [buffer_begin_, buffer_end_] to
207216 // the line-buffer.
217+ if (buffer_pos_ < curr_buffer_pos) {
218+ AppendToLineBuffer (buffer_ + buffer_pos_,
219+ curr_buffer_pos - buffer_pos_);
220+ buffer_pos_ = curr_buffer_pos;
221+ }
208222
209- AppendToCurrentLine (buffer_ + buffer_begin_,
210- static_cast < uint32_t >(buffer_end_ - buffer_begin_)) ;
223+ // Reset positions
224+ curr_buffer_pos = 0 ;
211225
212226 // Now, read more data
213227 if (!NextBuffer ()) {
214- return nullptr ;
228+ // We hit en EOF
229+ break ;
215230 }
216-
217- // Reset positions
218- line_end = buffer_begin_;
219- copied_to_line_buf = true ;
220231 }
221232
222233 // Read character
223- char c = buffer_[line_end ];
234+ char c = buf[curr_buffer_pos++ ];
224235
225236 if (in_quote && c == escape) {
226237 last_was_escape = !last_was_escape;
@@ -235,47 +246,120 @@ const char *CSVScanner::NextLine() {
235246 // Process the new-line character. If we a new-line and we're not currently
236247 // in a quoted section, we're done.
237248 if (c == ' \n ' && !in_quote) {
238- buffer_[line_end] = ' \0 ' ;
239249 break ;
240250 }
251+ }
241252
242- // Move along
243- line_end++;
253+ // Flush remaining valid bytes
254+ if (buffer_pos_ < curr_buffer_pos) {
255+ AppendToLineBuffer (buffer_ + buffer_pos_, curr_buffer_pos - buffer_pos_);
256+ buffer_pos_ = curr_buffer_pos;
244257 }
245258
246259 // Increment line number
247260 line_number_++;
248261
249- if (copied_to_line_buf) {
250- AppendToCurrentLine (buffer_, line_end);
251- buffer_begin_ = line_end + 1 ;
252- return line_;
253- } else {
254- const char *ret = buffer_ + buffer_begin_;
255- buffer_begin_ = line_end + 1 ;
256- return ret;
262+ // If we didn't transfer any bytes to the line buffer, we must have reached an
263+ // EOF. If so, return null indicating there are no more lines.
264+ if (line_len_ == 0 ) {
265+ return nullptr ;
257266 }
267+
268+ // A full line has been transferred to the line buffer, but we also copied the
269+ // newline character. Strip it off now.
270+ line_len_--;
271+ line_[line_len_] = ' \0 ' ;
272+
273+ // Done
274+ return line_;
258275}
259276
260- void CSVScanner::ProduceCSV (const char *line) {
261- // At this point, we have a well-formed line. Let's pull out pointers to the
262- // columns.
277+ void CSVScanner::ProduceCSV (char *line) {
278+ const char delimiter = delimiter_;
279+ const char quote = quote_;
280+ const char escape = escape_;
263281
264- const auto *iter = line;
265- for (uint32_t col_idx = 0 ; col_idx < num_cols_; col_idx++) {
266- // Start points to the beginning of the column's data value
267- const char *start = iter;
282+ // The iterator over characters in the line
283+ char *iter = line;
268284
269- // Eat text until the next delimiter
270- while (*iter != 0 && *iter != delimiter_) {
271- iter++;
285+ for (uint32_t col_idx = 0 ; col_idx < num_cols_; col_idx++) {
286+ char *col_begin = iter;
287+ char *col_end = nullptr ;
288+
289+ // We need to move col_end to the end of the column's data. Along the way,
290+ // we may need to shift data down due to quotes and escapes. Inspired by
291+ // Postgres.
292+ {
293+ char *out = col_begin;
294+ while (true ) {
295+ // This first loop looks for either the delimiter character or the end
296+ // of the line, indicating the end of a columns data. It breaks out of
297+ // the loop if a quote character is found. It flows into a second loop
298+ // whose only purpose is to find the end of the quoted section.
299+ while (true ) {
300+ char c = *iter++;
301+
302+ // If we see the delimiter character, or the end of the string,
303+ // finish
304+ if (c == delimiter || c == ' \0 ' ) {
305+ col_end = out;
306+ iter--;
307+ goto colend;
308+ }
309+
310+ // If we see a quote character, move to the second loop to find the
311+ // closing quote.
312+ if (c == quote) {
313+ break ;
314+ }
315+
316+ *out++ = c;
317+ }
318+
319+ while (true ) {
320+ char c = *iter++;
321+
322+ // If we see the end of the line *within* a quoted section, throw
323+ // error
324+ if (c == ' \0 ' ) {
325+ throw Exception (StringUtil::Format (
326+ " unterminated CSV quoted field at %u" , col_idx));
327+ }
328+
329+ // If we see an escape character within a quoted section, we need to
330+ // check if the following character is a quote. If so, we must
331+ // escape it
332+ if (c == escape) {
333+ char next = *iter;
334+ if (next == quote || next == escape) {
335+ *out++ = next;
336+ iter++;
337+ continue ;
338+ }
339+ }
340+
341+ // If we see the closing quote, we're done.
342+ if (c == quote) {
343+ break ;
344+ }
345+
346+ *out++ = c;
347+ }
348+ }
272349 }
273350
274- // At this point, iter points to the end of the column's data value
351+ colend:
352+ // If we've reached the of the line, but haven't setup all the columns, then
353+ // we're missing data for the remaining columns and should throw an error.
354+ if (*iter == ' \0 ' && col_idx != (num_cols_ - 1 )) {
355+ throw Exception (
356+ StringUtil::Format (" missing data for column %u on line %u" ,
357+ (col_idx + 2 ), line_number_));
358+ }
275359
276360 // Let's setup the columns
277- cols_[col_idx].ptr = start ;
278- cols_[col_idx].len = static_cast <uint32_t >(iter - start );
361+ cols_[col_idx].ptr = col_begin ;
362+ cols_[col_idx].len = static_cast <uint32_t >(col_end - col_begin );
279363 cols_[col_idx].is_null = (cols_[col_idx].len == 0 );
280364
281365 // Eat delimiter, moving to next column
0 commit comments