@@ -41,6 +41,7 @@ void ArchiveReader::read_metadata() {
4141 int32_t schema_id;
4242 uint64_t num_messages;
4343 size_t table_offset;
44+ size_t uncompressed_size;
4445
4546 if (auto error = m_table_metadata_decompressor.try_read_numeric_value (schema_id);
4647 ErrorCodeSuccess != error)
@@ -60,7 +61,13 @@ void ArchiveReader::read_metadata() {
6061 throw OperationFailed (error, __FILENAME__, __LINE__);
6162 }
6263
63- m_id_to_table_metadata[schema_id] = {num_messages, table_offset};
64+ if (auto error = m_table_metadata_decompressor.try_read_numeric_value (uncompressed_size);
65+ ErrorCodeSuccess != error)
66+ {
67+ throw OperationFailed (error, __FILENAME__, __LINE__);
68+ }
69+
70+ m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size};
6471 m_schema_ids.push_back (schema_id);
6572 }
6673 m_table_metadata_decompressor.close ();
@@ -74,7 +81,7 @@ void ArchiveReader::read_dictionaries_and_metadata() {
7481 read_metadata ();
7582}
7683
77- std::unique_ptr< SchemaReader> ArchiveReader::read_table (
84+ SchemaReader& ArchiveReader::read_table (
7885 int32_t schema_id,
7986 bool should_extract_timestamp,
8087 bool should_marshal_records
@@ -85,93 +92,156 @@ std::unique_ptr<SchemaReader> ArchiveReader::read_table(
8592 throw OperationFailed (ErrorCodeFileNotFound, __FILENAME__, __LINE__);
8693 }
8794
88- auto schema_reader
95+ auto & schema_reader
8996 = create_schema_reader (schema_id, should_extract_timestamp, should_marshal_records);
9097
9198 m_tables_file_reader.try_seek_from_begin (m_id_to_table_metadata[schema_id].offset );
9299 m_tables_decompressor.open (m_tables_file_reader, cDecompressorFileReadBufferCapacity);
93- schema_reader-> load (m_tables_decompressor);
94- m_tables_decompressor.close ();
100+ schema_reader. load (m_tables_decompressor, m_id_to_table_metadata[schema_id]. uncompressed_size );
101+ m_tables_decompressor.close_for_reuse ();
95102 return schema_reader;
96103}
97104
98- BaseColumnReader*
99- ArchiveReader::append_reader_column (std::unique_ptr<SchemaReader>& reader, int32_t column_id) {
105+ BaseColumnReader* ArchiveReader::append_reader_column (SchemaReader& reader, int32_t column_id) {
100106 BaseColumnReader* column_reader = nullptr ;
101- auto node = m_schema_tree->get_node (column_id);
102- std::string key_name = node->get_key_name ();
103- switch (node->get_type ()) {
104- case NodeType::INTEGER:
105- column_reader = new Int64ColumnReader (key_name, column_id);
106- break ;
107- case NodeType::FLOAT:
108- column_reader = new FloatColumnReader (key_name, column_id);
107+ auto const & node = m_schema_tree->get_node (column_id);
108+ switch (node.get_type ()) {
109+ case NodeType::Integer:
110+ column_reader = new Int64ColumnReader (column_id);
109111 break ;
110- case NodeType::CLPSTRING :
111- column_reader = new ClpStringColumnReader (key_name, column_id, m_var_dict, m_log_dict );
112+ case NodeType::Float :
113+ column_reader = new FloatColumnReader ( column_id);
112114 break ;
113- case NodeType::VARSTRING :
114- column_reader = new VariableStringColumnReader (key_name, column_id, m_var_dict);
115+ case NodeType::ClpString :
116+ column_reader = new ClpStringColumnReader ( column_id, m_var_dict, m_log_dict );
115117 break ;
116- case NodeType::BOOLEAN :
117- column_reader = new BooleanColumnReader (key_name, column_id );
118+ case NodeType::VarString :
119+ column_reader = new VariableStringColumnReader (column_id, m_var_dict );
118120 break ;
119- case NodeType::ARRAY:
120- column_reader = new ClpStringColumnReader (
121- key_name,
122- column_id,
123- m_var_dict,
124- m_array_dict,
125- true
126- );
121+ case NodeType::Boolean:
122+ column_reader = new BooleanColumnReader (column_id);
127123 break ;
128- case NodeType::DATESTRING :
129- column_reader = new DateStringColumnReader (key_name, column_id, m_timestamp_dict );
124+ case NodeType::UnstructuredArray :
125+ column_reader = new ClpStringColumnReader (column_id, m_var_dict, m_array_dict, true );
130126 break ;
131- case NodeType::OBJECT:
132- case NodeType::NULLVALUE:
133- reader->append_column (column_id);
127+ case NodeType::DateString:
128+ column_reader = new DateStringColumnReader (column_id, m_timestamp_dict);
134129 break ;
135- case NodeType::UNKNOWN:
130+ // No need to push columns without associated object readers into the SchemaReader.
131+ case NodeType::Object:
132+ case NodeType::NullValue:
133+ case NodeType::Unknown:
136134 break ;
137135 }
138136
139137 if (column_reader) {
140- reader-> append_column (column_reader);
138+ reader. append_column (column_reader);
141139 }
142140 return column_reader;
143141}
144142
145- std::unique_ptr<SchemaReader> ArchiveReader::create_schema_reader (
143+ void ArchiveReader::append_unordered_reader_columns (
144+ SchemaReader& reader,
145+ NodeType unordered_object_type,
146+ std::span<int32_t > schema_ids,
147+ bool should_marshal_records
148+ ) {
149+ int32_t mst_subtree_root_node_id = INT32_MAX;
150+ size_t object_begin_pos = reader.get_column_size ();
151+ for (int32_t column_id : schema_ids) {
152+ if (Schema::schema_entry_is_unordered_object (column_id)) {
153+ continue ;
154+ }
155+ BaseColumnReader* column_reader = nullptr ;
156+ auto const & node = m_schema_tree->get_node (column_id);
157+ if (INT32_MAX == mst_subtree_root_node_id) {
158+ mst_subtree_root_node_id = m_schema_tree->find_matching_subtree_root_in_subtree (
159+ -1 ,
160+ column_id,
161+ unordered_object_type
162+ );
163+ }
164+ switch (node.get_type ()) {
165+ case NodeType::Integer:
166+ column_reader = new Int64ColumnReader (column_id);
167+ break ;
168+ case NodeType::Float:
169+ column_reader = new FloatColumnReader (column_id);
170+ break ;
171+ case NodeType::ClpString:
172+ column_reader = new ClpStringColumnReader (column_id, m_var_dict, m_log_dict);
173+ break ;
174+ case NodeType::VarString:
175+ column_reader = new VariableStringColumnReader (column_id, m_var_dict);
176+ break ;
177+ case NodeType::Boolean:
178+ column_reader = new BooleanColumnReader (column_id);
179+ break ;
180+ // UnstructuredArray and DateString currently aren't supported as part of any unordered
181+ // object, so we disregard them here
182+ case NodeType::UnstructuredArray:
183+ case NodeType::DateString:
184+ // No need to push columns without associated object readers into the SchemaReader.
185+ case NodeType::Object:
186+ case NodeType::NullValue:
187+ case NodeType::Unknown:
188+ break ;
189+ }
190+
191+ if (column_reader) {
192+ reader.append_unordered_column (column_reader);
193+ }
194+ }
195+
196+ if (should_marshal_records) {
197+ reader.mark_unordered_object (object_begin_pos, mst_subtree_root_node_id, schema_ids);
198+ }
199+ }
200+
201+ SchemaReader& ArchiveReader::create_schema_reader (
146202 int32_t schema_id,
147203 bool should_extract_timestamp,
148204 bool should_marshal_records
149205) {
150- auto reader = std::make_unique<SchemaReader>(
206+ auto & schema = (*m_schema_map)[schema_id];
207+ m_schema_reader.reset (
151208 m_schema_tree,
152209 schema_id,
210+ schema.get_ordered_schema_view (),
153211 m_id_to_table_metadata[schema_id].num_messages ,
154212 should_marshal_records
155213 );
156214 auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids ();
157215
158- for (int32_t column_id : (*m_schema_map)[reader->get_schema_id ()]) {
159- BaseColumnReader* column_reader = append_reader_column (reader, column_id);
216+ for (size_t i = 0 ; i < schema.size (); ++i) {
217+ int32_t column_id = schema[i];
218+ if (Schema::schema_entry_is_unordered_object (column_id)) {
219+ size_t length = Schema::get_unordered_object_length (column_id);
220+ append_unordered_reader_columns (
221+ m_schema_reader,
222+ Schema::get_unordered_object_type (column_id),
223+ schema.get_view (i + 1 , length),
224+ should_marshal_records
225+ );
226+ i += length;
227+ continue ;
228+ }
229+ BaseColumnReader* column_reader = append_reader_column (m_schema_reader, column_id);
160230
161231 if (should_extract_timestamp && column_reader && timestamp_column_ids.count (column_id) > 0 )
162232 {
163- reader-> mark_column_as_timestamp (column_reader);
233+ m_schema_reader. mark_column_as_timestamp (column_reader);
164234 }
165235 }
166- return reader ;
236+ return m_schema_reader ;
167237}
168238
169239void ArchiveReader::store (FileWriter& writer) {
170240 std::string message;
171241
172242 for (auto & [id, table_metadata] : m_id_to_table_metadata) {
173- auto schema_reader = read_table (id, false , true );
174- while (schema_reader-> get_next_message (message)) {
243+ auto & schema_reader = read_table (id, false , true );
244+ while (schema_reader. get_next_message (message)) {
175245 writer.write (message.c_str (), message.length ());
176246 }
177247 }
0 commit comments