Skip to content

Commit 13d4e03

Browse files
committed
refine compaction design
1 parent 6b7a1f3 commit 13d4e03

File tree

3 files changed

+105
-97
lines changed

3 files changed

+105
-97
lines changed

docs/internals/COMPACTION.md

Lines changed: 64 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,19 @@ This is a living document capturing current work on log compaction.
77

88
Compaction in Ra is intrinsically linked to the snapshotting
99
feature. Standard Raft snapshotting removes all entries in the Ra log
10-
that precedes the snapshot index where the snapshot is a full representation of
10+
below the snapshot index where the snapshot is a full representation of
1111
the state machine state.
1212

1313
The high level idea of compacting in Ra is that instead of deleting all
1414
segment data that precedes the snapshot index the snapshot data can emit a list
1515
of live raft indexes which will be kept, either in their original segments
16-
or written to new compacted segments. the data for these indexes can then
17-
be omitted from the snapshot to reduce its size and write amplification.
18-
16+
or written to new compacted segments. The data for these indexes can then
17+
be omitted from the snapshot to reduce its size and the write amplification
18+
incurred by writing the snapshot.
1919

2020
### Log sections
2121

22-
Two named sections of the log then emerge.
22+
Two named sections of the log then emerge:
2323

2424
#### Normal log section
2525

@@ -43,14 +43,17 @@ and will run immediately after each snapshot is taken.
4343
The run will start with the oldest segment and move towards the newest segment
4444
in the compacting log section. Every segment that has no entries in the live
4545
indexes list returned by the snapshot state will be deleted. Standard Raft
46-
log truncation is achieved by returning and empty list of live indexes.
46+
log truncation is achieved by returning an empty list of live indexes.
47+
48+
TODO: how to ensure segments containing overwritten entries only are cleaned
49+
up.
4750

4851
### Compacted segments: naming (phase 3 compaction)
4952

5053
Segment files in a Ra log have numeric names incremented as they are written.
5154
This is essential as the order is required to ensure log integrity.
5255

53-
Desired Properties of phase 3 compaction:
56+
Desired Properties of phase 3:
5457

5558
* Retain immutability, entries will never be deleted from a segment. Instead they
5659
will be written to a new segment.
@@ -61,21 +64,44 @@ will be written to a new segment.
6164
Segments will be compacted when 2 or more adjacent segments fit into a single
6265
segment.
6366

64-
The new segment will have the naming format `OLD-NEW.segment`
67+
During compaction the target segment will have the naming format `001-002-003.compacting`
68+
such that each segment (001, 002, 003) name is present in the compacting name.
69+
An upper limit on the maximum number of source segments will have to be set to
70+
ensure the compacting file name doesn't get ridiculously long. E.g. 8.
71+
72+
Once the compacting segment has been synced the lowest numbered segment will
73+
be hard linked to the compacting segment. Each of the compacted
74+
higher numbered segments (003, 004) will then have a symlink created (e.g. 003.link)
75+
pointing to the lowest numbered segment (002)
76+
then the link is renamed to the source file: `003.link -> 003` (NB not atomic).
77+
78+
`002-003-004.compacting` is then deleted (but 002 is still hard linked so the data
79+
will remain).
80+
81+
This naming format means it is easy to identify partially compacted segments
82+
after an unclean exit. All `*.compacting` files with a link count of 1 will
83+
be deleted as it is not clear at what stage the unclean exit occurred.
84+
85+
If a compacting file has a link count of 2 (or more???) the compacting writes
86+
completed and the lowest numbered segment was hard linked to the compacting
87+
segment. We don't know if all symlinks were created correctly so we need to ensure
88+
this during recovery.
89+
90+
Once we've ensured there are hard or symlinks for all the source files the compacting
91+
file can be deleted.
92+
93+
The symlinks are there so that any pending read references to the old
94+
segment name are still valid for some time after but the disk space for the
95+
source segment will still be reclaimed when the links replace the files.
6596

66-
This means that a single segment can only be compacted once e.g
67-
`001.segment -> 001-001.segment` as after this there is no new name available
68-
and it has to wait until it can be compacted with the adjacent segment. Single
69-
segment compaction could be optional and only triggered when a substantial,
70-
say 75% or more entries / data can be deleted.
97+
Some time later the symbolic links can be removed.
98+
99+
Single segment compaction would work the same as we can directly rename
100+
e.g. the compacted segment `001.compacting` to `001.segment` without breaking
101+
any references to the segment. Single segment compaction should only be triggered
102+
when a certain limit has been reached, e.g. > 50% of indexes can be cleared up.
71103

72-
This naming format means it is easy to identify dead segments after an unclean
73-
exit.
74104

75-
During compaction a different extension will be used: `002-004.compacting` and
76-
after an unclean shutdown any such files will be removed. Once synced it will be
77-
renamed to `.segment` and some time after the source files will be deleted (Once
78-
the Ra server has updated its list of segments).
79105

80106
#### When does phase 3 compaction run?
81107

@@ -84,9 +110,22 @@ Options:
84110
* On a timer
85111
* After phase 1 if needed based on a ratio of live to dead indexes in the compacting section
86112
* After phase 1 if needed based on disk use / ratio of live data to dead.
113+
* Explicitly through a new ra API.
87114

88115
![segments](compaction2.jpg)
89116

117+
### Phase 4 compaction (optional)
118+
119+
At some point the number of live indexes could become completely sparse (no
120+
adjacent indexes) and large which is sub optimal memory wise.
121+
122+
At this point the state machine could implement a "rewrite" command (or we
123+
provide one in Ra) to rewrite a subset or all of the indexes at the head of
124+
the Ra log to "clump" their indexes better together.
125+
126+
This is ofc optional and has replication costs but could be a manually triggered
127+
maintenance option perhaps.
128+
90129
### Ra Server log worker responsibilities
91130

92131
* Write checkpoints and snapshots
@@ -162,93 +201,28 @@ the flush request comes in.
162201
With the snapshot now defined as the snapshot state + live preceding raft indexes
163202
the default snapshot replication approach will need to change.
164203

165-
The snapshot sender (Ra log worker??) needs to negotiate with the follower to
166-
discover which preceding raft indexes the follower does not yet have. Then it would
167-
go on and replicate these before or after (??) sending the snapshot itself.
168-
169-
T: probably before as if a new snapshot has been taken locally we'd most likely
170-
skip some raft index replication on the second attempt.
171-
172-
Q: How will the follower write the live indexes preceding the snapshot?
173-
If the main Ra process does it this introduces a 3rd modifier of the Ra log
174-
and there may be concurrent Ra log writes from the snapshot writer at this point.
175-
176-
It can't write them to the WAL as they are not contiguous unless we allow
177-
such writes.
204+
The snapshot sender process (currently transient) first sends all live
205+
entries for the given snapshot, then performs normal chunk based
206+
snapshot replication.
178207

179-
The Ra process can write them to a set of temporary segment files then call into
180-
the segment writer to rename into the set of segments.
181-
No this can't work with the
182-
live indexes logic the segment writer applies as it relies on the mem tables
183-
ranges to decide which indexes to flush.
184208

185-
having pending segment flushes when receiving
186-
187-
the ra process truncates the mem table on snapshot installations
188-
so that the segment writer avoids
189-
writing any of the live index preceding the snapshot.
190-
191-
If this is done before live indexes are replicated if the Ra process then waits
192-
for the mt delete to complete then
193-
194-
Idea: the ra server could truncate the mt as soon as a snapshot installation
195-
starts to minimise subsequent mem table flushes. Typically this means emptying
196-
the memtable completely (the ra server could execute the delete perhaps to ensure).
197-
198-
Scenario: pending mem table flushes when snapshot installation comes in.
199-
200-
Need to ensure the old pending data in the WAL / memtable isn't flushed _after_
201-
the received live indexes are written to a segment and that segment is renamed
202-
into the list of segments.
203-
204-
Options:
205-
206-
1. Call into segment writer to rename the temporary named segment(s) into the
207-
main sequence of segments. This command will return the full list of new segments.
208-
If the memtable has been truncated before this is done by the time the rename
209-
returns we know there wont be any more segment records being written.
210-
We can't update the `ra_log_snapshot_state` table until _after_ this as the
211-
segment writer may assume the live indexes should be in the memtable.
212-
Also need to handle concurrent compaction?
213-
Downside: BLOCKING, segwriter can at at times run a backlog.
214-
215-
2.
216-
217-
218-
##### Steps when receiving a new valid `install_snapshot_rpc`
219-
220-
* Once the `last` part of the snapshot has been installed and recovered as the
221-
mem table state we can calculated which live indexes are needed to complete
222-
the snapshot. The Ra follower remains in `receiving_snapshot` state and replies
223-
back with a `ra_seq` of the required indexes.
224-
* these are received and written to the log as normal
225-
process itself
226-
227-
228-
#### Alternative snapshot install procedure
209+
#### Snapshot install procedure
229210

230211
* Sender begins with sending negotiating which live indexes are needed. It is
231212
probably sufficient for the receiver to return it's `last_applied` index and the
232213
sender will send all sparse entries after that index
233214
* Then it proceeds to send the live indexes _before_ the snapshot (so in it's
234-
natural order if you like).
215+
natural log order).
235216
* The receiving ra process then writes these commands to the WAL as normal but
236217
using a special command / flag to avoid the WAL triggering its' gap detection.
237218
Ideally the specialised command would include the previous idx so that we can
238219
still do gap detection in the sparse sequence (should all sends include prior
239220
sequence so this is the only mode?).
240221
* The sparse writes are written to a new memtable using a new `ra_mt:sparse_write/2`
241222
API that bypasses gap validation and stores a sparse sequence instead of range
242-
* Alt the live indexes replication could be done after the snapshot is complete
243-
as it is easy for the follower to work out which live indexes it needs.
244-
when it receives the `last` snapshot chunk it then replies with a special
245-
continuation command instead of `install_snapshot_result{}` which will initiate
246-
the live index replication. NB the snapshot sender process may need to call
247-
into the leader process to get read plans as entries _could_ be in the memtable.
223+
248224

249225
#### How to work out which live indexes the follower needs
250-
WA
251-
Gnarly example:
252226

253227
Follower term indexes:
254228

@@ -267,7 +241,6 @@ If follower `last_applied` is: 1100 then follower needs `[1200, 1777]`
267241

268242
#### How to store live indexes with snapshot
269243

270-
* New section in snapshot file format.
271244
* Separate file (that can be rebuilt if needed from the snapshot).
272245

273246

src/ra_kv.erl

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
start_cluster/3,
2020
add_member/3,
2121

22-
put/4
22+
put/4,
23+
get/3,
24+
query_get/3
2325
]).
2426

2527

@@ -48,15 +50,15 @@
4850
-spec start_cluster(atom(), atom(), map()) ->
4951
{ok, [ra_server_id()], [ra_server_id()]} |
5052
{error, cluster_not_formed}.
51-
start_cluster(System, Name, #{members := ServerIds})
52-
when is_atom(Name) andalso
53+
start_cluster(System, ClusterName, #{members := ServerIds})
54+
when is_atom(ClusterName) andalso
5355
is_atom(System) ->
5456
Machine = {module, ?MODULE, #{}},
5557
Configs = [begin
56-
UId = ra:new_uid(ra_lib:to_binary(Name)),
58+
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
5759
#{id => Id,
5860
uid => UId,
59-
cluster_name => Name,
61+
cluster_name => ClusterName,
6062
log_init_args => #{uid => UId},
6163
initial_members => ServerIds,
6264
machine => Machine}
@@ -95,14 +97,43 @@ put(ServerId, Key, Value, Timeout) ->
9597
end.
9698

9799

98-
%% get performs a consistent query that returns the index, hash and member set
100+
%% @doc get performs a consistent query that returns the index, hash and member set
99101
%% then perform an aux query to actually get the data for a given index.
100102
%% if addressing a follower (say there is a local one) then the read may need
101103
%% to wait if the index isn't yet available locally (term also need to be checked)
102104
%% or check that the machien state has the right index for a given key before
103105
%% reading the value from the log
106+
-spec get(ra:server_id(), key(), non_neg_integer()) ->
107+
{ok, map(), value()} | {error, term()} | {timeout, ra:server_id()}.
108+
get(ServerId, Key, Timeout) ->
109+
case ra:consistent_query(ServerId, {?MODULE, query_get,
110+
[element(1, ServerId), Key]}, Timeout) of
111+
{ok, {ok, Idx, Members}, LeaderId} ->
112+
case ra_server_proc:read_entries(LeaderId, [Idx],
113+
undefined, Timeout) of
114+
{ok, {#{Idx := {Idx, Term,
115+
{'$usr', Meta, #put{value = Value}, _}}}, Flru}} ->
116+
_ = ra_flru:evict_all(Flru),
117+
{ok, Meta#{index => Idx,
118+
members => Members,
119+
term => Term}, Value};
120+
Err ->
121+
Err
122+
end;
123+
Err ->
124+
Err
125+
end.
104126

105127

128+
query_get(ClusterName, Key, #?STATE{keys = Keys}) ->
129+
Members = ra_leaderboard:lookup_members(ClusterName),
130+
case Keys of
131+
#{Key := [Idx |_]} ->
132+
{ok, Idx, Members};
133+
_ ->
134+
{error, not_found}
135+
end.
136+
106137
%% state machine
107138

108139
init(_) ->
@@ -124,6 +155,7 @@ live_indexes(#?STATE{keys = Keys}) ->
124155
end, [], Keys).
125156

126157
-record(aux, {}).
158+
127159
init_aux(_) ->
128160
#aux{}.
129161

test/ra_kv_SUITE.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ basics(_Config) ->
7070
[{ok, #{}} = ra_kv:put(KvId, K2, I, 5000)
7171
|| I <- lists:seq(1, 10000)],
7272

73+
ct:pal("kv get ~p", [ra_kv:get(KvId, <<"k1">>, 5000)]),
74+
ct:pal("leaderboard ~p", [ets:tab2list(ra_leaderboard)]),
75+
7376
?assertMatch({ok, #{machine := #{num_keys := 2}}, KvId},
7477
ra:member_overview(KvId)),
7578
ra_log_wal:force_roll_over(ra_log_wal),

0 commit comments

Comments
 (0)