Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4931f6b
feat(log-surgeon)!: Add support for a single capture group in a schem…
davidlion Aug 28, 2025
a7a0be1
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Aug 28, 2025
61d5715
Add schema equals rule capture.
davidlion Aug 28, 2025
daf18ad
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Sep 17, 2025
20b26f9
Address some review comments.
davidlion Sep 18, 2025
b8089f1
Tweak tests.
davidlion Sep 18, 2025
98d1088
Fix wrapped token logic.
davidlion Sep 19, 2025
dc591cd
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Sep 19, 2025
dec9f6e
Fix formatting.
davidlion Sep 19, 2025
03a1f02
Remove wrapping logic by manipulating the token.
davidlion Sep 19, 2025
dee547a
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Nov 19, 2025
65541fb
Update log-surgeon.
davidlion Nov 19, 2025
56bb7a6
Update GrepCore.
davidlion Nov 19, 2025
610e76f
Update log surgeon code + tests with new version.
davidlion Nov 20, 2025
d52a8c3
Merge branch 'main' into capture-support
davidlion Nov 21, 2025
ec2c8f2
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Nov 24, 2025
578601a
Refactor writing a token to dictionaries into a private helper.
davidlion Nov 24, 2025
08d5c0c
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Nov 26, 2025
834fdb7
Add unit test; refactor test-ParserWithUserSchema.cpp.
davidlion Nov 27, 2025
ea1ad3b
Add null + empty check for token type ids.
davidlion Nov 27, 2025
8ce06ba
Merge remote-tracking branch 'upstream/main' into capture-support
davidlion Nov 27, 2025
3b36328
Fix coderabbit nits.
davidlion Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/core/config/schemas.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ float:\-{0,1}[0-9]+\.[0-9]+
// Dictionary variables
hex:[a-fA-F]+
hasNumber:.*\d.*
equals:.*=.*[a-zA-Z0-9].*
equals:.*=(?<var>.*[a-zA-Z0-9].*)
4 changes: 2 additions & 2 deletions components/core/src/clp/GrepCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ bool GrepCore::get_bounds_of_next_potential_var(
return false;
}
search_token = SearchToken{token.value()};
search_token.m_type_ids_set.insert(search_token.m_type_ids_ptr->at(0));
search_token.m_type_ids_set.insert(search_token.get_type_ids()->at(0));
}
auto const& type = search_token.m_type_ids_ptr->at(0);
auto const& type = search_token.get_type_ids()->at(0);
if (type != static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString)
&& type != static_cast<int>(log_surgeon::SymbolId::TokenEnd))
{
Expand Down
13 changes: 13 additions & 0 deletions components/core/src/clp/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <iostream>
#include <memory>
#include <set>
#include <string>

#include <boost/algorithm/string.hpp>
#include <boost/lexical_cast.hpp>
Expand Down Expand Up @@ -187,6 +188,18 @@ load_lexer_from_file(std::string const& schema_file_path, log_surgeon::lexers::B
for (std::unique_ptr<log_surgeon::ParserAST> const& parser_ast : schema_ast->m_schema_vars) {
auto* rule = dynamic_cast<log_surgeon::SchemaVarAST*>(parser_ast.get());

// Currently, we only support at most a single capture group in each variable. If a capture
// group is present its match will be treated as the variable rather than the full match.
auto const num_captures = rule->m_regex_ptr->get_subtree_positive_captures().size();
if (1 < num_captures) {
throw std::runtime_error(
schema_file_path + ":" + std::to_string(rule->m_line_num + 1)
+ ": error: the schema rule '" + rule->m_name
+ "' has a regex pattern containing > 1 capture groups (found "
+ std::to_string(num_captures) + ").\n"
);
}

if ("timestamp" == rule->m_name) {
continue;
}
Expand Down
234 changes: 146 additions & 88 deletions components/core/src/clp/streaming_archive/writer/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@

#include <sys/stat.h>

#include <cstdint>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <stdexcept>
#include <string>
#include <vector>

#include <boost/asio.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <log_surgeon/Constants.hpp>
#include <log_surgeon/LogEvent.hpp>
#include <log_surgeon/LogParser.hpp>
#include <log_surgeon/Token.hpp>
#include <nlohmann/json.hpp>

#include "../../EncodedVariableInterpreter.hpp"
#include "../../ir/types.hpp"
#include "../../spdlog_with_specializations.hpp"
#include "../../Utils.hpp"
#include "../Constants.hpp"
#include "TimestampPattern.hpp"
#include "utils.hpp"

using clp::ir::eight_byte_encoded_variable_t;
using clp::ir::four_byte_encoded_variable_t;
using log_surgeon::LogEventView;
using std::list;
using std::make_unique;
using std::string;
using std::unordered_set;
using std::vector;

namespace clp::streaming_archive::writer {
Expand Down Expand Up @@ -315,30 +316,139 @@ Archive::write_msg(epochtime_t timestamp, string const& message, size_t num_unco
update_segment_indices(logtype_id, var_ids);
}

void Archive::write_msg_using_schema(LogEventView const& log_view) {
epochtime_t timestamp = 0;
TimestampPattern* timestamp_pattern = nullptr;
auto const& log_output_buffer = log_view.get_log_output_buffer();
if (log_output_buffer->has_timestamp()) {
size_t start;
size_t end;
timestamp_pattern = (TimestampPattern*)TimestampPattern::search_known_ts_patterns(
log_output_buffer->get_mutable_token(0).to_string(),
auto Archive::add_token_to_dicts(
log_surgeon::LogEventView const& log_view,
log_surgeon::Token token_view
) -> void {
auto const* type_ids{token_view.get_type_ids()};
if (nullptr == type_ids || type_ids->empty()) {
throw std::runtime_error("Token has no type IDs: " + token_view.to_string());
}
switch (type_ids->at(0)) {
case static_cast<int>(log_surgeon::SymbolId::TokenNewline):
case static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString): {
m_logtype_dict_entry.add_constant(token_view.to_string(), 0, token_view.get_length());
break;
}
case static_cast<int>(log_surgeon::SymbolId::TokenInt): {
encoded_variable_t encoded_var{};
if (!EncodedVariableInterpreter::convert_string_to_representable_integer_var(
token_view.to_string(),
encoded_var
))
{
variable_dictionary_id_t id{};
m_var_dict.add_entry(token_view.to_string(), id);
encoded_var = EncodedVariableInterpreter::encode_var_dict_id(id);
m_logtype_dict_entry.add_dictionary_var();
} else {
m_logtype_dict_entry.add_int_var();
}
m_encoded_vars.push_back(encoded_var);
break;
}
case static_cast<int>(log_surgeon::SymbolId::TokenFloat): {
encoded_variable_t encoded_var{};
if (!EncodedVariableInterpreter::convert_string_to_representable_float_var(
token_view.to_string(),
encoded_var
))
{
variable_dictionary_id_t id{};
m_var_dict.add_entry(token_view.to_string(), id);
encoded_var = EncodedVariableInterpreter::encode_var_dict_id(id);
m_logtype_dict_entry.add_dictionary_var();
} else {
m_logtype_dict_entry.add_float_var();
}
m_encoded_vars.push_back(encoded_var);
break;
}
default: {
// If there are no capture groups the entire variable token is stored as a variable.
// If the variable token contains capture groups, we break the token up by storing
// each capture as a variable and any substrings surrounding the capture as part of
// the logtype. Capture repetition currently does not work so we explicitly only
// store the first capture.

auto const token_type{token_view.get_type_ids()->at(0)};
auto const& lexer{log_view.get_log_parser().m_lexer};
auto capture_ids{lexer.get_capture_ids_from_rule_id(token_type)};
if (false == capture_ids.has_value()) {
variable_dictionary_id_t id{};
m_var_dict.add_entry(token_view.to_string(), id);
m_var_ids.push_back(id);
m_encoded_vars.push_back(EncodedVariableInterpreter::encode_var_dict_id(id));
m_logtype_dict_entry.add_dictionary_var();
break;
}

auto const register_ids{lexer.get_reg_ids_from_capture_id(capture_ids.value().at(0))};
if (false == register_ids.has_value()) {
throw(std::runtime_error(
"No register IDs found for variable's capture group. Full token: "
+ token_view.to_string()
));
}

auto const [start_reg_id, end_reg_id]{register_ids.value()};
auto const start_positions{token_view.get_reversed_reg_positions(start_reg_id)};
auto const end_positions{token_view.get_reversed_reg_positions(end_reg_id)};

if (false == start_positions.empty() && -1 < start_positions[0]
&& false == end_positions.empty() && -1 < end_positions[0])
{
auto token_end{token_view.get_end_pos()};

token_view.set_end_pos(start_positions[0]);
auto const before_capture{token_view.to_string_view()};
m_logtype_dict_entry.add_constant(before_capture, 0, before_capture.size());

token_view.set_start_pos(start_positions[0]);
token_view.set_end_pos(end_positions[0]);

variable_dictionary_id_t id{};
m_var_dict.add_entry(token_view.to_string_view(), id);
m_var_ids.push_back(id);
m_encoded_vars.push_back(EncodedVariableInterpreter::encode_var_dict_id(id));
m_logtype_dict_entry.add_dictionary_var();

token_view.set_start_pos(end_positions[0]);
token_view.set_end_pos(token_end);
auto const after_capture{token_view.to_string_view()};
m_logtype_dict_entry.add_constant(after_capture, 0, after_capture.size());
}
break;
}
}
}

void Archive::write_msg_using_schema(log_surgeon::LogEventView const& log_view) {
epochtime_t timestamp{0};
TimestampPattern const* timestamp_pattern{nullptr};
auto const& log_buf = log_view.get_log_output_buffer();
if (log_buf->has_timestamp()) {
size_t start{};
size_t end{};
timestamp_pattern = TimestampPattern::search_known_ts_patterns(
log_buf->get_mutable_token(0).to_string(),
timestamp,
start,
end
);
if (nullptr == timestamp_pattern) {
throw(std::runtime_error(
"Schema contains a timestamp regex that matches "
+ log_output_buffer->get_mutable_token(0).to_string()
+ log_buf->get_mutable_token(0).to_string()
+ " which does not match any known timestamp pattern."
));
}
if (m_old_ts_pattern != timestamp_pattern) {
change_ts_pattern(timestamp_pattern);
m_old_ts_pattern = timestamp_pattern;
m_old_ts_pattern = const_cast<TimestampPattern*>(timestamp_pattern);
}
} else {
timestamp_pattern = nullptr;
}
if (get_data_size_of_dictionaries() >= m_target_data_size_of_dicts) {
split_file_and_archive(
Expand All @@ -354,89 +464,37 @@ void Archive::write_msg_using_schema(LogEventView const& log_view) {
m_encoded_vars.clear();
m_var_ids.clear();
m_logtype_dict_entry.clear();
size_t num_uncompressed_bytes = 0;

size_t num_uncompressed_bytes{0};
// Timestamp is included in the uncompressed message size
uint32_t start_pos = log_output_buffer->get_token(0).m_start_pos;
auto start_pos{log_buf->get_token(0).get_start_pos()};
if (timestamp_pattern == nullptr) {
start_pos = log_output_buffer->get_token(1).m_start_pos;
start_pos = log_buf->get_token(1).get_start_pos();
}
uint32_t end_pos = log_output_buffer->get_token(log_output_buffer->pos() - 1).m_end_pos;
auto const end_pos{log_buf->get_token(log_buf->pos() - 1).get_end_pos()};
if (start_pos <= end_pos) {
num_uncompressed_bytes = end_pos - start_pos;
} else {
num_uncompressed_bytes
= log_output_buffer->get_token(0).m_buffer_size - start_pos + end_pos;
num_uncompressed_bytes = log_buf->get_token(0).get_buffer_size() - start_pos + end_pos;
}
for (uint32_t i = 1; i < log_output_buffer->pos(); i++) {
log_surgeon::Token& token = log_output_buffer->get_mutable_token(i);
int token_type = token.m_type_ids_ptr->at(0);
if (log_output_buffer->has_delimiters() && (timestamp_pattern != nullptr || i > 1)
for (auto token_idx{1}; token_idx < log_buf->pos(); token_idx++) {
auto token_view{log_buf->get_token(token_idx)};
auto const* type_ids{token_view.get_type_ids()};
if (nullptr == type_ids || type_ids->empty()) {
throw std::runtime_error("Token has no type IDs: " + token_view.to_string());
}
auto const token_type{type_ids->at(0)};
if (log_buf->has_delimiters() && (timestamp_pattern != nullptr || token_idx > 1)
&& token_type != static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString)
&& token_type != static_cast<int>(log_surgeon::SymbolId::TokenNewline))
{
m_logtype_dict_entry.add_constant(token.get_delimiter(), 0, 1);
if (token.m_start_pos == token.m_buffer_size - 1) {
token.m_start_pos = 0;
} else {
token.m_start_pos++;
}
}
switch (token_type) {
case static_cast<int>(log_surgeon::SymbolId::TokenNewline):
case static_cast<int>(log_surgeon::SymbolId::TokenUncaughtString): {
m_logtype_dict_entry.add_constant(token.to_string(), 0, token.get_length());
break;
}
case static_cast<int>(log_surgeon::SymbolId::TokenInt): {
encoded_variable_t encoded_var;
if (!EncodedVariableInterpreter::convert_string_to_representable_integer_var(
token.to_string(),
encoded_var
))
{
variable_dictionary_id_t id;
m_var_dict.add_entry(token.to_string(), id);
encoded_var = EncodedVariableInterpreter::encode_var_dict_id(id);
m_logtype_dict_entry.add_dictionary_var();
} else {
m_logtype_dict_entry.add_int_var();
}
m_encoded_vars.push_back(encoded_var);
break;
}
case static_cast<int>(log_surgeon::SymbolId::TokenFloat): {
encoded_variable_t encoded_var;
if (!EncodedVariableInterpreter::convert_string_to_representable_float_var(
token.to_string(),
encoded_var
))
{
variable_dictionary_id_t id;
m_var_dict.add_entry(token.to_string(), id);
encoded_var = EncodedVariableInterpreter::encode_var_dict_id(id);
m_logtype_dict_entry.add_dictionary_var();
} else {
m_logtype_dict_entry.add_float_var();
}
m_encoded_vars.push_back(encoded_var);
break;
}
default: {
// Variable string looks like a dictionary variable, so encode it as so
encoded_variable_t encoded_var;
variable_dictionary_id_t id;
m_var_dict.add_entry(token.to_string(), id);
encoded_var = EncodedVariableInterpreter::encode_var_dict_id(id);
m_var_ids.push_back(id);

m_logtype_dict_entry.add_dictionary_var();
m_encoded_vars.push_back(encoded_var);
break;
}
m_logtype_dict_entry.add_constant(token_view.get_delimiter(), 0, 1);
token_view.increment_start_pos();
}
add_token_to_dicts(log_view, token_view);
}
if (!m_logtype_dict_entry.get_value().empty()) {
logtype_dictionary_id_t logtype_id;
if (false == m_logtype_dict_entry.get_value().empty()) {
logtype_dictionary_id_t logtype_id{};
m_logtype_dict.add_entry(m_logtype_dict_entry, logtype_id);
m_file->write_encoded_msg(
timestamp,
Expand Down
12 changes: 10 additions & 2 deletions components/core/src/clp/streaming_archive/writer/Archive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <log_surgeon/LogEvent.hpp>
#include <log_surgeon/ReaderParser.hpp>

#include "../../ArrayBackedPosIntSet.hpp"
#include "../../ErrorCode.hpp"
Expand Down Expand Up @@ -150,7 +149,7 @@ class Archive {
* @param log_event_view
* @throw FileWriter::OperationFailed if any write fails
*/
void write_msg_using_schema(log_surgeon::LogEventView const& log_event_view);
auto write_msg_using_schema(log_surgeon::LogEventView const& log_view) -> void;

/**
* Writes an IR log event to the current encoded file
Expand Down Expand Up @@ -290,6 +289,15 @@ class Archive {
*/
auto update_global_metadata() -> void;

/**
* Inspect a log surgeon token and add its information to the logtype and variable dictionaries.
* @param log_view The log event containing the token.
* @param token_view The token to add to the dictionaries.
*/
auto
add_token_to_dicts(log_surgeon::LogEventView const& log_view, log_surgeon::Token token_view)
-> void;

// Variables
boost::uuids::uuid m_id;
std::string m_id_as_string;
Expand Down
Loading
Loading