Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ set(SOURCE_FILES_unitTest
tests/test-clp_s-end_to_end.cpp
tests/test-clp_s-range_index.cpp
tests/test-clp_s-search.cpp
tests/test-clp_s-StringUtils.cpp
tests/test-EncodedVariableInterpreter.cpp
tests/test-encoding_methods.cpp
tests/test-ffi_IrUnitHandlerReq.cpp
Expand Down
6 changes: 6 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"structurize-arrays",
po::bool_switch(&m_structurize_arrays),
"Structurize arrays instead of compressing them as clp strings."
)(
"sanitize-invalid-json",
po::bool_switch(&m_sanitize_invalid_json),
"Sanitize invalid JSON by escaping unescaped control characters (0x00-0x1F),"
" replacing invalid UTF-8 sequences with U+FFFD, and handling invalid"
" surrogate escapes. When disabled (default), parsing fails on invalid JSON."
)(
"disable-log-order",
po::bool_switch(&m_disable_log_order),
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class CommandLineArguments {

bool get_structurize_arrays() const { return m_structurize_arrays; }

bool get_sanitize_invalid_json() const { return m_sanitize_invalid_json; }

bool get_ordered_decompression() const { return m_ordered_decompression; }

size_t get_target_ordered_chunk_size() const { return m_target_ordered_chunk_size; }
Expand Down Expand Up @@ -202,6 +204,7 @@ class CommandLineArguments {
bool m_no_retain_float_format{false};
bool m_single_file_archive{false};
bool m_structurize_arrays{false};
bool m_sanitize_invalid_json{false};
bool m_ordered_decompression{false};
size_t m_target_ordered_chunk_size{};
bool m_print_ordered_chunk_stats{false};
Expand Down
212 changes: 206 additions & 6 deletions components/core/src/clp_s/JsonFileIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
#include <cctype>
#include <cstring>

#include <fmt/format.h>
#include <spdlog/spdlog.h>

#include "Utils.hpp"

namespace clp_s {
JsonFileIterator::JsonFileIterator(
clp::ReaderInterface& reader,
size_t max_document_size,
bool sanitize_invalid_json,
std::string path,
size_t buf_size
)
: m_buf_size(buf_size),
m_max_document_size(max_document_size),
m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]),
m_reader(reader) {
m_reader(reader),
m_path(std::move(path)),
m_sanitize_invalid_json(sanitize_invalid_json) {
read_new_json();
}

Expand Down Expand Up @@ -64,6 +71,93 @@ bool JsonFileIterator::read_new_json() {
)
.get(m_stream);

// If sanitization is enabled and we encounter errors that can be fixed by sanitization,
// sanitize the buffer and retry parsing
if (m_sanitize_invalid_json) {
// Handle invalid UTF-8 sequences by replacing with U+FFFD
if (simdjson::error_code::UTF8_ERROR == error) {
size_t const old_buf_occupied = m_buf_occupied;
auto const result = StringUtils::sanitize_utf8_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

if (!result.sanitized_char_counts.empty()) {
size_t total_replaced = 0;
for (auto const& [ch, count] : result.sanitized_char_counts) {
total_replaced += count;
}
SPDLOG_WARN(
"Replaced {} invalid UTF-8 sequence(s) with U+FFFD{}. Buffer size "
"changed by {} bytes ({} -> {}).",
total_replaced,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
static_cast<int64_t>(m_buf_occupied)
- static_cast<int64_t>(old_buf_occupied),
old_buf_occupied,
m_buf_occupied
);
}

error = m_parser.iterate_many(
m_buf,
/* length of data */ m_buf_occupied,
/* batch size of data to parse*/ m_buf_occupied
)
.get(m_stream);
}

// Handle unescaped control characters by escaping them to \u00XX format
if (simdjson::error_code::UNESCAPED_CHARS == error) {
size_t const old_buf_occupied = m_buf_occupied;
// Note: sanitize_json_buffer may reallocate m_buf and will update m_buf_size by
// reference if reallocation is needed. This keeps m_buf_size in sync with the
// actual allocated buffer size.
auto const result = StringUtils::sanitize_json_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

// Build log message with details of sanitized characters
size_t total_sanitized = 0;
std::string char_details;
for (auto const& [ch, count] : result.sanitized_char_counts) {
if (!char_details.empty()) {
char_details += ", ";
}
char_details
+= fmt::format("0x{:02x} ({})", static_cast<unsigned char>(ch), count);
total_sanitized += count;
}
size_t bytes_added = m_buf_occupied - old_buf_occupied;
SPDLOG_WARN(
"Escaped {} control character(s) in JSON{}: [{}]. Buffer expanded by {} "
"bytes ({} -> {}).",
total_sanitized,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
char_details,
bytes_added,
old_buf_occupied,
m_buf_occupied
);

error = m_parser.iterate_many(
m_buf,
/* length of data */ m_buf_occupied,
/* batch size of data to parse*/ m_buf_occupied
)
.get(m_stream);
}
}

if (error) {
m_error_code = error;
return false;
Expand Down Expand Up @@ -118,6 +212,64 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
return true;
} else if (m_doc_it.error() == simdjson::error_code::UTF8_ERROR) {
maybe_utf8_edge_case = true;
} else if (m_sanitize_invalid_json
&& m_doc_it.error() == simdjson::error_code::UNESCAPED_CHARS)
{
// Unescaped control characters detected during document iteration. Sanitize the
// buffer and re-setup the document stream to restart from the beginning.
size_t const old_buf_occupied = m_buf_occupied;
auto const result = StringUtils::sanitize_json_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

// Log sanitization details
if (!result.sanitized_char_counts.empty()) {
size_t total_sanitized = 0;
std::string char_details;
for (auto const& [ch, count] : result.sanitized_char_counts) {
if (!char_details.empty()) {
char_details += ", ";
}
char_details += fmt::format(
"0x{:02x} ({})",
static_cast<unsigned char>(ch),
count
);
total_sanitized += count;
}
SPDLOG_WARN(
"Escaped {} control character(s) in JSON{}: [{}]. Buffer expanded by "
"{} bytes ({} -> {}).",
total_sanitized,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
char_details,
m_buf_occupied - old_buf_occupied,
old_buf_occupied,
m_buf_occupied
);
} else {
// Sanitization made no changes - report the original error to avoid infinite
// loop
m_error_code = m_doc_it.error();
return false;
}

// Re-setup the document stream and restart iteration
auto error = m_parser.iterate_many(m_buf, m_buf_occupied, m_buf_occupied)
.get(m_stream);
if (error) {
m_error_code = error;
return false;
}
m_doc_it = m_stream.begin();
m_first_doc_in_buffer = true;
m_next_document_position = 0;
continue;
} else {
m_error_code = m_doc_it.error();
return false;
Expand All @@ -137,8 +289,54 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
// If we hit a UTF-8 error and either we have reached eof or the buffer occupancy is
// greater than the maximum document size we assume that the UTF-8 error must have been
// in the middle of the stream. Note: it is possible that the UTF-8 error is at the end
// of the stream and that this is actualy a truncation error. Unfortunately the only way
// to check is to parse it ourselves, so we rely on this heuristic for now.
// of the stream and that this is actually a truncation error. Unfortunately the only
// way to check is to parse it ourselves, so we rely on this heuristic for now.
if (m_sanitize_invalid_json) {
// Sanitize invalid UTF-8 sequences and retry
size_t const old_buf_occupied = m_buf_occupied;
auto const result = StringUtils::sanitize_utf8_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

if (!result.sanitized_char_counts.empty()) {
size_t total_replaced = 0;
for (auto const& [ch, count] : result.sanitized_char_counts) {
total_replaced += count;
}
SPDLOG_WARN(
"Replaced {} invalid UTF-8 sequence(s) with U+FFFD{}. Buffer size "
"changed by {} bytes ({} -> {}).",
total_replaced,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
static_cast<int64_t>(m_buf_occupied)
- static_cast<int64_t>(old_buf_occupied),
old_buf_occupied,
m_buf_occupied
);
} else {
// Sanitization made no changes - report the original error to avoid infinite
// loop
m_error_code = simdjson::error_code::UTF8_ERROR;
return false;
}

// Re-setup the document stream and restart iteration
auto error = m_parser.iterate_many(m_buf, m_buf_occupied, m_buf_occupied)
.get(m_stream);
if (error) {
m_error_code = error;
return false;
}
m_doc_it = m_stream.begin();
m_first_doc_in_buffer = true;
m_next_document_position = 0;
continue;
}
m_error_code = simdjson::error_code::UTF8_ERROR;
return false;
} else if (maybe_utf8_edge_case) {
Expand All @@ -151,10 +349,12 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
size_t JsonFileIterator::get_num_bytes_consumed() {
// If there are more documents left in the current buffer account for how much of the
// buffer has been consumed, otherwise report the total number of bytes read so that we
// capture trailing whitespace.
// capture trailing whitespace. Include bytes added by sanitization since the sanitized
// content is what gets compressed.
if (m_doc_it != m_stream.end()) {
return m_bytes_read - (m_buf_occupied - m_next_document_position);
return m_bytes_read + m_sanitization_bytes_added
- (m_buf_occupied - m_next_document_position);
}
return m_bytes_read;
return m_bytes_read + m_sanitization_bytes_added;
}
} // namespace clp_s
15 changes: 13 additions & 2 deletions components/core/src/clp_s/JsonFileIterator.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef CLP_S_JSONFILEITERATOR_HPP
#define CLP_S_JSONFILEITERATOR_HPP

#include <string>

#include <simdjson.h>

#include "../clp/ReaderInterface.hpp"
Expand All @@ -19,11 +21,15 @@ class JsonFileIterator {

* @param reader the input stream containing JSON
* @param max_document_size the maximum allowed size of a single document
* @param sanitize_invalid_json whether to sanitize invalid JSON (control chars, invalid UTF-8)
* @param path optional path to the file being read (used for logging)
* @param buf_size the initial buffer size
*/
explicit JsonFileIterator(
clp::ReaderInterface& reader,
size_t max_document_size,
bool sanitize_invalid_json,
std::string path = {},
size_t buf_size = 1024 * 1024 /* 1 MiB default */
);
~JsonFileIterator();
Expand All @@ -41,9 +47,11 @@ class JsonFileIterator {
[[nodiscard]] size_t truncated_bytes() const { return m_truncated_bytes; }

/**
* @return total number of bytes read from the file
* @return total number of bytes read from the file, plus any bytes added by sanitization
*/
[[nodiscard]] size_t get_num_bytes_read() const { return m_bytes_read; }
[[nodiscard]] size_t get_num_bytes_read() const {
return m_bytes_read + m_sanitization_bytes_added;
}

/**
* Note: this method can not be const because checking if a simdjson iterator is at the end
Expand Down Expand Up @@ -76,11 +84,14 @@ class JsonFileIterator {
size_t m_truncated_bytes{0};
size_t m_next_document_position{0};
size_t m_bytes_read{0};
size_t m_sanitization_bytes_added{0};
size_t m_buf_size{0};
size_t m_buf_occupied{0};
size_t m_max_document_size{0};
char* m_buf{nullptr};
clp::ReaderInterface& m_reader;
std::string m_path;
bool m_sanitize_invalid_json{false};
simdjson::ondemand::parser m_parser;
simdjson::ondemand::document_stream m_stream;
bool m_eof{false};
Expand Down
Loading
Loading