Skip to content

Commit f82e611

Browse files
gibber9809jackluo923junhaoliaoLinZhihao-723
authored
feat(clp-s)!: Add Timestamp column type to replaceDateString column type; Bump the archive version to 0.5.0. (#1788)
Co-authored-by: Jack Luo <jack.luo@yscope.com> Co-authored-by: Junhao Liao <junhao.liao@yscope.com> Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
1 parent f64b4e7 commit f82e611

38 files changed

+691
-614
lines changed

components/core/cmake/Options/options.cmake

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ function(validate_clp_binaries_dependencies)
132132
CLP_BUILD_CLP_S_SEARCH
133133
CLP_BUILD_CLP_S_SEARCH_AST
134134
CLP_BUILD_CLP_S_SEARCH_KQL
135+
CLP_BUILD_CLP_S_TIMESTAMP_PARSER
135136
)
136137
endfunction()
137138

@@ -207,6 +208,7 @@ function(validate_clp_s_archivereader_dependencies)
207208
CLP_BUILD_CLP_STRING_UTILS
208209
CLP_BUILD_CLP_S_CLP_DEPENDENCIES
209210
CLP_BUILD_CLP_S_IO
211+
CLP_BUILD_CLP_S_TIMESTAMP_PARSER
210212
CLP_BUILD_CLP_S_TIMESTAMPPATTERN
211213
)
212214
endfunction()
@@ -228,6 +230,7 @@ function(validate_clp_s_archivewriter_dependencies)
228230
validate_clp_dependencies_for_target(CLP_BUILD_CLP_S_ARCHIVEWRITER
229231
CLP_BUILD_CLP_S_CLP_DEPENDENCIES
230232
CLP_BUILD_CLP_S_IO
233+
CLP_BUILD_CLP_S_TIMESTAMP_PARSER
231234
CLP_BUILD_CLP_S_TIMESTAMPPATTERN
232235
)
233236
endfunction()

components/core/src/clp_s/ArchiveReader.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,12 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
212212
case NodeType::UnstructuredArray:
213213
column_reader = new ClpStringColumnReader(column_id, m_var_dict, m_array_dict, true);
214214
break;
215-
case NodeType::DateString:
216-
column_reader = new DateStringColumnReader(column_id, get_timestamp_dictionary());
215+
case NodeType::DeprecatedDateString:
216+
column_reader
217+
= new DeprecatedDateStringColumnReader(column_id, get_timestamp_dictionary());
218+
break;
219+
case NodeType::Timestamp:
220+
column_reader = new TimestampColumnReader(column_id, get_timestamp_dictionary());
217221
break;
218222
// No need to push columns without associated object readers into the SchemaReader.
219223
case NodeType::Metadata:
@@ -268,10 +272,11 @@ void ArchiveReader::append_unordered_reader_columns(
268272
case NodeType::Boolean:
269273
column_reader = new BooleanColumnReader(column_id);
270274
break;
271-
// UnstructuredArray and DateString currently aren't supported as part of any unordered
272-
// object, so we disregard them here
275+
// UnstructuredArray, DeprecatedDateString, and Timestamp currently aren't supported as
276+
// part of any unordered object, so we disregard them here
273277
case NodeType::UnstructuredArray:
274-
case NodeType::DateString:
278+
case NodeType::DeprecatedDateString:
279+
case NodeType::Timestamp:
275280
// No need to push columns without associated object readers into the SchemaReader.
276281
case NodeType::StructuredArray:
277282
case NodeType::Object:

components/core/src/clp_s/ArchiveReader.hpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ class ArchiveReader {
151151
*/
152152
bool has_log_order() { return m_log_event_idx_column_id >= 0; }
153153

154+
/**
155+
* @return Whether this archive can contain columns with the deprecated DateString timestamp
156+
* format.
157+
*/
158+
[[nodiscard]] auto has_deprecated_timestamp_format() const -> bool {
159+
return get_header().has_deprecated_timestamp_format();
160+
}
161+
154162
private:
155163
/**
156164
* Initializes a schema reader passed by reference to become a reader for a given schema.

components/core/src/clp_s/ArchiveReaderAdaptor.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,10 @@ ArchiveReaderAdaptor::try_read_archive_file_info(ZstdDecompressor& decompressor,
7575

7676
ErrorCode
7777
ArchiveReaderAdaptor::try_read_timestamp_dictionary(ZstdDecompressor& decompressor, size_t size) {
78-
return m_timestamp_dictionary->read(decompressor);
78+
return m_timestamp_dictionary->read(
79+
decompressor,
80+
m_archive_header.has_deprecated_timestamp_format()
81+
);
7982
}
8083

8184
ErrorCode ArchiveReaderAdaptor::try_read_archive_info(ZstdDecompressor& decompressor, size_t size) {

components/core/src/clp_s/ArchiveWriter.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,13 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
333333
std::make_unique<ClpStringColumnWriter>(id, m_var_dict, m_array_dict)
334334
);
335335
break;
336-
case NodeType::DateString:
337-
writer->append_column(std::make_unique<DateStringColumnWriter>(id));
338-
break;
339336
case NodeType::DeltaInteger:
340337
writer->append_column(std::make_unique<DeltaEncodedInt64ColumnWriter>(id));
341338
break;
339+
case NodeType::Timestamp:
340+
writer->append_column(std::make_unique<TimestampColumnWriter>(id));
341+
break;
342+
case NodeType::DeprecatedDateString:
342343
case NodeType::Metadata:
343344
case NodeType::NullValue:
344345
case NodeType::Object:

components/core/src/clp_s/ArchiveWriter.hpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,34 +193,49 @@ class ArchiveWriter {
193193
int32_t add_schema(Schema const& schema) { return m_schema_map.add_schema(schema); }
194194

195195
/**
196-
* Ingests a timestamp entry from a string
196+
* Ingests a timestamp entry from a string.
197197
* @param key
198198
* @param node_id
199199
* @param timestamp
200-
* @param pattern_id
201-
* @return the epoch time corresponding to the string timestamp
200+
* @param is_json_literal
201+
* @return Forwards `TimestampDictionaryWriter::ingest_string_timestamp`'s return values.
202202
*/
203-
epochtime_t ingest_timestamp_entry(
203+
[[nodiscard]] auto ingest_string_timestamp(
204204
std::string_view key,
205205
int32_t node_id,
206206
std::string_view timestamp,
207-
uint64_t& pattern_id
208-
) {
209-
return m_timestamp_dict.ingest_entry(key, node_id, timestamp, pattern_id);
207+
bool is_json_literal
208+
) -> std::pair<epochtime_t, uint64_t> {
209+
return m_timestamp_dict.ingest_string_timestamp(key, node_id, timestamp, is_json_literal);
210210
}
211211

212212
/**
213-
* Ingests a timestamp entry from a number
214-
* @param column_key
213+
* Ingests a numeric JSON entry.
214+
* @param key
215215
* @param node_id
216216
* @param timestamp
217+
* @return Forwards `TimestampDictionaryWriter::ingest_numeric_json_timestamp`'s return values.
217218
*/
218-
void ingest_timestamp_entry(std::string_view key, int32_t node_id, double timestamp) {
219-
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
219+
[[nodiscard]] auto
220+
ingest_numeric_json_timestamp(std::string_view key, int32_t node_id, std::string_view timestamp)
221+
-> std::pair<epochtime_t, uint64_t> {
222+
return m_timestamp_dict.ingest_numeric_json_timestamp(key, node_id, timestamp);
220223
}
221224

222-
void ingest_timestamp_entry(std::string_view key, int32_t node_id, int64_t timestamp) {
223-
m_timestamp_dict.ingest_entry(key, node_id, timestamp);
225+
/**
226+
* Ingests an unknown precision epoch timestamp.
227+
* @param key
228+
* @param node_id
229+
* @param timestamp
230+
* @return Forwards `TimestampDictionaryWriter::ingest_unknown_precision_epoch_timestamp`'s
231+
* return values.
232+
*/
233+
[[nodiscard]] auto ingest_unknown_precision_epoch_timestamp(
234+
std::string_view key,
235+
int32_t node_id,
236+
int64_t timestamp
237+
) -> std::pair<epochtime_t, uint64_t> {
238+
return m_timestamp_dict.ingest_unknown_precision_epoch_timestamp(key, node_id, timestamp);
224239
}
225240

226241
/**

components/core/src/clp_s/CMakeLists.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,14 @@ if(CLP_BUILD_CLP_S_ARCHIVEWRITER)
281281
absl::flat_hash_map
282282
clp_s::clp_dependencies
283283
clp_s::io
284+
clp_s::timestamp_parser
285+
clp_s::timestamp_pattern
284286
msgpack-cxx
285287
nlohmann_json::nlohmann_json
286288
simdjson::simdjson
287289
ystdlib::error_handling
288290
PRIVATE
289291
Boost::url
290-
clp_s::timestamp_pattern
291292
${CURL_LIBRARIES}
292293
fmt::fmt
293294
spdlog::spdlog
@@ -345,13 +346,14 @@ if(CLP_BUILD_CLP_S_ARCHIVEREADER)
345346
absl::flat_hash_map
346347
clp::string_utils
347348
clp_s::io
349+
clp_s::timestamp_parser
350+
clp_s::timestamp_pattern
348351
msgpack-cxx
349352
nlohmann_json::nlohmann_json
350353
ystdlib::error_handling
351354
PRIVATE
352355
Boost::url
353356
clp_s::clp_dependencies
354-
clp_s::timestamp_pattern
355357
${CURL_LIBRARIES}
356358
fmt::fmt
357359
spdlog::spdlog

components/core/src/clp_s/ColumnReader.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,31 +226,31 @@ int64_t VariableStringColumnReader::get_variable_id(uint64_t cur_message) {
226226
return m_variables[cur_message];
227227
}
228228

229-
void DateStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
229+
void DeprecatedDateStringColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
230230
m_timestamps = reader.read_unaligned_span<int64_t>(num_messages);
231231
m_timestamp_encodings = reader.read_unaligned_span<int64_t>(num_messages);
232232
}
233233

234-
std::variant<int64_t, double, std::string, uint8_t> DateStringColumnReader::extract_value(
234+
std::variant<int64_t, double, std::string, uint8_t> DeprecatedDateStringColumnReader::extract_value(
235235
uint64_t cur_message
236236
) {
237-
return m_timestamp_dict->get_string_encoding(
237+
return m_timestamp_dict->get_deprecated_timestamp_string_encoding(
238238
m_timestamps[cur_message],
239239
m_timestamp_encodings[cur_message]
240240
);
241241
}
242242

243-
void DateStringColumnReader::extract_string_value_into_buffer(
243+
void DeprecatedDateStringColumnReader::extract_string_value_into_buffer(
244244
uint64_t cur_message,
245245
std::string& buffer
246246
) {
247-
buffer.append(m_timestamp_dict->get_string_encoding(
247+
buffer.append(m_timestamp_dict->get_deprecated_timestamp_string_encoding(
248248
m_timestamps[cur_message],
249249
m_timestamp_encodings[cur_message]
250250
));
251251
}
252252

253-
epochtime_t DateStringColumnReader::get_encoded_time(uint64_t cur_message) {
253+
epochtime_t DeprecatedDateStringColumnReader::get_encoded_time(uint64_t cur_message) {
254254
return m_timestamps[cur_message];
255255
}
256256

components/core/src/clp_s/ColumnReader.hpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -327,20 +327,23 @@ class VariableStringColumnReader : public BaseColumnReader {
327327
UnalignedMemSpan<uint64_t> m_variables;
328328
};
329329

330-
class DateStringColumnReader : public BaseColumnReader {
330+
class DeprecatedDateStringColumnReader : public BaseColumnReader {
331331
public:
332332
// Constructor
333-
DateStringColumnReader(int32_t id, std::shared_ptr<TimestampDictionaryReader> timestamp_dict)
333+
DeprecatedDateStringColumnReader(
334+
int32_t id,
335+
std::shared_ptr<TimestampDictionaryReader> timestamp_dict
336+
)
334337
: BaseColumnReader(id),
335338
m_timestamp_dict(std::move(timestamp_dict)) {}
336339

337340
// Destructor
338-
~DateStringColumnReader() override = default;
341+
~DeprecatedDateStringColumnReader() override = default;
339342

340343
// Methods inherited from BaseColumnReader
341344
void load(BufferViewReader& reader, uint64_t num_messages) override;
342345

343-
NodeType get_type() override { return NodeType::DateString; }
346+
NodeType get_type() override { return NodeType::DeprecatedDateString; }
344347

345348
std::variant<int64_t, double, std::string, uint8_t> extract_value(
346349
uint64_t cur_message

components/core/src/clp_s/ColumnWriter.cpp

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -165,21 +165,6 @@ void VariableStringColumnWriter::store(ZstdCompressor& compressor) {
165165
compressor.write(reinterpret_cast<char const*>(m_var_dict_ids.data()), size);
166166
}
167167

168-
size_t DateStringColumnWriter::add_value(ParsedMessage::variable_t& value) {
169-
auto encoded_timestamp = std::get<std::pair<uint64_t, epochtime_t>>(value);
170-
m_timestamps.push_back(encoded_timestamp.second);
171-
m_timestamp_encodings.push_back(encoded_timestamp.first);
172-
return 2 * sizeof(int64_t);
173-
;
174-
}
175-
176-
void DateStringColumnWriter::store(ZstdCompressor& compressor) {
177-
size_t timestamps_size = m_timestamps.size() * sizeof(int64_t);
178-
compressor.write(reinterpret_cast<char const*>(m_timestamps.data()), timestamps_size);
179-
size_t encodings_size = m_timestamp_encodings.size() * sizeof(int64_t);
180-
compressor.write(reinterpret_cast<char const*>(m_timestamp_encodings.data()), encodings_size);
181-
}
182-
183168
auto TimestampColumnWriter::add_value(ParsedMessage::variable_t& value) -> size_t {
184169
auto const [timestamp, encoding] = std::get<std::pair<epochtime_t, uint64_t>>(value);
185170
auto const encoded_timestamp_size{m_timestamps.add_value(timestamp)};

0 commit comments

Comments
 (0)