From 7d68a4c1da233ec5d875273fda48f9dc42b5f7bf Mon Sep 17 00:00:00 2001 From: saada Date: Thu, 14 Aug 2025 13:04:43 -0400 Subject: [PATCH 1/6] Add POST/PUT request support and optional parsing features - Add support for POST/PUT HTTP methods with JSON payload - Fix proxy authentication parameter names (proxy_username/proxy_password) - Add optional parsing mode to bypass SSE parsing for raw data streaming These changes enable more flexible usage patterns while maintaining backward compatibility with existing GET-only SSE implementations. --- lib/ld-eventsource/client.rb | 29 +++++++++++++++---- lib/ld-eventsource/impl/basic_event_parser.rb | 22 ++++++++++++++ 2 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 lib/ld-eventsource/impl/basic_event_parser.rb diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c018151..aee69fe 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -1,4 +1,5 @@ require "ld-eventsource/impl/backoff" +require "ld-eventsource/impl/basic_event_parser" require "ld-eventsource/impl/buffered_line_reader" require "ld-eventsource/impl/event_parser" require "ld-eventsource/events" @@ -49,6 +50,9 @@ class Client # The default value for `reconnect_reset_interval` in {#initialize}. DEFAULT_RECONNECT_RESET_INTERVAL = 60 + # The default HTTP method for requests. + DEFAULT_HTTP_METHOD = "GET" + # # Creates a new SSE client. # @@ -84,6 +88,9 @@ class Client # @param socket_factory [#open] (nil) an optional factory object for creating sockets, # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` + # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests + # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) + # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -92,17 +99,23 @@ def initialize(uri, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, + http_method: DEFAULT_HTTP_METHOD, + http_payload: {}, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + parse: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @http_method = http_method + @http_payload = http_payload @logger = logger || default_logger + @parse = parse http_client_options = {} if socket_factory http_client_options["socket_class"] = socket_factory @@ -121,6 +134,8 @@ def initialize(uri, http_client_options["proxy"] = { :proxy_address => @proxy.host, :proxy_port => @proxy.port, + :proxy_username => @proxy.user, + :proxy_password => @proxy.password, } end @@ -262,9 +277,9 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers, - }) + opts = { headers: build_headers } + opts[:json] = @http_payload unless @http_payload.empty? + cxn = @http_client.request(@http_method, @uri, opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -316,7 +331,11 @@ def read_stream(cxn) end end end - event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + if @parse + event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + else + event_parser = Impl::BasicEventParser.new(chunks) + end event_parser.items.each do |item| return if @stopped.value diff --git a/lib/ld-eventsource/impl/basic_event_parser.rb b/lib/ld-eventsource/impl/basic_event_parser.rb new file mode 100644 index 0000000..3eb0533 --- /dev/null +++ b/lib/ld-eventsource/impl/basic_event_parser.rb @@ -0,0 +1,22 @@ +require "ld-eventsource/events" + +module SSE + module Impl + class BasicEventParser + + def initialize(chunks) + @chunks = chunks + end + + # Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}. + def items + Enumerator.new do |gen| + @chunks.each do |chunk| + item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil) + gen.yield item + end + end + end + end + end +end \ No newline at end of file From aaa1442d93b49badadf99d6990927ecc3b542a65 Mon Sep 17 00:00:00 2001 From: saada Date: Thu, 14 Aug 2025 13:30:27 -0400 Subject: [PATCH 2/6] Add optional SSL verification override Add verify_ssl parameter (defaults to true) to allow disabling SSL certificate verification for development, testing, and internal networks. This maintains security by default while providing flexibility when needed. --- lib/ld-eventsource/client.rb | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index aee69fe..c4dd44a 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -7,6 +7,7 @@ require "concurrent/atomics" require "logger" +require "openssl" require "thread" require "uri" require "http" @@ -91,6 +92,7 @@ class Client # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks + # @param verify_ssl [Boolean] (true) whether to verify SSL certificates; set to false for development/testing # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -105,7 +107,8 @@ def initialize(uri, proxy: nil, logger: nil, socket_factory: nil, - parse: true) + parse: true, + verify_ssl: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @@ -117,6 +120,9 @@ def initialize(uri, @logger = logger || default_logger @parse = parse http_client_options = {} + unless verify_ssl + http_client_options[:ssl] = { verify_mode: OpenSSL::SSL::VERIFY_NONE } + end if socket_factory http_client_options["socket_class"] = socket_factory end From 2e72a420de7d23cebd9cb375d3140e9d1ac1fabd Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 28 Aug 2025 10:12:20 -0400 Subject: [PATCH 3/6] Remove verify_ssl option; can be provided through overrides --- lib/ld-eventsource/client.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c4dd44a..aee69fe 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -7,7 +7,6 @@ require "concurrent/atomics" require "logger" -require "openssl" require "thread" require "uri" require "http" @@ -92,7 +91,6 @@ class Client # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks - # @param verify_ssl [Boolean] (true) whether to verify SSL certificates; set to false for development/testing # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -107,8 +105,7 @@ def initialize(uri, proxy: nil, logger: nil, socket_factory: nil, - parse: true, - verify_ssl: true) + parse: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @@ -120,9 +117,6 @@ def initialize(uri, @logger = logger || default_logger @parse = parse http_client_options = {} - unless verify_ssl - http_client_options[:ssl] = { verify_mode: OpenSSL::SSL::VERIFY_NONE } - end if socket_factory http_client_options["socket_class"] = socket_factory end From 0e4112c876ba53c56d81529a48689ce4da0c7dd8 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 28 Aug 2025 10:23:54 -0400 Subject: [PATCH 4/6] Safely add authorization headers --- lib/ld-eventsource/client.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index aba47a2..c87cde8 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -146,9 +146,9 @@ def initialize(uri, base_http_client_options["proxy"] = { :proxy_address => @proxy.host, :proxy_port => @proxy.port, - :proxy_username => @proxy.user, - :proxy_password => @proxy.password, } + base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.empty? + base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.empty? end options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options From 8b6b2361d070ca6fb87dd66baddc5f51c49a31fa Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 28 Aug 2025 10:27:11 -0400 Subject: [PATCH 5/6] Remove previous work with parser option --- lib/ld-eventsource/client.rb | 10 +-------- lib/ld-eventsource/impl/basic_event_parser.rb | 22 ------------------- 2 files changed, 1 insertion(+), 31 deletions(-) delete mode 100644 lib/ld-eventsource/impl/basic_event_parser.rb diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c87cde8..3e2c2b1 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -1,5 +1,4 @@ require "ld-eventsource/impl/backoff" -require "ld-eventsource/impl/basic_event_parser" require "ld-eventsource/impl/buffered_line_reader" require "ld-eventsource/impl/event_parser" require "ld-eventsource/events" @@ -93,7 +92,6 @@ class Client # an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON # request body. If payload responds to #call, it will be invoked on each # request to generate the payload dynamically. - # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks # @param retry_enabled [Boolean] (true) whether to retry connections after failures. If false, the client # will exit after the first connection failure instead of attempting to reconnect. # @param http_client_options [Hash] (nil) additional options to pass to @@ -113,7 +111,6 @@ def initialize(uri, socket_factory: nil, method: DEFAULT_HTTP_METHOD, payload: nil, - parse: true, retry_enabled: true, http_client_options: nil) @uri = URI(uri) @@ -125,7 +122,6 @@ def initialize(uri, @read_timeout = read_timeout @method = method.to_s.upcase @payload = payload - @parse = parse @logger = logger || default_logger base_http_client_options = {} @@ -346,11 +342,7 @@ def read_stream(cxn) end end end - if @parse - event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) - else - event_parser = Impl::BasicEventParser.new(chunks) - end + event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) event_parser.items.each do |item| return if @stopped.value diff --git a/lib/ld-eventsource/impl/basic_event_parser.rb b/lib/ld-eventsource/impl/basic_event_parser.rb deleted file mode 100644 index 3eb0533..0000000 --- a/lib/ld-eventsource/impl/basic_event_parser.rb +++ /dev/null @@ -1,22 +0,0 @@ -require "ld-eventsource/events" - -module SSE - module Impl - class BasicEventParser - - def initialize(chunks) - @chunks = chunks - end - - # Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}. - def items - Enumerator.new do |gen| - @chunks.each do |chunk| - item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil) - gen.yield item - end - end - end - end - end -end \ No newline at end of file From aeb47a7e8a962f2b951882ae95ab7daf27f244a0 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Thu, 28 Aug 2025 10:30:06 -0400 Subject: [PATCH 6/6] empty? -> nil? --- lib/ld-eventsource/client.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index 3e2c2b1..f398b16 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -143,8 +143,8 @@ def initialize(uri, :proxy_address => @proxy.host, :proxy_port => @proxy.port, } - base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.empty? - base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.empty? + base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.nil? + base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.nil? end options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options