Skip to content

Commit 2f23fbb

Browse files
authored
Distinguish between missing and errored values in TaskResultBuffer (#19)
* Distinguish between missing and errored values in TaskResultBuffer * v0.2.0
1 parent fa00066 commit 2f23fbb

File tree

6 files changed

+349
-291
lines changed

6 files changed

+349
-291
lines changed

Project.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ChunkedCSV"
22
uuid = "c0d0730e-6432-44b2-a51e-6ec55e1c8b99"
33
authors = ["Tomáš Drvoštěp <[email protected]>"]
4-
version = "0.1.3"
4+
version = "0.2.0"
55

66
[deps]
77
ChunkedBase = "a380dd43-0ebf-4429-88d6-6f06ea920732"

src/consume_contexts.jl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ function ChunkedBase.consume!(consume_ctx::DebugContext, payload::ParsedPayload)
7171
write(io, debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
7272
n != 1 && print(io, ", ")
7373
n -= 1
74-
elseif task_buf.row_statuses[j] == RowStatus.HasColumnIndicators
75-
write(io, k in task_buf.column_indicators[c] ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
74+
elseif task_buf.row_statuses[j] == RowStatus.MissingValues
75+
write(io, k in task_buf.missing_values[c] ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
7676
n != 1 && print(io, ", ")
7777
c += 1
7878
n -= 1
@@ -97,7 +97,7 @@ function ChunkedBase.consume!(consume_ctx::DebugContext, payload::ParsedPayload)
9797
consume_ctx.show_values && print(io, "\t$(name): [")
9898
for j in 1:length(task_buf.row_statuses)
9999
if (task_buf.row_statuses[j] & S) > 0
100-
has_missing = task_buf.row_statuses[j] > RowStatus.Ok && task_buf.column_indicators[c, k]
100+
has_missing = task_buf.row_statuses[j] & RowStatus.MissingValues > 0 && task_buf.missing_values[c, k]
101101
consume_ctx.show_values && write(io, has_missing ? "?" : debug(col, j, parsing_ctx, consume_ctx, chunking_ctx))
102102
consume_ctx.show_values && n != 1 && print(io, ", ")
103103
has_missing && (c += 1)
@@ -154,9 +154,9 @@ function ChunkedBase.consume!(ctx::TestContext, payload::ParsedPayload)
154154
str_col = String[]
155155
push!(strings, str_col)
156156
if T === Parsers.PosLen31
157-
col_iter = ColumnIterator(cols[i]::BufferedVector{Parsers.PosLen31}, i, task_buf.row_statuses, task_buf.column_indicators)
158-
for (value, isinvalidrow, ismissingvalue) in col_iter
159-
if ismissingvalue
157+
col_iter = ColumnIterator(cols[i]::BufferedVector{Parsers.PosLen31}, i, task_buf.row_statuses, task_buf.missing_values, task_buf.errored_values)
158+
for (value, isinvalidrow, iserroredvalue, ismissingvalue) in col_iter
159+
if ismissingvalue || iserroredvalue
160160
push!(str_col, "")
161161
else
162162
push!(str_col, Parsers.getstring(chunking_ctx.bytes, value, parsing_ctx.escapechar))

src/populate_result_buffer.jl

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ function skip_row!(result_buf::AbstractResultBuffer, row_bytes, comment::Vector{
2121
end
2222
function skip_row!(result_buf::AbstractResultBuffer)
2323
foreach(skip_element!, result_buf.cols)
24-
unsafe_push!(result_buf.row_statuses, RowStatus.HasColumnIndicators | RowStatus.SkippedRow)
25-
addrows!(result_buf.column_indicators, 1, true)
24+
unsafe_push!(result_buf.row_statuses, RowStatus.MissingValues | RowStatus.SkippedRow)
25+
addrows!(result_buf.missing_values, 1, true)
2626
return true
2727
end
2828

@@ -70,28 +70,31 @@ function ChunkedBase.populate_result_buffer!(
7070
empty!(result_buf)
7171
enum_schema = parsing_ctx.enum_schema
7272
schema = parsing_ctx.schema
73-
colinds_row_idx = 1
73+
missing_idx = 1
74+
errored_idx = 1
7475
options = parsing_ctx.options
7576

7677
Base.ensureroom(result_buf, ceil(Int, length(newlines_segment) * 1.01))
7778

7879
ignorerepeated = options.ignorerepeated::Bool
7980
ignoreemptyrows = options.ignoreemptylines::Bool
80-
colinds = result_buf.column_indicators
81+
missing_values = result_buf.missing_values
82+
errored_values = result_buf.errored_values
8183
cols = result_buf.cols
8284

8385
N = length(schema)
8486
for row_idx in 2:length(newlines_segment)
8587
# We only grow the column indicators when we need to, this flag tacks whether we
8688
# already added one for this row
87-
added_collind_row = false
89+
added_missing_row = false
90+
added_errored_row = false
8891
@inbounds prev_newline = newlines_segment[row_idx - 1]
8992
@inbounds curr_newline = newlines_segment[row_idx]
9093
isemptyrow = ChunkedBase._isemptyrow(prev_newline, curr_newline, buf)
91-
(ignoreemptyrows && isemptyrow) && skip_row!(result_buf) && (colinds_row_idx += 1; continue)
94+
(ignoreemptyrows && isemptyrow) && skip_row!(result_buf) && (missing_idx += 1; continue)
9295
# +1 -1 to exclude newline chars
9396
@inbounds row_bytes = view(buf, prev_newline+Int32(1):curr_newline-Int32(1))
94-
skip_row!(result_buf, row_bytes, comment) && (colinds_row_idx += 1; continue)
97+
skip_row!(result_buf, row_bytes, comment) && (missing_idx += 1; continue)
9598

9699
len = length(row_bytes)
97100
pos = 1
@@ -120,11 +123,10 @@ function ChunkedBase.populate_result_buffer!(
120123

121124
if Parsers.eof(code) && !(col_idx == N && Parsers.delimited(code))
122125
row_status |= RowStatus.TooFewColumns
123-
row_status |= RowStatus.HasColumnIndicators
124-
added_collind_row || (added_collind_row = true; addrows!(colinds))
126+
added_errored_row || (added_errored_row = true; addrows!(errored_values))
125127
for _col_idx in col_idx:N
126128
skip_element!(cols[_col_idx])
127-
colinds[colinds_row_idx, _col_idx] = true
129+
errored_values[errored_idx, _col_idx] = true
128130
end
129131
break
130132
end
@@ -164,22 +166,22 @@ function ChunkedBase.populate_result_buffer!(
164166
(val, tlen, code) = parsecustom!(CT, row_bytes, pos, len, col_idx, cols, options, schema[col_idx])
165167
end
166168
if Parsers.sentinel(code)
167-
row_status |= RowStatus.HasColumnIndicators
168-
added_collind_row || (added_collind_row = true; addrows!(colinds))
169-
@inbounds colinds[colinds_row_idx, col_idx] = true
169+
row_status |= RowStatus.MissingValues
170+
added_missing_row || (added_missing_row = true; addrows!(missing_values))
171+
@inbounds missing_values[missing_idx, col_idx] = true
170172
elseif !Parsers.ok(code)
171173
row_status |= RowStatus.ValueParsingError
172-
row_status |= RowStatus.HasColumnIndicators
173-
added_collind_row || (added_collind_row = true; addrows!(colinds))
174-
@inbounds colinds[colinds_row_idx, col_idx] = true
174+
added_errored_row || (added_errored_row = true; addrows!(errored_values))
175+
@inbounds errored_values[errored_idx, col_idx] = true
175176
end
176177
pos += tlen
177178
end # for col_idx
178179
if !Parsers.eof(code)
179180
row_status |= RowStatus.TooManyColumns
180181
end
181182
unsafe_push!(result_buf.row_statuses, row_status)
182-
colinds_row_idx += (row_status & RowStatus.HasColumnIndicators) > 0
183+
missing_idx += added_missing_row
184+
errored_idx += added_errored_row
183185
end # for row_idx
184186
return nothing
185187
end

0 commit comments

Comments
 (0)