Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_compression/zstd/Decompressor.hpp
src/clp/StringReader.cpp
src/clp/StringReader.hpp
src/clp/time_types.hpp
src/clp/TimestampPattern.cpp
src/clp/TimestampPattern.hpp
src/clp/TraceableException.hpp
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/Defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define CLP_DEFS_H

#include <atomic>
#include <chrono>
#include <cstdint>
#include <limits>

Expand Down
13 changes: 11 additions & 2 deletions components/core/src/clp/MessageParser.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "MessageParser.hpp"

#include <chrono>

#include "Defs.h"
#include "time_types.hpp"
#include "TimestampPattern.hpp"

constexpr char cLineDelimiter = '\n';
Expand All @@ -13,7 +16,7 @@ bool MessageParser::parse_next_message(
size_t& buf_pos,
ParsedMessage& message
) {
message.clear_except_ts_patt();
message.clear_except_time_pattern_and_offset();

while (true) {
// Check if the buffer was exhausted
Expand Down Expand Up @@ -50,7 +53,7 @@ bool MessageParser::parse_next_message(
ReaderInterface& reader,
ParsedMessage& message
) {
message.clear_except_ts_patt();
message.clear_except_time_pattern_and_offset();

while (true) {
// Read message
Expand Down Expand Up @@ -98,20 +101,23 @@ bool MessageParser::parse_line(ParsedMessage& message) {
// Parse timestamp and content
TimestampPattern const* timestamp_pattern = message.get_ts_patt();
epochtime_t timestamp = 0;
UtcOffset utc_offset{0};
size_t timestamp_begin_pos;
size_t timestamp_end_pos;
if (nullptr == timestamp_pattern
|| false
== timestamp_pattern->parse_timestamp(
m_line,
timestamp,
utc_offset,
timestamp_begin_pos,
timestamp_end_pos
))
{
timestamp_pattern = TimestampPattern::search_known_ts_patterns(
m_line,
timestamp,
utc_offset,
timestamp_begin_pos,
timestamp_end_pos
);
Expand All @@ -124,6 +130,7 @@ bool MessageParser::parse_line(ParsedMessage& message) {
m_buffered_msg.set(
timestamp_pattern,
timestamp,
utc_offset,
m_line,
timestamp_begin_pos,
timestamp_end_pos
Expand All @@ -136,6 +143,7 @@ bool MessageParser::parse_line(ParsedMessage& message) {
m_buffered_msg.set(
timestamp_pattern,
timestamp,
utc_offset,
m_line,
timestamp_begin_pos,
timestamp_end_pos
Expand All @@ -149,6 +157,7 @@ bool MessageParser::parse_line(ParsedMessage& message) {
message.set(
timestamp_pattern,
timestamp,
utc_offset,
m_line,
timestamp_begin_pos,
timestamp_end_pos
Expand Down
17 changes: 15 additions & 2 deletions components/core/src/clp/ParsedMessage.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#include "ParsedMessage.hpp"

#include "time_types.hpp"

using std::string;

namespace clp {
void ParsedMessage::clear() {
m_ts_patt = nullptr;
clear_except_ts_patt();
m_utc_offset = UtcOffset{0};
clear_except_time_pattern_and_offset();
}

void ParsedMessage::clear_except_ts_patt() {
void ParsedMessage::clear_except_time_pattern_and_offset() {
m_ts_patt_changed = false;
m_ts = 0;
m_utc_offset_changed = false;
m_content.clear();
m_orig_num_bytes = 0;
m_is_set = false;
Expand All @@ -19,6 +23,7 @@ void ParsedMessage::clear_except_ts_patt() {
void ParsedMessage::set(
TimestampPattern const* timestamp_pattern,
epochtime_t const timestamp,
UtcOffset utc_offset,
string const& line,
size_t timestamp_begin_pos,
size_t timestamp_end_pos
Expand All @@ -28,6 +33,10 @@ void ParsedMessage::set(
m_ts_patt_changed = true;
}
m_ts = timestamp;
if (utc_offset != m_utc_offset) {
m_utc_offset = utc_offset;
m_utc_offset_changed = true;
}
if (timestamp_begin_pos == timestamp_end_pos) {
m_content.assign(line);
} else {
Expand All @@ -49,6 +58,10 @@ void ParsedMessage::consume(ParsedMessage& message) {
m_ts_patt_changed = true;
}
m_ts = message.m_ts;
if (message.m_utc_offset != m_utc_offset) {
m_utc_offset = message.m_utc_offset;
m_utc_offset_changed = true;
}
m_content.swap(message.m_content);
m_orig_num_bytes = message.m_orig_num_bytes;
m_is_set = true;
Expand Down
15 changes: 14 additions & 1 deletion components/core/src/clp/ParsedMessage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <string>

#include "time_types.hpp"
#include "TimestampPattern.hpp"

namespace clp {
Expand All @@ -17,6 +18,8 @@ class ParsedMessage {
: m_ts_patt(nullptr),
m_ts_patt_changed(false),
m_ts(0),
m_utc_offset{},
m_utc_offset_changed(false),
m_content({}),
m_orig_num_bytes(0),
m_is_set(false) {}
Expand All @@ -30,11 +33,15 @@ class ParsedMessage {

// Methods
void clear();
void clear_except_ts_patt();
/**
* Clears the parsed message except for the timestamp pattern and UTC offset.
*/
void clear_except_time_pattern_and_offset();

void set(
TimestampPattern const* timestamp_pattern,
epochtime_t timestamp,
UtcOffset utc_offset,
std::string const& line,
size_t timestamp_begin_pos,
size_t timestamp_end_pos
Expand All @@ -58,13 +65,19 @@ class ParsedMessage {

bool has_ts_patt_changed() const { return m_ts_patt_changed; }

UtcOffset get_utc_offset() const { return m_utc_offset; }

bool has_utc_offset_changed() const { return m_utc_offset_changed; }

bool is_empty() const { return false == m_is_set; }

private:
// Variables
TimestampPattern const* m_ts_patt;
bool m_ts_patt_changed;
epochtime_t m_ts;
UtcOffset m_utc_offset;
bool m_utc_offset_changed;
std::string m_content;
size_t m_orig_num_bytes;
bool m_is_set;
Expand Down
65 changes: 65 additions & 0 deletions components/core/src/clp/SQLitePreparedStatement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,42 @@ void SQLitePreparedStatement::bind_int64(string const& parameter_name, int64_t v
bind_int64(parameter_index, value);
}

void SQLitePreparedStatement::bind_blob64(
int parameter_index,
void* value,
size_t value_size,
bool copy_parameter
) {
auto return_value = sqlite3_bind_blob64(
m_statement_handle,
parameter_index,
value,
value_size,
copy_parameter ? SQLITE_TRANSIENT : SQLITE_STATIC
);
if (SQLITE_OK != return_value) {
SPDLOG_ERROR(
"SQLitePreparedStatement: Failed to bind blob64 to statement - {}",
sqlite3_errmsg(m_db_handle)
);
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
}

void SQLitePreparedStatement::bind_blob64(
std::string const& parameter_name,
void* value,
size_t value_size,
bool copy_parameter
) {
auto parameter_index = sqlite3_bind_parameter_index(m_statement_handle, parameter_name.c_str());
if (0 == parameter_index) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

bind_blob64(parameter_index, value, value_size, copy_parameter);
}

void SQLitePreparedStatement::bind_text(
int parameter_index,
std::string const& value,
Expand Down Expand Up @@ -201,6 +237,35 @@ int64_t SQLitePreparedStatement::column_int64(string const& parameter_name) cons
return column_int64(parameter_index);
}

void SQLitePreparedStatement::column_blob(
int parameter_index,
void const*& value,
size_t& value_size
) const {
if (false == m_row_ready) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

value = sqlite3_column_blob(m_statement_handle, parameter_index);
value_size = sqlite3_column_bytes(m_statement_handle, parameter_index);
}

void SQLitePreparedStatement::column_blob(
std::string const& parameter_name,
void const*& value,
size_t& value_size
) const {
if (false == m_row_ready) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}
auto parameter_index = sqlite3_bind_parameter_index(m_statement_handle, parameter_name.c_str());
if (0 == parameter_index) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

column_blob(parameter_index, value, value_size);
}

void SQLitePreparedStatement::column_string(int parameter_index, std::string& value) const {
if (false == m_row_ready) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
Expand Down
10 changes: 10 additions & 0 deletions components/core/src/clp/SQLitePreparedStatement.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class SQLitePreparedStatement {
void bind_int(std::string const& parameter_name, int value);
void bind_int64(int parameter_index, int64_t value);
void bind_int64(std::string const& parameter_name, int64_t value);
void bind_blob64(int parameter_index, void* value, size_t value_size, bool copy_parameter);
void bind_blob64(
std::string const& parameter_name,
void* value,
size_t value_size,
bool copy_parameter
);
void bind_text(int parameter_index, std::string const& value, bool copy_parameter);
void
bind_text(std::string const& parameter_name, std::string const& value, bool copy_parameter);
Expand All @@ -51,6 +58,9 @@ class SQLitePreparedStatement {
int column_int(std::string const& parameter_name) const;
int64_t column_int64(int parameter_index) const;
int64_t column_int64(std::string const& parameter_name) const;
void column_blob(int parameter_index, void const*& value, size_t& value_size) const;
void
column_blob(std::string const& parameter_name, void const*& value, size_t& value_size) const;
void column_string(int parameter_index, std::string& value) const;
void column_string(std::string const& parameter_name, std::string& value) const;

Expand Down
Loading