Skip to content

Commit 4feaa7b

Browse files
committed
WIP
Signed-off-by: JCW <[email protected]>
1 parent 45a4f44 commit 4feaa7b

File tree

13 files changed

+330
-67
lines changed

13 files changed

+330
-67
lines changed

include/xrpl/basics/Log.h

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
#include <boost/filesystem.hpp>
2828

2929
#include <array>
30+
#include <atomic>
3031
#include <chrono>
32+
#include <condition_variable>
3133
#include <fstream>
3234
#include <map>
3335
#include <memory>
3436
#include <mutex>
3537
#include <span>
38+
#include <thread>
3639
#include <utility>
3740

3841
namespace ripple {
@@ -71,10 +74,10 @@ class Logs
7174
operator=(Sink const&) = delete;
7275

7376
void
74-
write(beast::severities::Severity level, std::string_view text) override;
77+
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
7578

7679
void
77-
writeAlways(beast::severities::Severity level, std::string_view text)
80+
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
7881
override;
7982
};
8083

@@ -132,12 +135,12 @@ class Logs
132135
Does nothing if there is no associated system file.
133136
*/
134137
void
135-
write(std::string_view str);
138+
write(std::string&& str);
136139

137140
/** @} */
138141

139142
private:
140-
std::unique_ptr<std::ofstream> m_stream;
143+
std::optional<std::ofstream> m_stream;
141144
boost::filesystem::path m_path;
142145
};
143146

@@ -153,11 +156,18 @@ class Logs
153156

154157
// Batching members
155158
mutable std::mutex batchMutex_;
156-
public:
159+
beast::lockfree::queue<std::string> messages_;
157160
static constexpr size_t BATCH_BUFFER_SIZE = 64 * 1024; // 64KB buffer
158161
std::array<char, BATCH_BUFFER_SIZE> batchBuffer_{};
159162
std::span<char> writeBuffer_; // Points to available write space
160163
std::span<char> readBuffer_; // Points to data ready to flush
164+
165+
// Log thread members
166+
std::thread logThread_;
167+
std::atomic<bool> stopLogThread_;
168+
std::mutex logMutex_;
169+
std::condition_variable logCondition_;
170+
161171
private:
162172
std::chrono::steady_clock::time_point lastFlush_ =
163173
std::chrono::steady_clock::now();
@@ -208,6 +218,7 @@ class Logs
208218
beast::severities::Severity level,
209219
std::string const& partition,
210220
std::string_view text,
221+
beast::Journal::MessagePoolNode owner,
211222
bool console);
212223

213224
std::string
@@ -261,6 +272,9 @@ class Logs
261272

262273
void
263274
flushBatchUnsafe();
275+
276+
void
277+
logThreadWorker();
264278
};
265279

266280
// Wraps a Journal::Stream to skip evaluation of

include/xrpl/beast/utility/Journal.h

Lines changed: 189 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <xrpl/beast/utility/instrumentation.h>
2424

25+
#include <thread>
2526
#include <atomic>
2627
#include <charconv>
2728
#include <cstring>
@@ -34,6 +35,164 @@
3435
#include <string_view>
3536
#include <utility>
3637

38+
namespace beast {
39+
40+
class StringBufferPool {
41+
public:
42+
// ----- Empty index marker -----
43+
static constexpr std::uint32_t kEmptyIdx = std::numeric_limits<std::uint32_t>::max();
44+
45+
// ----- Single-word CAS target: {tag | idx} with pack/unpack -----
46+
struct Head {
47+
std::uint32_t tag;
48+
std::uint32_t idx; // kEmptyIdx means empty
49+
50+
static std::uint64_t pack(Head h) noexcept {
51+
return (std::uint64_t(h.tag) << 32) | h.idx;
52+
}
53+
static Head unpack(std::uint64_t v) noexcept {
54+
return Head{ std::uint32_t(v >> 32), std::uint32_t(v) };
55+
}
56+
};
57+
58+
// ----- Internal node -----
59+
struct Node {
60+
std::uint32_t next_idx{kEmptyIdx};
61+
std::uint32_t self_idx{kEmptyIdx};
62+
std::string buf{};
63+
};
64+
static_assert(std::is_standard_layout_v<Node>, "Node must be standard layout");
65+
66+
// ----- User-facing move-only RAII handle -----
67+
class Handle {
68+
public:
69+
Handle() = default;
70+
Handle(Handle&& other) noexcept
71+
: owner_(other.owner_), node_(other.node_) {
72+
other.owner_ = nullptr; other.node_ = nullptr;
73+
}
74+
Handle& operator=(Handle&& other) noexcept {
75+
if (this != &other) {
76+
// Return current if still held
77+
if (owner_ && node_) owner_->give_back(std::move(*this));
78+
owner_ = other.owner_;
79+
node_ = other.node_;
80+
other.owner_ = nullptr;
81+
other.node_ = nullptr;
82+
}
83+
return *this;
84+
}
85+
86+
Handle(const Handle&) = delete;
87+
Handle& operator=(const Handle&) = delete;
88+
89+
~Handle() noexcept {
90+
if (owner_ && node_) owner_->give_back(std::move(*this));
91+
}
92+
93+
bool valid() const noexcept { return node_ != nullptr; }
94+
std::string& string() noexcept { return node_->buf; }
95+
const std::string& string() const noexcept { return node_->buf; }
96+
97+
private:
98+
friend class StringBufferPool;
99+
Handle(StringBufferPool* owner, Node* n) : owner_(owner), node_(n) {}
100+
101+
StringBufferPool* owner_ = nullptr;
102+
Node* node_ = nullptr;
103+
};
104+
105+
explicit StringBufferPool(std::uint32_t grow_by = 20)
106+
: grow_by_(grow_by), head_(Head::pack({0, kEmptyIdx})) {}
107+
108+
// Rent a buffer; grows on demand. Returns move-only RAII handle.
109+
Handle rent() {
110+
for (;;) {
111+
std::uint64_t old64 = head_.load(std::memory_order_acquire);
112+
Head old = Head::unpack(old64);
113+
if (old.idx == kEmptyIdx) { grow_(); continue; } // rare slow path
114+
115+
Node& n = nodes_[old.idx];
116+
std::uint32_t next = n.next_idx;
117+
118+
Head neu{ std::uint32_t(old.tag + 1), next };
119+
if (head_.compare_exchange_weak(old64, Head::pack(neu),
120+
std::memory_order_acq_rel,
121+
std::memory_order_acquire)) {
122+
return {this, &n};
123+
}
124+
}
125+
}
126+
127+
private:
128+
// Only the pool/handle can call this
129+
void give_back(Handle&& h) noexcept {
130+
Node* node = h.node_;
131+
if (!node) return; // already invalid
132+
const std::uint32_t idx = node->self_idx;
133+
134+
node->buf.clear();
135+
136+
for (;;) {
137+
std::uint64_t old64 = head_.load(std::memory_order_acquire);
138+
Head old = Head::unpack(old64);
139+
140+
node->next_idx = old.idx;
141+
142+
Head neu{ std::uint32_t(old.tag + 1), idx };
143+
if (head_.compare_exchange_weak(old64, Head::pack(neu),
144+
std::memory_order_acq_rel,
145+
std::memory_order_acquire)) {
146+
// Invalidate handle (prevents double return)
147+
h.owner_ = nullptr;
148+
h.node_ = nullptr;
149+
return;
150+
}
151+
}
152+
}
153+
154+
void grow_() {
155+
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
156+
std::scoped_lock lk(grow_mu_);
157+
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
158+
159+
const std::uint32_t base = static_cast<std::uint32_t>(nodes_.size());
160+
nodes_.resize(base + grow_by_); // indices [base .. base+grow_by_-1]
161+
162+
// Init nodes and local chain
163+
for (std::uint32_t i = 0; i < grow_by_; ++i) {
164+
std::uint32_t idx = base + i;
165+
Node& n = nodes_[idx];
166+
n.self_idx = idx;
167+
n.next_idx = (i + 1 < grow_by_) ? (idx + 1) : kEmptyIdx;
168+
}
169+
170+
// Splice chain onto global head: [base .. base+grow_by_-1]
171+
const std::uint32_t chain_head = base;
172+
const std::uint32_t chain_tail = base + grow_by_ - 1;
173+
174+
for (;;) {
175+
std::uint64_t old64 = head_.load(std::memory_order_acquire);
176+
Head old = Head::unpack(old64);
177+
178+
nodes_[chain_tail].next_idx = old.idx; // tail -> old head
179+
Head neu{ std::uint32_t(old.tag + 1), chain_head };
180+
181+
if (head_.compare_exchange_weak(old64, Head::pack(neu),
182+
std::memory_order_acq_rel,
183+
std::memory_order_acquire)) {
184+
break;
185+
}
186+
}
187+
}
188+
189+
const std::uint32_t grow_by_;
190+
std::atomic<std::uint64_t> head_; // single 64-bit CAS (Head packed)
191+
std::mutex grow_mu_; // only during growth
192+
std::deque<Node> nodes_; // stable storage for nodes/strings
193+
};
194+
} // namespace beast
195+
37196
namespace ripple::log {
38197
template <typename T>
39198
class LogParameter
@@ -181,10 +340,10 @@ class SimpleJsonWriter
181340
buffer_.append(str);
182341
}
183342

184-
[[nodiscard]] std::string_view
343+
void
185344
finish()
186345
{
187-
return std::string_view{buffer_.c_str(), buffer_.size() - 1};
346+
buffer_.pop_back();
188347
}
189348

190349
private:
@@ -323,18 +482,25 @@ class Journal
323482

324483
class Sink;
325484

485+
using MessagePoolNode = lockfree::queue<std::string>::Node*;
486+
326487
class JsonLogContext
327488
{
328-
std::string buffer_;
489+
MessagePoolNode messageBuffer_;
329490
detail::SimpleJsonWriter messageParamsWriter_;
330491
bool hasMessageParams_ = false;
331492

332493
public:
333-
JsonLogContext() : messageParamsWriter_(buffer_)
494+
explicit JsonLogContext()
495+
: messageBuffer_(rentFromPool())
496+
, messageParamsWriter_(messageBuffer_->data)
334497
{
335-
buffer_.reserve(1024 * 5);
498+
messageBuffer_->data.reserve(1024 * 5);
336499
}
337500

501+
MessagePoolNode
502+
messageBuffer() { return messageBuffer_; }
503+
338504
void
339505
startMessageParams()
340506
{
@@ -379,6 +545,7 @@ class Journal
379545
static std::shared_mutex globalLogAttributesMutex_;
380546
static bool jsonLogsEnabled_;
381547

548+
static lockfree::queue<std::string> messagePool_;
382549
static thread_local JsonLogContext currentJsonLogContext_;
383550

384551
// Invariant: m_sink always points to a valid Sink
@@ -389,12 +556,26 @@ class Journal
389556
std::source_location location,
390557
severities::Severity severity) const;
391558

392-
static std::string_view
559+
static MessagePoolNode
393560
formatLog(std::string const& message);
394561

395562
public:
396563
//--------------------------------------------------------------------------
397564

565+
static MessagePoolNode
566+
rentFromPool()
567+
{
568+
auto node = messagePool_.pop();
569+
if (!node)
570+
{
571+
node = new lockfree::queue<std::string>::Node();
572+
}
573+
return node;
574+
}
575+
576+
static void
577+
returnMessageNode(MessagePoolNode node) { messagePool_.push(node); }
578+
398579
static void
399580
enableStructuredJournal();
400581

@@ -444,7 +625,7 @@ class Journal
444625
level is below the current threshold().
445626
*/
446627
virtual void
447-
write(Severity level, std::string_view text) = 0;
628+
write(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0;
448629

449630
/** Bypass filter and write text to the sink at the specified severity.
450631
* Always write the message, but maintain the same formatting as if
@@ -454,7 +635,7 @@ class Journal
454635
* @param text Text to write to sink.
455636
*/
456637
virtual void
457-
writeAlways(Severity level, std::string_view text) = 0;
638+
writeAlways(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0;
458639

459640
private:
460641
Severity thresh_;

include/xrpl/beast/utility/WrappedSink.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,17 +88,17 @@ class WrappedSink : public beast::Journal::Sink
8888
}
8989

9090
void
91-
write(beast::severities::Severity level, std::string_view text) override
91+
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
9292
{
9393
using beast::Journal;
94-
sink_.write(level, prefix_ + std::string{text});
94+
sink_.write(level, prefix_ + std::string(text), owner);
9595
}
9696

9797
void
98-
writeAlways(severities::Severity level, std::string_view text) override
98+
writeAlways(severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
9999
{
100100
using beast::Journal;
101-
sink_.writeAlways(level, prefix_ + std::string{text});
101+
sink_.writeAlways(level, prefix_ + std::string(text), owner);
102102
}
103103
};
104104

0 commit comments

Comments
 (0)