Skip to content

Commit 0b3de0c

Browse files
committed
Add manipulator based coalescher to coalesce multiple simultaneous that could be cached
1 parent 1f5d779 commit 0b3de0c

File tree

8 files changed

+230
-33
lines changed

8 files changed

+230
-33
lines changed

lib/cache/cache.ex

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,22 @@ defmodule Cache.Registry do
1717
end
1818
end
1919

20+
def get_or_create_coalescer(key) do
21+
GenServer.call(__MODULE__, {:get_or_create_coalescer, key})
22+
end
23+
24+
def get_coalesce_pid(key) do
25+
case GenServer.call(__MODULE__, {:get_coalesce, key}) do
26+
{:ok, response} -> response
27+
{:not_found} -> nil
28+
end
29+
end
30+
31+
def remove_coalesce_key(key) do
32+
GenServer.call(__MODULE__, {:remove_coalesce, key})
33+
end
34+
2035
def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do
21-
# IO.puts "Going to store new content"
22-
# IO.inspect( key, label: "Key to store under" )
23-
# IO.inspect( response, label: "Response to save" )
2436
GenServer.call(__MODULE__, {:store, key, response})
2537
end
2638

@@ -32,11 +44,42 @@ defmodule Cache.Registry do
3244
# GenServer API
3345
###
3446
def start_link(_) do
35-
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}}], name: __MODULE__)
47+
GenServer.start_link(__MODULE__, [%{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}],
48+
name: __MODULE__
49+
)
3650
end
3751

3852
def init(_) do
39-
{:ok, %{cache: %{}, caches_by_key: %{}}}
53+
{:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}}
54+
end
55+
56+
def handle_call({:get_or_create_coalescer, key}, _from, state) do
57+
case Map.get(state.coalesce_handlers, key, nil) do
58+
nil ->
59+
{:ok, pid} = Coalesce.Registry.start(%{})
60+
new_state = put_in(state[:coalesce_handlers][key], pid)
61+
{:reply, {:created, pid}, new_state}
62+
63+
pid ->
64+
{:reply, {:attach, pid}, state}
65+
end
66+
end
67+
68+
def handle_call({:get_coalesce, key}, _from, state) do
69+
if Map.has_key?(state.coalesce_handlers, key) do
70+
{:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state}
71+
else
72+
{:reply, {:not_found}, state}
73+
end
74+
end
75+
76+
def handle_call({:remove_coalesce, key}, _from, state) do
77+
if Map.has_key?(state.coalesce_handlers, key) do
78+
{_, new_state} = pop_in(state[:coalesce_handlers][key])
79+
{:reply, {:ok}, new_state}
80+
else
81+
{:reply, {:not_found}, state}
82+
end
4083
end
4184

4285
def handle_call({:find_cache, key}, _from, state) do
@@ -48,21 +91,13 @@ defmodule Cache.Registry do
4891
end
4992

5093
def handle_call({:store, request_key, response}, _from, state) do
51-
# IO.inspect( request_key, label: "Request key" )
52-
# IO.inspect( response, label: "Response" )
53-
5494
%{cache_keys: cache_keys, clear_keys: clear_keys} = response
5595

56-
# IO.inspect { :cache_keys, cache_keys }
57-
# IO.inspect { :clear_keys, clear_keys }
58-
5996
state =
6097
state
6198
# update state for clear_keys
6299
|> clear_keys!(clear_keys)
63100

64-
# IO.puts "Executed clear keys"
65-
66101
if cache_keys == [] do
67102
{:reply, :ok, state}
68103
else
@@ -96,6 +131,7 @@ defmodule Cache.Registry do
96131
cache =
97132
Enum.reduce(clear_keys, cache, fn clear_key, cache ->
98133
keys_to_remove = Map.get(caches_by_key, clear_key, [])
134+
99135
cache = Map.drop(cache, keys_to_remove)
100136
cache
101137
end)

lib/cache/coalesce.ex

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
defmodule Coalesce.Registry do
2+
@moduledoc """
3+
Maintains coalescing requests for specific request, dependant on key.
4+
"""
5+
6+
use GenServer
7+
8+
def add_connection(pid, connection) do
9+
{:ok, conn} = GenServer.call(pid, {:add_conn, connection})
10+
conn
11+
end
12+
13+
def assure_status_sent(state, status) do
14+
if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do
15+
# First time there was a status
16+
conns =
17+
state.connections
18+
|> Enum.map(fn {conn, from} ->
19+
conn =
20+
Plug.Conn.merge_resp_headers(conn, state.headers)
21+
|> Plug.Conn.send_chunked(status)
22+
23+
{conn, from}
24+
end)
25+
|> Enum.flat_map(fn {conn, from} ->
26+
case push_body_parts(conn, state.body) do
27+
nil -> []
28+
conn -> [{conn, from}]
29+
end
30+
end)
31+
32+
%{state | connections: conns, status: status}
33+
else
34+
state
35+
end
36+
end
37+
38+
def push_body_parts(conn, body_parts) do
39+
Enum.reduce_while(body_parts, conn, fn ch, conn ->
40+
case Plug.Conn.chunk(conn, ch) do
41+
{:ok, conn} -> {:cont, conn}
42+
{:error, :closed} -> {:halt, nil}
43+
end
44+
end)
45+
end
46+
47+
###
48+
# GenServer API
49+
###
50+
def start(_) do
51+
GenServer.start(__MODULE__, [%{}])
52+
end
53+
54+
@impl true
55+
def init(_) do
56+
{:ok, %{connections: [], headers: nil, body: [], status: nil}}
57+
end
58+
59+
@impl true
60+
def handle_call({:add_conn, conn}, from, state) do
61+
conn = if not is_nil(state.status) and not is_nil(state.headers) do
62+
Plug.Conn.merge_resp_headers(conn, state.headers)
63+
|> Plug.Conn.send_chunked(state.status)
64+
|> push_body_parts(state.body)
65+
else
66+
conn
67+
end
68+
69+
conns = [{conn, from} | state.connections]
70+
71+
new_state = %{state | connections: conns}
72+
73+
{:noreply, new_state}
74+
end
75+
76+
@impl true
77+
def handle_cast({:headers, headers, status}, state) do
78+
state_with_headers = %{state | headers: headers}
79+
new_state = assure_status_sent(state_with_headers, status)
80+
81+
{:noreply, new_state}
82+
end
83+
84+
@impl true
85+
def handle_cast({:chunk, data, status}, state) do
86+
new_state = assure_status_sent(state, status)
87+
88+
conns =
89+
Enum.flat_map(new_state.connections, fn {conn, from} ->
90+
case Plug.Conn.chunk(conn, data) do
91+
{:ok, conn} -> [{conn, from}]
92+
{:error, :closed} -> []
93+
end
94+
end)
95+
96+
new_state = %{new_state | connections: conns}
97+
98+
{:noreply, new_state}
99+
end
100+
101+
@impl true
102+
def handle_cast({:finished, status}, state) do
103+
new_state = assure_status_sent(state, status)
104+
105+
Enum.each(new_state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end)
106+
107+
{:stop, :normal, new_state}
108+
end
109+
end

lib/manipulators/cache_key_logger.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ defmodule Manipulators.CacheKeyLogger do
3838
end
3939

4040
defp header_value(headers, header_name) do
41-
header = Enum.find(headers, header_name)
41+
header =
42+
headers
43+
|> Enum.find(&match?({^header_name, _}, &1))
4244

4345
if header do
4446
elem(header, 1)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
defmodule Manipulators.CoalesceResponse do
2+
@moduledoc """
3+
Manipulates the response, notifying the Coalesce.Registry for this request.
4+
"""
5+
6+
alias Cache.Registry, as: Cache
7+
8+
@behaviour ProxyManipulator
9+
10+
@impl true
11+
def headers(headers, {conn_in, conn_out}) do
12+
all_response_headers = Mint.HTTP.get_private(conn_out, :mu_cache_original_headers)
13+
14+
allowed_groups =
15+
all_response_headers
16+
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
17+
|> elem(1)
18+
|> Poison.decode!()
19+
20+
key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}
21+
22+
pid = Cache.get_coalesce_pid(key)
23+
24+
conn_out =
25+
conn_out
26+
|> Mint.HTTP.put_private(:coalesce_pid, pid)
27+
|> Mint.HTTP.put_private(:coalesce_key, key)
28+
29+
GenServer.cast(pid, {:headers, headers, conn_in.status})
30+
31+
{headers, {conn_in, conn_out}}
32+
end
33+
34+
@impl true
35+
def chunk(chunk, {conn_in, conn_out}) do
36+
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
37+
GenServer.cast(pid, {:chunk, chunk, conn_in.status})
38+
39+
:skip
40+
end
41+
42+
@impl true
43+
def finish(_, {conn_in, conn_out}) do
44+
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
45+
key = Mint.HTTP.get_private(conn_out, :coalesce_key)
46+
Cache.remove_coalesce_key(key)
47+
GenServer.cast(pid, {:finished, conn_in.status})
48+
:skip
49+
end
50+
end

lib/manipulators/remove_cache_related_keys.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ defmodule Manipulators.RemoveCacheRelatedKeys do
99
@behaviour ProxyManipulator
1010

1111
@impl true
12-
def headers(headers, connection) do
12+
def headers(headers_inp, connection) do
1313
new_headers =
14-
headers
14+
headers_inp
1515
|> Enum.reject(fn
1616
{"cache-keys", _} -> true
1717
{"clear-keys", _} -> true

lib/manipulators/store_response.ex

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do
6666
all_response_headers
6767
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
6868
|> elem(1)
69+
|> Poison.decode!()
6970

7071
cache_keys =
7172
all_response_headers
@@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do
7980
|> elem(1)
8081
|> Poison.decode!()
8182

82-
# IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" )
83-
8483
Cache.store(
8584
{conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups},
8685
%{
@@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do
109108
|> elem(1)
110109
|> Poison.decode!()
111110

112-
IO.inspect(clear_keys, label: "Clear keys")
113-
114111
Cache.clear_keys(clear_keys)
115112

116113
true ->

lib/mu_cache_plug.ex

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ defmodule MuCachePlug do
1414
@response_manipulators [
1515
Manipulators.CacheKeyLogger,
1616
Manipulators.StoreResponse,
17-
Manipulators.RemoveCacheRelatedKeys
17+
Manipulators.RemoveCacheRelatedKeys,
18+
19+
# Make sure this is the last one, coalescing responses as generated at this point
20+
Manipulators.CoalesceResponse
1821
]
1922
@manipulators ProxyManipulatorSettings.make_settings(
2023
@request_manipulators,
@@ -49,16 +52,16 @@ defmodule MuCachePlug do
4952
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
5053

5154
cached_value =
52-
Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) ->
55+
Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) ->
5356
# with allowed groups and a cache, we should use the cache
5457
respond_with_cache(conn, cached_value)
5558

5659
true ->
57-
# without a cache, we should consult the backend
58-
# IO.inspect(
59-
# {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature")
60-
61-
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
60+
key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}
61+
case Cache.get_or_create_coalescer(key) do
62+
{:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
63+
{:attach, pid} -> Coalesce.Registry.add_connection(pid, conn)
64+
end
6265
end
6366
end
6467

mix.lock

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
%{
22
"bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"},
3-
"castore": {:hex, :castore, "0.1.9", "eb08a94c12ebff92a92d844c6ccd90728dc7662aab9bdc8b3b785ba653c499d5", [:mix], [], "hexpm", "99c3a38ad9c0bab03fee1418c98390da1a31f3b85e317db5840d51a1443d26c8"},
4-
"cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"},
3+
"castore": {:hex, :castore, "0.1.11", "c0665858e0e1c3e8c27178e73dffea699a5b28eb72239a3b2642d208e8594914", [:mix], [], "hexpm", "91b009ba61973b532b84f7c09ce441cba7aa15cb8b006cf06c6f4bba18220081"},
4+
"cowboy": {:hex, :cowboy, "2.9.0", "865dd8b6607e14cf03282e10e934023a1bd8be6f6bacf921a7e2a96d800cd452", [:make, :rebar3], [{:cowlib, "2.11.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "2c729f934b4e1aa149aff882f57c6372c15399a20d54f65c8d67bef583021bde"},
55
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
6-
"cowlib": {:hex, :cowlib, "2.9.1", "61a6c7c50cf07fdd24b2f45b89500bb93b6686579b069a89f88cb211e1125c78", [:rebar3], [], "hexpm", "e4175dc240a70d996156160891e1c62238ede1729e45740bdd38064dad476170"},
7-
"credo": {:hex, :credo, "1.5.5", "e8f422026f553bc3bebb81c8e8bf1932f498ca03339856c7fec63d3faac8424b", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "dd8623ab7091956a855dc9f3062486add9c52d310dfd62748779c4315d8247de"},
6+
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
7+
"credo": {:hex, :credo, "1.5.6", "e04cc0fdc236fefbb578e0c04bd01a471081616e741d386909e527ac146016c6", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "4b52a3e558bd64e30de62a648518a5ea2b6e3e5d2b164ef5296244753fc7eb17"},
88
"dialyxir": {:hex, :dialyxir, "1.1.0", "c5aab0d6e71e5522e77beff7ba9e08f8e02bad90dfbeffae60eaf0cb47e29488", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "07ea8e49c45f15264ebe6d5b93799d4dd56a44036cf42d0ad9c960bc266c0b9a"},
99
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
1010
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
@@ -16,6 +16,6 @@
1616
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
1717
"plug_mint_proxy": {:git, "https://github.com/madnificent/plug-mint-proxy.git", "cb52954d260117a0b0e65baa8d3f313561bc2cf7", [branch: "feature/separate-example-runner"]},
1818
"poison": {:hex, :poison, "2.2.0", "4763b69a8a77bd77d26f477d196428b741261a761257ff1cf92753a0d4d24a63", [:mix], [], "hexpm", "519bc209e4433961284174c497c8524c001e285b79bdf80212b47a1f898084cc"},
19-
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
20-
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
19+
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
20+
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
2121
}

0 commit comments

Comments
 (0)