Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions lib/ld-eventsource/client.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
require "ld-eventsource/impl/backoff"
require "ld-eventsource/impl/basic_event_parser"
require "ld-eventsource/impl/buffered_line_reader"
require "ld-eventsource/impl/event_parser"
require "ld-eventsource/events"
require "ld-eventsource/errors"

require "concurrent/atomics"
require "logger"
require "openssl"
require "thread"
require "uri"
require "http"
Expand Down Expand Up @@ -49,6 +51,9 @@ class Client
# The default value for `reconnect_reset_interval` in {#initialize}.
DEFAULT_RECONNECT_RESET_INTERVAL = 60

# The default HTTP method for requests.
DEFAULT_HTTP_METHOD = "GET"

#
# Creates a new SSE client.
#
Expand Down Expand Up @@ -84,6 +89,10 @@ class Client
# @param socket_factory [#open] (nil) an optional factory object for creating sockets,
# if you want to use something other than the default `TCPSocket`; it must implement
# `open(uri, timeout)` to return a connected `Socket`
# @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests
# @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods)
# @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks
# @param verify_ssl [Boolean] (true) whether to verify SSL certificates; set to false for development/testing
# @yieldparam [Client] client the new client instance, before opening the connection
#
def initialize(uri,
Expand All @@ -92,18 +101,28 @@ def initialize(uri,
read_timeout: DEFAULT_READ_TIMEOUT,
reconnect_time: DEFAULT_RECONNECT_TIME,
reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL,
http_method: DEFAULT_HTTP_METHOD,
http_payload: {},
last_event_id: nil,
proxy: nil,
logger: nil,
socket_factory: nil)
socket_factory: nil,
parse: true,
verify_ssl: true)
@uri = URI(uri)
@stopped = Concurrent::AtomicBoolean.new(false)

@headers = headers.clone
@connect_timeout = connect_timeout
@read_timeout = read_timeout
@http_method = http_method
@http_payload = http_payload
@logger = logger || default_logger
@parse = parse
http_client_options = {}
unless verify_ssl
http_client_options[:ssl] = { verify_mode: OpenSSL::SSL::VERIFY_NONE }
end
if socket_factory
http_client_options["socket_class"] = socket_factory
end
Expand All @@ -121,6 +140,8 @@ def initialize(uri,
http_client_options["proxy"] = {
:proxy_address => @proxy.host,
:proxy_port => @proxy.port,
:proxy_username => @proxy.user,
:proxy_password => @proxy.password,
}
end

Expand Down Expand Up @@ -262,9 +283,9 @@ def connect
cxn = nil
begin
@logger.info { "Connecting to event stream at #{@uri}" }
cxn = @http_client.request("GET", @uri, {
headers: build_headers,
})
opts = { headers: build_headers }
opts[:json] = @http_payload unless @http_payload.empty?
cxn = @http_client.request(@http_method, @uri, opts)
if cxn.status.code == 200
content_type = cxn.content_type.mime_type
if content_type && content_type.start_with?("text/event-stream")
Expand Down Expand Up @@ -316,7 +337,11 @@ def read_stream(cxn)
end
end
end
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id)
if @parse
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id)
else
event_parser = Impl::BasicEventParser.new(chunks)
end

event_parser.items.each do |item|
return if @stopped.value
Expand Down
22 changes: 22 additions & 0 deletions lib/ld-eventsource/impl/basic_event_parser.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
require "ld-eventsource/events"

module SSE
module Impl
class BasicEventParser

def initialize(chunks)
@chunks = chunks
end

# Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}.
def items
Enumerator.new do |gen|
@chunks.each do |chunk|
item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil)
gen.yield item
end
end
end
end
end
end