Skip to content
Merged
40 changes: 21 additions & 19 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,14 @@ void ArchiveWriter::write_archive_header(FileWriter& archive_writer, size_t meta

void
ArchiveWriter::append_message(int32_t schema_id, Schema const& schema, ParsedMessage& message) {
SchemaWriter* schema_writer;
auto it = m_id_to_schema_writer.find(schema_id);
if (it != m_id_to_schema_writer.end()) {
schema_writer = it->second;
} else {
schema_writer = new SchemaWriter();
initialize_schema_writer(schema_writer, schema);
m_id_to_schema_writer[schema_id] = schema_writer;
if (it == m_id_to_schema_writer.end()) {
auto schema_writer = std::make_unique<SchemaWriter>();
initialize_schema_writer(schema_writer.get(), schema);
it = m_id_to_schema_writer.emplace(schema_id, std::move(schema_writer)).first;
}

m_encoded_message_size += schema_writer->append_message(message);
m_encoded_message_size += it->second->append_message(message);
++m_next_log_event_id;
}

Expand Down Expand Up @@ -307,34 +304,40 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
auto const& node = m_schema_tree.get_node(id);
switch (node.get_type()) {
case NodeType::Integer:
writer->append_column(new Int64ColumnWriter(id));
writer->append_column(std::make_unique<Int64ColumnWriter>(id));
break;
case NodeType::Float:
writer->append_column(new FloatColumnWriter(id));
writer->append_column(std::make_unique<FloatColumnWriter>(id));
break;
case NodeType::FormattedFloat:
writer->append_column(new FormattedFloatColumnWriter(id));
writer->append_column(std::make_unique<FormattedFloatColumnWriter>(id));
break;
case NodeType::DictionaryFloat:
writer->append_column(new DictionaryFloatColumnWriter(id, m_var_dict));
writer->append_column(
std::make_unique<DictionaryFloatColumnWriter>(id, m_var_dict)
);
break;
case NodeType::ClpString:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_log_dict));
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_log_dict)
);
break;
case NodeType::VarString:
writer->append_column(new VariableStringColumnWriter(id, m_var_dict));
writer->append_column(std::make_unique<VariableStringColumnWriter>(id, m_var_dict));
break;
case NodeType::Boolean:
writer->append_column(new BooleanColumnWriter(id));
writer->append_column(std::make_unique<BooleanColumnWriter>(id));
break;
case NodeType::UnstructuredArray:
writer->append_column(new ClpStringColumnWriter(id, m_var_dict, m_array_dict));
writer->append_column(
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_array_dict)
);
break;
case NodeType::DateString:
writer->append_column(new DateStringColumnWriter(id));
writer->append_column(std::make_unique<DateStringColumnWriter>(id));
break;
case NodeType::DeltaInteger:
writer->append_column(new DeltaEncodedInt64ColumnWriter(id));
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>(id));
break;
case NodeType::Metadata:
case NodeType::NullValue:
Expand Down Expand Up @@ -421,7 +424,6 @@ std::pair<size_t, size_t> ArchiveWriter::store_tables() {
it->second->get_num_messages()
);
current_stream_offset += it->second->get_total_uncompressed_size();
delete it->second;

if (current_stream_offset > m_min_table_size || schemas.size() == schema_metadata.size()) {
stream_metadata.emplace_back(current_table_file_offset, current_stream_offset);
Expand Down
3 changes: 2 additions & 1 deletion components/core/src/clp_s/ArchiveWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_ARCHIVEWRITER_HPP
#define CLP_S_ARCHIVEWRITER_HPP

#include <memory>
#include <optional>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -347,7 +348,7 @@ class ArchiveWriter {
SchemaMap m_schema_map;
SchemaTree m_schema_tree;

std::map<int32_t, SchemaWriter*> m_id_to_schema_writer;
std::map<int32_t, std::unique_ptr<SchemaWriter>> m_id_to_schema_writer;

FileWriter m_tables_file_writer;
FileWriter m_table_metadata_file_writer;
Expand Down
10 changes: 2 additions & 8 deletions components/core/src/clp_s/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#include <utility>

namespace clp_s {
void SchemaWriter::append_column(BaseColumnWriter* column_writer) {
void SchemaWriter::append_column(std::unique_ptr<BaseColumnWriter> column_writer) {
m_total_uncompressed_size += column_writer->get_total_header_size();
m_columns.push_back(column_writer);
m_columns.push_back(std::move(column_writer));
}

size_t SchemaWriter::append_message(ParsedMessage& message) {
Expand All @@ -31,10 +31,4 @@ void SchemaWriter::store(ZstdCompressor& compressor) {
writer->store(compressor);
}
}

SchemaWriter::~SchemaWriter() {
for (auto i : m_columns) {
delete i;
}
}
} // namespace clp_s
8 changes: 4 additions & 4 deletions components/core/src/clp_s/SchemaWriter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef CLP_S_SCHEMAWRITER_HPP
#define CLP_S_SCHEMAWRITER_HPP

#include <memory>
#include <vector>

#include "ColumnWriter.hpp"
Expand All @@ -15,7 +16,7 @@ class SchemaWriter {
SchemaWriter() : m_num_messages(0) {}

// Destructor
~SchemaWriter();
~SchemaWriter() = default;

/**
* Opens the schema writer.
Expand All @@ -28,7 +29,7 @@ class SchemaWriter {
* Appends a column to the schema writer.
* @param column_writer
*/
void append_column(BaseColumnWriter* column_writer);
void append_column(std::unique_ptr<BaseColumnWriter> column_writer);

/**
* Appends a message to the schema writer.
Expand All @@ -54,8 +55,7 @@ class SchemaWriter {
uint64_t m_num_messages;
size_t m_total_uncompressed_size{};

std::vector<BaseColumnWriter*> m_columns;
std::vector<BaseColumnWriter*> m_unordered_columns;
std::vector<std::unique_ptr<BaseColumnWriter>> m_columns;
};
} // namespace clp_s

Expand Down
Loading