Skip to content

Commit 2828081

Browse files
authored
Merge pull request #6 from coherentpath/add-support-for-item-ordering
Add support for item ordering
2 parents f65bc1f + f28fed6 commit 2828081

File tree

4 files changed

+39
-1
lines changed

4 files changed

+39
-1
lines changed

lib/buffer.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ defmodule Buffer do
4343
* `:max_size` - The maximum size (in bytes) of the buffer before being flushed. By default,
4444
this limit is `:infinity`.
4545
46+
* `:ordering` - The order in which buffered items are returned. The options are `:fifo`
47+
(first-in-first-out) and `:lifo` (last-in-first-out). Defaults to `:fifo`. Using `:lifo`
48+
provides a small performance improvement by skipping the list reversal.
49+
4650
* `:partitioner` - The method by which items are inserted into different partitions. The
4751
options are `:rotating` and `:random` and the former is the default.
4852

lib/buffer/server.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Buffer.Server do
1212
:jitter_rate,
1313
:max_length,
1414
:max_size,
15+
:ordering,
1516
:partition,
1617
:size_callback
1718
]

lib/buffer/state.ex

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule Buffer.State do
66
:flush_meta,
77
:max_length,
88
:max_size,
9+
:ordering,
910
:partition,
1011
:size_callback,
1112
:timeout,
@@ -36,6 +37,7 @@ defmodule Buffer.State do
3637

3738
@doc false
3839
@spec items(t()) :: list()
40+
def items(%{ordering: :lifo} = buffer), do: buffer.buffer
3941
def items(buffer), do: Enum.reverse(buffer.buffer)
4042

4143
@doc false
@@ -46,12 +48,14 @@ defmodule Buffer.State do
4648
{:ok, size_callback} <- get_size_callback(opts),
4749
{:ok, max_length} <- get_max_length(opts, jitter),
4850
{:ok, max_size} <- get_max_size(opts, jitter),
49-
{:ok, timeout} <- get_timeout(opts, jitter) do
51+
{:ok, timeout} <- get_timeout(opts, jitter),
52+
{:ok, ordering} <- get_ordering(opts) do
5053
buffer = %__MODULE__{
5154
flush_callback: flush_callback,
5255
flush_meta: Keyword.get(opts, :flush_meta),
5356
max_length: max_length,
5457
max_size: max_size,
58+
ordering: ordering,
5559
partition: Keyword.get(opts, :partition, 0),
5660
size_callback: size_callback,
5761
timeout: timeout
@@ -103,6 +107,13 @@ defmodule Buffer.State do
103107
validate_limit(Keyword.get(opts, :buffer_timeout, :infinity), jitter)
104108
end
105109

110+
defp get_ordering(opts) do
111+
case Keyword.get(opts, :ordering, :fifo) do
112+
ordering when ordering in [:fifo, :lifo] -> {:ok, ordering}
113+
_ -> {:error, :invalid_ordering}
114+
end
115+
end
116+
106117
defp validate_callback(fun, arity) when is_function(fun, arity), do: {:ok, fun}
107118
defp validate_callback(_, _), do: {:error, :invalid_callback}
108119

test/buffer_test.exs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ defmodule BufferTest do
7070
assert start_ex_buffer(opts) == {:error, :invalid_jitter}
7171
end
7272

73+
test "will not start with an invalid ordering" do
74+
opts = [ordering: :invalid]
75+
76+
assert start_ex_buffer(opts) == {:error, :invalid_ordering}
77+
end
78+
7379
test "will flush an Buffer on termination" do
7480
assert {:ok, buffer} = start_ex_buffer()
7581
assert seed_buffer(buffer) == :ok
@@ -177,6 +183,14 @@ defmodule BufferTest do
177183
assert {:ok, buffer} = start_ex_buffer()
178184
assert Buffer.dump(buffer, partition: -1) == {:error, :invalid_partition}
179185
end
186+
187+
test "will return items in LIFO order when set" do
188+
opts = [ordering: :lifo]
189+
190+
assert {:ok, buffer} = start_ex_buffer(opts)
191+
assert seed_buffer(buffer) == :ok
192+
assert Buffer.dump(buffer) == {:ok, ["baz", "bar", "foo"]}
193+
end
180194
end
181195

182196
describe "flush/2" do
@@ -232,6 +246,14 @@ defmodule BufferTest do
232246
assert {:ok, buffer} = start_ex_buffer()
233247
assert Buffer.flush(buffer, partition: -1) == {:error, :invalid_partition}
234248
end
249+
250+
test "will flush items in LIFO order when set" do
251+
opts = [ordering: :lifo, max_length: 3]
252+
253+
assert {:ok, buffer} = start_ex_buffer(opts)
254+
assert seed_buffer(buffer) == :ok
255+
assert_receive {^buffer, ["baz", "bar", "foo"], _}
256+
end
235257
end
236258

237259
describe "info/2" do

0 commit comments

Comments
 (0)