11#include " ArchiveReader.hpp"
22
3+ #include < cstddef>
4+ #include < cstdint>
35#include < memory>
46#include < system_error>
57#include < utility>
@@ -67,100 +69,127 @@ auto ArchiveReader::initialize_archive_reader() -> void {
6769 m_array_dict = ReaderUtils::get_array_dictionary_reader (*m_archive_reader_adaptor);
6870}
6971
70- void ArchiveReader::read_metadata () {
71- constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024 ; // 64 KiB
72+ auto ArchiveReader::read_single_schema_metadata ()
73+ -> ystdlib::error_handling::Result<std::pair<int32_t, SchemaReader::SchemaMetadata>> {
74+ uint64_t stream_id_u64{0 };
75+ uint64_t stream_offset_u64{0 };
76+ int32_t schema_id{0 };
77+ uint64_t num_messages{0 };
78+
79+ if (auto const error{m_table_metadata_decompressor.try_read_numeric_value (stream_id_u64)};
80+ ErrorCodeSuccess != error)
81+ {
82+ return std::errc::io_error;
83+ }
84+ auto const stream_id{
85+ YSTDLIB_ERROR_HANDLING_TRYX (ReaderUtils::try_uint64_to_size_t (stream_id_u64))
86+ };
87+
88+ if (auto const error{m_table_metadata_decompressor.try_read_numeric_value (stream_offset_u64)};
89+ ErrorCodeSuccess != error)
90+ {
91+ return std::errc::io_error;
92+ }
93+ auto const stream_offset{
94+ YSTDLIB_ERROR_HANDLING_TRYX (ReaderUtils::try_uint64_to_size_t (stream_offset_u64))
95+ };
96+
97+ if (stream_offset > m_stream_reader.get_uncompressed_stream_size (stream_id)) {
98+ return std::errc::illegal_byte_sequence;
99+ }
100+
101+ if (auto const error{m_table_metadata_decompressor.try_read_numeric_value (schema_id)};
102+ ErrorCodeSuccess != error)
103+ {
104+ return std::errc::io_error;
105+ }
106+
107+ if (auto const error{m_table_metadata_decompressor.try_read_numeric_value (num_messages)};
108+ ErrorCodeSuccess != error)
109+ {
110+ return std::errc::io_error;
111+ }
112+
113+ return std::make_pair (
114+ schema_id,
115+ SchemaReader::SchemaMetadata{stream_id, stream_offset, num_messages}
116+ );
117+ }
118+
119+ auto ArchiveReader::read_metadata () -> ystdlib::error_handling::Result<void> {
120+ constexpr size_t cDecompressorFileReadBufferCapacity{64 * 1024 }; // 64 KiB
72121 auto table_metadata_reader = m_archive_reader_adaptor->checkout_reader_for_section (
73122 constants::cArchiveTableMetadataFile
74123 );
75124 m_table_metadata_decompressor.open (*table_metadata_reader, cDecompressorFileReadBufferCapacity);
76125
77- m_stream_reader.read_metadata (m_table_metadata_decompressor);
126+ YSTDLIB_ERROR_HANDLING_TRYV ( m_stream_reader.read_metadata (m_table_metadata_decompressor) );
78127
79- size_t num_separate_column_schemas;
80- if (auto error
81- = m_table_metadata_decompressor.try_read_numeric_value (num_separate_column_schemas);
128+ uint64_t num_separate_column_schemas{0 };
129+ if (auto const error{
130+ m_table_metadata_decompressor.try_read_numeric_value (num_separate_column_schemas)
131+ };
82132 ErrorCodeSuccess != error)
83133 {
84134 throw OperationFailed (error, __FILENAME__, __LINE__);
85135 }
86136
87137 if (0 != num_separate_column_schemas) {
88- throw OperationFailed (ErrorCode:: ErrorCodeUnsupported, __FILENAME__, __LINE__);
138+ throw OperationFailed (ErrorCodeUnsupported, __FILENAME__, __LINE__);
89139 }
90140
91- size_t num_schemas;
92- if (auto error = m_table_metadata_decompressor.try_read_numeric_value (num_schemas);
141+ uint64_t num_schemas{ 0 } ;
142+ if (auto const error{ m_table_metadata_decompressor.try_read_numeric_value (num_schemas)} ;
93143 ErrorCodeSuccess != error)
94144 {
95145 throw OperationFailed (error, __FILENAME__, __LINE__);
96146 }
147+ if (0 == num_schemas) {
148+ throw OperationFailed (ErrorCodeUnsupported, __FILENAME__, __LINE__);
149+ }
97150
98- bool prev_metadata_initialized{false };
99- SchemaReader::SchemaMetadata prev_metadata{};
100- int32_t prev_schema_id{};
101- for (size_t i = 0 ; i < num_schemas; ++i) {
102- uint64_t stream_id;
103- uint64_t stream_offset;
104- int32_t schema_id;
105- uint64_t num_messages;
106-
107- if (auto error = m_table_metadata_decompressor.try_read_numeric_value (stream_id);
108- ErrorCodeSuccess != error)
109- {
110- throw OperationFailed (error, __FILENAME__, __LINE__);
111- }
112-
113- if (auto error = m_table_metadata_decompressor.try_read_numeric_value (stream_offset);
114- ErrorCodeSuccess != error)
115- {
116- throw OperationFailed (error, __FILENAME__, __LINE__);
117- }
151+ auto [prev_schema_id,
152+ prev_metadata]{YSTDLIB_ERROR_HANDLING_TRYX (read_single_schema_metadata ())};
153+ m_schema_ids.push_back (prev_schema_id);
154+ for (uint64_t i{1 }; i < num_schemas; ++i) {
155+ auto const [schema_id, metadata]{
156+ YSTDLIB_ERROR_HANDLING_TRYX (read_single_schema_metadata ())
157+ };
158+ m_schema_ids.push_back (schema_id);
118159
119- if (stream_offset > m_stream_reader.get_uncompressed_stream_size (stream_id)) {
160+ if (metadata.stream_id () != prev_metadata.stream_id ()) {
161+ prev_metadata.set_uncompressed_size (
162+ m_stream_reader.get_uncompressed_stream_size (prev_metadata.stream_id ())
163+ - prev_metadata.stream_offset ()
164+ );
165+ } else if (metadata.stream_offset () < prev_metadata.stream_offset ()) {
120166 throw OperationFailed (ErrorCodeCorrupt, __FILENAME__, __LINE__);
121- }
122-
123- if (auto error = m_table_metadata_decompressor.try_read_numeric_value (schema_id);
124- ErrorCodeSuccess != error)
125- {
126- throw OperationFailed (error, __FILENAME__, __LINE__);
127- }
128-
129- if (auto error = m_table_metadata_decompressor.try_read_numeric_value (num_messages);
130- ErrorCodeSuccess != error)
131- {
132- throw OperationFailed (error, __FILENAME__, __LINE__);
133- }
134-
135- if (prev_metadata_initialized) {
136- uint64_t uncompressed_size{0 };
137- if (stream_id != prev_metadata.stream_id ) {
138- uncompressed_size
139- = m_stream_reader.get_uncompressed_stream_size (prev_metadata.stream_id )
140- - prev_metadata.stream_offset ;
141- } else {
142- uncompressed_size = stream_offset - prev_metadata.stream_offset ;
143- }
144- prev_metadata.uncompressed_size = uncompressed_size;
145- m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
146167 } else {
147- prev_metadata_initialized = true ;
168+ prev_metadata.set_uncompressed_size (
169+ metadata.stream_offset () - prev_metadata.stream_offset ()
170+ );
148171 }
149- prev_metadata = {stream_id, stream_offset, num_messages, 0 };
172+ m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
173+
150174 prev_schema_id = schema_id;
151- m_schema_ids. push_back (schema_id) ;
175+ prev_metadata = metadata ;
152176 }
153- prev_metadata.uncompressed_size
154- = m_stream_reader.get_uncompressed_stream_size (prev_metadata.stream_id )
155- - prev_metadata.stream_offset ;
177+ prev_metadata.set_uncompressed_size (
178+ m_stream_reader.get_uncompressed_stream_size (prev_metadata.stream_id ())
179+ - prev_metadata.stream_offset ()
180+ );
156181 m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
157182 m_table_metadata_decompressor.close ();
158183
159184 m_archive_reader_adaptor->checkin_reader_for_section (constants::cArchiveTableMetadataFile);
185+
186+ return ystdlib::error_handling::success ();
160187}
161188
162189void ArchiveReader::read_dictionaries_and_metadata () {
163- read_metadata ();
190+ if (auto const result{read_metadata ()}; result.has_error ()) {
191+ throw OperationFailed (ErrorCodeFailure, __FILENAME__, __LINE__);
192+ }
164193 m_var_dict->read_entries ();
165194 m_log_dict->read_entries ();
166195 m_array_dict->read_entries ();
@@ -186,10 +215,13 @@ SchemaReader& ArchiveReader::read_schema_table(
186215 should_marshal_records
187216 );
188217
189- auto & schema_metadata = m_id_to_schema_metadata[schema_id];
190- auto stream_buffer = read_stream (schema_metadata.stream_id , true );
191- m_schema_reader
192- .load (stream_buffer, schema_metadata.stream_offset , schema_metadata.uncompressed_size );
218+ auto const & schema_metadata = m_id_to_schema_metadata[schema_id];
219+ auto stream_buffer = read_stream (schema_metadata.stream_id (), true );
220+ m_schema_reader.load (
221+ stream_buffer,
222+ schema_metadata.stream_offset (),
223+ schema_metadata.uncompressed_size ()
224+ );
193225 return m_schema_reader;
194226}
195227
@@ -199,12 +231,12 @@ std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
199231 for (auto schema_id : m_schema_ids) {
200232 auto schema_reader = std::make_shared<SchemaReader>();
201233 initialize_schema_reader (*schema_reader, schema_id, true , true );
202- auto & schema_metadata = m_id_to_schema_metadata[schema_id];
203- auto stream_buffer = read_stream (schema_metadata.stream_id , false );
234+ auto const & schema_metadata = m_id_to_schema_metadata[schema_id];
235+ auto stream_buffer = read_stream (schema_metadata.stream_id () , false );
204236 schema_reader->load (
205237 stream_buffer,
206- schema_metadata.stream_offset ,
207- schema_metadata.uncompressed_size
238+ schema_metadata.stream_offset () ,
239+ schema_metadata.uncompressed_size ()
208240 );
209241 readers.push_back (std::move (schema_reader));
210242 }
@@ -338,7 +370,7 @@ void ArchiveReader::initialize_schema_reader(
338370 m_projection,
339371 schema_id,
340372 schema.get_ordered_schema_view (),
341- m_id_to_schema_metadata[schema_id].num_messages ,
373+ m_id_to_schema_metadata[schema_id].num_messages () ,
342374 should_marshal_records
343375 );
344376 auto timestamp_column_ids
0 commit comments