13
13
# the COPYING file in the top-level directory.
14
14
#
15
15
16
- import asyncore
17
16
import socket
18
17
import threading
19
18
from collections import deque
20
19
import time
21
20
22
21
23
- class ConsoleSocket (asyncore . dispatcher ):
22
+ class ConsoleSocket (socket . socket ):
24
23
"""
25
24
ConsoleSocket represents a socket attached to a char device.
26
25
27
- Drains the socket and places the bytes into an in memory buffer
28
- for later processing.
26
+ Optionally (if drain==True), drains the socket and places the bytes
27
+ into an in memory buffer for later processing.
29
28
30
29
Optionally a file path can be passed in and we will also
31
30
dump the characters to this file for debugging purposes.
32
31
"""
33
- def __init__ (self , address , file = None ):
32
+ def __init__ (self , address , file = None , drain = False ):
34
33
self ._recv_timeout_sec = 300
35
34
self ._sleep_time = 0.5
36
35
self ._buffer = deque ()
37
- self ._asyncore_thread = None
38
- self ._sock = socket .socket (socket .AF_UNIX , socket .SOCK_STREAM )
39
- self ._sock .connect (address )
36
+ socket .socket .__init__ (self , socket .AF_UNIX , socket .SOCK_STREAM )
37
+ self .connect (address )
40
38
self ._logfile = None
41
39
if file :
42
40
self ._logfile = open (file , "w" )
43
- asyncore .dispatcher .__init__ (self , sock = self ._sock )
44
41
self ._open = True
45
- self ._thread_start ()
42
+ if drain :
43
+ self ._drain_thread = self ._thread_start ()
44
+ else :
45
+ self ._drain_thread = None
46
46
47
- def _thread_start (self ):
48
- """Kick off a thread to wait on the asyncore.loop"""
49
- if self ._asyncore_thread is not None :
50
- return
51
- self ._asyncore_thread = threading .Thread (target = asyncore .loop ,
52
- kwargs = {'timeout' :1 })
53
- self ._asyncore_thread .daemon = True
54
- self ._asyncore_thread .start ()
47
+ def _drain_fn (self ):
48
+ """Drains the socket and runs while the socket is open."""
49
+ while self ._open :
50
+ try :
51
+ self ._drain_socket ()
52
+ except socket .timeout :
53
+ # The socket is expected to timeout since we set a
54
+ # short timeout to allow the thread to exit when
55
+ # self._open is set to False.
56
+ time .sleep (self ._sleep_time )
55
57
56
- def handle_close (self ):
57
- """redirect close to base class"""
58
- # Call the base class close, but not self.close() since
59
- # handle_close() occurs in the context of the thread which
60
- # self.close() attempts to join.
61
- asyncore .dispatcher .close (self )
58
+ def _thread_start (self ):
59
+ """Kick off a thread to drain the socket."""
60
+ # Configure socket to not block and timeout.
61
+ # This allows our drain thread to not block
62
+ # on recieve and exit smoothly.
63
+ socket .socket .setblocking (self , False )
64
+ socket .socket .settimeout (self , 1 )
65
+ drain_thread = threading .Thread (target = self ._drain_fn )
66
+ drain_thread .daemon = True
67
+ drain_thread .start ()
68
+ return drain_thread
62
69
63
70
def close (self ):
64
71
"""Close the base object and wait for the thread to terminate"""
65
72
if self ._open :
66
73
self ._open = False
67
- asyncore .dispatcher .close (self )
68
- if self ._asyncore_thread is not None :
69
- thread , self ._asyncore_thread = self ._asyncore_thread , None
74
+ if self ._drain_thread is not None :
75
+ thread , self ._drain_thread = self ._drain_thread , None
70
76
thread .join ()
77
+ socket .socket .close (self )
71
78
if self ._logfile :
72
79
self ._logfile .close ()
73
80
self ._logfile = None
74
81
75
- def handle_read (self ):
82
+ def _drain_socket (self ):
76
83
"""process arriving characters into in memory _buffer"""
77
- data = asyncore . dispatcher .recv (self , 1 )
84
+ data = socket . socket .recv (self , 1 )
78
85
# latin1 is needed since there are some chars
79
86
# we are receiving that cannot be encoded to utf-8
80
87
# such as 0xe2, 0x80, 0xA6.
@@ -85,27 +92,38 @@ def handle_read(self):
85
92
for c in string :
86
93
self ._buffer .extend (c )
87
94
88
- def recv (self , buffer_size = 1 ):
95
+ def recv (self , bufsize = 1 ):
89
96
"""Return chars from in memory buffer.
90
97
Maintains the same API as socket.socket.recv.
91
98
"""
99
+ if self ._drain_thread is None :
100
+ # Not buffering the socket, pass thru to socket.
101
+ return socket .socket .recv (self , bufsize )
92
102
start_time = time .time ()
93
- while len (self ._buffer ) < buffer_size :
103
+ while len (self ._buffer ) < bufsize :
94
104
time .sleep (self ._sleep_time )
95
105
elapsed_sec = time .time () - start_time
96
106
if elapsed_sec > self ._recv_timeout_sec :
97
107
raise socket .timeout
98
- chars = '' .join ([self ._buffer .popleft () for i in range (buffer_size )])
108
+ chars = '' .join ([self ._buffer .popleft () for i in range (bufsize )])
99
109
# We choose to use latin1 to remain consistent with
100
110
# handle_read() and give back the same data as the user would
101
111
# receive if they were reading directly from the
102
112
# socket w/o our intervention.
103
113
return chars .encode ("latin1" )
104
114
105
- def set_blocking (self ):
106
- """Maintain compatibility with socket API"""
107
- pass
115
+ def setblocking (self , value ):
116
+ """When not draining we pass thru to the socket,
117
+ since when draining we control socket blocking.
118
+ """
119
+ if self ._drain_thread is None :
120
+ socket .socket .setblocking (self , value )
108
121
109
122
def settimeout (self , seconds ):
110
- """Set current timeout on recv"""
111
- self ._recv_timeout_sec = seconds
123
+ """When not draining we pass thru to the socket,
124
+ since when draining we control the timeout.
125
+ """
126
+ if seconds is not None :
127
+ self ._recv_timeout_sec = seconds
128
+ if self ._drain_thread is None :
129
+ socket .socket .settimeout (self , seconds )
0 commit comments