77#include < nlohmann/json.hpp>
88#include < spdlog/spdlog.h>
99
10- #include " ../clp/streaming_archive/Constants.hpp"
1110#include " archive_constants.hpp"
1211#include " Defs.hpp"
1312#include " SchemaTree.hpp"
@@ -58,7 +57,7 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
5857 m_array_dict->open (array_dict_path, m_compression_level, UINT64_MAX);
5958}
6059
61- void ArchiveWriter::close () {
60+ auto ArchiveWriter::close (bool is_split) -> ArchiveStats {
6261 if (m_range_open) {
6362 if (auto const rc = close_current_range (); ErrorCodeSuccess != rc) {
6463 throw OperationFailed (rc, __FILENAME__, __LINE__);
@@ -87,15 +86,16 @@ void ArchiveWriter::close() {
8786 offset += original_size;
8887 }
8988
89+ nlohmann::json archive_range_index;
9090 if (m_single_file_archive) {
91- write_single_file_archive (files);
91+ archive_range_index = write_single_file_archive (files);
9292 } else {
9393 FileWriter header_and_metadata_writer;
9494 header_and_metadata_writer.open (
9595 m_archive_path + constants::cArchiveHeaderFile,
9696 FileWriter::OpenMode::CreateForWriting
9797 );
98- write_archive_metadata (header_and_metadata_writer, files);
98+ archive_range_index = write_archive_metadata (header_and_metadata_writer, files);
9999 size_t metadata_size = header_and_metadata_writer.get_pos () - sizeof (ArchiveHeader);
100100
101101 m_compressed_size
@@ -107,8 +107,18 @@ void ArchiveWriter::close() {
107107 header_and_metadata_writer.close ();
108108 }
109109
110+ ArchiveStats archive_stats{
111+ m_id,
112+ m_timestamp_dict.get_begin_timestamp (),
113+ m_timestamp_dict.get_end_timestamp (),
114+ m_uncompressed_size,
115+ m_compressed_size,
116+ archive_range_index,
117+ is_split
118+ };
110119 if (m_print_archive_stats) {
111- print_archive_stats ();
120+ std::cout << archive_stats.as_string () << ' \n ' ;
121+ std::cout << std::flush;
112122 }
113123
114124 m_id_to_schema_writer.clear ();
@@ -123,14 +133,17 @@ void ArchiveWriter::close() {
123133 m_authoritative_timestamp_namespace.clear ();
124134 m_matched_timestamp_prefix_length = 0ULL ;
125135 m_matched_timestamp_prefix_node_id = constants::cRootNodeId;
136+ return archive_stats;
126137}
127138
128- void ArchiveWriter::write_single_file_archive (std::vector<ArchiveFileInfo> const & files) {
139+ auto ArchiveWriter::write_single_file_archive (std::vector<ArchiveFileInfo> const & files)
140+ -> nlohmann::json {
129141 std::string single_file_archive_path = (std::filesystem::path (m_archives_dir) / m_id).string ();
130142 FileWriter archive_writer;
131143 archive_writer.open (single_file_archive_path, FileWriter::OpenMode::CreateForWriting);
132144
133- write_archive_metadata (archive_writer, files);
145+ // Avoid brace initialization to avoid wrapping nlohmann::json return value in a JSON array
146+ auto archive_range_index (write_archive_metadata (archive_writer, files));
134147 size_t metadata_section_size = archive_writer.get_pos () - sizeof (ArchiveHeader);
135148 write_archive_files (archive_writer, files);
136149 m_compressed_size = archive_writer.get_pos ();
@@ -141,12 +154,13 @@ void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const
141154 if (false == std::filesystem::remove (m_archive_path, ec)) {
142155 throw OperationFailed (ErrorCodeFileExists, __FILENAME__, __LINE__);
143156 }
157+ return archive_range_index;
144158}
145159
146- void ArchiveWriter::write_archive_metadata (
160+ auto ArchiveWriter::write_archive_metadata (
147161 FileWriter& archive_writer,
148162 std::vector<ArchiveFileInfo> const & files
149- ) {
163+ ) -> nlohmann::json {
150164 archive_writer.seek_from_begin (sizeof (ArchiveHeader));
151165
152166 ZstdCompressor compressor;
@@ -184,11 +198,15 @@ void ArchiveWriter::write_archive_metadata(
184198 compressor.write (encoded_timestamp_dict.data (), encoded_timestamp_dict.size ());
185199
186200 // Write range index
187- if (auto rc = m_range_index_writer.write (compressor); ErrorCodeSuccess != rc) {
201+ nlohmann::json archive_range_index;
202+ if (auto rc = m_range_index_writer.write (compressor, archive_range_index);
203+ ErrorCodeSuccess != rc)
204+ {
188205 throw OperationFailed (rc, __FILENAME__, __LINE__);
189206 }
190207
191208 compressor.close ();
209+ return archive_range_index;
192210}
193211
194212void ArchiveWriter::write_archive_files (
@@ -441,15 +459,4 @@ std::pair<size_t, size_t> ArchiveWriter::store_tables() {
441459
442460 return {table_metadata_compressed_size, table_compressed_size};
443461}
444-
445- auto ArchiveWriter::print_archive_stats () const -> void {
446- namespace Archive = clp::streaming_archive::cMetadataDB::Archive;
447- nlohmann::json json_msg
448- = {{Archive::Id, m_id},
449- {Archive::BeginTimestamp, m_timestamp_dict.get_begin_timestamp ()},
450- {Archive::EndTimestamp, m_timestamp_dict.get_end_timestamp ()},
451- {Archive::UncompressedSize, m_uncompressed_size},
452- {Archive::Size, m_compressed_size}};
453- std::cout << json_msg.dump (-1 , ' ' , true , nlohmann::json::error_handler_t ::ignore) << std::endl;
454- }
455462} // namespace clp_s
0 commit comments