Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions contract-tests/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
capabilities: [
'headers',
'last-event-id',
'post',
'read-timeout',
'report',
],
}.to_json
end
Expand Down Expand Up @@ -54,6 +56,8 @@
entity = nil
sse = SSE::Client.new(
streamUrl,
method: opts[:method] || "GET",
payload: opts[:body] || nil,
headers: opts[:headers] || {},
last_event_id: opts[:lastEventId],
read_timeout: opts[:readTimeoutMs].nil? ? nil : (opts[:readTimeoutMs].to_f / 1000),
Expand Down
28 changes: 24 additions & 4 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class Client
# @param socket_factory [#open] (nil) an optional factory object for creating sockets,
# if you want to use something other than the default `TCPSocket`; it must implement
# `open(uri, timeout)` to return a connected `Socket`
# @param method [String] ("GET") the HTTP method to use for requests
# @param payload [String, Hash, Array, #call] (nil) optional request payload. If payload is a Hash or
# an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON
# request body. If payload responds to #call, it will be invoked on each
# request to generate the payload dynamically.
# @yieldparam [Client] client the new client instance, before opening the connection
#
def initialize(uri,
Expand All @@ -95,13 +100,17 @@ def initialize(uri,
last_event_id: nil,
proxy: nil,
logger: nil,
socket_factory: nil)
socket_factory: nil,
method: "GET",
payload: nil)
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)

@headers = headers.clone
@connect_timeout = connect_timeout
@read_timeout = read_timeout
@method = method.to_s.upcase
@payload = payload
@logger = logger || default_logger
http_client_options = {}
if socket_factory
Expand Down Expand Up @@ -262,9 +271,7 @@ def connect
cxn = nil
begin
@logger.info { "Connecting to event stream at #{@uri}" }
cxn = @http_client.request("GET", @uri, {
headers: build_headers,
})
cxn = @http_client.request(@method, @uri, build_opts)
if cxn.status.code == 200
content_type = cxn.content_type.mime_type
if content_type && content_type.start_with?("text/event-stream")
Expand Down Expand Up @@ -358,5 +365,18 @@ def build_headers
h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != ""
h.merge(@headers)
end

def build_opts
return {headers: build_headers} if @payload.nil?

# Resolve payload if it's callable
resolved_payload = @payload.respond_to?(:call) ? @payload.call : @payload

if resolved_payload.is_a?(Hash) || resolved_payload.is_a?(Array)
{headers: build_headers, json: resolved_payload}
else
{headers: build_headers, body: resolved_payload.to_s}
end
end
end
end
290 changes: 290 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,294 @@ def send_stream_content(res, content, keep_open:)
end
end
end

describe "HTTP method parameter" do
it "defaults to GET method" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

with_client(subject.new(server.base_uri)) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("GET")
end
end
end

it "uses explicit GET method" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

with_client(subject.new(server.base_uri, method: "GET")) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("GET")
end
end
end

it "uses explicit POST method" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

with_client(subject.new(server.base_uri, method: "POST")) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("POST")
end
end
end

it "normalizes method to uppercase" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

with_client(subject.new(server.base_uri, method: "post")) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("POST")
end
end
end
end

describe "payload parameter" do
it "sends string payload as body" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

payload = "test-string-payload"
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("POST")
expect(received_req.body).to eq(payload)
end
end
end

it "sends hash payload as JSON" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

payload = {user: "test", id: 123}
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("POST")
expect(received_req.header["content-type"].first).to include("application/json")
parsed_body = JSON.parse(received_req.body)
expect(parsed_body).to eq({"user" => "test", "id" => 123})
end
end
end

it "sends array payload as JSON" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

payload = ["item1", "item2", "item3"]
with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("POST")
expect(received_req.header["content-type"].first).to include("application/json")
parsed_body = JSON.parse(received_req.body)
expect(parsed_body).to eq(["item1", "item2", "item3"])
end
end
end

it "works with GET method and payload" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

payload = "get-with-payload"
with_client(subject.new(server.base_uri, method: "GET", payload: payload)) do |client|
received_req = requests.pop
expect(received_req.request_method).to eq("GET")
expect(received_req.body).to eq(payload)
end
end
end
end

describe "callable payload parameter" do
it "invokes lambda payload on each request" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: false) # Close to trigger reconnect
end

counter = 0
callable_payload = -> { counter += 1; "request-#{counter}" }

with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client|
# Wait for first request
req1 = requests.pop
expect(req1.body).to eq("request-1")

# Wait for reconnect and second request
req2 = requests.pop
expect(req2.body).to eq("request-2")
end
end
end

it "invokes proc payload on each request" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: false)
end

counter = 0
callable_payload = proc { counter += 1; {request_id: counter, timestamp: Time.now.to_i} }

with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client|
# Wait for first request
req1 = requests.pop
parsed_body1 = JSON.parse(req1.body)
expect(parsed_body1["request_id"]).to eq(1)

# Wait for reconnect and second request
req2 = requests.pop
parsed_body2 = JSON.parse(req2.body)
expect(parsed_body2["request_id"]).to eq(2)
expect(parsed_body2["timestamp"]).to be >= parsed_body1["timestamp"]
end
end
end

it "invokes custom callable object payload" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

class TestPayloadGenerator
def initialize
@counter = 0
end

def call
@counter += 1
{generator: "test", count: @counter}
end
end

callable_payload = TestPayloadGenerator.new
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
received_req = requests.pop
parsed_body = JSON.parse(received_req.body)
expect(parsed_body).to eq({"generator" => "test", "count" => 1})
end
end
end

it "handles callable returning string" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

callable_payload = -> { "dynamic-string-#{rand(1000)}" }
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
received_req = requests.pop
expect(received_req.body).to match(/^dynamic-string-\d+$/)
end
end
end

it "handles callable returning hash" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

callable_payload = -> { {type: "dynamic", value: rand(1000)} }
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
received_req = requests.pop
expect(received_req.header["content-type"].first).to include("application/json")
parsed_body = JSON.parse(received_req.body)
expect(parsed_body["type"]).to eq("dynamic")
expect(parsed_body["value"]).to be_a(Integer)
end
end
end

it "handles callable returning array" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

callable_payload = -> { ["dynamic", Time.now.to_i] }
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
received_req = requests.pop
expect(received_req.header["content-type"].first).to include("application/json")
parsed_body = JSON.parse(received_req.body)
expect(parsed_body[0]).to eq("dynamic")
expect(parsed_body[1]).to be_a(Integer)
end
end
end

it "handles callable returning other types by converting to string" do
with_server do |server|
requests = Queue.new
server.setup_response("/") do |req,res|
requests << req
send_stream_content(res, "", keep_open: true)
end

test_object = Object.new
def test_object.to_s
"custom-object-string"
end

callable_payload = -> { test_object }
with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client|
received_req = requests.pop
expect(received_req.body).to eq("custom-object-string")
end
end
end
end
end