Skip to content

Commit 08e6b8e

Browse files
committed
fix: address 7 PR review issues for correctness and determinism
- Fix write_backup compression flag (always-on bug) - Fix put_if (5-tuple CAS) to update secondary indexes - Fix put_if (6-tuple compat) to update secondary indexes - Fix delete_if to remove entries from secondary indexes - Fix V2 ETS fallback to rebuild index tables - Fix meta_time fallback to use 0 instead of wall-clock time - Fix put_many error detection pattern to match {key, {:error, _}} - Add tests for put_if/delete_if index maintenance - Fix tests to provide system_time in meta for deterministic time
1 parent a3e3c0b commit 08e6b8e

File tree

4 files changed

+131
-18
lines changed

4 files changed

+131
-18
lines changed

lib/concord/backup.ex

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ defmodule Concord.Backup do
3333

3434
require Logger
3535

36+
alias Concord.Compression
37+
alias Concord.Index
38+
alias Concord.Index.Extractor
39+
3640
@backup_extension ".backup"
3741
@default_backup_dir "./backups"
3842

@@ -337,7 +341,8 @@ defmodule Concord.Backup do
337341
filename = "concord_backup_#{timestamp}#{@backup_extension}"
338342
backup_path = Path.join(backup_dir, filename)
339343

340-
encoded = :erlang.term_to_binary(backup_data, [:compressed | if(compress, do: [], else: [])])
344+
opts = if compress, do: [:compressed], else: []
345+
encoded = :erlang.term_to_binary(backup_data, opts)
341346

342347
case File.write(backup_path, encoded) do
343348
:ok -> {:ok, backup_path}
@@ -439,6 +444,30 @@ defmodule Concord.Backup do
439444
end)
440445
end
441446

447+
# Rebuild index ETS tables from index definitions and KV data
448+
indexes = Map.get(snapshot_data, :indexes, %{})
449+
kv_data = Map.get(snapshot_data, :kv_data, [])
450+
451+
Enum.each(indexes, fn {name, extractor} ->
452+
table = Index.index_table_name(name)
453+
454+
if :ets.whereis(table) == :undefined do
455+
:ets.new(table, [:bag, :public, :named_table])
456+
else
457+
:ets.delete_all_objects(table)
458+
end
459+
460+
Enum.each(kv_data, fn {key, stored} ->
461+
value =
462+
case stored do
463+
%{value: v} -> Compression.decompress(v)
464+
_ -> stored
465+
end
466+
467+
Extractor.index_value(table, key, value, extractor)
468+
end)
469+
end)
470+
442471
:telemetry.execute(
443472
[:concord, :backup, :restored],
444473
%{entry_count: metadata.entry_count},

lib/concord/state_machine.ex

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ defmodule Concord.StateMachine do
4444
# Extract deterministic timestamp in seconds from Ra metadata.
4545
# Ra's system_time is milliseconds set by the leader at proposal time.
4646
defp meta_time(meta) do
47-
ms = Map.get(meta, :system_time, System.system_time(:millisecond))
47+
ms = Map.get(meta, :system_time, 0)
4848
div(ms, 1000)
4949
end
5050

@@ -221,9 +221,19 @@ defmodule Concord.StateMachine do
221221
if expired?(current_expires_at, now) do
222222
{:error, :not_found}
223223
else
224-
if Compression.decompress(current_value) == expected do
224+
old_decompressed = Compression.decompress(current_value)
225+
226+
if old_decompressed == expected do
225227
formatted_value = format_value(value, expires_at)
226228
:ets.insert(:concord_store, {key, formatted_value})
229+
230+
update_indexes_on_put(
231+
data,
232+
key,
233+
old_decompressed,
234+
Compression.decompress(value)
235+
)
236+
227237
:ok
228238
else
229239
{:error, :condition_failed}
@@ -262,8 +272,10 @@ defmodule Concord.StateMachine do
262272

263273
result =
264274
check_conditional_operation(key, expected, condition_fn, now, fn ->
275+
old_value = get_decompressed_value(key)
265276
formatted_value = format_value(value, expires_at)
266277
:ets.insert(:concord_store, {key, formatted_value})
278+
update_indexes_on_put(data, key, old_value, Compression.decompress(value))
267279
:ok
268280
end)
269281

@@ -284,7 +296,13 @@ defmodule Concord.StateMachine do
284296

285297
result =
286298
check_conditional_operation(key, expected, condition_fn, now, fn ->
299+
old_value = get_decompressed_value(key)
287300
:ets.delete(:concord_store, key)
301+
302+
if old_value != nil do
303+
remove_from_all_indexes(data, key, old_value)
304+
end
305+
288306
:ok
289307
end)
290308

@@ -390,7 +408,7 @@ defmodule Concord.StateMachine do
390408
:ok ->
391409
results = execute_put_many_batch(operations, data)
392410

393-
case Enum.find(results, fn {status, _} -> status == :error end) do
411+
case Enum.find(results, fn {_key, result} -> match?({:error, _}, result) end) do
394412
nil ->
395413
duration = System.monotonic_time() - start_time
396414

test/concord/bulk_operations_test.exs

Lines changed: 77 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,20 +73,22 @@ defmodule Concord.BulkOperationsTest do
7373
end
7474

7575
test "apply_command handles get_many", %{state: state} do
76-
meta = %{index: 1}
76+
now_ms = System.system_time(:millisecond)
77+
now_s = div(now_ms, 1000)
78+
meta = %{index: 1, system_time: now_ms}
7779

7880
# Setup some data
7981
:ets.insert(:concord_store, {"key1", %{value: "value1", expires_at: nil}})
8082

8183
:ets.insert(
8284
:concord_store,
83-
{"key2", %{value: "value2", expires_at: System.system_time(:second) + 3600}}
85+
{"key2", %{value: "value2", expires_at: now_s + 3600}}
8486
)
8587

8688
# Expired
8789
:ets.insert(
8890
:concord_store,
89-
{"key3", %{value: "value3", expires_at: System.system_time(:second) - 1}}
91+
{"key3", %{value: "value3", expires_at: now_s - 1}}
9092
)
9193

9294
{new_state, result, _effects} =
@@ -126,12 +128,13 @@ defmodule Concord.BulkOperationsTest do
126128
end
127129

128130
test "apply_command handles touch_many", %{state: state} do
129-
meta = %{index: 1}
130-
current_time = System.system_time(:second)
131+
now_ms = System.system_time(:millisecond)
132+
now_s = div(now_ms, 1000)
133+
meta = %{index: 1, system_time: now_ms}
131134

132135
# Setup some data with existing TTL
133-
:ets.insert(:concord_store, {"key1", format_value("value1", current_time + 100)})
134-
:ets.insert(:concord_store, {"key2", format_value("value2", current_time + 200)})
136+
:ets.insert(:concord_store, {"key1", format_value("value1", now_s + 100)})
137+
:ets.insert(:concord_store, {"key2", format_value("value2", now_s + 200)})
135138
# No TTL
136139
:ets.insert(:concord_store, {"key3", format_value("value3", nil)})
137140

@@ -141,13 +144,11 @@ defmodule Concord.BulkOperationsTest do
141144
assert result == {:ok, [{"key1", :ok}, {"key2", :ok}]}
142145
assert new_state == state
143146

144-
# Verify TTLs were extended - check they're greater than original time
147+
# Verify TTLs were extended — now_s + additional_ttl
145148
[{_, updated1}] = :ets.lookup(:concord_store, "key1")
146149
[{_, updated2}] = :ets.lookup(:concord_store, "key2")
147-
# Extended beyond original
148-
assert updated1.expires_at > current_time + 100
149-
# Extended beyond original
150-
assert updated2.expires_at > current_time + 200
150+
assert updated1.expires_at == now_s + 3600
151+
assert updated2.expires_at == now_s + 7200
151152
end
152153

153154
test "apply_command handles touch_many with non-existent keys", %{state: state} do
@@ -275,6 +276,70 @@ defmodule Concord.BulkOperationsTest do
275276
assert :ets.lookup(table, "second@test.com") == [{"second@test.com", ["user:1"]}]
276277
end
277278

279+
test "put_if (5-tuple CAS) updates secondary indexes", %{state: state} do
280+
meta = %{index: 1}
281+
282+
# Create index
283+
{state_with_index, :ok, _} =
284+
StateMachine.apply_command(meta, {:create_index, "by_email", {:map_get, :email}}, state)
285+
286+
# Insert initial record
287+
{state2, :ok, _} =
288+
StateMachine.apply_command(
289+
%{index: 2},
290+
{:put, "user:1", %{email: "old@test.com"}, nil},
291+
state_with_index
292+
)
293+
294+
table = Concord.Index.index_table_name("by_email")
295+
assert :ets.lookup(table, "old@test.com") == [{"old@test.com", ["user:1"]}]
296+
297+
# Use put_if CAS to update — expected matches the current value
298+
{_state3, :ok, _} =
299+
StateMachine.apply_command(
300+
%{index: 3},
301+
{:put_if, "user:1", %{email: "new@test.com"}, nil, %{email: "old@test.com"}},
302+
state2
303+
)
304+
305+
# Old index entry removed, new one present
306+
assert :ets.lookup(table, "old@test.com") == []
307+
assert :ets.lookup(table, "new@test.com") == [{"new@test.com", ["user:1"]}]
308+
end
309+
310+
test "delete_if removes entries from secondary indexes", %{state: state} do
311+
meta = %{index: 1}
312+
313+
# Create index
314+
{state_with_index, :ok, _} =
315+
StateMachine.apply_command(meta, {:create_index, "by_email", {:map_get, :email}}, state)
316+
317+
# Insert a record
318+
{state2, :ok, _} =
319+
StateMachine.apply_command(
320+
%{index: 2},
321+
{:put, "user:1", %{email: "alice@test.com"}, nil},
322+
state_with_index
323+
)
324+
325+
table = Concord.Index.index_table_name("by_email")
326+
assert :ets.lookup(table, "alice@test.com") == [{"alice@test.com", ["user:1"]}]
327+
328+
# delete_if with a condition function that matches
329+
condition_fn = fn _value -> true end
330+
331+
{_state3, :ok, _} =
332+
StateMachine.apply_command(
333+
%{index: 3},
334+
{:delete_if, "user:1", nil, condition_fn},
335+
state2
336+
)
337+
338+
# Key deleted and index entry removed
339+
assert :ets.lookup(:concord_store, "user:1") == []
340+
assert :ets.lookup(table, "alice@test.com") == []
341+
end
342+
278343
test "query handles get_many", %{state: state} do
279344
# Setup some data
280345
:ets.insert(:concord_store, {"key1", %{value: "value1", expires_at: nil}})

test/concord/ttl_test.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,8 +224,9 @@ defmodule Concord.TTLTest do
224224
end
225225

226226
test "apply/3 handles cleanup_expired", %{state: state} do
227-
meta = %{index: 1}
228-
current_time = System.system_time(:second)
227+
now_ms = System.system_time(:millisecond)
228+
current_time = div(now_ms, 1000)
229+
meta = %{index: 1, system_time: now_ms}
229230

230231
# Insert test data
231232
:ets.insert(

0 commit comments

Comments
 (0)