diff --git a/lib/ld-eventsource/client.rb b/lib/ld-eventsource/client.rb index c018151..c4dd44a 100644 --- a/lib/ld-eventsource/client.rb +++ b/lib/ld-eventsource/client.rb @@ -1,4 +1,5 @@ require "ld-eventsource/impl/backoff" +require "ld-eventsource/impl/basic_event_parser" require "ld-eventsource/impl/buffered_line_reader" require "ld-eventsource/impl/event_parser" require "ld-eventsource/events" @@ -6,6 +7,7 @@ require "concurrent/atomics" require "logger" +require "openssl" require "thread" require "uri" require "http" @@ -49,6 +51,9 @@ class Client # The default value for `reconnect_reset_interval` in {#initialize}. DEFAULT_RECONNECT_RESET_INTERVAL = 60 + # The default HTTP method for requests. + DEFAULT_HTTP_METHOD = "GET" + # # Creates a new SSE client. # @@ -84,6 +89,10 @@ class Client # @param socket_factory [#open] (nil) an optional factory object for creating sockets, # if you want to use something other than the default `TCPSocket`; it must implement # `open(uri, timeout)` to return a connected `Socket` + # @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests + # @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods) + # @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks + # @param verify_ssl [Boolean] (true) whether to verify SSL certificates; set to false for development/testing # @yieldparam [Client] client the new client instance, before opening the connection # def initialize(uri, @@ -92,18 +101,28 @@ def initialize(uri, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, + http_method: DEFAULT_HTTP_METHOD, + http_payload: {}, last_event_id: nil, proxy: nil, logger: nil, - socket_factory: nil) + socket_factory: nil, + parse: true, + verify_ssl: true) @uri = URI(uri) @stopped = Concurrent::AtomicBoolean.new(false) @headers = headers.clone @connect_timeout = connect_timeout @read_timeout = read_timeout + @http_method = http_method + @http_payload = http_payload @logger = logger || default_logger + @parse = parse http_client_options = {} + unless verify_ssl + http_client_options[:ssl] = { verify_mode: OpenSSL::SSL::VERIFY_NONE } + end if socket_factory http_client_options["socket_class"] = socket_factory end @@ -121,6 +140,8 @@ def initialize(uri, http_client_options["proxy"] = { :proxy_address => @proxy.host, :proxy_port => @proxy.port, + :proxy_username => @proxy.user, + :proxy_password => @proxy.password, } end @@ -262,9 +283,9 @@ def connect cxn = nil begin @logger.info { "Connecting to event stream at #{@uri}" } - cxn = @http_client.request("GET", @uri, { - headers: build_headers, - }) + opts = { headers: build_headers } + opts[:json] = @http_payload unless @http_payload.empty? + cxn = @http_client.request(@http_method, @uri, opts) if cxn.status.code == 200 content_type = cxn.content_type.mime_type if content_type && content_type.start_with?("text/event-stream") @@ -316,7 +337,11 @@ def read_stream(cxn) end end end - event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + if @parse + event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id) + else + event_parser = Impl::BasicEventParser.new(chunks) + end event_parser.items.each do |item| return if @stopped.value diff --git a/lib/ld-eventsource/impl/basic_event_parser.rb b/lib/ld-eventsource/impl/basic_event_parser.rb new file mode 100644 index 0000000..3eb0533 --- /dev/null +++ b/lib/ld-eventsource/impl/basic_event_parser.rb @@ -0,0 +1,22 @@ +require "ld-eventsource/events" + +module SSE + module Impl + class BasicEventParser + + def initialize(chunks) + @chunks = chunks + end + + # Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}. + def items + Enumerator.new do |gen| + @chunks.each do |chunk| + item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil) + gen.yield item + end + end + end + end + end +end \ No newline at end of file