Skip to content
Open
22 changes: 10 additions & 12 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,40 +307,38 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
auto const& node = m_schema_tree.get_node(id);
switch (node.get_type()) {
case NodeType::Integer:
writer->append_column(std::make_unique<Int64ColumnWriter>(id));
writer->append_column(std::make_unique<Int64ColumnWriter>());
break;
case NodeType::Float:
writer->append_column(std::make_unique<FloatColumnWriter>(id));
writer->append_column(std::make_unique<FloatColumnWriter>());
break;
case NodeType::FormattedFloat:
writer->append_column(std::make_unique<FormattedFloatColumnWriter>(id));
writer->append_column(std::make_unique<FormattedFloatColumnWriter>());
break;
case NodeType::DictionaryFloat:
writer->append_column(
std::make_unique<DictionaryFloatColumnWriter>(id, m_var_dict)
);
writer->append_column(std::make_unique<DictionaryFloatColumnWriter>(m_var_dict));
break;
case NodeType::ClpString:
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_log_dict)
std::make_unique<ClpStringColumnWriter>(m_var_dict, m_log_dict)
);
break;
case NodeType::VarString:
writer->append_column(std::make_unique<VariableStringColumnWriter>(id, m_var_dict));
writer->append_column(std::make_unique<VariableStringColumnWriter>(m_var_dict));
break;
case NodeType::Boolean:
writer->append_column(std::make_unique<BooleanColumnWriter>(id));
writer->append_column(std::make_unique<BooleanColumnWriter>());
break;
case NodeType::UnstructuredArray:
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_array_dict)
std::make_unique<ClpStringColumnWriter>(m_var_dict, m_array_dict)
);
break;
case NodeType::DeltaInteger:
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>(id));
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>());
break;
case NodeType::Timestamp:
writer->append_column(std::make_unique<TimestampColumnWriter>(id));
writer->append_column(std::make_unique<TimestampColumnWriter>());
break;
case NodeType::DeprecatedDateString:
case NodeType::Metadata:
Expand Down
158 changes: 63 additions & 95 deletions components/core/src/clp_s/ColumnWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,48 @@
#ifndef CLP_S_COLUMNWRITER_HPP
#define CLP_S_COLUMNWRITER_HPP

#include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include <clp/Defs.h>
#include <clp_s/DictionaryEntry.hpp>

#include "DictionaryWriter.hpp"
#include "FileWriter.hpp"
#include "FloatFormatEncoding.hpp"
#include "ParsedMessage.hpp"
#include "TimestampDictionaryWriter.hpp"
#include "ZstdCompressor.hpp"
#include <clp_s/DictionaryWriter.hpp>
#include <clp_s/FloatFormatEncoding.hpp>
#include <clp_s/ParsedMessage.hpp>
#include <clp_s/ZstdCompressor.hpp>

namespace clp_s {
class BaseColumnWriter {
public:
// Constructor
explicit BaseColumnWriter(int32_t id) : m_id(id) {}
// Constructors
BaseColumnWriter() = default;

BaseColumnWriter(BaseColumnWriter const&) = default;
BaseColumnWriter(BaseColumnWriter&&) noexcept = default;

// Destructor
// Destructors
virtual ~BaseColumnWriter() = default;

// Operators
auto operator=(BaseColumnWriter const&) -> BaseColumnWriter& = default;
auto operator=(BaseColumnWriter&&) noexcept -> BaseColumnWriter& = default;

// Methods
/**
* Adds a value to the column
* @param value
* @return the size of the encoded data appended to this column in bytes
*/
virtual size_t add_value(ParsedMessage::variable_t& value) = 0;
virtual auto add_value(ParsedMessage::variable_t& value) -> size_t = 0;

/**
* Stores the column to a compressed file.
* @param compressor
*/
virtual void store(ZstdCompressor& compressor) = 0;
virtual auto store(ZstdCompressor& compressor) -> void = 0;

/**
* Returns the total size of the header data that will be written to the compressor. This header
Expand All @@ -43,119 +51,88 @@ class BaseColumnWriter {
*
* @return the total size of header data that will be written to the compressor in bytes
*/
virtual size_t get_total_header_size() const { return 0; }

protected:
int32_t m_id;
[[nodiscard]] virtual auto get_total_header_size() const -> size_t { return 0; }
};

class Int64ColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit Int64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~Int64ColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::vector<int64_t> m_values;
};

class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit DeltaEncodedInt64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~DeltaEncodedInt64ColumnWriter() override = default;

// Methods
[[nodiscard]] auto add_value(int64_t value) -> size_t;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::vector<int64_t> m_values;
int64_t m_cur{};
};

class FloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit FloatColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~FloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::vector<double> m_values;
};

class FormattedFloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit FormattedFloatColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~FormattedFloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::vector<double> m_values;
std::vector<float_format_t> m_formats;
};

class DictionaryFloatColumnWriter : public BaseColumnWriter {
public:
// Constructor
DictionaryFloatColumnWriter(int32_t id, std::shared_ptr<VariableDictionaryWriter> var_dict)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)) {}
// Constructors
DictionaryFloatColumnWriter(std::shared_ptr<VariableDictionaryWriter> var_dict)
: m_var_dict(std::move(var_dict)) {}

// Destructor
~DictionaryFloatColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::shared_ptr<VariableDictionaryWriter> m_var_dict;
std::vector<clp::variable_dictionary_id_t> m_var_dict_ids;
};

class BooleanColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit BooleanColumnWriter(int32_t id) : BaseColumnWriter(id) {}

// Destructor
~BooleanColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
// Variables
std::vector<uint8_t> m_values;
};

Expand All @@ -164,23 +141,22 @@ class ClpStringColumnWriter : public BaseColumnWriter {
// Types
using encoded_log_dict_id_t = uint64_t;

// Constructor
// Constructors
ClpStringColumnWriter(
int32_t id,
std::shared_ptr<VariableDictionaryWriter> var_dict,
std::shared_ptr<LogTypeDictionaryWriter> log_dict
)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)),
: m_var_dict(std::move(var_dict)),
m_log_dict(std::move(log_dict)) {}

// Methods inherited from BaseColumnWriter
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

auto store(ZstdCompressor& compressor) -> void override;

auto get_total_header_size() const -> size_t override { return sizeof(size_t); }
[[nodiscard]] auto get_total_header_size() const -> size_t override { return sizeof(size_t); }

// Methods
/**
* @param encoded_id
* @return the encoded log dict id
Expand All @@ -199,6 +175,7 @@ class ClpStringColumnWriter : public BaseColumnWriter {
}

private:
// Methods
/**
* Encodes a log dict id
* @param id
Expand All @@ -210,6 +187,7 @@ class ClpStringColumnWriter : public BaseColumnWriter {
return static_cast<encoded_log_dict_id_t>(id) | (offset << cOffsetBitPosition);
}

// Variables
static constexpr int cOffsetBitPosition = 24;
static constexpr uint64_t cLogDictIdMask = (1ULL << cOffsetBitPosition) - 1;
static constexpr uint64_t cOffsetMask = ~cLogDictIdMask;
Expand All @@ -224,18 +202,14 @@ class ClpStringColumnWriter : public BaseColumnWriter {

class VariableStringColumnWriter : public BaseColumnWriter {
public:
// Constructor
VariableStringColumnWriter(int32_t id, std::shared_ptr<VariableDictionaryWriter> var_dict)
: BaseColumnWriter(id),
m_var_dict(std::move(var_dict)) {}
// Constructors
VariableStringColumnWriter(std::shared_ptr<VariableDictionaryWriter> var_dict)
: m_var_dict(std::move(var_dict)) {}

// Destructor
~VariableStringColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
size_t add_value(ParsedMessage::variable_t& value) override;
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
std::shared_ptr<VariableDictionaryWriter> m_var_dict;
Expand All @@ -244,16 +218,10 @@ class VariableStringColumnWriter : public BaseColumnWriter {

class TimestampColumnWriter : public BaseColumnWriter {
public:
// Constructor
explicit TimestampColumnWriter(int32_t id) : BaseColumnWriter{id}, m_timestamps{id} {}

// Destructor
~TimestampColumnWriter() override = default;

// Methods inherited from BaseColumnWriter
// Methods implementing BaseColumnWriter
auto add_value(ParsedMessage::variable_t& value) -> size_t override;

void store(ZstdCompressor& compressor) override;
auto store(ZstdCompressor& compressor) -> void override;

private:
DeltaEncodedInt64ColumnWriter m_timestamps;
Expand Down
Loading
Loading