-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpusherhub.lua
More file actions
429 lines (415 loc) · 16.4 KB
/
pusherhub.lua
File metadata and controls
429 lines (415 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
--------------------
-- PusherHub
-- opensource websocket messaging for CoronaSDK and Pusher.com
--
-- Authors: Jack9
--
-- License: WTFPL
--------------------
local socket = require("socket")
local mime = require("mime")
local crypto = require("crypto")
local self = {
socket_id = nil,
readyCallback = nil,
disconnectCallback = nil,
pushererrorCallback = nil,
server = nil,
port = nil,
headers = '',
key = nil,
secret = nil,
app_id = nil,
sock = nil,
buffer = '',
channels = {},
readyState = 3,
--0 (connecting), 1 (open), 2 (closing), and 3 (closed).
}
self.lpad = function(str, len, char)
str = tostring(str)
if char == nil then
char = ' '
end
return string.rep(char, len - #str)..str
end
self.bytesToDec = function(str)
local bits = ''
for i=1,string.len(str) do
bits = bits..self.toBits(string.byte(string.sub(str,i,i)))
end
return tonumber(bits,2)
end
self.toBits = function(num)
-- returns a table of bits, least significant first.
local t={} -- will contain the bits
while num>0 do
rest=math.fmod(num,2)
t[#t+1]=rest
num=(num-rest)/2
end
return string.reverse(table.concat(t))
end
self.parseHeader = function(headerbytes)
local fbyte = string.sub(headerbytes,1,1) -- first
local fbits = self.toBits(string.byte(fbyte))
local opcode = tonumber("0000"..string.sub(fbits,5),2)
local sbyte = string.sub(headerbytes,2) -- second, this strips the mask bit off
local sbits = self.toBits(string.byte(sbyte))
local payloadlen = tonumber("0"..sbits,2) -- this pads the missing mask bit on
--print("opcode:",opcode,"payloadlen:",payloadlen)
return opcode, payloadlen
end
self.handleBytes = function(byes)
--print("some binary msg",byes)
local chrs = {}
local chr
for i=1,string.len(byes) do
chr = byes:byte(i)
chrs[#chrs+1] = chr
end
--print(util.xmp(chrs))
return chrs
end
self.makeFrame = function(str,pong)
if str == nil then
str = ''
end
-- UGLY HARD PART - Assemble Websocket Frame Header
-- see http://tools.ietf.org/html/rfc6455#section-5 (5.2)
local bitGroups = {}
local binStr = "1" -- FIN, why not? 1
binStr = binStr.."000" -- RSV1,RSV2,RESV3,
-- the 'Sec-WebSocket-Extensions: x-webkit-deflate-frame' + RSV1 doesn't have an effect
if not pong then
binStr = binStr.."0010" -- %x1 denotes a text frame (I guess this means 0001) - confirmed
else
binStr = binStr.."1001" -- %xA denotes a pong frame
end
bitGroups[#bitGroups+1] = binStr -- we dump the value and have a byte
binStr = "0" -- Not using a mask and starting over on the binStr
local strLen = string.len(str) -- message length in bytes
--print("strLen",strLen)
local pad = 7 -- Spec says default of 7
-- The wording is a little confusing. spec says ... determine how many bits you need BEFORE
-- assembling a text/binary/continuation message
-- If our strLen is over 125, we have to IGNORE the initial 7 because they become a placeholder value.
-- If strLen is 126-65536 we can use 16 bits and leave the initial 7 at a placeholder of 126
-- If strLen is more than 65536, we leave the initial 7 at 127 and use 64 bits
-- use those NEW bits to represent the message size.
if strLen > 125 then
--print("we're over 125")
if strLen <= 65536 then
binStr = binStr.."1111110"
-- 16 bit time
pad = 16
else
--print("we're over 65536?")
binStr = binStr.."1111111"
-- 64 bit time!
pad = 64
-- TODO: Construct continuation frames for when data is too large?! Don't know what limit pusher has.
end
end
--print(strLen,self.toBits(strLen))
binStr = binStr..self.lpad(self.toBits(strLen),pad,"0") -- 7 or 7+16 or 7+64 bits to represent message byte length
--print(binStr)
local s = 1
local e = 8
while e < pad+9 do
bitGroups[#bitGroups+1] = string.sub(binStr,s,e) -- we dump the value and have another byte
s = s+8
e = e+8
end
-- There are many ways to go from here. I chose a way that works. Can probably be improved.
local ret = ''
for i=1,#bitGroups do
-- Now that we've assembled a delicate set of bits, move to bytes
ret = ret..string.char(tonumber(bitGroups[i],2))
end
--print(table.concat(bitGroups),string.len(str)) -- use http://home.paulschou.net/tools/xlate/
-- Leading bytes are the header for the frame. Ta-da
return ret..str
end
self.websocketresponse6 = function(key)
-- This string is hardcoded per websocket specification
local magic = key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
return (mime.b64(crypto.digest(crypto.sha1, magic, true)))
end
self.new = function (params) -- constructor method
params = params or {}
params.port = params.port or 80
if not params.server or not params.key or not params.secret or not params.app_id then
print("PusherHub requires server, key, secret, app_id, readyCallback function and defaults to port 80")
self.disconnect()
return self
end
-- Headers are legitimate, since Chrome uses them in pusher.com's test page
params.headers = params.headers or {
'GET /app/'..params.key..'?protocol=6&client=lua&version=2.0&flash=false HTTP/1.1',
'Host: '..params.server,
'Sec-WebSocket-Key: '..self.websocketresponse6(params.app_id), -- anything is fine, might as well use the app_id for something
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Version: 13',
--'Pragma: no-cache',
--'Authentication: Basic '..(mime.b64(params.key..":"..params.secret)),
'Origin: http://somedomain.com',
--'User-Agent: Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/27.0.1453.116 Safari/537.36', -- headers based on Chrome
--'Sec-WebSocket-Extensions: x-webkit-deflate-frame',
--'Cache-Control: no-cache',
}
self.readyCallback = params.readyCallback or function() end
self.disconnectCallback = params.disconnectCallback or function() end
self.pushererrorCallback = params.pushererrorCallback or function() self.disconnect(); end
self.server = params.server
self.port = params.port
for i=1,#params.headers do
self.headers = self.headers..params.headers[i].."\n"
end
self.headers = self.headers.."\n"
self.key = params.key
self.secret = params.secret
self.app_id = params.app_id
self.sock = socket.tcp()
if( self.sock == "nil" or self.sock == nil) then
print("can't get a valid tcp socket")
self.disconnect()
return self
end
self.sock:settimeout(1) -- minimum 1 !!!
self.sock:setoption( 'tcp-nodelay', true ) -- If I want to send 1 byte, I should be able to!
--self.sock:setoption( 'keepalive', true ) -- No need. We have ping/pong from the spec
self.readyState = 0
local _,erro = self.sock:connect(self.server, self.port)
if erro then
print("error connecting")
self.disconnect()
return self
end
-- Check if headers are good
--print(self.headers)
local bytesSent = self.sock:send(self.headers)
--print("sent bytes:",bytesSent)
self.origParams = params
Runtime:addEventListener('enterFrame', self)
return self
end
self.subscribe = function(params)
local string_to_sign, proper, strange
params.channel = params.channel or 'test_channel'
params.private = false
params.presence = false
if string.sub(params.channel,1,8) == "private-" then
params.private = true
end
if params.channel_data and string.sub(params.channel,1,9) == "presence-" then
params.presence = true
end
self.channels[params.channel] = { -- this could get ugly if you resubscribe with new events
events = params.bindings
}
local msg = {
event = "pusher:subscribe",
data = {
channel = params.channel,
},
}
if params.private or params.presence then
--pusher.com docs say HMAC::SHA256.hexdigest(secret, string_to_sign),
string_to_sign = self.socket_id..":"..params.channel
-- in a presence channel... you need the channel_data appended - double encoded and stripped of outer quotes...and slashes?
if params.presence then
proper = json.encode(json.encode(params.channel_data))
strange = string.sub(proper,2,string.len(proper)-1) -- wtf!
strange = string.gsub(strange,"\\","") -- wtf x 2!
--print(strange)
string_to_sign = self.socket_id..":"..params.channel..":"..strange
end
msg.data.auth = self.key..":"..crypto.hmac( crypto.sha256, string_to_sign, self.secret )
msg.data.channel_data = json.encode(params.channel_data)
end
msg = json.encode(msg)
--print(msg)
self.publish(msg)
return true
end
self.unsubscribe = function(chan)
-- Wipe out bindings, you won't get to see your own parting
for i=1,#self.channels[chan]["events"] do
self.channels[chan]["events"][i] = nil
end
self.channels[chan] = nil
local msg = {
["event"] = "pusher:unsubscribe",
["data"] = {
["channel"] = chan,
},
}
msg = json.encode(msg)
self.publish(msg)
end
-- Untested
self.reconnect = function()
print("PusherHub: attempt to reconnect...")
self = self.new(self.origParams)
Runtime:addEventListener('enterFrame', self)
return self
end
self.disconnect = function()
self.readyState = 2
print("websocket closing")
Runtime:removeEventListener('enterFrame',self)
if self.socket ~= nil then
self.socket:close()
end
self.socket_id = nil
self.readyState = 3
self.disconnectCallback()
end
self.publish = function(msg)
print("publishing",msg)
local num_byes = self.sock:send(self.makeFrame(msg))
--print("bytes published",num_byes)
return true
end
self.enterFrame = function(evt)
local msg, chr, str, opcode, payloadlen, payloadstart, payloadend
local chrs = {}
local got_something_new = false
local skt, e, p, b
local input,output = socket.select({ self.sock },nil, 0) -- this is a way not to block runtime while reading socket. zero timeout does the trick
for i,v in ipairs(input) do -------------
while true do
skt, e, p = v:receive()
-- MOST USEFUL DEBUG is this and print(util.xmp(chrs)) in handleBytes
--print("reading",skt,e,p) -- the "timeout" in the console is from "e" which is ok
-- if there is a p (msg) we don't read the e
-- except, if p is "closed" it has no header frame. Specific to pusher.com Strange.
if p == "closed" then
p = ''
e = tostring(e).."closed"
end
if (p ~= nil and p ~= '') then
--print("p",p)
got_something_new = true -- probably a good msg
self.buffer = self.buffer..p
break
elseif (e ~= nil and e ~= '') then
--print("e",e) -- generally a non-nil error code is fatal
-- We are being told, this is closed. So we're done.
print("read an error")
self.disconnect()
break
elseif (skt) then
--print("skt",skt) -- raw text info, like streamed headers
end
end -- /while-do
-- now, checking if a message is present in buffer...
while got_something_new do -- this is for a case of several messages stocked in the buffer
--print("buffer:"..string.len(self.buffer),self.buffer)
-- Standard message for pusher.com (some bytes header, then json)
opcode, payloadlen = self.parseHeader(string.sub(self.buffer,1,2)) -- first 2 bytes tell us everything
payloadstart = 3 -- no more metadata
payloadend = payloadlen
bits = ''
if payloadlen == 126 then -- we have 2 more bytes of metadata
payloadstart = 5
payloadend = self.bytesToDec(string.sub(self.buffer,3,4))
elseif payloadlen == 127 then -- we have 8 more bytes of metadata (not 2)
payloadstart = 11
payloadend = self.bytesToDec(string.sub(self.buffer,3,10))
end
msg = string.sub(self.buffer, payloadstart, payloadstart+payloadend)
--print("msg",msg)
msg = json.decode(msg)
--print("decoded msg",msg)
if msg ~= nil then -- valid json
-- Startup Connection parsing, specific to pusher.com
if msg.event == "pusher:connection_established" then
msg = json.decode(msg["data"])
self.socket_id = msg.socket_id
self.readyState = 1
self.readyCallback() -- should call subscribe, I hope! If not, whatever.
-- This is a pusher protocol error. Not fatal. Default behavior is disconnect()
elseif msg.event == "pusher:error" then
msg.data = json.decode(msg["data"])
self.pushererrorCallback(msg)
--print("Nonfatal Err:",msg.data.message)
-- This is the catch-all binding code. If you have a handler, it gets called.
elseif self.channels[msg.channel] ~= nil and type(self.channels[msg.channel]["events"][msg.event]) == "function" then -- typical msg
--print("standard event")
msg.data = json.decode(msg["data"])
self.channels[msg.channel]["events"][msg.event](msg)
end
end
if opcode == 0 then
-- TODO: continuation support
print("continuation frames are not yet supported!")
end
if opcode == 1 then -- typical text frame
-- noop
end
if opcode == 2 then
-- Pusher does not support binary frames
print("bad opcode, ignoring")
end
if opcode == 8 then -- this is a close message via opcode
print("heard close message")
self.disconnect()
end
if opcode == 9 then -- this is a ping
-- In response we will make a 0xA or [138 0]
print("sending pong!")
-- TODO: per spec, we have to send back anything that came with the ping
local byes = self.sock:send(self.makeFrame(msg,true))
--print("bytes sent:",byes)
end
-- 0xA is their pong to our pong, pongpong!
if opcode == 10 then -- this is a pongpong response
-- TODO: Implement a timeout if no pongpong in 30s
print("got pongpong")
end
self.buffer = string.sub(self.buffer,payloadstart+payloadend)
if self.buffer == '' then
got_something_new = false
end
end
end
end -- /enterFrame
return self
--[[
-- Example Usage
print("connecting to chat server...")
mychathub = nil -- global
mychathub = pusherhub.new({
app_id = '12345', -- Example
key = '278d425bdf160c739803', -- Example http://pusher.com/docs/auth_signatures
secret = '7ad3773142a6692b25b8', -- Example http://pusher.com/docs/auth_signatures
server = "ws.pusherapp.com",
port = 80,
disconnectCallback = function()
scene:dispatchEvent("chatDisconnect",scene)
end,
pushererrorCallback = function()
scene:dispatchEvent("chatError", scene)
scene:dispatchEvent("chatDisconnect",scene)
end,
readyCallback = function()
print("Connected to chat server.")
print("Attempting to join Gen Chat...")
mychathub.subscribe({
channel = "test_channel",
bindings = {
["client-message"] = function(msg1)
print("test client-message",msg1)
end,
["pusher_internal:subscription_succeeded"] = function(msg2) -- Msg2 is a table
print("test pusher_internal:subscription_succeeded",msg2.event)
print("Joined Gen Chat.")
end
}
})
end
})
]]--