1+ """Connects to Beam's chat and liveloading."""
2+
13from tornado .websocket import websocket_connect
24from tornado .gen import coroutine
35from tornado .ioloop import PeriodicCallback
1214from functools import partial
1315from json import dumps , loads
1416
15- from re import match
17+ import re
18+ import time
19+
20+ from models import User , session
21+ from datetime import datetime
1622
1723
1824class Beam :
@@ -52,7 +58,7 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):
5258 except ImportError :
5359 colored_formatter = formatter
5460 self .logger .warning (
55- "Module 'coloredlogs' unavailable; using ugly logging." )
61+ "Module 'coloredlogs' unavailable; using normal logging." )
5662
5763 stream_handler = StreamHandler ()
5864 stream_handler .setLevel (level )
@@ -69,6 +75,22 @@ def _init_logger(self, level="INFO", file_logging=True, **kwargs):
6975
7076 self .logger .info ("Logger initialized with level '{}'." .format (level ))
7177
78+ def _init_users (self ):
79+ viewers = set (
80+ user ["userId" ] for user in
81+ self .get_chat_users (self .channel_data ["id" ]))
82+
83+ stored_users = set (
84+ user [0 ] for user in session .query (User ).with_entities (User .id ))
85+
86+ for user in viewers - stored_users :
87+ user = User (id = user , joins = 1 )
88+ session .add (user )
89+
90+ session .commit ()
91+
92+ self .logger .info ("Successfully added new users to database." )
93+
7294 def _request (self , url , method = "GET" , ** kwargs ):
7395 """Send HTTP request to Beam."""
7496 response = self .http_session .request (
@@ -95,13 +117,16 @@ def get_chat(self, id):
95117 """Get chat server data."""
96118 return self ._request ("/chats/{id}" .format (id = id ))
97119
98- def connect (self , channel_id , bot_id , silent = False ):
120+ def get_chat_users (self , id ):
121+ return self ._request ("/chats/{id}/users" .format (id = id ))
122+
123+ def connect (self , channel_id , bot_id , quiet = False ):
99124 """Connect to a Beam chat through a websocket."""
100125
101126 self .connection_information = {
102127 "channel_id" : channel_id ,
103128 "bot_id" : bot_id ,
104- "silent " : silent
129+ "quiet " : quiet
105130 }
106131
107132 chat = self .get_chat (channel_id )
@@ -117,7 +142,7 @@ def connect(self, channel_id, bot_id, silent=False):
117142 websocket_connection = websocket_connect (
118143 self .servers [self .server_offset ])
119144
120- if silent :
145+ if quiet is True :
121146 websocket_connection .add_done_callback (
122147 partial (self .authenticate , channel_id ))
123148 else :
@@ -127,6 +152,8 @@ def connect(self, channel_id, bot_id, silent=False):
127152 def authenticate (self , * args ):
128153 """Authenticate session to a Beam chat through a websocket."""
129154
155+ backoff = 0
156+
130157 future = args [- 1 ]
131158 if future .exception () is None :
132159 self .websocket = future .result ()
@@ -135,23 +162,48 @@ def authenticate(self, *args):
135162
136163 self .send_message (* args [:- 1 ], method = "auth" )
137164
165+ if self .quiet :
166+ self .http_session = Session ()
167+
138168 self .read_chat (self .handle )
139169 else :
140- raise ConnectionError (future .exception ())
170+ self .logger .error ("There was an issue connecting." )
171+ self .logger .error ("Trying again in {} seconds." .format (backoff ))
172+
173+ time .sleep (min (2 ** backoff , 60 ))
174+ backoff += 1
175+
176+ self .authenticate (* args )
141177
142178 def send_message (self , * args , method = "msg" ):
143179 """Send a message to a Beam chat through a websocket."""
144180
181+ if self .quiet and method != "auth" :
182+ if self .quiet is True :
183+ return
184+
185+ if method == "msg" :
186+ args = (self .quiet , r'\n' .join (args ))
187+ elif method == "whisper" :
188+ args = (
189+ self .quiet ,
190+ "> {args[0]} | {args[1]}" .format (
191+ args = args ,
192+ )
193+ )
194+ method = "whisper"
195+
145196 if method == "msg" :
146197 for message in args :
147- message_packet = {
148- "type" : "method" ,
149- "method" : "msg" ,
150- "arguments" : (message ,),
151- "id" : self .message_id
152- }
153- self .websocket .write_message (dumps (message_packet ))
154- self .message_id += 1
198+ for chunk in re .findall (r'.{1,250}' , message ):
199+ message_packet = {
200+ "type" : "method" ,
201+ "method" : "msg" ,
202+ "arguments" : (chunk ,),
203+ "id" : self .message_id
204+ }
205+ self .websocket .write_message (dumps (message_packet ))
206+ self .message_id += 1
155207
156208 else :
157209 message_packet = {
@@ -164,8 +216,8 @@ def send_message(self, *args, method="msg"):
164216 self .message_id += 1
165217
166218 if method == "whisper" :
167- self .logger .info ("$ [{bot_name } > {user}] {message}" .format (
168- bot_name = self .config ["auth" ]["username" ],
219+ self .logger .info ("$ [{bot } > {user}] {message}" .format (
220+ bot = self .config ["auth" ]["username" ],
169221 user = args [0 ],
170222 message = args [1 ]))
171223
@@ -184,45 +236,39 @@ def read_chat(self, handler=None):
184236 if message is None :
185237 self .logger .warning (
186238 "Connection to chat server lost. Attempting to reconnect." )
239+
187240 self .server_offset += 1
188241 self .server_offset %= len (self .servers )
242+
189243 self .logger .debug ("Connecting to: {server}." .format (
190244 server = self .servers [self .server_offset ]))
191245
192246 websocket_connection = websocket_connect (
193247 self .servers [self .server_offset ])
194248
195- # NOTE: We'll remove these try/excepts in the future
196- # Current just for debugging, we need to see the returned
197- # values from self.get_chat()
198249 try :
199250 authkey = self .get_chat (
200251 self .connection_information ["channel_id" ])["authkey" ]
201- except TypeError as e :
202- self .logger .warning ("Caught crash-worthy error!" )
203- self .logger .warning (repr (e ))
204- self .logger .warning (self .get_chat (
205- self .connection_information ["channel_id" ]))
206-
207- # Skip this loop
208- continue
209-
210- if self .connection_information ["silent" ]:
211- websocket_connection .add_done_callback (
212- partial (
213- self .authenticate ,
214- self .connection_information ["channel_id" ]
215- )
216- )
252+ except TypeError :
253+ self .logger .error ("Couldn't get the auth key from data." )
254+ self .read_chat (self .handle )
217255 else :
218- websocket_connection .add_done_callback (
219- partial (
220- self .authenticate ,
221- self .connection_information ["channel_id" ],
222- self .connection_information ["bot_id" ],
223- authkey
256+ if self .connection_information ["quiet" ]:
257+ return websocket_connection .add_done_callback (
258+ partial (
259+ self .authenticate ,
260+ self .connection_information ["channel_id" ]
261+ )
262+ )
263+ else :
264+ return websocket_connection .add_done_callback (
265+ partial (
266+ self .authenticate ,
267+ self .connection_information ["channel_id" ],
268+ self .connection_information ["bot_id" ],
269+ authkey
270+ )
224271 )
225- )
226272
227273 else :
228274 response = loads (message )
@@ -234,6 +280,12 @@ def read_chat(self, handler=None):
234280
235281 def connect_to_liveloading (self , channel_id , user_id ):
236282 """Connect to Beam liveloading."""
283+
284+ self .liveloading_connection_information = {
285+ "channel_id" : channel_id ,
286+ "user_id" : user_id
287+ }
288+
237289 liveloading_websocket_connection = websocket_connect (
238290 "wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket" )
239291 liveloading_websocket_connection .add_done_callback (
@@ -253,6 +305,7 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
253305 "channel:{channel_id}:followed" ,
254306 "channel:{channel_id}:subscribed" ,
255307 "channel:{channel_id}:resubscribed" ,
308+ "channel:{channel_id}:hosted" ,
256309 "user:{user_id}:update"
257310 )
258311 self .subscribe_to_interfaces (
@@ -269,30 +322,27 @@ def subscribe_to_liveloading(self, channel_id, user_id, future):
269322 else :
270323 self .logger .warning (future .exception ())
271324 self .connect_to_liveloading (channel_id , user_id )
272- # raise ConnectionError(future.exception())
273325
274326 def subscribe_to_interfaces (self , * interfaces ):
275327 """Subscribe to a Beam liveloading interface."""
276- for interface in interfaces :
277- packet = [
278- "put" ,
279- {
280- "method" : "put" ,
281- "headers" : {},
282- "data" : {
283- "slug" : [
284- interface
285- ]
286- },
287- "url" : "/api/v1/live"
288- }
289- ]
290- self .liveloading_websocket .write_message ('420' + dumps (packet ))
328+
329+ packet = [
330+ "put" ,
331+ {
332+ "method" : "put" ,
333+ "headers" : {},
334+ "data" : {
335+ "slug" : interfaces
336+ },
337+ "url" : "/api/v1/live"
338+ }
339+ ]
340+ self .liveloading_websocket .write_message ('420' + dumps (packet ))
291341
292342 def parse_liveloading_message (self , message ):
293343 """Parse a message received from the Beam liveloading websocket."""
294344
295- sections = match ("(\d+)(.+)?$" , message ).groups ()
345+ sections = re . match (r "(\d+)(.+)?$" , message ).groups ()
296346
297347 return {
298348 "code" : sections [0 ],
@@ -318,8 +368,11 @@ def watch_liveloading(self, handler=None):
318368 message = yield self .liveloading_websocket .read_message ()
319369
320370 if message is None :
321- self .logger .info ("There was an error connecting." )
322- raise ConnectionError
371+ self .logger .info ("Connection to Liveloading lost." )
372+ self .logger .info ("Attempting to reconnect." )
373+
374+ return self .connect_to_liveloading (
375+ ** self .liveloading_connection_information )
323376
324377 self .logger .info ("Attempting to reconnect." )
325378 self .watch_liveloading ()
@@ -334,12 +387,25 @@ def watch_liveloading(self, handler=None):
334387 if packet ["data" ][1 ].get ("following" ):
335388 self .logger .info ("- {} followed." .format (
336389 packet ["data" ][1 ]["user" ]["username" ]))
337- self .send_message (
338- "Thanks for the follow, @{}!" .format (
339- packet ["data" ][1 ]["user" ]["username" ]))
390+
391+ user = session .query (User ).filter_by (
392+ id = packet ["data" ][1 ]["user" ]["id" ]).first ()
393+ if user and (datetime .now () - user .follow_date ).days :
394+ self .send_message (
395+ "Thanks for the follow, @{}!" .format (
396+ packet ["data" ][1 ]["user" ]["username" ]))
397+ user .follow_date = datetime .now ()
398+ session .add (user )
399+ session .commit ()
340400 elif packet ["data" ][1 ].get ("subscribed" ):
341401 self .logger .info ("- {} subscribed." .format (
342402 packet ["data" ][1 ]["user" ]["username" ]))
343403 self .send_message (
344404 "Thanks for the subscription, @{}! <3" .format (
345405 packet ["data" ][1 ]["user" ]["username" ]))
406+ elif packet ["data" ][1 ].get ("hoster" ):
407+ self .logger .info ("- {} hosted the channel." .format (
408+ packet ["data" ][1 ]["hoster" ]["token" ]))
409+ self .send_message (
410+ "Thanks for the hosting the channel, @{}!" .format (
411+ packet ["data" ][1 ]["hoster" ]["token" ]))
0 commit comments