Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "ChunkedCSV"
uuid = "c0d0730e-6432-44b2-a51e-6ec55e1c8b99"
authors = ["Tomáš Drvoštěp <[email protected]>"]
version = "0.1.3"
version = "0.2.0"

[deps]
ChunkedBase = "a380dd43-0ebf-4429-88d6-6f06ea920732"
Expand Down
12 changes: 6 additions & 6 deletions src/consume_contexts.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ function ChunkedBase.consume!(consume_ctx::DebugContext, payload::ParsedPayload)
write(io, debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
n != 1 && print(io, ", ")
n -= 1
elseif task_buf.row_statuses[j] == RowStatus.HasColumnIndicators
write(io, k in task_buf.column_indicators[c] ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
elseif task_buf.row_statuses[j] == RowStatus.MissingValues
write(io, k in task_buf.missing_values[c] ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
n != 1 && print(io, ", ")
c += 1
n -= 1
Expand All @@ -97,7 +97,7 @@ function ChunkedBase.consume!(consume_ctx::DebugContext, payload::ParsedPayload)
consume_ctx.show_values && print(io, "\t$(name): [")
for j in 1:length(task_buf.row_statuses)
if (task_buf.row_statuses[j] & S) > 0
has_missing = task_buf.row_statuses[j] > RowStatus.Ok && task_buf.column_indicators[c, k]
has_missing = task_buf.row_statuses[j] & RowStatus.MissingValues > 0 && task_buf.missing_values[c, k]
consume_ctx.show_values && write(io, has_missing ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
consume_ctx.show_values && n != 1 && print(io, ", ")
has_missing && (c += 1)
Expand Down Expand Up @@ -154,9 +154,9 @@ function ChunkedBase.consume!(ctx::TestContext, payload::ParsedPayload)
str_col = String[]
push!(strings, str_col)
if T === Parsers.PosLen31
col_iter = ColumnIterator(cols[i]::BufferedVector{Parsers.PosLen31}, i, task_buf.row_statuses, task_buf.column_indicators)
for (value, isinvalidrow, ismissingvalue) in col_iter
if ismissingvalue
col_iter = ColumnIterator(cols[i]::BufferedVector{Parsers.PosLen31}, i, task_buf.row_statuses, task_buf.missing_values, task_buf.errored_values)
for (value, isinvalidrow, iserroredvalue, ismissingvalue) in col_iter
if ismissingvalue || iserroredvalue
push!(str_col, "")
else
push!(str_col, Parsers.getstring(chunking_ctx.bytes, value, parsing_ctx.escapechar))
Expand Down
36 changes: 19 additions & 17 deletions src/populate_result_buffer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ function skip_row!(result_buf::AbstractResultBuffer, row_bytes, comment::Vector{
end
function skip_row!(result_buf::AbstractResultBuffer)
foreach(skip_element!, result_buf.cols)
unsafe_push!(result_buf.row_statuses, RowStatus.HasColumnIndicators | RowStatus.SkippedRow)
addrows!(result_buf.column_indicators, 1, true)
unsafe_push!(result_buf.row_statuses, RowStatus.MissingValues | RowStatus.SkippedRow)
addrows!(result_buf.missing_values, 1, true)
return true
end

Expand Down Expand Up @@ -70,28 +70,31 @@ function ChunkedBase.populate_result_buffer!(
empty!(result_buf)
enum_schema = parsing_ctx.enum_schema
schema = parsing_ctx.schema
colinds_row_idx = 1
missing_idx = 1
errored_idx = 1
options = parsing_ctx.options

Base.ensureroom(result_buf, ceil(Int, length(newlines_segment) * 1.01))

ignorerepeated = options.ignorerepeated::Bool
ignoreemptyrows = options.ignoreemptylines::Bool
colinds = result_buf.column_indicators
missing_values = result_buf.missing_values
errored_values = result_buf.errored_values
cols = result_buf.cols

N = length(schema)
for row_idx in 2:length(newlines_segment)
# We only grow the column indicators when we need to, this flag tacks whether we
# already added one for this row
added_collind_row = false
added_missing_row = false
added_errored_row = false
@inbounds prev_newline = newlines_segment[row_idx - 1]
@inbounds curr_newline = newlines_segment[row_idx]
isemptyrow = ChunkedBase._isemptyrow(prev_newline, curr_newline, buf)
(ignoreemptyrows && isemptyrow) && skip_row!(result_buf) && (colinds_row_idx += 1; continue)
(ignoreemptyrows && isemptyrow) && skip_row!(result_buf) && (missing_idx += 1; continue)
# +1 -1 to exclude newline chars
@inbounds row_bytes = view(buf, prev_newline+Int32(1):curr_newline-Int32(1))
skip_row!(result_buf, row_bytes, comment) && (colinds_row_idx += 1; continue)
skip_row!(result_buf, row_bytes, comment) && (missing_idx += 1; continue)

len = length(row_bytes)
pos = 1
Expand Down Expand Up @@ -120,11 +123,10 @@ function ChunkedBase.populate_result_buffer!(

if Parsers.eof(code) && !(col_idx == N && Parsers.delimited(code))
row_status |= RowStatus.TooFewColumns
row_status |= RowStatus.HasColumnIndicators
added_collind_row || (added_collind_row = true; addrows!(colinds))
added_errored_row || (added_errored_row = true; addrows!(errored_values))
for _col_idx in col_idx:N
skip_element!(cols[_col_idx])
colinds[colinds_row_idx, _col_idx] = true
errored_values[errored_idx, _col_idx] = true
end
break
end
Expand Down Expand Up @@ -164,22 +166,22 @@ function ChunkedBase.populate_result_buffer!(
(val, tlen, code) = parsecustom!(CT, row_bytes, pos, len, col_idx, cols, options, schema[col_idx])
end
if Parsers.sentinel(code)
row_status |= RowStatus.HasColumnIndicators
added_collind_row || (added_collind_row = true; addrows!(colinds))
@inbounds colinds[colinds_row_idx, col_idx] = true
row_status |= RowStatus.MissingValues
added_missing_row || (added_missing_row = true; addrows!(missing_values))
@inbounds missing_values[missing_idx, col_idx] = true
elseif !Parsers.ok(code)
row_status |= RowStatus.ValueParsingError
row_status |= RowStatus.HasColumnIndicators
added_collind_row || (added_collind_row = true; addrows!(colinds))
@inbounds colinds[colinds_row_idx, col_idx] = true
added_errored_row || (added_errored_row = true; addrows!(errored_values))
@inbounds errored_values[errored_idx, col_idx] = true
end
pos += tlen
end # for col_idx
if !Parsers.eof(code)
row_status |= RowStatus.TooManyColumns
end
unsafe_push!(result_buf.row_statuses, row_status)
colinds_row_idx += (row_status & RowStatus.HasColumnIndicators) > 0
missing_idx += added_missing_row
errored_idx += added_errored_row
end # for row_idx
return nothing
end
Loading
Loading