Skip to content

Commit 80d8975

Browse files
committed
Server-Sent Event with Last-Event-ID
1 parent 66df761 commit 80d8975

File tree

3 files changed

+63
-16
lines changed

3 files changed

+63
-16
lines changed

lib/board-linuxfr.rb

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,24 @@
66
class BoardLinuxfr < Goliath::API
77
plugin RedisPlugin
88

9-
def initialize
10-
Redis.new
11-
end
12-
139
def response(env)
14-
env.logger.debug "New client"
15-
env['subscription'] = status[:channel].subscribe do |msg|
16-
env.stream_send("data: #{msg}\n\n")
17-
end
10+
env.logger.debug "New client: #{env['PATH_INFO']}"
11+
send_msg = ->(msg) {
12+
env.logger.debug " -> #{msg}"
13+
env.stream_send("data: #{msg}\nid: #{msg[:id]}\n\n")
14+
}
15+
event_id = env['HTTP_LAST_EVENT_ID']
16+
chan_name = env['PATH_INFO'].delete('/b/')
17+
env['cache'] = status[:cache][chan_name]
18+
env['chan'] = status[:channels][chan_name]
19+
env['sid'] = env['chan'].subscribe &send_msg
20+
env['timer'] = EM.add_periodic_timer(15) { env.stream_send "::\n\n" }
21+
EM.next_tick { env['cache'].from(event_id, &send_msg) } if event_id
1822
streaming_response(200, {'Content-Type' => 'text/event-stream'})
1923
end
2024

21-
def on_close
22-
return unless env['subscription']
23-
status[:channel].unsubscribe(env['subscription'])
25+
def on_close(env)
26+
env['chan'].unsubscribe env['sid'] if env['sid']
27+
env['timer'].cancel if env['timer']
2428
end
2529
end

lib/board-linuxfr/cache.rb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
require "goliath/api"
2+
3+
4+
class BoardLinuxfr < Goliath::API
5+
class Cache
6+
CAPACITY = 10
7+
8+
class Entry < Array
9+
def push(item)
10+
super item
11+
shift if size > CAPACITY
12+
end
13+
14+
def from(id, &blk)
15+
found = false
16+
each do |item|
17+
blk.call item if found
18+
found ||= item[:id] == id
19+
end
20+
end
21+
end
22+
23+
def initialize
24+
@keys = Entry.new
25+
@vals = Entry.new
26+
end
27+
28+
def [](key)
29+
n = @keys.index(key)
30+
unless n
31+
@keys.push key
32+
@vals.push Entry.new
33+
n = @keys.length - 1
34+
end
35+
@vals[n]
36+
end
37+
end
38+
end

lib/board-linuxfr/redis_plugin.rb

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,18 @@
22
require "em-synchrony"
33
require "redis"
44
require "redis/connection/synchrony"
5+
require "board-linuxfr/cache"
56

67

78
class BoardLinuxfr < Goliath::API
89
class RedisPlugin
10+
911
def initialize(port, config, status, logger)
1012
logger.info "Initializing the Redis plugin"
11-
@logger = logger
12-
@channel = status[:channel] = EM::Channel.new
13-
@redis = Redis.new
13+
@logger = logger
14+
@chans = status[:channels] = Hash.new { |h,k| h[k] = EM::Channel.new }
15+
@cache = status[:cache] = Cache.new
16+
@redis = Redis.new
1417
end
1518

1619
def run
@@ -22,8 +25,10 @@ def run
2225

2326
on.pmessage do |pattern, chan, msg|
2427
_, chan, id, kind = *chan.split('/')
25-
@channel.push(chan: chan, id: id, kind: kind, msg: msg)
26-
@logger.info "New message: #{chan} #{id} #{kind} #{msg}"
28+
@logger.info "New message: [#{chan}] #{id}. #{kind}> #{msg}"
29+
[@chans, @cache].each do |storage|
30+
storage[chan].push(id: id, kind: kind, msg: msg)
31+
end
2732
end
2833

2934
on.punsubscribe do |pattern, total|

0 commit comments

Comments
 (0)