Skip to content

Commit 446ce2b

Browse files
committed
Add multi-socket Select plugin
1 parent dc93384 commit 446ce2b

File tree

3 files changed

+129
-82
lines changed

3 files changed

+129
-82
lines changed

spockbot/plugins/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from spockbot.plugins.core import auth, event, net, taskmanager, ticker, timers
1+
from spockbot.plugins.core import auth, event, net, select, \
2+
taskmanager, ticker, timers
23
from spockbot.plugins.helpers import auxiliary, channels, chat, clientinfo, \
34
craft, entities, interact, inventory, movement, \
45
pathfinding, physics, start, world
@@ -7,6 +8,7 @@
78
('auth', auth.AuthPlugin),
89
('event', event.EventPlugin),
910
('net', net.NetPlugin),
11+
('select', select.SelectPlugin),
1012
('taskmanager', taskmanager.TaskManager),
1113
('ticker', ticker.TickerPlugin),
1214
('timers', timers.TimersPlugin),

spockbot/plugins/core/net.py

Lines changed: 56 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66

77
import logging
8-
import select
98
import socket
109
import time
1110

@@ -36,44 +35,11 @@ def decrypt(self, data):
3635
return self.decryptifier.update(data)
3736

3837

39-
class SelectSocket(socket.socket):
40-
"""
41-
Provides an asynchronous socket with a poll method built on
42-
top of select.select for cross-platform compatiability
43-
"""
44-
def __init__(self, timer):
45-
super(SelectSocket, self).__init__(socket.AF_INET, socket.SOCK_STREAM)
46-
self.sending = False
47-
self.timer = timer
48-
49-
def poll(self):
50-
flags = []
51-
if self.sending:
52-
self.sending = False
53-
slist = [(self,), (self,), (self,)]
54-
else:
55-
slist = [(self,), (), (self,)]
56-
timeout = self.timer.get_timeout()
57-
if timeout >= 0:
58-
slist.append(timeout)
59-
try:
60-
rlist, wlist, xlist = select.select(*slist)
61-
except select.error as e:
62-
logger.error("SELECTSOCKET: Socket Error: %s", str(e))
63-
rlist, wlist, xlist = [], [], []
64-
if rlist:
65-
flags.append('SOCKET_RECV')
66-
if wlist:
67-
flags.append('SOCKET_SEND')
68-
if xlist:
69-
flags.append('SOCKET_ERR')
70-
return flags
71-
72-
7338
class NetCore(object):
74-
def __init__(self, sock, event):
39+
def __init__(self, sock, event, select):
7540
self.sock = sock
7641
self.event = event
42+
self.select = select
7743
self.host = None
7844
self.port = None
7945
self.connected = False
@@ -84,21 +50,24 @@ def __init__(self, sock, event):
8450
self.sbuff = b''
8551
self.rbuff = BoundBuffer()
8652

53+
def reset(self, sock):
54+
self.__init__(sock, self.event, self.select)
55+
8756
def connect(self, host='localhost', port=25565):
8857
self.host = host
8958
self.port = port
9059
try:
91-
logger.debug("NETCORE: Attempting to connect to host: %s port: %s",
60+
logger.debug('NETCORE: Attempting to connect to host: %s port: %s',
9261
host, port)
9362
# Set the connect to be a blocking operation
9463
self.sock.setblocking(True)
95-
self.sock.connect((self.host, self.port))
64+
self.sock.connect((host, port))
9665
self.sock.setblocking(False)
9766
self.connected = True
98-
self.event.emit('net_connect', (self.host, self.port))
99-
logger.debug("NETCORE: Connected to host: %s port: %s", host, port)
67+
self.event.emit('net_connect', (host, port))
68+
logger.debug('NETCORE: Connected to host: %s port: %s', host, port)
10069
except socket.error as error:
101-
logger.error("NETCORE: Error on Connect")
70+
logger.error('NETCORE: Error on Connect')
10271
self.event.emit('SOCKET_ERR', error)
10372

10473
def set_proto_state(self, state):
@@ -115,7 +84,7 @@ def push(self, packet):
11584
self.sbuff += (self.cipher.encrypt(data) if self.encrypted else data)
11685
self.event.emit(packet.ident, packet)
11786
self.event.emit(packet.str_ident, packet)
118-
self.sock.sending = True
87+
self.select.schedule_sending(self.sock)
11988

12089
def push_packet(self, ident, data):
12190
self.push(mcpacket.Packet(ident, data))
@@ -152,21 +121,19 @@ def disable_crypto(self):
152121
self.cipher = None
153122
self.encrypted = False
154123

155-
def reset(self, sock):
156-
self.__init__(sock, self.event)
157-
158124

159125
@pl_announce('Net')
160126
class NetPlugin(PluginBase):
161-
requires = ('Event', 'Timers')
127+
requires = ('Event', 'Select', 'Timers')
162128
defaults = {
163129
'bufsize': 4096,
164130
'sock_quit': True,
165131
}
166132
events = {
167133
'event_tick': 'tick',
168-
'SOCKET_RECV': 'handle_recv',
169-
'SOCKET_SEND': 'handle_send',
134+
'select_recv': 'handle_recv',
135+
'select_send': 'handle_send',
136+
'select_err': 'handle_err',
170137
'SOCKET_ERR': 'handle_err',
171138
'SOCKET_HUP': 'handle_hup',
172139
'PLAY<Disconnect': 'handle_disconnect',
@@ -181,25 +148,25 @@ def __init__(self, ploader, settings):
181148
super(NetPlugin, self).__init__(ploader, settings)
182149
self.bufsize = self.settings['bufsize']
183150
self.sock_quit = self.settings['sock_quit']
184-
self.sock = SelectSocket(self.timers)
185-
self.net = NetCore(self.sock, self.event)
151+
self.sock = None
152+
self.net = NetCore(self.sock, self.event, self.select)
153+
self.reset_sock()
186154
self.sock_dead = False
187155
ploader.provides('Net', self.net)
188156

189157
def tick(self, name, data):
190158
if self.net.connected:
191-
for flag in self.sock.poll():
192-
self.event.emit(flag)
159+
self.net.select.poll()
193160
else:
194161
timeout = self.timers.get_timeout()
195162
if timeout == -1:
196163
time.sleep(1)
197164
else:
198165
time.sleep(timeout)
199166

200-
# SOCKET_RECV - Socket is ready to recieve data
201-
def handle_recv(self, name, data):
202-
if self.net.connected:
167+
def handle_recv(self, name, fileno):
168+
"""Socket is ready to recieve data"""
169+
if self.net.connected and fileno == self.net.sock.fileno():
203170
try:
204171
data = self.sock.recv(self.bufsize)
205172
if not data:
@@ -209,60 +176,68 @@ def handle_recv(self, name, data):
209176
except socket.error as error:
210177
self.event.emit('SOCKET_ERR', error)
211178

212-
# SOCKET_SEND - Socket is ready to send data and Send buffer contains
213-
# data to send
214-
def handle_send(self, name, data):
215-
if self.net.connected:
179+
def handle_send(self, name, fileno):
180+
"""Socket is ready to send data and send buffer has data to send"""
181+
if self.net.connected and fileno == self.net.sock.fileno():
216182
try:
217183
sent = self.sock.send(self.net.sbuff)
218184
self.net.sbuff = self.net.sbuff[sent:]
219185
if self.net.sbuff:
220-
self.sock.sending = True
186+
self.net.select.schedule_sending(self.sock)
221187
except socket.error as error:
222188
self.event.emit('SOCKET_ERR', error)
223189

224-
# SOCKET_ERR - Socket Error has occured
225-
def handle_err(self, name, data):
226-
self.sock.close()
227-
self.sock = SelectSocket(self.timers)
228-
self.net.reset(self.sock)
229-
logger.error("NETPLUGIN: Socket Error: %s", data)
230-
self.event.emit('net_disconnect', data)
231-
if self.sock_quit and not self.event.kill_event:
232-
self.sock_dead = True
233-
self.event.kill()
190+
def handle_select_err(self, name, fileno):
191+
if self.net.connected and fileno == self.net.sock.fileno():
192+
self.event.emit('SOCKET_ERR', 'select error')
193+
194+
def handle_err(self, name, error):
195+
"""Socket Error has occured"""
196+
logger.error('NETPLUGIN: Socket Error: %s', error)
197+
self.reset_sock()
198+
self.event.emit('net_disconnect', error)
199+
self.check_quit()
234200

235-
# SOCKET_HUP - Socket has hung up
236201
def handle_hup(self, name, data):
237-
self.sock.close()
238-
self.sock = SelectSocket(self.timers)
202+
"""Socket has hung up"""
203+
logger.error('NETPLUGIN: Socket has hung up')
204+
self.reset_sock()
205+
self.event.emit('net_disconnect', 'Socket Hung Up')
206+
self.check_quit()
207+
208+
def reset_sock(self):
209+
if self.sock:
210+
self.sock.close()
211+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
212+
self.net.select.register_socket(self.sock)
239213
self.net.reset(self.sock)
240-
logger.error("NETPLUGIN: Socket has hung up")
241-
self.event.emit('net_disconnect', "Socket Hung Up")
214+
215+
def check_quit(self):
242216
if self.sock_quit and not self.event.kill_event:
243217
self.sock_dead = True
244218
self.event.kill()
245219

246-
# Handshake - Change to whatever the next state is going to be
247220
def handle_handshake(self, name, packet):
221+
"""Change to whatever the next state is going to be"""
248222
self.net.set_proto_state(packet.data['next_state'])
249223

250-
# Login Success - Change to Play state
251224
def handle_login_success(self, name, packet):
225+
"""Change to Play state"""
252226
self.net.set_proto_state(proto.PLAY_STATE)
253227

254-
# Handle Set Compression packets
255228
def handle_comp(self, name, packet):
229+
"""Handle Set Compression packets"""
256230
self.net.set_comp_state(packet.data['threshold'])
257231

258232
def handle_disconnect(self, name, packet):
259-
logger.debug("NETPLUGIN: Disconnected: %s", packet.data['reason'])
233+
logger.debug('NETPLUGIN: Disconnected: %s', packet.data['reason'])
260234
self.event.emit('net_disconnect', packet.data['reason'])
261235

262-
# Kill event - Try to shutdown the socket politely
263236
def handle_kill(self, name, data):
237+
"""Try to shutdown the socket politely"""
264238
if self.net.connected:
265-
logger.debug("NETPLUGIN: Kill event received, closing socket")
239+
logger.debug('NETPLUGIN: Kill event received, closing socket')
266240
if not self.sock_dead:
267241
self.sock.shutdown(socket.SHUT_WR)
268242
self.sock.close()
243+
self.net.select.unregister_socket(self.net.sock)

spockbot/plugins/core/select.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""
2+
Provides an asynchronous multi-socket selector with a poll method
3+
built on top of select.select for cross-platform compatibility.
4+
5+
After polling select, two events are emitted for each socket and kind-of-ready,
6+
``select_<kind>`` and ``select_<kind>_<sock.fileno()>``, where
7+
``<kind>`` is one of ``recv, send, err``.
8+
9+
The event payload is always the fileno of the corresponding socket.
10+
(The event plugin deep-copies the payload, but sockets are not serializable)
11+
12+
Note that the event loop is stopped during selecting. This is good in that
13+
the loop does not consume 100% CPU, but it means you have to register
14+
at least a slow timer if you do stuff on ``event_tick`` and
15+
expect it to be emitted frequently.
16+
"""
17+
18+
import logging
19+
import select
20+
21+
from spockbot.plugins.base import PluginBase, pl_announce
22+
23+
logger = logging.getLogger('spockbot')
24+
25+
26+
@pl_announce('Select')
27+
class SelectPlugin(PluginBase):
28+
requires = ('Event', 'Timers')
29+
30+
def __init__(self, ploader, settings):
31+
super(SelectPlugin, self).__init__(ploader, settings)
32+
self.sockets = set()
33+
self.sending = set()
34+
ploader.provides('Select', self)
35+
36+
def register_socket(self, sock):
37+
"""``poll()``ing will emit events when this socket is ready."""
38+
self.sockets.add(sock)
39+
40+
def unregister_socket(self, sock):
41+
self.sockets.remove(sock)
42+
43+
def schedule_sending(self, sock):
44+
"""Emit one event the next time this socket is ready to send."""
45+
self.sending.add(sock)
46+
47+
def poll(self):
48+
timeout = self.timers.get_timeout()
49+
if timeout < 0:
50+
timeout = 5 # do not hang
51+
52+
select_args = [
53+
tuple(self.sockets),
54+
tuple(self.sending),
55+
tuple(self.sockets),
56+
timeout,
57+
]
58+
self.sending.clear()
59+
60+
try:
61+
ready_lists = select.select(*select_args)
62+
except select.error as e:
63+
logger.error('SELECTSOCKET: Socket Error: "%s" %s', str(e), e.args)
64+
return
65+
66+
for ready_socks, kind in zip(ready_lists, ('recv', 'send', 'err')):
67+
for sock in ready_socks:
68+
self.event.emit('select_%s' % kind, sock.fileno())
69+
self.event.emit('select_%s_%s' % (kind, sock.fileno()),
70+
sock.fileno())

0 commit comments

Comments
 (0)