Skip to content

Commit fbf4c63

Browse files
committed
Release v0.7.2: Replace global locks with ETS-based locking for improved concurrency
This release replaces :global.trans-based locking with a custom ETS-based locking mechanism in ConcurrencyGate and State modules, improving performance and reliability under high concurrency. Key improvements: - ETS-based locking eliminates dependency on distributed Erlang's global name server, which can introduce latency and contention in single-node deployments - Dedicated lock tables (gemini_concurrency_locks and gemini_rate_limit_locks) track lock ownership per model/key with automatic cleanup of dead lock holders - Lock acquisition uses insert_new for atomic compare-and-set semantics; contending processes spin-wait with 5ms intervals - Dead lock holder cleanup uses delete_object to prevent TOCTOU races when reclaiming locks from terminated processes - Lock release is explicit and guaranteed via try/after blocks around critical sections Implementation details: - ConcurrencyGate.with_lock/2 acquires model-specific lock before permit operations (acquire, release, holder tracking) - State.with_lock/2 acquires budget-key-specific lock before reservation operations (try_reserve_budget, reconcile_reservation) - Both modules create secondary ETS tables with write_concurrency: true for lock storage during lazy initialization - Lock acquisition retries indefinitely with Process.alive? checks to detect and clean up stale lock holders - Test coverage expanded with high-iteration concurrency repro script (repro_concurrency_gate.exs) demonstrating no permit overlap across 500 iterations with 8 concurrent tasks Performance characteristics: - Reduced latency for uncontended lock acquisition (ETS insert_new vs global name server round-trip) - Bounded spin-wait interval (5ms) prevents CPU saturation during lock contention - Automatic lock cleanup eliminates need for manual lock recovery after process crashes Breaking changes: None All existing tests pass; new test validates atomic reservation with improved non_blocking mode timing to avoid flaky race conditions.
1 parent 8ae9f9d commit fbf4c63

File tree

7 files changed

+211
-18
lines changed

7 files changed

+211
-18
lines changed

CHANGELOG.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,17 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.7.2] - 2025-12-06
9+
10+
### Fixed
11+
- **Rate limiter race condition**: Replaced `:global.trans/2` with ETS-based spinlock using `:ets.insert_new/2` for proper single-node mutex semantics in both `ConcurrencyGate` and `State` modules
12+
- **TOCTOU race in lock cleanup**: Use `:ets.delete_object/2` instead of `:ets.delete/2` to atomically delete only if PID still matches, preventing lock theft
13+
- **ETS table options**: Changed lock tables to use `write_concurrency: true` instead of `read_concurrency: true` for write-heavy workloads
14+
- **Test synchronization**: Removed flaky `Process.sleep` from atomic reservation test; now awaits non-blocking task2 before releasing task1 for deterministic synchronization
15+
16+
### Changed
17+
- Lock acquisition retry sleep increased from 1ms to 5ms to reduce CPU usage under contention
18+
819
## [0.7.1] - 2025-12-05
920

1021
### Added

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ Add `gemini` to your list of dependencies in `mix.exs`:
5151
```elixir
5252
def deps do
5353
[
54-
{:gemini_ex, "~> 0.7.1"}
54+
{:gemini_ex, "~> 0.7.2"}
5555
]
5656
end
5757
```

lib/gemini/rate_limiter/concurrency_gate.ex

Lines changed: 67 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
1616
alias Gemini.RateLimiter.Config
1717

1818
@ets_table :gemini_concurrency_permits
19+
@lock_table :gemini_concurrency_locks
1920
@adaptive_backoff_factor 0.75
2021
@adaptive_raise_amount 1
2122

@@ -37,12 +38,12 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
3738
"""
3839
@spec init() :: :ok
3940
def init do
40-
ensure_table_exists()
41+
ensure_tables_exist()
4142
:ok
4243
end
4344

4445
# Lazy initialization - ensures table exists before any operation
45-
defp ensure_table_exists do
46+
defp ensure_tables_exist do
4647
case :ets.whereis(@ets_table) do
4748
:undefined ->
4849
try do
@@ -54,6 +55,18 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
5455
_ref ->
5556
:ok
5657
end
58+
59+
case :ets.whereis(@lock_table) do
60+
:undefined ->
61+
try do
62+
:ets.new(@lock_table, [:named_table, :public, :set, write_concurrency: true])
63+
catch
64+
:error, :badarg -> :ok
65+
end
66+
67+
_ref ->
68+
:ok
69+
end
5770
end
5871

5972
@doc """
@@ -76,7 +89,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
7689
"""
7790
@spec acquire(model_key(), Config.t()) :: :ok | {:error, atom()}
7891
def acquire(model, %Config{} = config) do
79-
ensure_table_exists()
92+
ensure_tables_exist()
8093

8194
unless Config.concurrency_enabled?(config) do
8295
{:error, :concurrency_disabled}
@@ -93,7 +106,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
93106
"""
94107
@spec release(model_key()) :: :ok
95108
def release(model) do
96-
ensure_table_exists()
109+
ensure_tables_exist()
97110

98111
with_lock(model, fn ->
99112
case :ets.lookup(@ets_table, model) do
@@ -118,7 +131,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
118131
"""
119132
@spec signal_429(model_key(), Config.t()) :: :ok
120133
def signal_429(model, %Config{adaptive_concurrency: true} = config) do
121-
ensure_table_exists()
134+
ensure_tables_exist()
122135

123136
case :ets.lookup(@ets_table, model) do
124137
[{^model, state}] ->
@@ -149,7 +162,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
149162
"""
150163
@spec signal_success(model_key(), Config.t()) :: :ok
151164
def signal_success(model, %Config{adaptive_concurrency: true} = config) do
152-
ensure_table_exists()
165+
ensure_tables_exist()
153166

154167
case :ets.lookup(@ets_table, model) do
155168
[{^model, state}] when not is_nil(state.adaptive_max) ->
@@ -173,7 +186,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
173186
"""
174187
@spec get_state(model_key()) :: permit_state() | nil
175188
def get_state(model) do
176-
ensure_table_exists()
189+
ensure_tables_exist()
177190

178191
case :ets.lookup(@ets_table, model) do
179192
[{^model, state}] -> state
@@ -186,7 +199,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
186199
"""
187200
@spec available_permits(model_key(), Config.t()) :: non_neg_integer()
188201
def available_permits(model, %Config{} = config) do
189-
ensure_table_exists()
202+
ensure_tables_exist()
190203

191204
case :ets.lookup(@ets_table, model) do
192205
[{^model, state}] ->
@@ -217,6 +230,11 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
217230
_ref -> :ets.delete_all_objects(@ets_table)
218231
end
219232

233+
case :ets.whereis(@lock_table) do
234+
:undefined -> :ok
235+
_ref -> :ets.delete_all_objects(@lock_table)
236+
end
237+
220238
:ok
221239
end
222240

@@ -335,7 +353,7 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
335353
end
336354

337355
def handle_holder_down(model, holder_pid) do
338-
ensure_table_exists()
356+
ensure_tables_exist()
339357

340358
with_lock(model, fn ->
341359
case :ets.lookup(@ets_table, model) do
@@ -371,6 +389,45 @@ defmodule Gemini.RateLimiter.ConcurrencyGate do
371389
end
372390

373391
defp with_lock(model, fun) do
374-
:global.trans({@ets_table, model}, fun)
392+
acquire_lock(model)
393+
394+
try do
395+
fun.()
396+
after
397+
release_lock(model)
398+
end
399+
end
400+
401+
defp acquire_lock(model) do
402+
ensure_tables_exist()
403+
404+
case :ets.insert_new(@lock_table, {model, self()}) do
405+
true ->
406+
:ok
407+
408+
false ->
409+
cleanup_dead_lock_holder(model)
410+
Process.sleep(5)
411+
acquire_lock(model)
412+
end
413+
end
414+
415+
defp release_lock(model) do
416+
:ets.delete(@lock_table, model)
417+
:ok
418+
end
419+
420+
defp cleanup_dead_lock_holder(model) do
421+
case :ets.lookup(@lock_table, model) do
422+
[{^model, pid}] ->
423+
unless Process.alive?(pid) do
424+
# Use delete_object to atomically delete only if PID still matches
425+
# This prevents TOCTOU race where another process acquired the lock
426+
:ets.delete_object(@lock_table, {model, pid})
427+
end
428+
429+
_ ->
430+
:ok
431+
end
375432
end
376433
end

lib/gemini/rate_limiter/state.ex

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ defmodule Gemini.RateLimiter.State do
3636
}
3737

3838
@ets_table :gemini_rate_limit_state
39+
@lock_table :gemini_rate_limit_locks
3940
@default_window_duration_ms 60_000
4041
@default_location "us-central1"
4142

@@ -52,7 +53,7 @@ defmodule Gemini.RateLimiter.State do
5253
:ok
5354
end
5455

55-
# Lazy initialization - ensures table exists before any operation
56+
# Lazy initialization - ensures tables exist before any operation
5657
defp ensure_table_exists do
5758
case :ets.whereis(@ets_table) do
5859
:undefined ->
@@ -66,6 +67,18 @@ defmodule Gemini.RateLimiter.State do
6667
_ref ->
6768
:ok
6869
end
70+
71+
case :ets.whereis(@lock_table) do
72+
:undefined ->
73+
try do
74+
:ets.new(@lock_table, [:named_table, :public, :set, write_concurrency: true])
75+
catch
76+
:error, :badarg -> :ok
77+
end
78+
79+
_ref ->
80+
:ok
81+
end
6982
end
7083

7184
@doc """
@@ -181,7 +194,7 @@ defmodule Gemini.RateLimiter.State do
181194
now = DateTime.utc_now()
182195
window_duration = Keyword.get(opts, :window_duration_ms, @default_window_duration_ms)
183196

184-
:global.trans({@ets_table, {:budget, key}}, fn ->
197+
with_lock({:budget, key}, fn ->
185198
current_window = current_or_new_window(key, now, window_duration)
186199

187200
updated =
@@ -273,7 +286,7 @@ defmodule Gemini.RateLimiter.State do
273286
window_duration = Keyword.get(opts, :window_duration_ms, @default_window_duration_ms)
274287
reserved_tokens = scaled_tokens(estimated_total_tokens, multiplier)
275288

276-
:global.trans({@ets_table, {:budget, key}}, fn ->
289+
with_lock({:budget, key}, fn ->
277290
now = DateTime.utc_now()
278291
window = current_or_new_window(key, now, window_duration)
279292
window_end = DateTime.add(window.window_start, window.window_duration_ms, :millisecond)
@@ -334,7 +347,7 @@ defmodule Gemini.RateLimiter.State do
334347
actual_input = Map.get(usage_map || %{}, :input_tokens, 0)
335348
actual_output = Map.get(usage_map || %{}, :output_tokens, 0)
336349

337-
:global.trans({@ets_table, {:budget, key}}, fn ->
350+
with_lock({:budget, key}, fn ->
338351
now = DateTime.utc_now()
339352
window = current_or_new_window(key, now, window_duration)
340353

@@ -369,6 +382,11 @@ defmodule Gemini.RateLimiter.State do
369382
_ref -> :ets.delete_all_objects(@ets_table)
370383
end
371384

385+
case :ets.whereis(@lock_table) do
386+
:undefined -> :ok
387+
_ref -> :ets.delete_all_objects(@lock_table)
388+
end
389+
372390
:ok
373391
end
374392

@@ -458,4 +476,48 @@ defmodule Gemini.RateLimiter.State do
458476
rate_limiter_config = Application.get_env(:gemini_ex, :rate_limiter, [])
459477
Keyword.get(rate_limiter_config, :default_retry_delay_ms, 60_000)
460478
end
479+
480+
# ETS-based locking to replace :global.trans
481+
defp with_lock(lock_key, fun) do
482+
acquire_lock(lock_key)
483+
484+
try do
485+
fun.()
486+
after
487+
release_lock(lock_key)
488+
end
489+
end
490+
491+
defp acquire_lock(lock_key) do
492+
ensure_table_exists()
493+
494+
case :ets.insert_new(@lock_table, {lock_key, self()}) do
495+
true ->
496+
:ok
497+
498+
false ->
499+
cleanup_dead_lock_holder(lock_key)
500+
Process.sleep(5)
501+
acquire_lock(lock_key)
502+
end
503+
end
504+
505+
defp release_lock(lock_key) do
506+
:ets.delete(@lock_table, lock_key)
507+
:ok
508+
end
509+
510+
defp cleanup_dead_lock_holder(lock_key) do
511+
case :ets.lookup(@lock_table, lock_key) do
512+
[{^lock_key, pid}] ->
513+
unless Process.alive?(pid) do
514+
# Use delete_object to atomically delete only if PID still matches
515+
# This prevents TOCTOU race where another process acquired the lock
516+
:ets.delete_object(@lock_table, {lock_key, pid})
517+
end
518+
519+
_ ->
520+
:ok
521+
end
522+
end
461523
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Gemini.MixProject do
22
use Mix.Project
33

4-
@version "0.7.1"
4+
@version "0.7.2"
55
@source_url "https://github.com/nshkrdotcom/gemini_ex"
66

77
def project do

repro_concurrency_gate.exs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
alias Gemini.RateLimiter.{Manager, ConcurrencyGate}
2+
3+
defmodule ConcurrencyGateRepro do
4+
def run(iterations \\ 500, tasks_per_round \\ 8, sleep_ms \\ 10) do
5+
ConcurrencyGate.init()
6+
Manager.reset_all()
7+
8+
for i <- 1..iterations do
9+
counter = :atomics.new(1, [])
10+
max_seen = :atomics.new(1, [])
11+
model = "repro-gate-#{i}-#{System.unique_integer([:positive])}"
12+
13+
request_fn = fn ->
14+
current = :atomics.add_get(counter, 1, 1)
15+
bump_max(max_seen, current)
16+
Process.sleep(sleep_ms)
17+
:atomics.add(counter, 1, -1)
18+
{:ok, :done}
19+
end
20+
21+
tasks =
22+
for _ <- 1..tasks_per_round do
23+
Task.async(fn ->
24+
Manager.execute(request_fn, model, max_concurrency_per_model: 1)
25+
end)
26+
end
27+
28+
results = Task.await_many(tasks, 5_000)
29+
30+
unless Enum.all?(results, &match?({:ok, _}, &1)),
31+
do: raise("unexpected result: #{inspect(results)}")
32+
33+
peak = :atomics.get(max_seen, 1)
34+
35+
if peak > 1 do
36+
IO.puts("FAIL iteration=#{i} peak=#{peak} tasks=#{tasks_per_round} sleep_ms=#{sleep_ms}")
37+
System.halt(1)
38+
end
39+
end
40+
41+
IO.puts(
42+
"PASS no overlaps after #{iterations} iterations (tasks=#{tasks_per_round}, sleep_ms=#{sleep_ms})"
43+
)
44+
end
45+
46+
defp bump_max(ref, value) do
47+
current = :atomics.get(ref, 1)
48+
49+
if value > current do
50+
:atomics.compare_exchange(ref, 1, current, value)
51+
end
52+
53+
:ok
54+
end
55+
end
56+
57+
ConcurrencyGateRepro.run()

0 commit comments

Comments
 (0)