Skip to content

Commit 4926fe5

Browse files
committed
perf: optimize prefix scan from O(N) to O(log N + K)
Switch :concord_store ETS table from :set to :ordered_set to enable lexicographic range traversal. Add a server-side {:prefix_scan, prefix} Ra query that uses :ets.select/2 with range guards to scan only the matching key range, filtering expired entries inline. Wire Query.keys/1 to dispatch this query when the prefix: option is present, avoiding the previous full-table load. Expose Concord.prefix_scan/2 as a public API.
1 parent f764bc4 commit 4926fe5

File tree

4 files changed

+115
-4
lines changed

4 files changed

+115
-4
lines changed

lib/concord.ex

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,33 @@ defmodule Concord do
516516
end
517517
end
518518

519+
@doc """
520+
Returns key-value pairs for all keys starting with the given prefix.
521+
522+
Uses an efficient server-side scan on the ordered ETS table, avoiding
523+
loading all keys into memory. O(log N + K) where K is the number of
524+
matching keys.
525+
526+
## Options
527+
- `:timeout` - Operation timeout in milliseconds (default: 5000)
528+
- `:consistency` - Read consistency level (default: :leader)
529+
530+
## Examples
531+
iex> Concord.prefix_scan("user:")
532+
{:ok, [{"user:1", %{name: "Alice"}}, {"user:2", %{name: "Bob"}}]}
533+
"""
534+
def prefix_scan(prefix, opts \\ []) when is_binary(prefix) do
535+
timeout = Keyword.get(opts, :timeout, @timeout)
536+
consistency = Keyword.get(opts, :consistency, default_consistency())
537+
538+
case query({:prefix_scan, prefix}, timeout, consistency) do
539+
{:ok, {{_index, _term}, query_result}, _} -> query_result
540+
{:timeout, _} -> {:error, :timeout}
541+
{:error, :noproc} -> {:error, :cluster_not_ready}
542+
{:error, reason} -> {:error, reason}
543+
end
544+
end
545+
519546
@doc """
520547
Stores multiple key-value pairs in the cluster atomically.
521548

lib/concord/query.ex

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,13 @@ defmodule Concord.Query do
9595
"""
9696
@spec keys(query_opts()) :: {:ok, [key()]} | {:error, term()}
9797
def keys(opts \\ []) do
98-
with {:ok, all_keys} <- get_all_keys() do
98+
fetch_fn =
99+
case Keyword.get(opts, :prefix) do
100+
nil -> fn -> get_all_keys() end
101+
prefix -> fn -> get_prefix_keys(prefix) end
102+
end
103+
104+
with {:ok, all_keys} <- fetch_fn.() do
99105
filtered_keys =
100106
all_keys
101107
|> apply_key_filters(opts)
@@ -207,6 +213,17 @@ defmodule Concord.Query do
207213
end
208214
end
209215

216+
defp get_prefix_keys(prefix) do
217+
case Concord.prefix_scan(prefix) do
218+
{:ok, pairs} ->
219+
keys = pairs |> Enum.map(fn {k, _v} -> k end) |> Enum.sort()
220+
{:ok, keys}
221+
222+
error ->
223+
error
224+
end
225+
end
226+
210227
defp get_key_values(keys) do
211228
case Concord.get_many(keys) do
212229
{:ok, results} ->

lib/concord/state_machine.ex

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ defmodule Concord.StateMachine do
7979

8080
@impl :ra_machine
8181
def init(_config) do
82-
ensure_ets_table(:concord_store, [:set, :public, :named_table])
82+
ensure_ets_table(:concord_store, [:ordered_set, :public, :named_table])
8383

8484
{:concord_kv, default_state_fields()}
8585
end
@@ -743,6 +743,33 @@ defmodule Concord.StateMachine do
743743
{:ok, Map.new(results)}
744744
end
745745

746+
def query({:prefix_scan, prefix}, {:concord_kv, _data}) when is_binary(prefix) do
747+
now = System.system_time(:second)
748+
# Upper bound: any key starting with `prefix` is lexicographically < prefix <> <<255>>
749+
# because byte values after the prefix will be < 255 for all normal string characters.
750+
end_key = prefix <> <<255>>
751+
752+
match_spec = [
753+
{{:"$1", :"$2"},
754+
[{:">=", :"$1", prefix}, {:"<", :"$1", end_key}],
755+
[{{:"$1", :"$2"}}]}
756+
]
757+
758+
results =
759+
:ets.select(:concord_store, match_spec)
760+
|> Enum.reduce([], fn {key, stored_data}, acc ->
761+
case extract_value(stored_data) do
762+
{value, expires_at} ->
763+
if expired?(expires_at, now), do: acc, else: [{key, value} | acc]
764+
765+
_ ->
766+
acc
767+
end
768+
end)
769+
770+
{:ok, results}
771+
end
772+
746773
def query(:stats, {:concord_kv, _data}) do
747774
info = :ets.info(:concord_store)
748775

@@ -843,7 +870,7 @@ defmodule Concord.StateMachine do
843870

844871
# V1/V2 legacy: bare list of KV tuples
845872
data when is_list(data) ->
846-
ensure_ets_table(:concord_store)
873+
ensure_ets_table(:concord_store, [:ordered_set, :public, :named_table])
847874
:ets.delete_all_objects(:concord_store)
848875
Enum.each(data, fn entry -> :ets.insert(:concord_store, entry) end)
849876
end
@@ -859,7 +886,7 @@ defmodule Concord.StateMachine do
859886

860887
defp rebuild_all_ets_from_snapshot(data) do
861888
# Rebuild main KV store
862-
ensure_ets_table(:concord_store)
889+
ensure_ets_table(:concord_store, [:ordered_set, :public, :named_table])
863890
:ets.delete_all_objects(:concord_store)
864891

865892
Enum.each(Map.get(data, :__kv_data__, []), fn entry ->

test/concord/query_test.exs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,46 @@ defmodule Concord.QueryTest do
317317
end
318318
end
319319

320+
describe "Concord.prefix_scan/2" do
321+
test "returns key-value pairs for matching prefix" do
322+
assert {:ok, pairs} = Concord.prefix_scan("user:")
323+
assert length(pairs) == 4
324+
keys = Enum.map(pairs, fn {k, _v} -> k end)
325+
assert "user:1" in keys
326+
assert "user:2" in keys
327+
assert "user:10" in keys
328+
assert "user:100" in keys
329+
end
330+
331+
test "does not return keys outside the prefix" do
332+
assert {:ok, pairs} = Concord.prefix_scan("product:")
333+
keys = Enum.map(pairs, fn {k, _v} -> k end)
334+
assert Enum.all?(keys, &String.starts_with?(&1, "product:"))
335+
refute Enum.any?(keys, &String.starts_with?(&1, "user:"))
336+
end
337+
338+
test "returns empty list when no keys match" do
339+
assert {:ok, []} = Concord.prefix_scan("nonexistent:")
340+
end
341+
342+
test "excludes expired keys" do
343+
:ok = Concord.put_with_ttl("expiring:1", "value", 1)
344+
:ok = Concord.put("expiring:2", "permanent")
345+
346+
# Both visible before expiry
347+
assert {:ok, pairs} = Concord.prefix_scan("expiring:")
348+
assert length(pairs) == 2
349+
350+
# Expire the key — TTL of 1s stores expires_at = now + 1.
351+
# The expired? check is `now > expires_at` (strict), so we need at least 2s.
352+
Process.sleep(2_100)
353+
assert {:ok, pairs} = Concord.prefix_scan("expiring:")
354+
keys = Enum.map(pairs, fn {k, _v} -> k end)
355+
refute "expiring:1" in keys
356+
assert "expiring:2" in keys
357+
end
358+
end
359+
320360
describe "performance with larger datasets" do
321361
test "handles queries on larger datasets efficiently" do
322362
# Insert 100 additional keys

0 commit comments

Comments
 (0)