diff --git a/contract-tests/service.rb b/contract-tests/service.rb index 0a1d303..30220bf 100644 --- a/contract-tests/service.rb +++ b/contract-tests/service.rb @@ -22,7 +22,9 @@ capabilities: [ 'headers', 'last-event-id', + 'post', 'read-timeout', + 'report', ], }.to_json end @@ -54,6 +56,8 @@ entity = nil sse = SSE::Client.new( streamUrl, + method: opts[:method] || "GET", + payload: opts[:body] || nil, headers: opts[:headers] || {}, last_event_id: opts[:lastEventId], read_timeout: opts[:readTimeoutMs].nil? ? nil : (opts[:readTimeoutMs].to_f / 1000), diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c018151..8c80aee 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -84,6 +84,11 @@ class Client # @param socket_factory [#open] (nil) an optional factory object for creating sockets, # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` + # @param method [String] ("GET") the HTTP method to use for requests + # @param payload [String, Hash, Array, #call] (nil) optional request payload. If payload is a Hash or + # an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON + # request body. If payload responds to #call, it will be invoked on each + # request to generate the payload dynamically. # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -95,13 +100,17 @@ def initialize(uri, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + method: "GET", + payload: nil) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @method = method.to_s.upcase + @payload = payload @logger = logger || default_logger http_client_options = {} if socket_factory @@ -262,9 +271,7 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers, - }) + cxn = @http_client.request(@method, @uri, build_opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -358,5 +365,18 @@ def build_headers h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != "" h.merge(@headers) end + + def build_opts + return {headers: build_headers} if @payload.nil? + + # Resolve payload if it's callable + resolved_payload = @payload.respond_to?(:call) ? @payload.call : @payload + + if resolved_payload.is_a?(Hash) || resolved_payload.is_a?(Array) + {headers: build_headers, json: resolved_payload} + else + {headers: build_headers, body: resolved_payload.to_s} + end + end end end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index ea7b849..636b1dc 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -447,4 +447,294 @@ def send_stream_content(res, content, keep_open:) end end end + + describe "HTTP method parameter" do + it "defaults to GET method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + end + end + end + + it "uses explicit GET method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "GET")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + end + end + end + + it "uses explicit POST method" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "POST")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + end + end + end + + it "normalizes method to uppercase" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + with_client(subject.new(server.base_uri, method: "post")) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + end + end + end + end + + describe "payload parameter" do + it "sends string payload as body" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = "test-string-payload" + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.body).to eq(payload) + end + end + end + + it "sends hash payload as JSON" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = {user: "test", id: 123} + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq({"user" => "test", "id" => 123}) + end + end + end + + it "sends array payload as JSON" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = ["item1", "item2", "item3"] + with_client(subject.new(server.base_uri, method: "POST", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("POST") + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq(["item1", "item2", "item3"]) + end + end + end + + it "works with GET method and payload" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + payload = "get-with-payload" + with_client(subject.new(server.base_uri, method: "GET", payload: payload)) do |client| + received_req = requests.pop + expect(received_req.request_method).to eq("GET") + expect(received_req.body).to eq(payload) + end + end + end + end + + describe "callable payload parameter" do + it "invokes lambda payload on each request" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: false) # Close to trigger reconnect + end + + counter = 0 + callable_payload = -> { counter += 1; "request-#{counter}" } + + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client| + # Wait for first request + req1 = requests.pop + expect(req1.body).to eq("request-1") + + # Wait for reconnect and second request + req2 = requests.pop + expect(req2.body).to eq("request-2") + end + end + end + + it "invokes proc payload on each request" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: false) + end + + counter = 0 + callable_payload = proc { counter += 1; {request_id: counter, timestamp: Time.now.to_i} } + + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload, reconnect_time: reconnect_asap)) do |client| + # Wait for first request + req1 = requests.pop + parsed_body1 = JSON.parse(req1.body) + expect(parsed_body1["request_id"]).to eq(1) + + # Wait for reconnect and second request + req2 = requests.pop + parsed_body2 = JSON.parse(req2.body) + expect(parsed_body2["request_id"]).to eq(2) + expect(parsed_body2["timestamp"]).to be >= parsed_body1["timestamp"] + end + end + end + + it "invokes custom callable object payload" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + class TestPayloadGenerator + def initialize + @counter = 0 + end + + def call + @counter += 1 + {generator: "test", count: @counter} + end + end + + callable_payload = TestPayloadGenerator.new + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + parsed_body = JSON.parse(received_req.body) + expect(parsed_body).to eq({"generator" => "test", "count" => 1}) + end + end + end + + it "handles callable returning string" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { "dynamic-string-#{rand(1000)}" } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.body).to match(/^dynamic-string-\d+$/) + end + end + end + + it "handles callable returning hash" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { {type: "dynamic", value: rand(1000)} } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body["type"]).to eq("dynamic") + expect(parsed_body["value"]).to be_a(Integer) + end + end + end + + it "handles callable returning array" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + callable_payload = -> { ["dynamic", Time.now.to_i] } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.header["content-type"].first).to include("application/json") + parsed_body = JSON.parse(received_req.body) + expect(parsed_body[0]).to eq("dynamic") + expect(parsed_body[1]).to be_a(Integer) + end + end + end + + it "handles callable returning other types by converting to string" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + send_stream_content(res, "", keep_open: true) + end + + test_object = Object.new + def test_object.to_s + "custom-object-string" + end + + callable_payload = -> { test_object } + with_client(subject.new(server.base_uri, method: "POST", payload: callable_payload)) do |client| + received_req = requests.pop + expect(received_req.body).to eq("custom-object-string") + end + end + end + end end