Skip to content

Commit 5130ca4

Browse files
authored
Merge branch 'main' into helm-admin
2 parents fdf5f23 + 255f232 commit 5130ca4

File tree

5 files changed

+89
-6
lines changed

5 files changed

+89
-6
lines changed

components/core/src/clp_s/ArchiveReader.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,29 @@ void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& netw
3030
}
3131

3232
m_archive_reader_adaptor = std::make_shared<ArchiveReaderAdaptor>(archive_path, network_auth);
33+
initialize_archive_reader();
34+
}
35+
36+
auto ArchiveReader::open(
37+
std::shared_ptr<clp::ReaderInterface> single_file_archive_reader,
38+
std::string_view archive_id
39+
) -> void {
40+
if (m_is_open) {
41+
throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__);
42+
}
43+
m_is_open = true;
44+
45+
if (nullptr == single_file_archive_reader || archive_id.empty()) {
46+
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
47+
}
48+
m_archive_id = archive_id;
49+
50+
m_archive_reader_adaptor
51+
= std::make_shared<ArchiveReaderAdaptor>(std::move(single_file_archive_reader));
52+
initialize_archive_reader();
53+
}
3354

55+
auto ArchiveReader::initialize_archive_reader() -> void {
3456
if (auto const rc = m_archive_reader_adaptor->load_archive_metadata(); ErrorCodeSuccess != rc) {
3557
throw OperationFailed(rc, __FILENAME__, __LINE__);
3658
}

components/core/src/clp_s/ArchiveReader.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ class ArchiveReader {
4141
*/
4242
void open(Path const& archive_path, NetworkAuthOption const& network_auth);
4343

44+
/**
45+
* Opens a single-file archive for reading from an already open `clp::ReaderInterface`.
46+
* @param single_file_archive_reader The already opened archive reader
47+
* @param archive_id The unique name or identifier for the archive
48+
*/
49+
auto open(
50+
std::shared_ptr<clp::ReaderInterface> single_file_archive_reader,
51+
std::string_view archive_id
52+
) -> void;
53+
4454
/**
4555
* Reads the dictionaries and metadata.
4656
*/
@@ -164,6 +174,11 @@ class ArchiveReader {
164174
}
165175

166176
private:
177+
/**
178+
* Reads archive metadata and prepares the archive reader for subsequent archive reads.
179+
*/
180+
auto initialize_archive_reader() -> void;
181+
167182
/**
168183
* Initializes a schema reader passed by reference to become a reader for a given schema.
169184
* @param reader

components/core/src/clp_s/ArchiveReaderAdaptor.cpp

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
#include <spdlog/spdlog.h>
1515

1616
#include "../clp/BoundedReader.hpp"
17+
#include "../clp/ErrorCode.hpp"
1718
#include "../clp/FileReader.hpp"
1819
#include "archive_constants.hpp"
20+
#include "ErrorCode.hpp"
1921
#include "InputConfig.hpp"
2022
#include "RangeIndexWriter.hpp"
2123
#include "SingleFileArchiveDefs.hpp"
@@ -27,7 +29,6 @@ ArchiveReaderAdaptor::ArchiveReaderAdaptor(
2729
)
2830
: m_archive_path{archive_path},
2931
m_network_auth{network_auth},
30-
m_single_file_archive{false},
3132
m_timestamp_dictionary{std::make_shared<TimestampDictionaryReader>()} {
3233
if (InputSource::Filesystem != archive_path.source
3334
|| std::filesystem::is_regular_file(archive_path.path))
@@ -36,6 +37,21 @@ ArchiveReaderAdaptor::ArchiveReaderAdaptor(
3637
}
3738
}
3839

40+
ArchiveReaderAdaptor::ArchiveReaderAdaptor(
41+
std::shared_ptr<clp::ReaderInterface> single_file_archive_reader
42+
)
43+
: m_single_file_archive{true},
44+
m_timestamp_dictionary{std::make_shared<TimestampDictionaryReader>()},
45+
m_reader{std::move(single_file_archive_reader)} {
46+
if (nullptr == m_reader) {
47+
throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__);
48+
}
49+
50+
if (auto const rc = m_reader->try_seek_from_begin(0); clp::ErrorCode::ErrorCode_Success != rc) {
51+
throw OperationFailed(ErrorCodeFailure, __FILENAME__, __LINE__);
52+
}
53+
}
54+
3955
ErrorCode
4056
ArchiveReaderAdaptor::try_read_archive_file_info(ZstdDecompressor& decompressor, size_t size) {
4157
std::vector<char> buffer(size);
@@ -253,6 +269,10 @@ ErrorCode ArchiveReaderAdaptor::try_read_archive_metadata(ZstdDecompressor& deco
253269
}
254270

255271
std::shared_ptr<clp::ReaderInterface> ArchiveReaderAdaptor::try_create_reader_at_header() {
272+
if (nullptr != m_reader) {
273+
return m_reader;
274+
}
275+
256276
if (InputSource::Filesystem == m_archive_path.source && false == m_single_file_archive) {
257277
try {
258278
return std::make_shared<clp::FileReader>(

components/core/src/clp_s/ArchiveReaderAdaptor.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,20 @@ class ArchiveReaderAdaptor {
5353
: TraceableException(error_code, filename, line_number) {}
5454
};
5555

56+
// Constructors
57+
/**
58+
* Creates an adaptor for an archive identified by path and source type.
59+
* @param archive_path Path/URL for a directory archive or a single-file archive.
60+
* @param network_auth Authentication options for network inputs.
61+
*/
5662
explicit ArchiveReaderAdaptor(Path const& archive_path, NetworkAuthOption const& network_auth);
5763

64+
/**
65+
* Creates an adaptor around an already opened single-file archive reader.
66+
* @param single_file_archive_reader An already-opened single-file archive stream.
67+
*/
68+
explicit ArchiveReaderAdaptor(std::shared_ptr<clp::ReaderInterface> single_file_archive_reader);
69+
5870
/**
5971
* Loads metadata for an archive including the header and metadata section. This method must be
6072
* invoked before checking out any section of an archive, or calling `get_timestamp_dictionary`.

docs/src/user-docs/guides-using-log-ingestor.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,26 @@ will be routed through CLP's [API server](./guides-using-the-api-server.md) in a
5151

5252
### Fault tolerance
5353

54-
:::{warning}
55-
**The current version of `log-ingestor` does not provide fault tolerance.**
54+
`log-ingestor` is designed to tolerate unexpected crashes or restarts without losing information
55+
about ingestion jobs or the files that have been submitted for compression. Note that this does not
56+
include fault tolerance of the components external to `log-ingestor`. Specifically, `log-ingestor`
57+
guarantees the following, even in the presence of crashes or restarts of `log-ingestor`:
5658

57-
If `log-ingestor` crashes or is restarted, all in-progress ingestion jobs and their associated state
58-
will be lost, and must be restored manually. Robust fault tolerance for the ingestion pipeline is
59-
planned for a future release.
59+
* Any ingestion job successfully submitted to `log-ingestor` will run continuously.
60+
* Within an ingestion job, any files that have been found on S3 or received as messages from SQS
61+
queue will eventually be submitted for compression.
62+
63+
:::{note}
64+
`log-ingestor` **DOES NOT** guarantee the following after a crash or restart:
65+
66+
* Any file submitted for compression (that can be compressed successfully) will eventually be
67+
compressed successfully.
68+
* This is because failures of the compression cluster are external to `log-ingestor`. Future
69+
versions of CLP will address this limitation.
70+
* Any file submitted for compression will *only* be compressed once.
71+
* This is because for [SQS listener](#sqs-listener) ingestion jobs, the processes for deleting
72+
messages from the SQS queue and recording the files for ingestion are not synchronized. As a
73+
result, a failure during this process may cause the same file to be ingested multiple times.
6074
:::
6175

6276
---

0 commit comments

Comments
 (0)