1+ """Connects to Beam's chat and liveloading."""
2+
13from tornado .websocket import websocket_connect
24from tornado .gen import coroutine
35from tornado .ioloop import PeriodicCallback
1214from functools import partial
1315from json import dumps , loads
1416
15- from re import match
17+ import re
18+ import time
19+
20+ from models import User , session
21+ from datetime import datetime
1622
1723
1824class Beam :
@@ -52,7 +58,7 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):
5258 except ImportError :
5359 colored_formatter = formatter
5460 self .logger .warning (
55- "Module 'coloredlogs' unavailable; using ugly logging." )
61+ "Module 'coloredlogs' unavailable; using normal logging." )
5662
5763 stream_handler = StreamHandler ()
5864 stream_handler .setLevel (level )
@@ -69,6 +75,22 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):
6975
7076 self .logger .info ("Logger initialized with level '{}'." .format (level ))
7177
78+ def _init_users (self ):
79+ viewers = set (
80+ user ["userId" ] for user in
81+ self .get_chat_users (self .channel_data ["id" ]))
82+
83+ stored_users = set (
84+ user [0 ] for user in session .query (User ).with_entities (User .id ))
85+
86+ for user in viewers - stored_users :
87+ user = User (id = user , joins = 1 )
88+ session .add (user )
89+
90+ session .commit ()
91+
92+ self .logger .info ("Successfully added new users to database." )
93+
7294 def _request (self , url , method = "GET" , ** kwargs ):
7395 """Send HTTP request to Beam."""
7496 response = self .http_session .request (
@@ -95,13 +117,16 @@ def get_chat(self, id):
95117 """Get chat server data."""
96118 return self ._request ("/chats/{id}" .format (id = id ))
97119
98- def connect (self , channel_id , bot_id , silent = False ):
120+ def get_chat_users (self , id ):
121+ return self ._request ("/chats/{id}/users" .format (id = id ))
122+
123+ def connect (self , channel_id , bot_id , quiet = False ):
99124 """Connect to a Beam chat through a websocket."""
100125
101126 self .connection_information = {
102127 "channel_id" : channel_id ,
103128 "bot_id" : bot_id ,
104- "silent " : silent
129+ "quiet " : quiet
105130 }
106131
107132 chat = self .get_chat (channel_id )
@@ -117,7 +142,7 @@ def connect(self, channel_id, bot_id, silent=False):
117142 websocket_connection = websocket_connect (
118143 self .servers [self .server_offset ])
119144
120- if silent :
145+ if quiet is True :
121146 websocket_connection .add_done_callback (
122147 partial (self .authenticate , channel_id ))
123148 else :
@@ -127,6 +152,8 @@ def connect(self, channel_id, bot_id, silent=False):
127152 def authenticate (self , * args ):
128153 """Authenticate session to a Beam chat through a websocket."""
129154
155+ backoff = 0
156+
130157 future = args [- 1 ]
131158 if future .exception () is None :
132159 self .websocket = future .result ()
@@ -135,23 +162,48 @@ def authenticate(self, *args):
135162
136163 self .send_message (* args [:- 1 ], method = "auth" )
137164
165+ if self .quiet :
166+ self .http_session = Session ()
167+
138168 self .read_chat (self .handle )
139169 else :
140- raise ConnectionError (future .exception ())
170+ self .logger .error ("There was an issue connecting." )
171+ self .logger .error ("Trying again in {} seconds." .format (backoff ))
172+
173+ time .sleep (min (2 ** backoff , 60 ))
174+ backoff += 1
175+
176+ self .authenticate (* args )
141177
142178 def send_message (self , * args , method = "msg" ):
143179 """Send a message to a Beam chat through a websocket."""
144180
181+ if self .quiet and method != "auth" :
182+ if self .quiet is True :
183+ return
184+
185+ if method == "msg" :
186+ args = (self .quiet , r'\n' .join (args ))
187+ elif method == "whisper" :
188+ args = (
189+ self .quiet ,
190+ "> {args[0]} | {args[1]}" .format (
191+ args = args ,
192+ )
193+ )
194+ method = "whisper"
195+
145196 if method == "msg" :
146197 for message in args :
147- message_packet = {
148- "type" : "method" ,
149- "method" : "msg" ,
150- "arguments" : (message ,),
151- "id" : self .message_id
152- }
153- self .websocket .write_message (dumps (message_packet ))
154- self .message_id += 1
198+ for chunk in re .findall (r'.{1,250}' , message ):
199+ message_packet = {
200+ "type" : "method" ,
201+ "method" : "msg" ,
202+ "arguments" : (chunk ,),
203+ "id" : self .message_id
204+ }
205+ self .websocket .write_message (dumps (message_packet ))
206+ self .message_id += 1
155207
156208 else :
157209 message_packet = {
@@ -164,8 +216,8 @@ def send_message(self, *args, method="msg"):
164216 self .message_id += 1
165217
166218 if method == "whisper" :
167- self .logger .info ("$ [{bot_name } > {user}] {message}" .format (
168- bot_name = self .config ["auth" ]["username" ],
219+ self .logger .info ("$ [{bot } > {user}] {message}" .format (
220+ bot = self .config ["auth" ]["username" ],
169221 user = args [0 ],
170222 message = args [1 ]))
171223
@@ -184,44 +236,56 @@ def read_chat(self, handler=None):
184236 if message is None :
185237 self .logger .warning (
186238 "Connection to chat server lost. Attempting to reconnect." )
239+
187240 self .server_offset += 1
188241 self .server_offset %= len (self .servers )
242+
189243 self .logger .debug ("Connecting to: {server}." .format (
190244 server = self .servers [self .server_offset ]))
191245
192246 websocket_connection = websocket_connect (
193247 self .servers [self .server_offset ])
194248
195- authkey = self .get_chat (
196- self .connection_information ["channel_id" ])["authkey" ]
197-
198- if self .connection_information ["silent" ]:
199- websocket_connection .add_done_callback (
200- partial (
201- self .authenticate ,
202- self .connection_information ["channel_id" ]
203- )
204- )
249+ try :
250+ authkey = self .get_chat (
251+ self .connection_information ["channel_id" ])["authkey" ]
252+ except TypeError :
253+ self .logger .error ("Couldn't get the auth key from data." )
254+ self .read_chat (self .handle )
205255 else :
206- websocket_connection .add_done_callback (
207- partial (
208- self .authenticate ,
209- self .connection_information ["channel_id" ],
210- self .connection_information ["bot_id" ],
211- authkey
256+ if self .connection_information ["quiet" ]:
257+ return websocket_connection .add_done_callback (
258+ partial (
259+ self .authenticate ,
260+ self .connection_information ["channel_id" ]
261+ )
262+ )
263+ else :
264+ return websocket_connection .add_done_callback (
265+ partial (
266+ self .authenticate ,
267+ self .connection_information ["channel_id" ],
268+ self .connection_information ["bot_id" ],
269+ authkey
270+ )
212271 )
213- )
214272
215- response = loads (message )
273+ else :
274+ response = loads (message )
216275
217- self .logger .debug ("CHAT: {}" .format (response ))
276+ self .logger .debug ("CHAT: {}" .format (response ))
218277
219- if callable (handler ):
220- handler (response )
278+ if callable (handler ):
279+ handler (response )
221280
222281 def connect_to_liveloading (self , channel_id , user_id ):
223282 """Connect to Beam liveloading."""
224283
284+ self .liveloading_connection_information = {
285+ "channel_id" : channel_id ,
286+ "user_id" : user_id
287+ }
288+
225289 liveloading_websocket_connection = websocket_connect (
226290 "wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket" )
227291 liveloading_websocket_connection .add_done_callback (
@@ -241,6 +305,7 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
241305 "channel:{channel_id}:followed" ,
242306 "channel:{channel_id}:subscribed" ,
243307 "channel:{channel_id}:resubscribed" ,
308+ "channel:{channel_id}:hosted" ,
244309 "user:{user_id}:update"
245310 )
246311 self .subscribe_to_interfaces (
@@ -255,31 +320,29 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
255320
256321 self .watch_liveloading ()
257322 else :
258- raise ConnectionError (future .exception ())
323+ self .logger .warning (future .exception ())
324+ self .connect_to_liveloading (channel_id , user_id )
259325
260326 def subscribe_to_interfaces (self , * interfaces ):
261327 """Subscribe to a Beam liveloading interface."""
262328
263- for interface in interfaces :
264- packet = [
265- "put" ,
266- {
267- "method" : "put" ,
268- "headers" : {},
269- "data" : {
270- "slug" : [
271- interface
272- ]
273- },
274- "url" : "/api/v1/live"
275- }
276- ]
277- self .liveloading_websocket .write_message ('420' + dumps (packet ))
329+ packet = [
330+ "put" ,
331+ {
332+ "method" : "put" ,
333+ "headers" : {},
334+ "data" : {
335+ "slug" : interfaces
336+ },
337+ "url" : "/api/v1/live"
338+ }
339+ ]
340+ self .liveloading_websocket .write_message ('420' + dumps (packet ))
278341
279342 def parse_liveloading_message (self , message ):
280343 """Parse a message received from the Beam liveloading websocket."""
281344
282- sections = match ("(\d+)(.+)?$" , message ).groups ()
345+ sections = re . match (r "(\d+)(.+)?$" , message ).groups ()
283346
284347 return {
285348 "code" : sections [0 ],
@@ -305,7 +368,14 @@ def watch_liveloading(self, handler=None):
305368 message = yield self .liveloading_websocket .read_message ()
306369
307370 if message is None :
308- raise ConnectionError
371+ self .logger .info ("Connection to Liveloading lost." )
372+ self .logger .info ("Attempting to reconnect." )
373+
374+ return self .connect_to_liveloading (
375+ ** self .liveloading_connection_information )
376+
377+ self .logger .info ("Attempting to reconnect." )
378+ self .watch_liveloading ()
309379
310380 packet = self .parse_liveloading_message (message )
311381
@@ -317,12 +387,25 @@ def watch_liveloading(self, handler=None):
317387 if packet ["data" ][1 ].get ("following" ):
318388 self .logger .info ("- {} followed." .format (
319389 packet ["data" ][1 ]["user" ]["username" ]))
320- self .send_message (
321- "Thanks for the follow, @{}!" .format (
322- packet ["data" ][1 ]["user" ]["username" ]))
390+
391+ user = session .query (User ).filter_by (
392+ id = packet ["data" ][1 ]["user" ]["id" ]).first ()
393+ if user and (datetime .now () - user .follow_date ).days :
394+ self .send_message (
395+ "Thanks for the follow, @{}!" .format (
396+ packet ["data" ][1 ]["user" ]["username" ]))
397+ user .follow_date = datetime .now ()
398+ session .add (user )
399+ session .commit ()
323400 elif packet ["data" ][1 ].get ("subscribed" ):
324401 self .logger .info ("- {} subscribed." .format (
325402 packet ["data" ][1 ]["user" ]["username" ]))
326403 self .send_message (
327404 "Thanks for the subscription, @{}! <3" .format (
328405 packet ["data" ][1 ]["user" ]["username" ]))
406+ elif packet ["data" ][1 ].get ("hoster" ):
407+ self .logger .info ("- {} hosted the channel." .format (
408+ packet ["data" ][1 ]["hoster" ]["token" ]))
409+ self .send_message (
410+ "Thanks for the hosting the channel, @{}!" .format (
411+ packet ["data" ][1 ]["hoster" ]["token" ]))
0 commit comments