Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 56 additions & 47 deletions owrx/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Drained(WebSocketException):
class WebSocketClosed(WebSocketException):
pass

class InterruptOrTimeout(WebSocketException):
pass

class WebSocketConnection(object):
connections = []
Expand Down Expand Up @@ -175,60 +177,67 @@ def read_loop(self):
def protected_read(num):
data = self.handler.rfile.read(num)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all reading should be moved into the loop; just initialize data to an empty array and start with a select. that way the code should be easier to read.

if data is None:
raise Drained()
if len(data) != num:
data = bytes()
while self.open and len(data) < num:
(read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15)
if self.handler.rfile in read:
data += self.handler.rfile.read(num - len(data))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data length should be checked after every read. if the rfile was in the select, but has 0 data available, this typically means the socket has been closed.

else:
if len(data) == 0:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The else branch comes into effect if the interrupt pipe has been triggered, so typically there's no need to check the data length or have different types of exceptions here.

raise InterruptOrTimeout()
else:
raise IncompleteRead()
if len(data) < num:
raise IncompleteRead()
return data

self.open = True
while self.open:
(read, _, _) = select.select([self.interruptPipeRecv, self.handler.rfile], [], [], 15)
if self.handler.rfile in read:
available = True
try:
header = protected_read(2)
self.resetPing()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd suggest moving this to protected_read, too - it should be reset on every bit of data that comes from the client.

while self.open and available:
opcode = header[0] & 0x0F
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if length == 126:
header = protected_read(2)
length = (header[0] << 8) + header[1]
if mask:
masking_key = protected_read(4)
data = None
if length > 0:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of implementing the check here, just pass length=0 to protected_read and let it return early with an empty byte array. i'd say that's more elegant and removes some distracting conditions here.

data = protected_read(length)
if mask:
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if opcode == OPCODE_TEXT_MESSAGE:
message = data.decode("utf-8")
try:
header = protected_read(2)
opcode = header[0] & 0x0F
length = header[1] & 0x7F
mask = (header[1] & 0x80) >> 7
if length == 126:
header = protected_read(2)
length = (header[0] << 8) + header[1]
if mask:
masking_key = protected_read(4)
data = protected_read(length)
if mask:
data = bytes([b ^ masking_key[index % 4] for (index, b) in enumerate(data)])
if opcode == OPCODE_TEXT_MESSAGE:
message = data.decode("utf-8")
try:
self.messageHandler.handleTextMessage(self, message)
except Exception:
logger.exception("Exception in websocket handler handleTextMessage()")
elif opcode == OPCODE_BINARY_MESSAGE:
try:
self.messageHandler.handleBinaryMessage(self, data)
except Exception:
logger.exception("Exception in websocket handler handleBinaryMessage()")
elif opcode == OPCODE_PING:
self.sendPong()
elif opcode == OPCODE_PONG:
# since every read resets the ping timer, there's nothing to do here.
pass
elif opcode == OPCODE_CLOSE:
logger.debug("websocket close frame received; closing connection")
self.open = False
else:
logger.warning("unsupported opcode: {0}".format(opcode))
except Drained:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drained exception is no longer used, so the class definition (further up) should be removed as well

available = False
except IncompleteRead:
logger.warning("incomplete read on websocket; closing connection")
self.open = False
except OSError:
logger.exception("OSError while reading data; closing connection")
self.open = False
self.messageHandler.handleTextMessage(self, message)
except Exception:
logger.exception("Exception in websocket handler handleTextMessage()")
elif opcode == OPCODE_BINARY_MESSAGE:
try:
self.messageHandler.handleBinaryMessage(self, data)
except Exception:
logger.exception("Exception in websocket handler handleBinaryMessage()")
elif opcode == OPCODE_PING:
self.sendPong()
elif opcode == OPCODE_PONG:
# since every read resets the ping timer, there's nothing to do here.
pass
elif opcode == OPCODE_CLOSE:
logger.debug("websocket close frame received; closing connection")
self.open = False
else:
logger.warning("unsupported opcode: {0}".format(opcode))
except InterruptOrTimeout:
pass
except IncompleteRead:
logger.warning("incomplete read on websocket; closing connection")
self.open = False
except OSError:
logger.exception("OSError while reading data; closing connection")
self.open = False

self.interruptPipeSend.close()
self.interruptPipeSend = None
Expand Down