Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,14 @@ void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t meta

void
ArchiveWriter::append_message(int32_t schema_id, Schema const& schema, ParsedMessage& message) {
SchemaWriter* schema_writer;
auto it = m_id_to_schema_writer.find(schema_id);
if (it != m_id_to_schema_writer.end()) {
schema_writer = it->second;
} else {
schema_writer = new SchemaWriter();
initialize_schema_writer(schema_writer, schema);
m_id_to_schema_writer[schema_id] = schema_writer;
if (it == m_id_to_schema_writer.end()) {
auto schema_writer = std::make_unique<SchemaWriter>();
initialize_schema_writer(schema_writer.get(), schema);
it = m_id_to_schema_writer.emplace(schema_id, std::move(schema_writer)).first;
}

m_encoded_message_size += schema_writer->append_message(message);
m_encoded_message_size += it->second->append_message(message);
++m_next_log_event_id;
}

Expand Down Expand Up @@ -307,34 +304,40 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
auto const& node = m_schema_tree.get_node(id);
switch (node.get_type()) {
case NodeType::Integer:
writer->append_column(new Int64ColumnWriter(id));
writer->append_column(std::make_unique<Int64ColumnWriter>(id));
break;
case NodeType::Float:
writer->append_column(new FloatColumnWriter(id));
writer->append_column(std::make_unique<FloatColumnWriter>(id));
break;
case NodeType::FormattedFloat:
writer->append_column(new FormattedFloatColumnWriter(id));
writer->append_column(std::make_unique<FormattedFloatColumnWriter>(id));
break;
case NodeType::DictionaryFloat:
writer->append_column(new DictionaryFloatColumnWriter(id, m_var_dict));
writer->append_column(
std::make_unique<DictionaryFloatColumnWriter>(id, m_var_dict)
);
break;
case NodeType::ClpString:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict));
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_log_dict)
);
break;
case NodeType::VarString:
writer->append_column(new VariableStringColumnWriter(id, m_var_dict));
writer->append_column(std::make_unique<VariableStringColumnWriter>(id, m_var_dict));
break;
case NodeType::Boolean:
writer->append_column(new BooleanColumnWriter(id));
writer->append_column(std::make_unique<BooleanColumnWriter>(id));
break;
case NodeType::UnstructuredArray:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_array_dict));
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_array_dict)
);
break;
case NodeType::DateString:
writer->append_column(new DateStringColumnWriter(id));
writer->append_column(std::make_unique<DateStringColumnWriter>(id));
break;
case NodeType::DeltaInteger:
writer->append_column(new DeltaEncodedInt64ColumnWriter(id));
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>(id));
break;
case NodeType::Metadata:
case NodeType::NullValue:
Expand Down Expand Up @@ -421,7 +424,6 @@ std::pair<size_t, size_t> ArchiveWriter::store_tables() {
it->second->get_num_messages()
);
current_stream_offset += it->second->get_total_uncompressed_size();
delete it->second;

if (current_stream_offset > m_min_table_size || schemas.size() == schema_metadata.size()) {
stream_metadata.emplace_back(current_table_file_offset, current_stream_offset);
Expand Down
3 changes: 2 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_ARCHIVEWRITER_HPP
#define CLP_S_ARCHIVEWRITER_HPP

#include <memory>
#include <optional>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -347,7 +348,7 @@ class ArchiveWriter {
SchemaMap m_schema_map;
SchemaTree m_schema_tree;

std::map<int32_t, SchemaWriter*> m_id_to_schema_writer;
std::map<int32_t, std::unique_ptr<SchemaWriter>> m_id_to_schema_writer;

FileWriter m_tables_file_writer;
FileWriter m_table_metadata_file_writer;
Expand Down
10 changes: 2 additions & 8 deletions components/core/src/clp_s/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include <utility>

namespace clp_s {
void SchemaWriter::append_column(BaseColumnWriter* column_writer) {
void SchemaWriter::append_column(std::unique_ptr<BaseColumnWriter> column_writer) {
m_total_uncompressed_size += column_writer->get_total_header_size();
m_columns.push_back(column_writer);
m_columns.push_back(std::move(column_writer));
}

size_t SchemaWriter::append_message(ParsedMessage& message) {
Expand All @@ -31,10 +31,4 @@ void SchemaWriter::store(ZstdCompressor& compressor) {
writer->store(compressor);
}
}

SchemaWriter::~SchemaWriter() {
for (auto i : m_columns) {
delete i;
}
}
} // namespace clp_s
8 changes: 4 additions & 4 deletions components/core/src/clp_s/SchemaWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_SCHEMAWRITER_HPP
#define CLP_S_SCHEMAWRITER_HPP

#include <memory>
#include <vector>

#include "ColumnWriter.hpp"
Expand All @@ -15,7 +16,7 @@ class SchemaWriter {
SchemaWriter() : m_num_messages(0) {}

// Destructor
~SchemaWriter();
~SchemaWriter() = default;

/**
* Opens the schema writer.
Expand All @@ -28,7 +29,7 @@ class SchemaWriter {
* Appends a column to the schema writer.
* @param column_writer
*/
void append_column(BaseColumnWriter* column_writer);
void append_column(std::unique_ptr<BaseColumnWriter> column_writer);

/**
* Appends a message to the schema writer.
Expand All @@ -54,8 +55,7 @@ class SchemaWriter {
uint64_t m_num_messages;
size_t m_total_uncompressed_size{};

std::vector<BaseColumnWriter*> m_columns;
std::vector<BaseColumnWriter*> m_unordered_columns;
std::vector<std::unique_ptr<BaseColumnWriter>> m_columns;
};
} // namespace clp_s

Expand Down
Loading