Skip to content

Commit 99ccd10

Browse files
committed
Release v0.6.3: Enhanced configurability and reliability improvements
Added partitionable concurrency gates with per-tenant/location keys to prevent cross-tenant starvation. Concurrency permit wait timeout is now configurable via permit_timeout_ms (defaults to :infinity). Permit holders are monitored and permits are automatically reclaimed if processes die without releasing. Per-request timeout overrides for HTTP and streaming operations. Global default timeout increased from 30s to 120s. Streaming gains tunable backoff ceiling (max_backoff_ms), connect timeout, and configurable ManagerV2 cleanup delay. Context cache TTL defaults now configurable via application environment. Rate limiter retry delay fallback is similarly configurable when API responses lack explicit retry timing. Fixed streaming client memory leaks by removing persistent_term state tracking. SSE parse errors now properly surface as errors instead of silently dropping. Streaming backoff and connection timeouts are now tunable parameters. All timeout and concurrency parameters support per-call overrides while maintaining sensible global defaults. Documentation updated throughout to reflect new configuration options and behavioral changes.
1 parent 35937a0 commit 99ccd10

File tree

18 files changed

+430
-138
lines changed

18 files changed

+430
-138
lines changed

CHANGELOG.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,25 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [0.6.3] - 2025-12-05
9+
10+
### Added
11+
- Concurrency gate is now partitionable via `concurrency_key` (e.g., per-tenant or per-location) instead of a single global queue per model.
12+
- Concurrency permit wait is configurable via `permit_timeout_ms`; default is now `:infinity` (no queue drop). Per-call overrides supported.
13+
- Per-request timeout overrides for HTTP and streaming; global default HTTP/stream timeout raised to 120_000ms.
14+
- Streaming knobs: `max_backoff_ms`, `connect_timeout`, and configurable cleanup delay for ManagerV2 (`config :gemini_ex, :streaming, cleanup_delay_ms: ...`).
15+
- Configurable context cache TTL defaults via `config :gemini_ex, :context_cache, default_ttl_seconds: ...`.
16+
- Configurable retry delay fallback via `config :gemini_ex, :rate_limiter, default_retry_delay_ms: ...`.
17+
- Permit leak protection: holders are monitored and reclaimed if the process dies without releasing.
18+
19+
### Changed
20+
- Default HTTP/stream timeout increased from 30_000ms to 120_000ms.
21+
- Concurrency gate uses configurable `permit_timeout_ms` (default `:infinity`) instead of a fixed 60s timeout.
22+
23+
### Fixed
24+
- Streaming client no longer leaks `:persistent_term` state; SSE parse errors now surface instead of being silently dropped.
25+
- Streaming backoff ceiling and connect timeout are tunable; SSE parsing failures return errors.
26+
827
## [0.6.2] - 2025-12-05
928

1029
### Fixed

README.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Add `gemini` to your list of dependencies in `mix.exs`:
4747
```elixir
4848
def deps do
4949
[
50-
{:gemini_ex, "~> 0.6.2"}
50+
{:gemini_ex, "~> 0.6.3"}
5151
]
5252
end
5353
```
@@ -150,12 +150,23 @@ Gemini.Streaming.resume_stream(stream_id)
150150
Gemini.Streaming.stop_stream(stream_id)
151151
```
152152

153-
### Rate Limiting (built-in)
153+
Streaming knobs: pass `timeout:` (per attempt, default `config :gemini_ex, :timeout` = 120_000), `max_retries:` (default 3), `max_backoff_ms:` (default 10_000), and `connect_timeout:` (default 5_000). Manager cleanup delay can be tuned via `config :gemini_ex, :streaming, cleanup_delay_ms: ...`.
154+
155+
### Rate Limiting & Concurrency (built-in)
154156

155157
- Enabled by default: requests block when over budget; non-blocking mode returns `{:error, {:rate_limited, retry_at, details}}` with `retry_at` set to the window end.
156158
- Oversized requests (estimate exceeds budget) return `reason: :over_budget, request_too_large: true` immediately—no retry loop.
157159
- Cached context tokens are counted toward budgets. When you precompute cache size, you can pass `estimated_cached_tokens:` alongside `estimated_input_tokens:` to budget correctly before the API reports usage.
158160
- Optional `max_budget_wait_ms` caps how long blocking calls sleep for a full window; if the cap is hit and the window is still full, you get a `rate_limited` error with `retry_at` set to the actual window end.
161+
- Concurrency gate: `max_concurrency_per_model` plus `permit_timeout_ms` (default `:infinity`, per-call override). `non_blocking: true` is the fail-fast path (returns `{:error, :no_permit_available}` immediately).
162+
- Partition the gate with `concurrency_key:` (e.g., tenant/location) to avoid cross-tenant starvation; default key is the model name.
163+
- Permit leak protection: holders are monitored; if a holder dies without releasing, its permits are reclaimed automatically.
164+
165+
### Timeouts (HTTP & Streaming)
166+
167+
- Global HTTP/stream timeout default is 120_000ms via `config :gemini_ex, :timeout`.
168+
- Per-call override: `timeout:` on any request/stream.
169+
- Streaming extras: `max_retries`, `max_backoff_ms` (default 10_000), `connect_timeout` (default 5_000).
159170

160171
### Advanced Generation Configuration
161172

@@ -265,6 +276,8 @@ alias Gemini.Types.Content
265276
)
266277
```
267278

279+
**TTL defaults:** The default cache TTL is configurable via `config :gemini_ex, :context_cache, default_ttl_seconds: ...` (defaults to 3_600). You can also override per call with `default_ttl_seconds:` or pass `:ttl`/`:expire_time` explicitly.
280+
268281
**Models that support explicit caching:**
269282
- `gemini-2.5-flash`
270283
- `gemini-2.5-pro`

STREAMING.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ defp listen_for_events do
308308
{:stream_complete, _stream_id} ->
309309
IO.puts("\n\n✅ Stream completed!")
310310
after
311-
30_000 ->
312-
IO.puts("\n⏰ Stream timeout after 30 seconds")
311+
timeout ->
312+
IO.puts("\n⏰ Stream timeout after #{timeout / 1000} seconds (configurable)")
313313
end
314314
end
315315
```
@@ -360,4 +360,4 @@ end
360360
3. **Low Latency**: Gap between API generation and CLI display should be <50ms
361361
4. **No Buffering**: Text should stream continuously, not in large blocks
362362

363-
The current issue where "all text dumps out at the end" should be completely resolved with this streaming implementation.
363+
The current issue where "all text dumps out at the end" should be completely resolved with this streaming implementation.

config/config.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ config :gemini_ex,
88
# Uncomment to override: default_model: "your-model-name",
99

1010
# HTTP timeout in milliseconds
11-
timeout: 30_000,
11+
timeout: 120_000,
1212

1313
# Enable telemetry events
1414
telemetry_enabled: true

docs/guides/rate_limiting.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Configure globally via application environment:
4141
```elixir
4242
config :gemini_ex, :rate_limiter,
4343
max_concurrency_per_model: 4, # nil or 0 disables concurrency gating
44+
permit_timeout_ms: :infinity, # default: no cap on queue wait; set a number to cap
4445
max_attempts: 3, # Retry attempts for transient errors
4546
base_backoff_ms: 1000, # Base backoff duration
4647
jitter_factor: 0.25, # Jitter range (±25%)
@@ -69,6 +70,16 @@ end
6970

7071
# Override concurrency limit
7172
{:ok, response} = Gemini.generate("Hello", max_concurrency_per_model: 8)
73+
74+
# Override permit wait timeout (defaults to :infinity)
75+
{:ok, response} = Gemini.generate("Hello", permit_timeout_ms: 600_000)
76+
77+
# Partition the concurrency gate (e.g., by tenant/location)
78+
{:ok, response} = Gemini.generate("Hello", concurrency_key: "tenant_a")
79+
80+
# Fail fast instead of waiting
81+
{:error, {:rate_limited, nil, %{reason: :no_permit_available}}} =
82+
Gemini.generate("Hello", non_blocking: true)
7283
```
7384

7485
## Quick Start
@@ -186,6 +197,9 @@ For most applications, start with a profile and adjust:
186197
- Seeing 429s? Lower both concurrency and budget
187198
- Underutilizing quota? Raise budget, enable adaptive concurrency
188199

200+
### Concurrency semantics
201+
202+
The concurrency gate is per model by default (all callers to the same model share a queue). Use `concurrency_key:` to partition by tenant/location. `permit_timeout_ms` defaults to `:infinity`; a waiter only errors if you explicitly set a finite cap and it expires. Use `non_blocking: true` to fail fast instead of queueing.
189203
## Structured Errors
190204

191205
Rate limit errors include retry information:

lib/gemini/apis/context_cache.ex

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ defmodule Gemini.APIs.ContextCache do
312312
%{ttl: "#{opts[:ttl]}s"}
313313

314314
true ->
315-
# Default 1 hour TTL
316-
%{ttl: "3600s"}
315+
default_ttl = Keyword.get(opts, :default_ttl_seconds, default_ttl_seconds())
316+
%{ttl: "#{default_ttl}s"}
317317
end
318318
end
319319

@@ -333,6 +333,11 @@ defmodule Gemini.APIs.ContextCache do
333333
end
334334
end
335335

336+
defp default_ttl_seconds do
337+
context_cache_config = Application.get_env(:gemini_ex, :context_cache, [])
338+
Keyword.get(context_cache_config, :default_ttl_seconds, 3_600)
339+
end
340+
336341
defp maybe_add_tools(map, opts) do
337342
case Keyword.get(opts, :tools) do
338343
tools when is_list(tools) and length(tools) > 0 ->

lib/gemini/apis/tokens.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ defmodule Gemini.APIs.Tokens do
174174
def count_batch(inputs, opts \\ []) when is_list(inputs) do
175175
# Use Task.async_stream for parallel processing
176176
max_concurrency = Keyword.get(opts, :max_concurrency, 5)
177-
timeout = Keyword.get(opts, :timeout, 30_000)
177+
timeout = Keyword.get(opts, :timeout, Config.timeout())
178178

179179
try do
180180
results =

lib/gemini/client/http.ex

Lines changed: 55 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,13 @@ defmodule Gemini.Client.HTTP do
106106

107107
Telemetry.execute([:gemini, :request, :start], measurements, metadata)
108108

109+
timeout = Keyword.get(opts, :timeout, Config.timeout())
110+
109111
req_opts = [
110112
method: method,
111113
url: url,
112114
headers: headers,
113-
receive_timeout: Config.timeout(),
115+
receive_timeout: timeout,
114116
json: body
115117
]
116118

@@ -184,10 +186,12 @@ defmodule Gemini.Client.HTTP do
184186

185187
Telemetry.execute([:gemini, :stream, :start], measurements, metadata)
186188

189+
timeout = Keyword.get(opts, :timeout, Config.timeout())
190+
187191
req_opts = [
188192
url: sse_url,
189193
headers: headers,
190-
receive_timeout: Config.timeout(),
194+
receive_timeout: timeout,
191195
json: body,
192196
into: :self
193197
]
@@ -196,18 +200,35 @@ defmodule Gemini.Client.HTTP do
196200
result =
197201
case Req.post(req_opts) do
198202
{:ok, %Req.Response{status: status, body: body}} when status in 200..299 ->
199-
events = parse_sse_stream(body)
203+
result =
204+
case parse_sse_stream(body) do
205+
{:ok, events} ->
206+
duration = Telemetry.calculate_duration(start_time)
207+
208+
stop_measurements = %{
209+
total_duration: duration,
210+
total_chunks: length(events)
211+
}
200212

201-
duration = Telemetry.calculate_duration(start_time)
213+
Telemetry.execute(
214+
[:gemini, :stream, :stop],
215+
stop_measurements,
216+
metadata
217+
)
202218

203-
stop_measurements = %{
204-
total_duration: duration,
205-
total_chunks: length(events)
206-
}
219+
{:ok, events}
207220

208-
Telemetry.execute([:gemini, :stream, :stop], stop_measurements, metadata)
221+
{:error, parse_error} ->
222+
Telemetry.execute(
223+
[:gemini, :stream, :exception],
224+
measurements,
225+
Map.put(metadata, :reason, parse_error)
226+
)
209227

210-
{:ok, events}
228+
{:error, parse_error}
229+
end
230+
231+
result
211232

212233
{:ok, %Req.Response{status: status}} ->
213234
error = {:http_error, status, "Stream request failed"}
@@ -251,19 +272,23 @@ defmodule Gemini.Client.HTTP do
251272
@doc """
252273
Raw streaming POST with full URL (used by streaming manager).
253274
"""
254-
def stream_post_raw(url, body, headers, _opts \\ []) do
275+
def stream_post_raw(url, body, headers, opts \\ []) do
276+
timeout = Keyword.get(opts, :timeout, Config.timeout())
277+
255278
req_opts = [
256279
url: url,
257280
headers: headers,
258-
receive_timeout: Config.timeout(),
281+
receive_timeout: timeout,
259282
json: body,
260283
into: :self
261284
]
262285

263286
case Req.post(req_opts) do
264287
{:ok, %Req.Response{status: status, body: body}} when status in 200..299 ->
265-
events = parse_sse_stream(body)
266-
{:ok, events}
288+
case parse_sse_stream(body) do
289+
{:ok, events} -> {:ok, events}
290+
{:error, parse_error} -> {:error, parse_error}
291+
end
267292

268293
{:ok, %Req.Response{status: status}} ->
269294
{:error, Error.http_error(status, "Stream request failed")}
@@ -369,16 +394,24 @@ defmodule Gemini.Client.HTTP do
369394

370395
# Parse Server-Sent Events format
371396
defp parse_sse_stream(data) when is_binary(data) do
372-
data
373-
|> String.split("\n\n")
374-
|> Enum.filter(&(String.trim(&1) != ""))
375-
|> Enum.map(&parse_sse_event/1)
376-
|> Enum.filter(&(&1 != nil))
377-
rescue
378-
_ -> []
397+
try do
398+
events =
399+
data
400+
|> String.split("\n\n")
401+
|> Enum.filter(&(String.trim(&1) != ""))
402+
|> Enum.map(&parse_sse_event/1)
403+
|> Enum.filter(&(&1 != nil))
404+
405+
{:ok, events}
406+
rescue
407+
exception ->
408+
{:error,
409+
Error.invalid_response("Failed to parse SSE stream: #{Exception.message(exception)}")}
410+
end
379411
end
380412

381-
defp parse_sse_stream(_), do: []
413+
defp parse_sse_stream(_),
414+
do: {:error, Error.invalid_response("Invalid SSE stream payload")}
382415

383416
defp parse_sse_event(event_data) do
384417
lines = String.split(event_data, "\n")

0 commit comments

Comments
 (0)