Skip to content

Commit b6731f3

Browse files
committed
Fixed starting sequence
1 parent a0be399 commit b6731f3

File tree

3 files changed

+80
-48
lines changed

3 files changed

+80
-48
lines changed

ipykernel/debugger.py

Lines changed: 74 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,25 @@
11
import logging
22
import os
33

4+
import zmq
45
from zmq.utils import jsonapi
5-
#from traitlets import Instance
6-
#from traitlets.config.configurable import SingletonConfigurable
76

8-
from asyncio import (Event, Queue)
7+
from tornado.queues import Queue
8+
from tornado.locks import Event
99

1010
class DebugpyMessageQueue:
1111

12-
HEADER = 'Content Length: '
12+
HEADER = 'Content-Length: '
1313
HEADER_LENGTH = 16
1414
SEPARATOR = '\r\n\r\n'
1515
SEPARATOR_LENGTH = 4
1616

17-
def __init__(self, event_callback):
17+
def __init__(self, event_callback, log):
1818
self.tcp_buffer = ''
1919
self._reset_tcp_pos()
2020
self.event_callback = event_callback
21-
self.message_queue = Queue
21+
self.message_queue = Queue()
22+
self.log = log
2223

2324
def _reset_tcp_pos(self):
2425
self.header_pos = -1
@@ -27,42 +28,56 @@ def _reset_tcp_pos(self):
2728
self.message_pos = -1
2829

2930
def _put_message(self, raw_msg):
31+
self.log.debug('QUEUE - _put_message:')
3032
msg = jsonapi.loads(raw_msg)
31-
if mes['type'] == 'event':
33+
if msg['type'] == 'event':
34+
self.log.debug('QUEUE - received event:')
35+
self.log.debug(msg)
3236
self.event_callback(msg)
3337
else:
38+
self.log.debug('QUEUE - put message:')
39+
self.log.debug(msg)
3440
self.message_queue.put_nowait(msg)
3541

3642
def put_tcp_frame(self, frame):
3743
self.tcp_buffer += frame
38-
# TODO: not sure this is required
39-
#self.tcp_buffer += frame.decode("utf-8")
4044

45+
self.log.debug('QUEUE - received frame')
4146
# Finds header
4247
if self.header_pos == -1:
43-
self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER, hint)
48+
self.header_pos = self.tcp_buffer.find(DebugpyMessageQueue.HEADER)
4449
if self.header_pos == -1:
4550
return
4651

52+
self.log.debug('QUEUE - found header at pos %i', self.header_pos)
53+
4754
#Finds separator
4855
if self.separator_pos == -1:
49-
hint = self.header_pos + DebugpyMessageQueue.HEADER_lenth
56+
hint = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH
5057
self.separator_pos = self.tcp_buffer.find(DebugpyMessageQueue.SEPARATOR, hint)
5158
if self.separator_pos == -1:
5259
return
5360

61+
self.log.debug('QUEUE - found separator at pos %i', self.separator_pos)
62+
5463
if self.message_pos == -1:
55-
size_pos = self.header_pos + DebugpyMessageQueue.HEADER_lenth
64+
size_pos = self.header_pos + DebugpyMessageQueue.HEADER_LENGTH
5665
self.message_pos = self.separator_pos + DebugpyMessageQueue.SEPARATOR_LENGTH
57-
self.message_size = int(self.tcp_buf[size_pos:self.separator_pos])
66+
self.message_size = int(self.tcp_buffer[size_pos:self.separator_pos])
5867

59-
if len(self.tcp_buffer - self.message_pos) < self.message_size:
68+
self.log.debug('QUEUE - found message at pos %i', self.message_pos)
69+
self.log.debug('QUEUE - message size is %i', self.message_size)
70+
71+
if len(self.tcp_buffer) - self.message_pos < self.message_size:
6072
return
6173

62-
self._put_message(self.tcp_buf[self.message_pos:self.message_size])
63-
if len(self.tcp_buffer - self_message_pos) == self.message_size:
64-
self.reset_buffer()
74+
self._put_message(self.tcp_buffer[self.message_pos:self.message_pos + self.message_size])
75+
if len(self.tcp_buffer) - self.message_pos == self.message_size:
76+
self.log.debug('QUEUE - resetting tcp_buffer')
77+
self.tcp_buffer = ''
78+
self._reset_tcp_pos()
6579
else:
80+
self.log.debug('QUEUE - slicing tcp_buffer')
6681
self.tcp_buffer = self.tcp_buffer[self.message_pos + self.message_size:]
6782
self._reset_tcp_pos()
6883

@@ -72,29 +87,40 @@ async def get_message(self):
7287

7388
class DebugpyClient:
7489

75-
def __init__(self, debugpy_stream, event_callback):
90+
def __init__(self, log, debugpy_stream, event_callback):
91+
self.log = log
7692
self.debugpy_stream = debugpy_stream
93+
self.routing_id = None
7794
self.event_callback = event_callback
78-
self.message_queue = DebugpyMessageQueue(self._forward_event)
95+
self.message_queue = DebugpyMessageQueue(self._forward_event, self.log)
7996
self.wait_for_attach = True
8097
self.init_event = Event()
98+
self.init_event_seq = -1
8199

82100
def _forward_event(self, msg):
83101
if msg['event'] == 'initialized':
84102
self.init_event.set()
103+
self.init_event_seq = msg['seq']
85104
self.event_callback(msg)
86105

87106
def _send_request(self, msg):
107+
if self.routing_id is None:
108+
self.routing_id = self.debugpy_stream.socket.getsockopt(zmq.ROUTING_ID)
88109
content = jsonapi.dumps(msg)
89-
content_length = len(content)
90-
buf = DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR + content_msg
91-
self.debugpy_stream.send(buf) # TODO: pass routing_id
110+
content_length = str(len(content))
111+
buf = (DebugpyMessageQueue.HEADER + content_length + DebugpyMessageQueue.SEPARATOR).encode('ascii')
112+
buf += content
113+
self.log.debug("DEBUGPYCLIENT:")
114+
self.log.debug(self.routing_id)
115+
self.log.debug(buf)
116+
self.debugpy_stream.send_multipart((self.routing_id, buf))
117+
#self.debugpy_stream.send(buf) # TODO: pass routing_id
92118

93-
async def _wait_for_reponse(self):
119+
async def _wait_for_response(self):
94120
# Since events are never pushed to the message_queue
95121
# we can safely assume the next message in queue
96122
# will be an answer to the previous request
97-
return await self.message_queue.get()
123+
return await self.message_queue.get_message()
98124

99125
async def _handle_init_sequence(self):
100126
# 1] Waits for initialized event
@@ -103,7 +129,7 @@ async def _handle_init_sequence(self):
103129
# 2] Sends configurationDone request
104130
configurationDone = {
105131
'type': 'request',
106-
'seq': int(self.init_event_message['seq']) + 1,
132+
'seq': int(self.init_event_seq) + 1,
107133
'command': 'configurationDone'
108134
}
109135
self._send_request(configurationDone)
@@ -125,7 +151,9 @@ async def send_dap_request(self, msg):
125151
self.wait_for_attach = False
126152
return rep
127153
else:
128-
rep = await self._wait_for_reponse()
154+
rep = await self._wait_for_response()
155+
self.log.debug('DEBUGPYCLIENT - returning:')
156+
self.log.debug(rep)
129157
return rep
130158

131159
class Debugger:
@@ -143,10 +171,9 @@ class Debugger:
143171
'debugInfo', 'inspectVariables'
144172
]
145173

146-
#log = Instance(logging.Logger, allow_none=True)
147-
148-
def __init__(self, debugpy_stream, event_callback, shell_socket, session):
149-
self.debugpy_client = DebugpyClient(debugpy_stream, event_callback)
174+
def __init__(self, log, debugpy_stream, event_callback, shell_socket, session):
175+
self.log = log
176+
self.debugpy_client = DebugpyClient(log, debugpy_stream, event_callback)
150177
self.shell_socket = shell_socket
151178
self.session = session
152179
self.is_started = False
@@ -162,6 +189,9 @@ def __init__(self, debugpy_stream, event_callback, shell_socket, session):
162189
self.breakpoint_list = {}
163190
self.stopped_threads = []
164191

192+
self.debugpy_host = '127.0.0.1'
193+
self.debugpy_port = 0
194+
165195
async def _forward_message(self, msg):
166196
return await self.debugpy_client.send_dap_request(msg)
167197

@@ -170,33 +200,38 @@ def tcp_client(self):
170200
return self.debugpy_client
171201

172202
def start(self):
173-
endpoint = self.debugpy_client.debugpy_stream.socket.getsockopt(zmq.LAST_ENDPOINT)
174-
index = endpoit.rfind(':')
175-
port = endpoint[index+1:]
203+
socket = self.debugpy_client.debugpy_stream.socket
204+
socket.bind_to_random_port('tcp://' + self.debugpy_host)
205+
endpoint = socket.getsockopt(zmq.LAST_ENDPOINT).decode('utf-8')
206+
socket.unbind(endpoint)
207+
index = endpoint.rfind(':')
208+
self.debugpy_port = endpoint[index+1:]
176209
code = 'import debugpy;'
177-
code += 'debugpy.listen(("127.0.0.1",' + port + '))'
210+
code += 'debugpy.listen(("' + self.debugpy_host + '",' + self.debugpy_port + '))'
178211
content = {
179212
'code': code,
180-
'slient': True
213+
'silent': True
181214
}
182215
self.session.send(self.shell_socket, 'execute_request', content,
183216
None, (self.shell_socket.getsockopt(zmq.ROUTING_ID)))
184217

185-
return False
218+
self.session.recv(self.shell_socket, mode=0)
219+
socket.connect(endpoint)
220+
return True
186221

187222
def stop(self):
188223
# TODO
189224
pass
190225

191-
def dumpCell(self, message):
226+
async def dumpCell(self, message):
192227
return {}
193228

194229
async def setBreakpoints(self, message):
195230
source = message['arguments']['source']['path'];
196231
self.breakpoint_list[source] = message['arguments']['breakpoints']
197232
return await self._forward_message(message);
198233

199-
def source(self, message):
234+
async def source(self, message):
200235
reply = {
201236
'type': 'response',
202237
'request_seq': message['seq'],
@@ -268,11 +303,9 @@ async def debugInfo(self, message):
268303
'stoppedThreads': self.stopped_threads
269304
}
270305
}
271-
#self.log.info("returning reply %s", reply)
272-
print("DEBUGGER: ", reply)
273306
return reply
274307

275-
def inspectVariables(self, message):
308+
async def inspectVariables(self, message):
276309
# TODO
277310
return {}
278311

ipykernel/kernelapp.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,6 @@ def init_control(self, context):
298298

299299
self.debugpy_socket = context.socket(zmq.STREAM)
300300
self.debugpy_socket.linger = 1000
301-
self.debugpy_port = 0
302-
self.debugpy_port = self._bind_socket(self.debugpy_socket, self.debugpy_port)
303-
self.log.debug("debugpy STREAM Channel on port: %i" % self.debugpy_port)
304301

305302
self.debug_shell_socket = context.socket(zmq.DEALER)
306303
self.debug_shell_socket.linger = 1000

ipykernel/kernelbase.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,15 +179,18 @@ def __init__(self, **kwargs):
179179
for msg_type in self.control_msg_types:
180180
self.control_handlers[msg_type] = getattr(self, msg_type)
181181

182-
self.debugger = Debugger(self.debugpy_stream,
182+
self.debugger = Debugger(self.log,
183+
self.debugpy_stream,
183184
self._publish_debug_event,
184185
self.debug_shell_socket,
185186
self.session)
186187

187188
@gen.coroutine
188189
def dispatch_debugpy(self, msg):
189-
self.log.debug("Debugpy received: %s", msg)
190-
selg.debugger.tcp_client.receive_dap_frame(msg)
190+
# The first frame is the socket id, we can drop it
191+
frame = msg[1].bytes.decode('utf-8')
192+
self.log.debug("Debugpy received: %s", frame)
193+
self.debugger.tcp_client.receive_dap_frame(frame)
191194

192195
@gen.coroutine
193196
def dispatch_control(self, msg):
@@ -548,7 +551,6 @@ def execute_request(self, stream, ident, parent):
548551
)
549552
)
550553

551-
self.log.debug("EXECUTE_REPLY: %s", reply_content)
552554
# Flush output before sending the reply.
553555
sys.stdout.flush()
554556
sys.stderr.flush()

0 commit comments

Comments
 (0)