Skip to content

Commit f65bc1f

Browse files
authored
Merge pull request #5 from coherentpath/expose-timeout-for-buffer-functions
Expose timeout for buffer functions
2 parents 2f536d0 + 9dbb63d commit f65bc1f

File tree

2 files changed

+43
-23
lines changed

2 files changed

+43
-23
lines changed

lib/buffer.ex

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,18 @@ defmodule Buffer do
9999
## Options
100100
101101
* `:partition` - The specific partition to dump. Defaults to `:all`.
102+
103+
* `:timeout` - The timeout for the GenServer call in milliseconds. Defaults to `5000`.
102104
"""
103105
@spec dump(GenServer.server(), keyword()) :: {:ok, list()} | {:error, atom()}
104106
def dump(buffer, opts \\ []) do
107+
timeout = Keyword.get(opts, :timeout, 5000)
108+
105109
with {:ok, {_, parts}} <- fetch_buffer(buffer),
106110
{:ok, part} <- validate_partition(opts, parts) do
107111
case part do
108-
:all -> {:ok, Enum.reduce(1..parts, [], &(&2 ++ do_dump_part(buffer, &1 - 1)))}
109-
part -> {:ok, do_dump_part(buffer, part)}
112+
:all -> {:ok, Enum.reduce(1..parts, [], &(&2 ++ do_dump_part(buffer, &1 - 1, timeout)))}
113+
part -> {:ok, do_dump_part(buffer, part, timeout)}
110114
end
111115
end
112116
end
@@ -120,6 +124,8 @@ defmodule Buffer do
120124
* `:async` - Whether or not the flush will be async. Defaults to `true`.
121125
122126
* `:partition` - The specific partition to flush. Defaults to `:all`.
127+
128+
* `:timeout` - The timeout for the GenServer call in milliseconds. Defaults to `5000`.
123129
"""
124130
@spec flush(GenServer.server(), keyword()) :: :ok | {:error, atom()}
125131
def flush(buffer, opts \\ []) do
@@ -138,25 +144,33 @@ defmodule Buffer do
138144
## Options
139145
140146
* `:partition` - The specific partition to return info for. Defaults to `:all`.
147+
148+
* `:timeout` - The timeout for the GenServer call in milliseconds. Defaults to `5000`.
141149
"""
142150
@spec info(GenServer.server(), keyword()) :: {:ok, list()} | {:error, atom()}
143151
def info(buffer, opts \\ []) do
152+
timeout = Keyword.get(opts, :timeout, 5000)
153+
144154
with {:ok, {_, parts}} <- fetch_buffer(buffer),
145155
{:ok, part} <- validate_partition(opts, parts) do
146156
case part do
147-
:all -> {:ok, Enum.map(1..parts, &do_info_part(buffer, &1 - 1))}
148-
part -> {:ok, [do_info_part(buffer, part)]}
157+
:all -> {:ok, Enum.map(1..parts, &do_info_part(buffer, &1 - 1, timeout))}
158+
part -> {:ok, [do_info_part(buffer, part, timeout)]}
149159
end
150160
end
151161
end
152162

153163
@doc """
154164
Inserts the given item into the given `Buffer`.
165+
166+
## Options
167+
168+
* `timeout` - The timeout for the GenServer call in milliseconds. Defaults to `5000`.
155169
"""
156-
@spec insert(GenServer.server(), term()) :: :ok | {:error, atom()}
157-
def insert(buffer, item) do
170+
@spec insert(GenServer.server(), term(), timeout()) :: :ok | {:error, atom()}
171+
def insert(buffer, item, timeout \\ 5000) do
158172
with {:ok, {partitioner, _}} <- fetch_buffer(buffer) do
159-
do_insert(buffer, partitioner, item)
173+
do_insert(buffer, partitioner, item, timeout)
160174
end
161175
end
162176

@@ -169,6 +183,8 @@ defmodule Buffer do
169183
Defaults to `true`. If set to `false`, all items in the batch will be inserted
170184
regardless of flush conditions being met. Afterwards, if a limit has been exceeded,
171185
the buffer will be flushed async.
186+
187+
* `:timeout` - The timeout for the GenServer call in milliseconds. Defaults to `5000`.
172188
"""
173189
@spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) ::
174190
{:ok, non_neg_integer()} | {:error, atom()}
@@ -254,10 +270,10 @@ defmodule Buffer do
254270

255271
defp build_key(buffer), do: {__MODULE__, buffer}
256272

257-
defp do_dump_part(buffer, partition) do
273+
defp do_dump_part(buffer, partition, timeout) do
258274
buffer
259275
|> buffer_partition_name(partition)
260-
|> Server.dump()
276+
|> Server.dump(timeout)
261277
end
262278

263279
defp do_flush_part(buffer, partition, opts) do
@@ -266,16 +282,16 @@ defmodule Buffer do
266282
|> Server.flush(opts)
267283
end
268284

269-
defp do_info_part(buffer, partition) do
285+
defp do_info_part(buffer, partition, timeout) do
270286
buffer
271287
|> buffer_partition_name(partition)
272-
|> Server.info()
288+
|> Server.info(timeout)
273289
end
274290

275-
defp do_insert(buffer, partitioner, item) do
291+
defp do_insert(buffer, partitioner, item, timeout) do
276292
buffer
277293
|> buffer_partition_name(partitioner.())
278-
|> Server.insert(item)
294+
|> Server.insert(item, timeout)
279295
end
280296

281297
defp do_insert_batch(buffer, partitioner, items, opts) do

lib/buffer/server.ex

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,34 +28,38 @@ defmodule Buffer.Server do
2828
end
2929

3030
@doc false
31-
@spec dump(GenServer.server()) :: list()
32-
def dump(buffer), do: GenServer.call(buffer, :dump)
31+
@spec dump(GenServer.server(), timeout()) :: list()
32+
def dump(buffer, timeout \\ 5000), do: GenServer.call(buffer, :dump, timeout)
3333

3434
@doc false
3535
@spec flush(GenServer.server(), keyword()) :: :ok
3636
def flush(buffer, opts \\ []) do
37+
timeout = Keyword.get(opts, :timeout, 5000)
38+
3739
if Keyword.get(opts, :async, true) do
38-
GenServer.call(buffer, :async_flush)
40+
GenServer.call(buffer, :async_flush, timeout)
3941
else
40-
GenServer.call(buffer, :sync_flush)
42+
GenServer.call(buffer, :sync_flush, timeout)
4143
end
4244
end
4345

4446
@doc false
45-
@spec info(GenServer.server()) :: map()
46-
def info(buffer), do: GenServer.call(buffer, :info)
47+
@spec info(GenServer.server(), timeout()) :: map()
48+
def info(buffer, timeout \\ 5000), do: GenServer.call(buffer, :info, timeout)
4749

4850
@doc false
49-
@spec insert(GenServer.server(), term()) :: :ok
50-
def insert(buffer, item), do: GenServer.call(buffer, {:insert, item})
51+
@spec insert(GenServer.server(), term(), timeout()) :: :ok
52+
def insert(buffer, item, timeout \\ 5000), do: GenServer.call(buffer, {:insert, item}, timeout)
5153

5254
@doc false
5355
@spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: :ok
5456
def insert_batch(buffer, items, opts \\ []) do
57+
timeout = Keyword.get(opts, :timeout, 5000)
58+
5559
if Keyword.get(opts, :safe_flush, true) do
56-
GenServer.call(buffer, {:safe_insert_batch, items})
60+
GenServer.call(buffer, {:safe_insert_batch, items}, timeout)
5761
else
58-
GenServer.call(buffer, {:unsafe_insert_batch, items})
62+
GenServer.call(buffer, {:unsafe_insert_batch, items}, timeout)
5963
end
6064
end
6165

0 commit comments

Comments
 (0)