Skip to content

Commit a81f6d3

Browse files
committed
MVP implementation
1 parent 156ee43 commit a81f6d3

File tree

5 files changed

+108
-62
lines changed

5 files changed

+108
-62
lines changed

lib/cache/cache.ex

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,25 @@ defmodule Cache.Registry do
1717
end
1818
end
1919

20-
def create_coalescer(key) do
21-
GenServer.call(__MODULE__, {:start_coalesce, key})
20+
def get_or_create_coalescer(key) do
21+
GenServer.call(__MODULE__, {:get_or_create_coalescer, key})
2222
end
2323

2424
def get_coalesce_pid(key) do
25-
GenServer.call(__MODULE__, {:get_coalesce, key})
25+
IO.inspect(key, label: "Getting coalesce")
26+
27+
case GenServer.call(__MODULE__, {:get_coalesce, key})
28+
|> IO.inspect(label: "got return value") do
29+
{:ok, response} -> response
30+
{:not_found} -> nil
31+
end
2632
end
2733

2834
def remove_coalesce_key(key) do
2935
GenServer.call(__MODULE__, {:remove_coalesce, key})
3036
end
3137

3238
def store({_method, _full_path, _get_params, _allowed_groups} = key, response) do
33-
# IO.puts "Going to store new content"
34-
# IO.inspect( key, label: "Key to store under" )
35-
# IO.inspect( response, label: "Response to save" )
3639
GenServer.call(__MODULE__, {:store, key, response})
3740
end
3841

@@ -53,18 +56,18 @@ defmodule Cache.Registry do
5356
{:ok, %{cache: %{}, caches_by_key: %{}, coalesce_handlers: %{}}}
5457
end
5558

59+
def handle_call({:get_or_create_coalescer, key}, _from, state) do
60+
case Map.get(state.coalesce_handlers, key, nil) do
61+
nil ->
62+
{:ok, pid} = Coalesce.Registry.start(%{})
63+
new_state = put_in(state[:coalesce_handlers][key], pid)
64+
{:reply, {:created, pid}, new_state}
5665

57-
def handle_call({:start_coalesce, key}, _from, state) do
58-
if Map.has_key?(state.coalesce_handlers, key) do
59-
{:reply, {:alread_started}, state}
60-
else
61-
{:ok, pid} = Coalesce.Registry.start(%{})
62-
new_state = put_in(state[:coalesce_handlers][key], pid)
63-
{:reply, {:ok, pid}, new_state}
66+
pid ->
67+
{:reply, {:attach, pid}, state}
6468
end
6569
end
6670

67-
6871
def handle_call({:get_coalesce, key}, _from, state) do
6972
if Map.has_key?(state.coalesce_handlers, key) do
7073
{:reply, {:ok, Map.get(state.coalesce_handlers, key)}, state}
@@ -73,10 +76,8 @@ defmodule Cache.Registry do
7376
end
7477
end
7578

76-
7779
def handle_call({:remove_coalesce, key}, _from, state) do
7880
if Map.has_key?(state.coalesce_handlers, key) do
79-
handlers = Map.pop(state.coalesce_handlers, key)
8081
{_, new_state} = pop_in(state[:coalesce_handlers][key])
8182
{:reply, {:ok}, new_state}
8283
else
@@ -85,23 +86,21 @@ defmodule Cache.Registry do
8586
end
8687

8788
def handle_call({:find_cache, key}, _from, state) do
88-
if Map.has_key?(state.cache, key) do
89+
if has_key?(state, key) do
8990
{:reply, {:ok, Map.get(state.cache, key)}, state}
9091
else
9192
{:reply, {:not_found}, state}
9293
end
9394
end
9495

9596
def handle_call({:store, request_key, response}, _from, state) do
96-
9797
%{cache_keys: cache_keys, clear_keys: clear_keys} = response
9898

9999
state =
100100
state
101101
# update state for clear_keys
102102
|> clear_keys!(clear_keys)
103103

104-
105104
if cache_keys == [] do
106105
{:reply, :ok, state}
107106
else
@@ -135,6 +134,7 @@ defmodule Cache.Registry do
135134
cache =
136135
Enum.reduce(clear_keys, cache, fn clear_key, cache ->
137136
keys_to_remove = Map.get(caches_by_key, clear_key, [])
137+
138138
cache = Map.drop(cache, keys_to_remove)
139139
cache
140140
end)

lib/cache/coalesce.ex

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ defmodule Coalesce.Registry do
55

66
# TODO define coalesce status
77
# With headers, body (in chunks), status code something else?
8-
9-
8+
def add_connection(pid, connection) do
9+
{:ok, conn} = GenServer.call(pid, {:add_conn, connection})
10+
conn
11+
end
1012

1113
use GenServer
1214
###
@@ -16,51 +18,97 @@ defmodule Coalesce.Registry do
1618
GenServer.start(__MODULE__, [%{}])
1719
end
1820

21+
def assure_status_sent(state, status) do
22+
if not is_nil(status) and is_nil(state.status) and not is_nil(state.headers) do
23+
# First time there was a status
24+
conns =
25+
state.connections
26+
|> Enum.map(fn {conn, from} ->
27+
conn =
28+
Plug.Conn.merge_resp_headers(conn, state.headers)
29+
|> Plug.Conn.send_chunked(status)
30+
31+
{conn, from}
32+
end)
33+
|> Enum.flat_map(fn {conn, from} ->
34+
case push_body_parts(conn, state.body) do
35+
nil -> []
36+
conn -> [{conn, from}]
37+
end
38+
end)
39+
40+
%{state | connections: conns, status: status}
41+
else
42+
state
43+
end
44+
end
45+
46+
def push_body_parts(conn, body_parts) do
47+
Enum.reduce_while(body_parts, conn, fn ch, conn ->
48+
case Plug.Conn.chunk(conn, ch) do
49+
{:ok, conn} -> {:cont, conn}
50+
{:error, :closed} -> {:halt, nil}
51+
end
52+
end)
53+
end
54+
1955
@impl true
2056
def init(s) do
2157
IO.inspect(s, label: "init config")
22-
{:ok, %{connections: [], state: %{headers: nil, body: []}}}
58+
{:ok, %{connections: [], headers: nil, body: [], status: nil}}
2359
end
2460

2561
# TODO
2662
@impl true
27-
def handle_call({:add_conn, _connection}, _from, state) do
28-
# Put connection in state
29-
# Make connection up to date
63+
def handle_call({:add_conn, conn}, from, state) do
64+
IO.inspect("adding connection to coalescer")
65+
66+
if not is_nil(state.status) and not is_nil(state.headers) do
67+
conn =
68+
Plug.Conn.merge_resp_headers(conn, state.headers)
69+
|> Plug.Conn.send_chunked(state.status)
70+
|> push_body_parts(state.body)
71+
end
3072

31-
{:reply, :ok, state}
73+
conns = [{conn, from} | state.connections]
74+
75+
new_state = %{state | connections: conns}
76+
77+
{:noreply, new_state}
3278
end
3379

3480
@impl true
35-
def handle_cast({:headers, headers}, state) do
36-
IO.inspect(headers, label: "coalescing cast headers")
81+
def handle_cast({:headers, headers, status}, state) do
82+
state_with_headers = %{state | headers: headers}
83+
new_state = assure_status_sent(state_with_headers, status)
3784

38-
{:noreply, put_in(state.state.headers, headers)}
85+
{:noreply, new_state}
3986
end
4087

41-
4288
# TODO
4389
@impl true
44-
def handle_cast({:chunk, data}, state) do
45-
IO.inspect(data, label: "coalescing cast chunk")
90+
def handle_cast({:chunk, data, status}, state) do
91+
new_state = assure_status_sent(state, status)
4692

47-
# foreach connection
48-
# frontend_conn
49-
# |> Plug.Conn.chunk(chunk)
93+
conns =
94+
Enum.flat_map(new_state.connections, fn {conn, from} ->
95+
case Plug.Conn.chunk(conn, data) do
96+
{:ok, conn} -> [{conn, from}]
97+
{:error, :closed} -> []
98+
end
99+
end)
50100

51-
{:noreply, state}
52-
end
101+
new_state = %{new_state | connections: conns}
53102

103+
{:noreply, new_state}
104+
end
54105

55-
# TODO
56106
@impl true
57107
def handle_cast({:finished, status}, state) do
58-
IO.inspect({state, status}, label: "coalescing cast finish")
59-
# foreach connection
60-
# frontend_conn
61-
# |> Plug.Conn.send_resp(return_status, "")
108+
new_state = assure_status_sent(state, status)
109+
110+
Enum.each(state.connections, fn {conn, from} -> GenServer.reply(from, {:ok, conn}) end)
62111

63-
{:noreply, state}
64-
# {:stop, :normal, state}
112+
{:stop, :normal, state}
65113
end
66114
end

lib/manipulators/coalesce_response.ex

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,26 @@ defmodule Manipulators.CoalesceResponse do
1111
all_response_headers
1212
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
1313
|> elem(1)
14+
|> Poison.decode!()
1415

1516
key = {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}
1617

17-
{:ok, pid} = Cache.create_coalescer(key) |> IO.inspect(label: "did it make the journey?")
18+
pid = Cache.get_coalesce_pid(key) |> IO.inspect(label: "did it make the journey?")
1819

19-
conn_out = conn_out
20-
|> Mint.HTTP.put_private(:coalesce_pid, pid)
21-
|> Mint.HTTP.put_private(:coalesce_key, key)
20+
conn_out =
21+
conn_out
22+
|> Mint.HTTP.put_private(:coalesce_pid, pid)
23+
|> Mint.HTTP.put_private(:coalesce_key, key)
2224

23-
GenServer.cast(pid, {:headers, headers})
25+
GenServer.cast(pid, {:headers, headers, conn_in.status})
2426

2527
{headers, {conn_in, conn_out}}
2628
end
2729

2830
@impl true
29-
def chunk(chunk, {_conn_in, conn_out}) do
30-
IO.inspect({chunk, nil}, label: "chunk")
31-
31+
def chunk(chunk, {conn_in, conn_out}) do
3232
pid = Mint.HTTP.get_private(conn_out, :coalesce_pid)
33-
GenServer.cast(pid, {:chunk, chunk})
33+
GenServer.cast(pid, {:chunk, chunk, conn_in.status})
3434

3535
:skip
3636
end

lib/manipulators/store_response.ex

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ defmodule Manipulators.StoreResponse do
6666
all_response_headers
6767
|> Enum.find({nil, "[]"}, &match?({"mu-auth-allowed-groups", _}, &1))
6868
|> elem(1)
69+
|> Poison.decode!()
6970

7071
cache_keys =
7172
all_response_headers
@@ -79,8 +80,6 @@ defmodule Manipulators.StoreResponse do
7980
|> elem(1)
8081
|> Poison.decode!()
8182

82-
# IO.inspect( {conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups}, label: "Signature to store" )
83-
8483
Cache.store(
8584
{conn_in.method, conn_in.request_path, conn_in.query_string, allowed_groups},
8685
%{
@@ -109,8 +108,6 @@ defmodule Manipulators.StoreResponse do
109108
|> elem(1)
110109
|> Poison.decode!()
111110

112-
IO.inspect(clear_keys, label: "Clear keys")
113-
114111
Cache.clear_keys(clear_keys)
115112

116113
true ->

lib/mu_cache_plug.ex

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,16 @@ defmodule MuCachePlug do
5252
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
5353

5454
cached_value =
55-
Cache.find_cache({conn.method, full_path, conn.query_string, known_allowed_groups}) ->
55+
Cache.find_cache({conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}) ->
5656
# with allowed groups and a cache, we should use the cache
5757
respond_with_cache(conn, cached_value)
5858

5959
true ->
60-
# without a cache, we should consult the backend
61-
# IO.inspect(
62-
# {conn.method, full_path, conn.query_string, known_allowed_groups}, label: "Cache miss for signature")
63-
ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
60+
key = {conn.method, full_path, conn.query_string, Poison.decode!(known_allowed_groups)}
61+
case Cache.get_or_create_coalescer(key) do
62+
{:created, _pid} -> ConnectionForwarder.forward(conn, path, "http://backend/", @manipulators)
63+
{:attach, pid} -> Coalesce.Registry.add_connection(pid, conn)
64+
end
6465
end
6566
end
6667

0 commit comments

Comments
 (0)