Skip to content

Commit 7d68a4c

Browse files
committed
Add POST/PUT request support and optional parsing features
- Add support for POST/PUT HTTP methods with JSON payload - Fix proxy authentication parameter names (proxy_username/proxy_password) - Add optional parsing mode to bypass SSE parsing for raw data streaming These changes enable more flexible usage patterns while maintaining backward compatibility with existing GET-only SSE implementations.
1 parent 45e2033 commit 7d68a4c

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

lib/ld-eventsource/client.rb

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require "ld-eventsource/impl/backoff"
2+
require "ld-eventsource/impl/basic_event_parser"
23
require "ld-eventsource/impl/buffered_line_reader"
34
require "ld-eventsource/impl/event_parser"
45
require "ld-eventsource/events"
@@ -49,6 +50,9 @@ class Client
4950
# The default value for `reconnect_reset_interval` in {#initialize}.
5051
DEFAULT_RECONNECT_RESET_INTERVAL = 60
5152

53+
# The default HTTP method for requests.
54+
DEFAULT_HTTP_METHOD = "GET"
55+
5256
#
5357
# Creates a new SSE client.
5458
#
@@ -84,6 +88,9 @@ class Client
8488
# @param socket_factory [#open] (nil) an optional factory object for creating sockets,
8589
# if you want to use something other than the default `TCPSocket`; it must implement
8690
# `open(uri, timeout)` to return a connected `Socket`
91+
# @param http_method [String] (DEFAULT_HTTP_METHOD) the HTTP method to use for requests
92+
# @param http_payload [Hash] ({}) JSON payload to send with requests (only used with POST/PUT methods)
93+
# @param parse [Boolean] (true) whether to parse SSE events or pass through raw chunks
8794
# @yieldparam [Client] client the new client instance, before opening the connection
8895
#
8996
def initialize(uri,
@@ -92,17 +99,23 @@ def initialize(uri,
9299
read_timeout: DEFAULT_READ_TIMEOUT,
93100
reconnect_time: DEFAULT_RECONNECT_TIME,
94101
reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL,
102+
http_method: DEFAULT_HTTP_METHOD,
103+
http_payload: {},
95104
last_event_id: nil,
96105
proxy: nil,
97106
logger: nil,
98-
socket_factory: nil)
107+
socket_factory: nil,
108+
parse: true)
99109
@uri = URI(uri)
100110
@stopped = Concurrent::AtomicBoolean.new(false)
101111

102112
@headers = headers.clone
103113
@connect_timeout = connect_timeout
104114
@read_timeout = read_timeout
115+
@http_method = http_method
116+
@http_payload = http_payload
105117
@logger = logger || default_logger
118+
@parse = parse
106119
http_client_options = {}
107120
if socket_factory
108121
http_client_options["socket_class"] = socket_factory
@@ -121,6 +134,8 @@ def initialize(uri,
121134
http_client_options["proxy"] = {
122135
:proxy_address => @proxy.host,
123136
:proxy_port => @proxy.port,
137+
:proxy_username => @proxy.user,
138+
:proxy_password => @proxy.password,
124139
}
125140
end
126141

@@ -262,9 +277,9 @@ def connect
262277
cxn = nil
263278
begin
264279
@logger.info { "Connecting to event stream at #{@uri}" }
265-
cxn = @http_client.request("GET", @uri, {
266-
headers: build_headers,
267-
})
280+
opts = { headers: build_headers }
281+
opts[:json] = @http_payload unless @http_payload.empty?
282+
cxn = @http_client.request(@http_method, @uri, opts)
268283
if cxn.status.code == 200
269284
content_type = cxn.content_type.mime_type
270285
if content_type && content_type.start_with?("text/event-stream")
@@ -316,7 +331,11 @@ def read_stream(cxn)
316331
end
317332
end
318333
end
319-
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id)
334+
if @parse
335+
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id)
336+
else
337+
event_parser = Impl::BasicEventParser.new(chunks)
338+
end
320339

321340
event_parser.items.each do |item|
322341
return if @stopped.value
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
require "ld-eventsource/events"
2+
3+
module SSE
4+
module Impl
5+
class BasicEventParser
6+
7+
def initialize(chunks)
8+
@chunks = chunks
9+
end
10+
11+
# Generator that parses the input iterator and returns instances of {StreamEvent} or {SetRetryInterval}.
12+
def items
13+
Enumerator.new do |gen|
14+
@chunks.each do |chunk|
15+
item = StreamEvent.new(chunk.nil? ? :final_message : :message, chunk, nil, nil)
16+
gen.yield item
17+
end
18+
end
19+
end
20+
end
21+
end
22+
end

0 commit comments

Comments
 (0)