Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Client
# an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON
# request body. If payload responds to #call, it will be invoked on each
# request to generate the payload dynamically.
# @param retry_enabled [Boolean] (true) whether to retry connections after failures. If false, the client
# will exit after the first connection failure instead of attempting to reconnect.
# @yieldparam [Client] client the new client instance, before opening the connection
#
def initialize(uri,
Expand All @@ -102,9 +104,11 @@ def initialize(uri,
logger: nil,
socket_factory: nil,
method: "GET",
payload: nil)
payload: nil,
retry_enabled: true)
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)
@retry_enabled = retry_enabled

@headers = headers.clone
@connect_timeout = connect_timeout
Expand Down Expand Up @@ -256,6 +260,8 @@ def run_stream
rescue StandardError => e
log_and_dispatch_error(e, "Unexpected error while closing stream")
end

return unless @retry_enabled
end
end

Expand Down
86 changes: 86 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -737,4 +737,90 @@ def test_object.to_s
end
end
end

describe "retry parameter" do
it "defaults to true (retries enabled)" do
events_body = simple_event_1_text
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
if attempt == 1
res.status = 500
res.body = "server error"
res.keep_alive = false
else
send_stream_content(res, events_body, keep_open: true)
end
end

event_sink = Queue.new
error_sink = Queue.new
client = subject.new(server.base_uri, reconnect_time: reconnect_asap) do |c|
c.on_event { |event| event_sink << event }
c.on_error { |error| error_sink << error }
end

with_client(client) do |c|
expect(event_sink.pop).to eq(simple_event_1)
expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 2 # Should have retried
end
end
end

it "allows retries when retry_enabled: true" do
events_body = simple_event_1_text
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
if attempt == 1
res.status = 500
res.body = "server error"
res.keep_alive = false
else
send_stream_content(res, events_body, keep_open: true)
end
end

event_sink = Queue.new
error_sink = Queue.new
client = subject.new(server.base_uri, reconnect_time: reconnect_asap, retry_enabled: true) do |c|
c.on_event { |event| event_sink << event }
c.on_error { |error| error_sink << error }
end

with_client(client) do |c|
expect(event_sink.pop).to eq(simple_event_1)
expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 2 # Should have retried
end
end
end

it "disables retries when retry_enabled: false" do
with_server do |server|
attempt = 0
server.setup_response("/") do |req,res|
attempt += 1
res.status = 500
res.body = "server error"
res.keep_alive = false
end

error_sink = Queue.new
client = subject.new(server.base_uri, retry_enabled: false) do |c|
c.on_error { |error| error_sink << error }
end

# Give the client some time to attempt connection and fail
sleep(0.5)
client.close

expect(error_sink.pop).to eq(SSE::Errors::HTTPStatusError.new(500, "server error"))
expect(attempt).to eq 1 # Should not have retried
end
end
end
end
Loading