Skip to content

Commit 3bb1e7e

Browse files
committed
chore: update ecpool usage
1 parent 566b394 commit 3bb1e7e

File tree

8 files changed

+52
-18
lines changed

8 files changed

+52
-18
lines changed

.ci/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
up:
2-
docker compose --file docker-compose-file/docker-compose-hstreamdb.yaml up --detach --build
2+
docker compose --file docker-compose-file/docker-compose-hstreamdb.yaml up --detach --build --force-recreate
33

44
down:
55
docker compose --file docker-compose-file/docker-compose-hstreamdb.yaml down --remove-orphans

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ coveralls: $(REBAR)
4646
clean:
4747
@rm -rf _build
4848
@rm -rf rebar3
49+
@rm -rf rebar.lock
50+
@rm -rf *.crashdump
4951
@rm -rf *_crash.dump
5052
@rm -rf hstreamdb_erl_*_plt
5153

config/sys.config

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +0,0 @@
1-
[{kernel, [{logger_level, debug}]}].

src/hstreamdb.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
-export([
5555
start_reader/2,
5656
stop_reader/1,
57+
wait_for_reader/1,
5758
read_stream_key/3,
5859
read_stream_key/4
5960
]).
@@ -201,6 +202,10 @@ start_reader(Name, ReaderOptions) ->
201202
stop_reader(Name) ->
202203
hstreamdb_readers_sup:stop(Name).
203204

205+
-spec wait_for_reader(ecpool:pool_name()) -> ok | no_such_reader.
206+
wait_for_reader(Name) ->
207+
hstreamdb_reader:wait_for_start(Name).
208+
204209
%% @doc fetch only records that have the same key as the one provided, using
205210
%% server-side filtering.
206211

src/hstreamdb_client.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,8 @@ stop_channel(SupRef, Channel) ->
370370
case stop_channel_pool(SupRef, Channel) of
371371
ok ->
372372
?LOG_DEBUG("[hstreamdb] stop_channel ok: ~p", [Channel]);
373+
{error, not_found} ->
374+
?LOG_DEBUG("[hstreamdb] stop_channel not_found: ~p", [Channel]);
373375
{error, _} = Error ->
374376
?LOG_ERROR("[hstreamdb] stop_channel error: ~p, ~p", [Channel, Error])
375377
end.

src/reader/hstreamdb_reader.erl

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323

2424
-export([
2525
read_key/3,
26-
read_key/4
26+
read_key/4,
27+
wait_for_start/1
2728
]).
2829

2930
-export([
@@ -69,6 +70,17 @@ read_key(Reader, Key, Limits) ->
6970
read_key(Reader, Key, Limits, Fold) ->
7071
do_read_key(Reader, Key, Limits, Fold).
7172

73+
-spec wait_for_start(ecpool:pool_name()) -> ok | no_such_reader.
74+
wait_for_start(Reader) ->
75+
case ecpool:get_client(Reader) of
76+
Pid when is_pid(Pid) -> ok;
77+
false ->
78+
timer:sleep(100),
79+
wait_for_start(Reader);
80+
no_such_pool ->
81+
no_such_reader
82+
end.
83+
7284
%%-------------------------------------------------------------------------------------------------
7385
%% ecpool part
7486

@@ -138,22 +150,26 @@ code_change(_OldVsn, State, _Extra) ->
138150
%%-------------------------------------------------------------------------------------------------
139151

140152
do_read_key(Reader, Key, Limits, {FoldFun, InitAcc}) ->
141-
LookupClient = ecpool_request(Reader, get_lookup_client),
142-
case ?MEASURE({lookup_key, self(), Key}, hstreamdb_client:lookup_key(LookupClient, Key)) of
143-
{ok, {_Host, _Port} = Addr} ->
144-
case ecpool_request(Reader, {get_key_gstream, Key, Addr}) of
145-
{ok, Stream, GStream} ->
146-
?MEASURE(
147-
{fold_key_read_gstream, Stream, Key},
148-
hstreamdb_client:fold_key_read_gstream(
149-
GStream, Stream, Key, Limits, FoldFun, InitAcc
150-
)
151-
);
153+
case ecpool_request(Reader, get_lookup_client) of
154+
{error, _} = Error ->
155+
Error;
156+
LookupClient ->
157+
case ?MEASURE({lookup_key, self(), Key}, hstreamdb_client:lookup_key(LookupClient, Key)) of
158+
{ok, {_Host, _Port} = Addr} ->
159+
case ecpool_request(Reader, {get_key_gstream, Key, Addr}) of
160+
{ok, Stream, GStream} ->
161+
?MEASURE(
162+
{fold_key_read_gstream, Stream, Key},
163+
hstreamdb_client:fold_key_read_gstream(
164+
GStream, Stream, Key, Limits, FoldFun, InitAcc
165+
)
166+
);
167+
{error, _} = Error ->
168+
Error
169+
end;
152170
{error, _} = Error ->
153171
Error
154-
end;
155-
{error, _} = Error ->
156-
Error
172+
end
157173
end.
158174

159175
do_get_key_gstream(
@@ -198,5 +214,7 @@ fold_stream_key_fun(Key) ->
198214
ecpool_request(Reader, Request) ->
199215
ecpool:with_client(
200216
Reader,
201-
fun(Pid) -> gen_server:call(Pid, Request) end
217+
fun(Pid) ->
218+
gen_server:call(Pid, Request)
219+
end
202220
).

test/hstreamdb_producer_SUITE.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ end).
4343
all() ->
4444
hstreamdb_test_helpers:test_cases(?MODULE).
4545

46+
suite() -> [{timetrap, {minutes, 5}}].
47+
4648
init_per_suite(Config) ->
4749
_ = application:ensure_all_started(hstreamdb_erl),
4850
Config.
@@ -799,6 +801,7 @@ t_append_sync_timeout_while_success(Config) ->
799801

800802
Reader = "reader_" ++ atom_to_list(?FUNCTION_NAME),
801803
ok = hstreamdb:start_reader(Reader, ReaderOptions),
804+
ok = hstreamdb:wait_for_reader(Reader),
802805

803806
% Read all records
804807

test/hstreamdb_read_SUITE.erl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
all() ->
2828
hstreamdb_test_helpers:test_cases(?MODULE).
2929

30+
suite() -> [{timetrap, {minutes, 5}}].
31+
3032
init_per_suite(Config) ->
3133
_ = application:ensure_all_started(hstreamdb_erl),
3234
Config.
@@ -64,6 +66,7 @@ t_read_stream_key(Config) ->
6466

6567
Reader = "reader_" ++ atom_to_list(?FUNCTION_NAME),
6668
ok = hstreamdb:start_reader(Reader, ReaderOptions),
69+
ok = hstreamdb:wait_for_reader(Reader),
6770

6871
% Try to read with invalid limits
6972

@@ -150,6 +153,7 @@ t_read_stream_recreated_key(Config) ->
150153

151154
Reader = "reader_" ++ atom_to_list(?FUNCTION_NAME),
152155
ok = hstreamdb:start_reader(Reader, ReaderOptions),
156+
ok = hstreamdb:wait_for_reader(Reader),
153157

154158
Limits = #{
155159
from => #{offset => {specialOffset, 0}},
@@ -221,6 +225,7 @@ t_trim(Config) ->
221225

222226
Reader = "reader_" ++ atom_to_list(?FUNCTION_NAME),
223227
ok = hstreamdb:start_reader(Reader, ReaderOptions),
228+
ok = hstreamdb:wait_for_reader(Reader),
224229

225230
Limits = #{
226231
from => #{offset => {specialOffset, 0}},

0 commit comments

Comments
 (0)