Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,40 +307,38 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
auto const& node = m_schema_tree.get_node(id);
switch (node.get_type()) {
case NodeType::Integer:
writer->append_column(std::make_unique<Int64ColumnWriter>(id));
writer->append_column(std::make_unique<Int64ColumnWriter>());
break;
case NodeType::Float:
writer->append_column(std::make_unique<FloatColumnWriter>(id));
writer->append_column(std::make_unique<FloatColumnWriter>());
break;
case NodeType::FormattedFloat:
writer->append_column(std::make_unique<FormattedFloatColumnWriter>(id));
writer->append_column(std::make_unique<FormattedFloatColumnWriter>());
break;
case NodeType::DictionaryFloat:
writer->append_column(
std::make_unique<DictionaryFloatColumnWriter>(id, m_var_dict)
);
writer->append_column(std::make_unique<DictionaryFloatColumnWriter>(m_var_dict));
break;
case NodeType::ClpString:
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_log_dict)
std::make_unique<ClpStringColumnWriter>(m_var_dict, m_log_dict)
);
break;
case NodeType::VarString:
writer->append_column(std::make_unique<VariableStringColumnWriter>(id, m_var_dict));
writer->append_column(std::make_unique<VariableStringColumnWriter>(m_var_dict));
break;
case NodeType::Boolean:
writer->append_column(std::make_unique<BooleanColumnWriter>(id));
writer->append_column(std::make_unique<BooleanColumnWriter>());
break;
case NodeType::UnstructuredArray:
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_array_dict)
std::make_unique<ClpStringColumnWriter>(m_var_dict, m_array_dict)
);
break;
case NodeType::DeltaInteger:
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>(id));
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>());
break;
case NodeType::Timestamp:
writer->append_column(std::make_unique<TimestampColumnWriter>(id));
writer->append_column(std::make_unique<TimestampColumnWriter>());
break;
case NodeType::DeprecatedDateString:
case NodeType::Metadata:
Expand Down
158 changes: 64 additions & 94 deletions components/core/src/clp_s/ColumnWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,48 @@
#ifndef CLP_S_COLUMNWRITER_HPP
#define CLP_S_COLUMNWRITER_HPP

#include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include <clp/Defs.h>
#include <clp_s/DictionaryEntry.hpp>

#include "DictionaryWriter.hpp"
#include "FileWriter.hpp"
#include "FloatFormatEncoding.hpp"
#include "ParsedMessage.hpp"
#include "TimestampDictionaryWriter.hpp"
#include "ZstdCompressor.hpp"
#include <clp_s/DictionaryWriter.hpp>
#include <clp_s/FloatFormatEncoding.hpp>
#include <clp_s/ParsedMessage.hpp>
#include <clp_s/ZstdCompressor.hpp>

namespace clp_s {
class BaseColumnWriter {
public:
// Constructor
explicit BaseColumnWriter(int32_t id) : m_id(id) {}
// Constructors
BaseColumnWriter() = default;

BaseColumnWriter(BaseColumnWriter const&) = default;
BaseColumnWriter(BaseColumnWriter&&) noexcept = default;

// Operators
auto operator=(BaseColumnWriter const&) -> BaseColumnWriter& = default;
auto operator=(BaseColumnWriter&&) noexcept -> BaseColumnWriter& = default;

// Destructor
virtual ~BaseColumnWriter() = default;

// Methods
/**
* Adds a value to the column
* @param value
* @return the size of the encoded data appended to this column in bytes
*/
virtual size_t add_value(ParsedMessage::variable_t& value) = 0;
virtual auto add_value(ParsedMessage::variable_t& value) -> size_t = 0;

/**
* Stores the column to a compressed file.
* @param compressor
*/
virtual void store(ZstdCompressor& compressor) = 0;
virtual auto store(ZstdCompressor& compressor) -> void = 0;

/**
* Returns the total size of the header data that will be written to the compressor. This header
Expand All @@ -43,119 +51,88 @@ class BaseColumnWriter {
*
* @return the total size of header data that will be written to the compressor in bytes
*/
virtual size_t get_total_header_size() const { return 0; }

protected:
int32_t m_id;
[[nodiscard]] virtual auto get_total_header_size() const -> size_t { return 0; }
};

class Int64ColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit Int64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~Int64ColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::vector<int64_t> m_values;
};

class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit DeltaEncodedInt64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

// Destructor
~DeltaEncodedInt64ColumnWriter() override = default;
auto store(ZstdCompressor& compressor) -> void override;

// Methods
[[nodiscard]] auto add_value(int64_t value) -> size_t;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;

void store(ZstdCompressor& compressor) override;

private:
// Data members
std::vector<int64_t> m_values;
int64_t m_cur{};
};

class FloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit FloatColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~FloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::vector<double> m_values;
};

class FormattedFloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit FormattedFloatColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~FormattedFloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::vector<double> m_values;
std::vector<float_format_t> m_formats;
};

class DictionaryFloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
DictionaryFloatColumnWriter(int32_t id, std::shared_ptr<VariableDictionaryWriter> var_dict)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)) {}
// Constructors
explicit DictionaryFloatColumnWriter(std::shared_ptr<VariableDictionaryWriter> var_dict)
: m_var_dict(std::move(var_dict)) {}

// Destructor
~DictionaryFloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::shared_ptr<VariableDictionaryWriter> m_var_dict;
std::vector<clp::variable_dictionary_id_t> m_var_dict_ids;
};

class BooleanColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit BooleanColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~BooleanColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::vector<uint8_t> m_values;
};

Expand All @@ -164,22 +141,21 @@ class ClpStringColumnWriter : public BaseColumnWriter {
// Types
using encoded_log_dict_id_t = uint64_t;

// Constructor
// Constructors
ClpStringColumnWriter(
int32_t id,
std::shared_ptr<VariableDictionaryWriter> var_dict,
std::shared_ptr<LogTypeDictionaryWriter> log_dict
)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)),
: m_var_dict(std::move(var_dict)),
m_log_dict(std::move(log_dict)) {}

// Methods inherited from BaseColumnWriter
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

auto store(ZstdCompressor& compressor) -> void override;

auto get_total_header_size() const -> size_t override { return sizeof(size_t); }
// Methods
[[nodiscard]] auto get_total_header_size() const -> size_t override { return sizeof(size_t); }

/**
* @param encoded_id
Expand All @@ -199,6 +175,7 @@ class ClpStringColumnWriter : public BaseColumnWriter {
}

private:
// Methods
/**
* Encodes a log dict id
* @param id
Expand All @@ -210,6 +187,7 @@ class ClpStringColumnWriter : public BaseColumnWriter {
return static_cast<encoded_log_dict_id_t>(id) | (offset << cOffsetBitPosition);
}

// Data members
static constexpr int cOffsetBitPosition = 24;
static constexpr uint64_t cLogDictIdMask = (1ULL << cOffsetBitPosition) - 1;
static constexpr uint64_t cOffsetMask = ~cLogDictIdMask;
Expand All @@ -224,38 +202,30 @@ class ClpStringColumnWriter : public BaseColumnWriter {

class VariableStringColumnWriter : public BaseColumnWriter {
public:
// Constructor
VariableStringColumnWriter(int32_t id, std::shared_ptr<VariableDictionaryWriter> var_dict)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)) {}
// Constructors
explicit VariableStringColumnWriter(std::shared_ptr<VariableDictionaryWriter> var_dict)
: m_var_dict(std::move(var_dict)) {}

// Destructor
~VariableStringColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
std::shared_ptr<VariableDictionaryWriter> m_var_dict;
std::vector<clp::variable_dictionary_id_t> m_var_dict_ids;
};

class TimestampColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit TimestampColumnWriter(int32_t id) : BaseColumnWriter{id}, m_timestamps{id} {}

// Destructor
~TimestampColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Data members
DeltaEncodedInt64ColumnWriter m_timestamps;
std::vector<uint64_t> m_timestamp_encodings;
};
Expand Down
Loading
Loading