Skip to content

Commit 7e91d8b

Browse files
committed
lsm: switch to structured logging
Additionally add a logs_to_trace script that can create a trace JSON file that can be viewed in chrome://tracing from the logs output to visualize what is going on.
1 parent 7032ce1 commit 7e91d8b

File tree

5 files changed

+176
-19
lines changed

5 files changed

+176
-19
lines changed

src/v/lsm/db/gc_actor.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
namespace lsm::db {
1818

1919
ss::future<> gc_actor::process(gc_message msg) {
20+
vlog(log.trace, "gc_actor_process_start");
2021
auto gen = _persistence->list_files();
2122
chunked_hash_set<internal::file_handle> seen_gc_files;
23+
int deleted = 0;
2224
while (auto file_handle_opt = co_await gen()) {
2325
if (_as.abort_requested()) {
2426
co_return;
@@ -59,11 +61,13 @@ ss::future<> gc_actor::process(gc_message msg) {
5961
co_await _table_cache->evict(file_handle);
6062
co_await _persistence->remove_file(file_handle);
6163
_pending_deletes.erase(it);
64+
++deleted;
6265
}
66+
vlog(log.trace, "gc_actor_process_end deleted={}", deleted);
6367
}
6468

6569
void gc_actor::on_error(std::exception_ptr ex) noexcept {
66-
vlog(log.warn, "error in LSM tree GC: {}", ex);
70+
vlog(log.warn, "gc_actor_process_end error=\"{}\"", ex);
6771
}
6872

6973
} // namespace lsm::db

src/v/lsm/db/impl.cc

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include "lsm/sst/block_cache.h"
2626
#include "lsm/sst/builder.h"
2727

28-
#include <seastar/core/coroutine.hh>
2928
#include <seastar/core/sleep.hh>
3029
#include <seastar/coroutine/as_future.hh>
3130

@@ -54,6 +53,7 @@ impl::impl(ctor, io::persistence p, ss::lw_shared_ptr<internal::options> o)
5453

5554
ss::future<std::unique_ptr<impl>> impl::open(
5655
ss::lw_shared_ptr<internal::options> opts, io::persistence persistence) {
56+
vlog(log.trace, "open_start");
5757
auto db = std::make_unique<impl>(
5858
ctor{}, std::move(persistence), std::move(opts));
5959
co_await db->_gc_actor.start();
@@ -65,21 +65,22 @@ ss::future<std::unique_ptr<impl>> impl::open(
6565
}
6666
// If we're readonly, we don't need to start any compaction loop.
6767
if (db->_opts->readonly) {
68+
vlog(log.trace, "open_end readonly=true");
6869
co_return db;
6970
}
7071
db->_background_work = ss::with_scheduling_group(
7172
db->_opts->compaction_scheduling_group, [db = db.get()] {
7273
return ss::do_until(
7374
[db] { return db->_as.abort_requested(); },
7475
[db] {
75-
vlog(log.trace, "waiting for background work");
7676
return db->_start_background_work_signal.wait(db->_as)
7777
.then([db] {
7878
db->_background_work_running = true;
79-
vlog(log.trace, "start background compaction");
79+
vlog(log.trace, "compaction_loop_start");
8080
return db->run_background_compaction();
8181
})
8282
.then_wrapped([db](ss::future<> fut) {
83+
vlog(log.trace, "compaction_loop_end");
8384
db->_background_work_running = false;
8485
db->maybe_schedule_compaction();
8586
db->_background_work_finished_signal.broadcast();
@@ -89,18 +90,19 @@ ss::future<std::unique_ptr<impl>> impl::open(
8990
} catch (const abort_requested_exception& ex) {
9091
vlog(
9192
log.debug,
92-
"LSM background loop got abort request: {}",
93+
"compaction_loop_error abort=true error=\"{}\"",
9394
ex.what());
9495
} catch (const io_error_exception& ex) {
9596
vlog(
9697
log.warn,
97-
"LSM background loop hit IO error: {}",
98+
"compaction_loop_error io=true error=\"{}\"",
9899
ex.what());
99100
} catch (...) {
100101
auto ep = std::current_exception();
101102
vlog(
102103
log.error,
103-
"Unexpected error in LSM background loop: {}",
104+
"compaction_loop_error unexpected=true "
105+
"error=\"{}\"",
104106
ep);
105107
}
106108
// Signal so that we immediately retry
@@ -110,6 +112,7 @@ ss::future<std::unique_ptr<impl>> impl::open(
110112
});
111113
});
112114
});
115+
vlog(log.trace, "open_end readonly=false");
113116
co_return db;
114117
}
115118

@@ -133,7 +136,7 @@ ss::future<> impl::make_room_for_write() {
133136
&& _versions->current()->num_files(0_level)
134137
> _opts->level_zero_slowdown_writes_trigger) {
135138
// We're in throttling mode
136-
vlog(log.debug, "throttling writes due to number of L0 files");
139+
vlog(log.debug, "throttling_writes reason=l0_file_count");
137140
try {
138141
co_await ss::sleep_abortable(std::chrono::seconds(1), _as);
139142
} catch (...) {
@@ -151,7 +154,7 @@ ss::future<> impl::make_room_for_write() {
151154
co_return;
152155
}
153156
if (_imm) {
154-
vlog(log.warn, "blocking writes as in memory buffers are full");
157+
vlog(log.warn, "blocking_writes reason=memtable_full");
155158
// We are over the write buffer limit and we have a pending
156159
// memtable flush, wait for it to finish.
157160
co_await _background_work_finished_signal.wait(_as);
@@ -160,16 +163,13 @@ ss::future<> impl::make_room_for_write() {
160163
if (
161164
_versions->current()->num_files(0_level)
162165
> _opts->level_zero_stop_writes_trigger) {
163-
vlog(
164-
log.warn,
165-
"too many L0 files, writing for compaction to finish before "
166-
"allowing more writes");
166+
vlog(log.warn, "blocking_writes reason=l0_full");
167167
// We've hit out L0 file limit, wait for compaction to finish.
168168
co_await _background_work_finished_signal.wait(_as);
169169
continue;
170170
}
171171
// We're over our limit, let's make a new memtable
172-
vlog(log.trace, "scheduling memtable flush");
172+
vlog(log.trace, "scheduling_memtable_flush");
173173
_imm = std::exchange(_mem, ss::make_lw_shared<memtable>());
174174
maybe_schedule_compaction();
175175
}
@@ -265,6 +265,7 @@ ss::future<> impl::flush() {
265265
}
266266

267267
ss::future<> impl::close() {
268+
vlog(log.trace, "close_start");
268269
_as.request_abort_ex(abort_requested_exception("database closing"));
269270
auto fut = std::exchange(_background_work, std::nullopt);
270271
if (fut) {
@@ -274,12 +275,14 @@ ss::future<> impl::close() {
274275
co_await _table_cache->close();
275276
co_await _persistence.data->close();
276277
co_await _persistence.metadata->close();
278+
vlog(log.trace, "close_end");
277279
if (fut && fut->failed()) {
278280
std::rethrow_exception(fut->get_exception());
279281
}
280282
}
281283

282284
ss::future<> impl::recover() {
285+
vlog(log.trace, "recover_start");
283286
co_await _versions->recover();
284287
// If requested, then pre-open all the files we know about.
285288
if (auto max_fibers = _opts->max_pre_open_fibers) {
@@ -291,12 +294,15 @@ ss::future<> impl::recover() {
291294
all_files.push_back(std::move(file));
292295
}
293296
}
297+
vlog(log.trace, "recover_pre_open_start files={}", all_files.size());
294298
co_await ss::max_concurrent_for_each(
295299
all_files, max_fibers, [this](ss::lw_shared_ptr<file_meta_data> f) {
296300
return _table_cache->create_iterator(f->handle, f->file_size)
297301
.discard_result();
298302
});
303+
vlog(log.trace, "recover_pre_open_end files={}", all_files.size());
299304
}
305+
vlog(log.trace, "recover_end");
300306
}
301307

302308
void impl::maybe_schedule_compaction() {
@@ -364,6 +370,17 @@ ss::future<> impl::run_background_compaction() {
364370
co_return;
365371
}
366372
auto compaction = *std::move(maybe_compaction);
373+
auto input_level = compaction.level();
374+
auto output_level = input_level + 1_level;
375+
auto num_input_files
376+
= compaction.num_input_files(compaction::which::input_level)
377+
+ compaction.num_input_files(compaction::which::output_level);
378+
vlog(
379+
log.trace,
380+
"compaction_start input_level={} output_level={} input_files={}",
381+
input_level,
382+
output_level,
383+
num_input_files);
367384
if (compaction.is_trivial_move()) {
368385
auto input_level_files = compaction.num_input_files(
369386
compaction::which::input_level);
@@ -382,7 +399,16 @@ ss::future<> impl::run_background_compaction() {
382399
.oldest_seqno = file->oldest_seqno,
383400
.newest_seqno = file->newest_seqno,
384401
});
402+
vlog(log.trace, "manifest_write_start");
385403
co_await _versions->log_and_apply(std::move(*compaction.edit()));
404+
vlog(log.trace, "manifest_write_end");
405+
vlog(
406+
log.trace,
407+
"compaction_end input_level={} output_level={} output_files=1 "
408+
"output_bytes={} trivial_move=true",
409+
input_level,
410+
output_level,
411+
file->file_size);
386412
co_return;
387413
}
388414
compaction_state state{
@@ -395,7 +421,6 @@ ss::future<> impl::run_background_compaction() {
395421
.smallest_snapshot = _snapshots.oldest_seqno().value_or(
396422
_versions->last_seqno().value()),
397423
};
398-
auto output_level = compaction.level() + 1_level;
399424
auto max_file_size = _opts->levels[output_level].max_file_size;
400425
sst::builder::options sst_options{
401426
.block_size = _opts->sst_block_size,
@@ -493,14 +518,24 @@ ss::future<> impl::run_background_compaction() {
493518
.newest_seqno = output.newest,
494519
});
495520
}
521+
vlog(log.trace, "manifest_write_start");
496522
co_await _versions->log_and_apply(std::move(*edit));
523+
vlog(log.trace, "manifest_write_end");
497524
// Go and cleanup any obsolete files where both the epoch, and file ID is
498525
// less than what we have committed, and is not in our working set.
499526
co_await _gc_actor.tell(
500527
gc_message{
501528
.highest_used_file_id = _versions->highest_used_file_id(),
502529
.live_files = _versions->get_live_files(),
503530
});
531+
vlog(
532+
log.trace,
533+
"compaction_end input_level={} output_level={} output_files={} "
534+
"output_bytes={}",
535+
input_level,
536+
output_level,
537+
state.outputs.size(),
538+
state.total_bytes);
504539
}
505540

506541
ss::future<> impl::flush_memtable() {
@@ -512,6 +547,12 @@ ss::future<> impl::flush_memtable() {
512547
? 0_level
513548
: v->pick_level_for_memtable_output(
514549
imm->min_key().user_key(), imm->max_key().user_key());
550+
auto mem_bytes = imm->approximate_memory_usage();
551+
vlog(
552+
log.trace,
553+
"flush_memtable_start level={} mem_bytes={}",
554+
level,
555+
mem_bytes);
515556
sst::builder::options sst_options{
516557
.block_size = _opts->sst_block_size,
517558
.filter_period = _opts->sst_filter_period,
@@ -525,6 +566,10 @@ ss::future<> impl::flush_memtable() {
525566
&_as);
526567
if (!result) {
527568
_versions->reuse_file_id(id);
569+
vlog(
570+
log.trace,
571+
"flush_memtable_end level={} file_bytes=0 empty=true",
572+
level);
528573
co_return;
529574
}
530575
version_edit edit(*_opts);
@@ -538,7 +583,14 @@ ss::future<> impl::flush_memtable() {
538583
.oldest_seqno = result->oldest_seqno,
539584
.newest_seqno = result->newest_seqno,
540585
});
586+
vlog(log.trace, "manifest_write_start");
541587
co_await _versions->log_and_apply(std::move(edit));
588+
vlog(log.trace, "manifest_write_end");
589+
vlog(
590+
log.trace,
591+
"flush_memtable_end level={} file_bytes={}",
592+
level,
593+
result->file_size);
542594
// Now that the new version has been applied, it's safe to remove the
543595
// immutable memtable, as readers will pick up the new file instead.
544596
//

src/v/lsm/db/table_cache.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,10 +134,7 @@ class table_cache::impl {
134134
, _cache(compute_cache_config(max_entries), eviction(this))
135135
, _block_cache(std::move(block_cache))
136136
, _cleanup_queue([](const std::exception_ptr& ex) {
137-
vlog(
138-
log.error,
139-
"expected exception on table cache cleanup queue: {}",
140-
ex);
137+
vlog(log.error, "table_cache_cleanup_error error=\"{}\"", ex);
141138
}) {}
142139

143140
ss::future<std::unique_ptr<internal::iterator>>

src/v/lsm/db/tests/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
trace.json
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Convert LSM trace logs to Chrome trace JSON format.
4+
5+
Usage:
6+
./logs_to_trace.py < logfile.log > trace.json
7+
./logs_to_trace.py logfile.log > trace.json
8+
9+
Open the output in chrome://tracing or https://ui.perfetto.dev/
10+
"""
11+
12+
import json
13+
import re
14+
import sys
15+
from datetime import datetime
16+
17+
18+
# Log format: LEVEL TIMESTAMP [shard N:CONTEXT] LOGGER - FILE:LINE - MESSAGE
19+
LOG_PATTERN = re.compile(
20+
r"^(?P<level>\w+)\s+"
21+
r"(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})\s+"
22+
r"\[shard (?P<shard>\d+):\w+\]\s+"
23+
r"lsm\s+-\s+"
24+
r"(?P<file>\w+\.cc):(?P<line>\d+)\s+-\s+"
25+
r"(?P<message>.+)$"
26+
)
27+
28+
29+
def parse_timestamp(ts_str: str) -> int:
30+
"""Parse seastar log timestamp to microseconds since epoch."""
31+
dt = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S,%f")
32+
return int(dt.timestamp() * 1_000_000)
33+
34+
35+
def parse_attrs(message: str) -> dict[str, str]:
36+
"""Parse key=value attributes from message, supporting quoted values."""
37+
attrs = {}
38+
# Match key=value or key="quoted value"
39+
for match in re.finditer(r'(\w+)=(?:"([^"]*)"|(\S+))', message):
40+
k = match.group(1)
41+
v = match.group(2) if match.group(2) is not None else match.group(3)
42+
try:
43+
attrs[k] = int(v)
44+
except ValueError:
45+
attrs[k] = v
46+
return attrs
47+
48+
49+
def main():
50+
# Read input
51+
if len(sys.argv) > 1:
52+
with open(sys.argv[1]) as f:
53+
lines = f.readlines()
54+
else:
55+
lines = sys.stdin.readlines()
56+
57+
events = []
58+
59+
for line in lines:
60+
match = LOG_PATTERN.match(line.strip())
61+
if not match:
62+
continue
63+
64+
msg = match.group("message")
65+
parts = msg.split()
66+
if not parts:
67+
continue
68+
69+
op = parts[0]
70+
71+
# Determine phase: B=begin, E=end, i=instant
72+
if op.endswith("_start"):
73+
name = op[:-6]
74+
ph = "B"
75+
elif op.endswith("_end"):
76+
name = op[:-4]
77+
ph = "E"
78+
else:
79+
# Instant event for point-in-time operations
80+
name = op
81+
ph = "i"
82+
83+
event = {
84+
"name": name,
85+
"cat": "lsm",
86+
"ph": ph,
87+
"ts": parse_timestamp(match.group("timestamp")),
88+
"pid": 1,
89+
"tid": int(match.group("shard")),
90+
"args": parse_attrs(msg),
91+
}
92+
93+
# Instant events need a scope (g=global, p=process, t=thread)
94+
if ph == "i":
95+
event["s"] = "t" # Thread-scoped instant event
96+
97+
events.append(event)
98+
99+
print(json.dumps({"traceEvents": events}, indent=2))
100+
101+
102+
if __name__ == "__main__":
103+
main()

0 commit comments

Comments
 (0)