Skip to content

Commit 7be2ddf

Browse files
authored
Merge pull request #156 from rabbitmq/configurable-writer-module
Make writer and replica impl modules pluggable
2 parents d47c4f7 + 57f23bd commit 7be2ddf

File tree

5 files changed

+100
-25
lines changed

5 files changed

+100
-25
lines changed

src/osiris.erl

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
update_retention/2,
2424
start_cluster/1,
2525
stop_cluster/1,
26+
2627
start_writer/1,
2728
start_replica/2,
29+
stop_member/2,
30+
delete_member/2,
31+
2832
delete_cluster/1,
2933
configure_logger/1,
3034
get_stats/1]).
@@ -103,7 +107,7 @@ start_cluster(Config00 = #{name := Name}) ->
103107
true = osiris_util:validate_base64uri(Name),
104108
%% ensure reference is set
105109
Config0 = maps:merge(#{reference => Name}, Config00),
106-
case osiris_writer:start(Config0) of
110+
case start_writer(Config0) of
107111
{ok, Pid} ->
108112
Config = Config0#{leader_pid => Pid},
109113
case start_replicas(Config) of
@@ -115,22 +119,41 @@ start_cluster(Config00 = #{name := Name}) ->
115119
end.
116120

117121
stop_cluster(Config) ->
118-
ok = osiris_writer:stop(Config),
119-
[ok = osiris_replica:stop(N, Config)
122+
WriterNode = maps:get(leader_node, Config),
123+
ok = stop_member(WriterNode, Config),
124+
[ok = stop_member(N, Config)
120125
|| N <- maps:get(replica_nodes, Config)],
121126
ok.
122127

123128
-spec delete_cluster(config()) -> ok.
124129
delete_cluster(Config) ->
125-
[ok = osiris_replica:delete(R, Config)
126-
|| R <- maps:get(replica_nodes, Config)],
127-
ok = osiris_writer:delete(Config).
130+
[ok = delete_member(N, Config)
131+
|| N <- maps:get(replica_nodes, Config)],
132+
WriterNode = maps:get(leader_node, Config),
133+
ok = delete_member(WriterNode, Config).
128134

135+
-spec start_writer(osiris:config()) ->
136+
supervisor:startchild_ret().
129137
start_writer(Config) ->
130-
osiris_writer:start(Config).
138+
Mod = get_writer_module(Config),
139+
Node = maps:get(leader_node, Config),
140+
osiris_member:start(Mod, Node, Config).
131141

132-
start_replica(Replica, Config) ->
133-
osiris_replica:start(Replica, Config).
142+
-spec start_replica(node(), osiris:config()) ->
143+
supervisor:startchild_ret().
144+
start_replica(Node, Config) ->
145+
Mod = maps:get(replica_mod, Config, osiris_replica),
146+
osiris_member:start(Mod, Node, Config).
147+
148+
-spec stop_member(node(), osiris:config()) ->
149+
ok | {error, not_found}.
150+
stop_member(Node, Config) ->
151+
osiris_member:stop(Node, Config).
152+
153+
-spec delete_member(node(), osiris:config()) ->
154+
ok | {error, not_found}.
155+
delete_member(Node, Config) ->
156+
osiris_member:delete(Node, Config).
134157

135158
-spec write(Pid :: pid(), Data :: data()) -> ok.
136159
write(Pid, Data) ->
@@ -258,7 +281,7 @@ start_replicas(_Config, [], ReplicaPids) ->
258281
{ok, ReplicaPids};
259282
start_replicas(Config, [Node | Nodes], ReplicaPids) ->
260283
try
261-
case osiris_replica:start(Node, Config) of
284+
case start_replica(Node, Config) of
262285
{ok, Pid} ->
263286
start_replicas(Config, Nodes, [Pid | ReplicaPids]);
264287
{ok, Pid, _} ->
@@ -293,3 +316,6 @@ get_stats(Pid)
293316
last_chunk_id => osiris_log_shared:last_chunk_id(Shared)};
294317
get_stats(Pid) when is_pid(Pid) ->
295318
erpc:call(node(Pid), ?MODULE, ?FUNCTION_NAME, [Pid]).
319+
320+
get_writer_module(Config) ->
321+
maps:get(writer_mod, Config, osiris_writer).

src/osiris_member.erl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
-module(osiris_member).
2+
3+
4+
-include("osiris.hrl").
5+
-export([start/3,
6+
stop/2,
7+
delete/2]).
8+
9+
%% Callbacks
10+
11+
-callback start(node(), osiris:config()) ->
12+
supervisor:startchild_ret().
13+
14+
%% API
15+
16+
-spec start(module(), node(), osiris:config()) ->
17+
supervisor:startchild_ret().
18+
start(Mod, Node, Config) ->
19+
Mod:start(Node, Config).
20+
21+
-spec stop(node(), osiris:config()) ->
22+
ok | {error, not_found}.
23+
stop(Node, Config) ->
24+
?SUP:stop_child(Node, Config).
25+
26+
-spec delete(node(), osiris:config()) ->
27+
ok | {error, term()}.
28+
delete(Node, Config) ->
29+
?SUP:delete_child(Node, Config).

src/osiris_replica.erl

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
-module(osiris_replica).
99

1010
-behaviour(gen_server).
11+
-behaviour(osiris_member).
1112

1213
-include("osiris.hrl").
1314

@@ -17,9 +18,8 @@
1718

1819
%% API functions
1920
-export([start/2,
20-
start_link/1,
21-
stop/2,
22-
delete/2]).
21+
start_link/1
22+
]).
2323
%% Test
2424
-export([get_port/1, combine_ips_hosts/4]).
2525
%% gen_server callbacks
@@ -86,6 +86,8 @@
8686
%%% API functions
8787
%%%===================================================================
8888

89+
-spec start(node(), Config :: osiris:config()) ->
90+
supervisor:startchild_ret().
8991
start(Node, Config = #{name := Name}) when ?IS_STRING(Name) ->
9092
case supervisor:start_child({?SUP, Node},
9193
#{id => Name,
@@ -105,12 +107,6 @@ start(Node, Config = #{name := Name}) when ?IS_STRING(Name) ->
105107
Err
106108
end.
107109

108-
stop(Node, #{name := Name}) ->
109-
?SUP:stop_child(Node, Name).
110-
111-
delete(Node, Config = #{}) ->
112-
?SUP:delete_child(Node, Config).
113-
114110
%%--------------------------------------------------------------------
115111
%% @doc
116112
%% Starts the server

src/osiris_server_sup.erl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
-behaviour(supervisor).
1111

12+
-include("osiris.hrl").
1213
-export([start_link/0]).
1314
-export([init/1,
1415
stop_child/2,
@@ -21,7 +22,9 @@ init([]) ->
2122
Procs = [],
2223
{ok, {{one_for_one, 1, 5}, Procs}}.
2324

24-
stop_child(Node, Name) ->
25+
stop_child(Node, #{name := Name}) ->
26+
stop_child(Node, Name);
27+
stop_child(Node, Name) when ?IS_STRING(Name) ->
2528
try
2629
%% as replicas are temporary we don't have to explicitly
2730
%% delete them

src/osiris_writer.erl

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
-module(osiris_writer).
1010

1111
-behaviour(gen_batch_server).
12+
-behaviour(osiris_member).
1213

1314
-include("osiris.hrl").
1415

@@ -33,6 +34,11 @@
3334
stop/1,
3435
delete/1]).
3536

37+
%% osiris_member impl
38+
-export([start/2,
39+
stop/2,
40+
delete/2]).
41+
3642
-define(C_COMMITTED_OFFSET, ?C_NUM_LOG_FIELDS + 1).
3743
-define(C_READERS, ?C_NUM_LOG_FIELDS + 2).
3844
-define(C_EPOCH, ?C_NUM_LOG_FIELDS + 3).
@@ -73,18 +79,33 @@
7379

7480
-export_type([state/0]).
7581

76-
start(Config = #{name := Name, leader_node := Leader}) ->
77-
supervisor:start_child({?SUP, Leader},
82+
-spec start(node(), Config :: osiris:config()) ->
83+
supervisor:startchild_ret().
84+
start(Node, #{name := Name, leader_node := Node} = Config) ->
85+
supervisor:start_child({?SUP, Node},
7886
#{id => osiris_util:normalise_name(Name),
7987
start => {?MODULE, start_link, [Config]},
8088
restart => temporary,
8189
shutdown => 5000,
8290
type => worker}).
8391

84-
stop(#{name := Name, leader_node := Node}) ->
85-
%% stop child handles name normalisation
86-
?SUP:stop_child(Node, Name).
92+
-spec stop(node(), osiris:config()) ->
93+
ok | {error, not_found}.
94+
stop(Node, #{leader_node := Node} = Config) ->
95+
?SUP:stop_child(Node, Config).
96+
97+
-spec delete(node(), osiris:config()) ->
98+
ok | {error, term()}.
99+
delete(Node, #{leader_node := Node} = Config) ->
100+
?SUP:delete_child(Node, Config).
87101

102+
%% backwards compat
103+
start(#{leader_node := LeaderNode} = Config) ->
104+
start(LeaderNode, Config).
105+
%% backwards compat
106+
stop(#{leader_node := Node} = Config) ->
107+
stop(Node, Config).
108+
%% backwards compat
88109
delete(#{leader_node := Node} = Config) ->
89110
?SUP:delete_child(Node, Config).
90111

0 commit comments

Comments
 (0)