Skip to content

Commit 1d6b207

Browse files
committed
Clean up comments and remove unused channel synchronization
1 parent 055434b commit 1d6b207

File tree

1 file changed

+29
-24
lines changed

1 file changed

+29
-24
lines changed

kernel.go

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type kernelInfo struct {
7474
HelpLinks []helpLink `json:"help_links"`
7575
}
7676

77-
// shutdownReply encodes a boolean indication of stutdown/restart.
77+
// shutdownReply encodes a boolean indication of shutdown/restart.
7878
type shutdownReply struct {
7979
Restart bool `json:"restart"`
8080
}
@@ -113,11 +113,12 @@ func runKernel(connectionFile string) {
113113
log.Fatal(err)
114114
}
115115

116-
// channelsWG waits for all channel handlers to shutdown.
117-
var channelsWG sync.WaitGroup
116+
// TODO connect all channel handlers to a WaitGroup to ensure shutdown before returning from runKernel.
118117

119118
// Start up the heartbeat handler.
120-
shutdownHeartbeat := runHeartbeat(sockets.HBSocket, &channelsWG)
119+
startHeartbeat(sockets.HBSocket, &sync.WaitGroup{})
120+
121+
// TODO gracefully shutdown the heartbeat handler on kernel shutdown by closing the chan returned by startHeartbeat.
121122

122123
poller := zmq.NewPoller()
123124
poller.Add(sockets.ShellSocket, zmq.POLLIN)
@@ -129,7 +130,6 @@ func runKernel(connectionFile string) {
129130

130131
// Start a message receiving loop.
131132
for {
132-
133133
polled, err := poller.Poll(-1)
134134
if err != nil {
135135
log.Fatal(err)
@@ -177,12 +177,6 @@ func runKernel(connectionFile string) {
177177
}
178178
}
179179
}
180-
181-
// Request that the heartbeat channel handler be shutdown.
182-
shutdownHeartbeat()
183-
184-
// Wait for the channel handlers to finish shutting down.
185-
channelsWG.Wait()
186180
}
187181

188182
// prepareSockets sets up the ZMQ sockets through which the kernel
@@ -198,26 +192,36 @@ func prepareSockets(connInfo ConnectionInfo) (SocketGroup, error) {
198192
// Initialize the socket group.
199193
var sg SocketGroup
200194

195+
// Create the shell socket, a request-reply socket that may receive messages from multiple frontend for
196+
// code execution, introspection, auto-completion, etc.
201197
sg.ShellSocket, err = context.NewSocket(zmq.ROUTER)
202198
if err != nil {
203199
return sg, err
204200
}
205201

202+
// Create the control socket. This socket is a duplicate of the shell socket where messages on this channel
203+
// should jump ahead of queued messages on the shell socket.
206204
sg.ControlSocket, err = context.NewSocket(zmq.ROUTER)
207205
if err != nil {
208206
return sg, err
209207
}
210208

209+
// Create the stdin socket, a request-reply socket used to request user input from a front-end. This is analogous
210+
// to a standard input stream.
211211
sg.StdinSocket, err = context.NewSocket(zmq.ROUTER)
212212
if err != nil {
213213
return sg, err
214214
}
215215

216+
// Create the iopub socket, a publisher for broadcasting data like stdout/stderr output, displaying execution
217+
// results or errors, kernel status, etc. to connected subscribers.
216218
sg.IOPubSocket, err = context.NewSocket(zmq.PUB)
217219
if err != nil {
218220
return sg, err
219221
}
220222

223+
// Create the heartbeat socket, a request-reply socket that only allows alternating recv-send (request-reply)
224+
// calls. It should echo the byte strings it receives to let the requester know the kernel is still alive.
221225
sg.HBSocket, err = context.NewSocket(zmq.REP)
222226
if err != nil {
223227
return sg, err
@@ -279,6 +283,7 @@ func sendKernelInfo(receipt msgReceipt) error {
279283
// handleExecuteRequest runs code from an execute_request method,
280284
// and sends the various reply messages.
281285
func handleExecuteRequest(ir *classic.Interp, receipt msgReceipt) error {
286+
282287
// Extract the data from the request.
283288
reqcontent := receipt.Msg.Content.(map[string]interface{})
284289
code := reqcontent["code"].(string)
@@ -382,6 +387,7 @@ func handleExecuteRequest(ir *classic.Interp, receipt msgReceipt) error {
382387
// doEval evaluates the code in the interpreter. This function captures an uncaught panic
383388
// as well as the values of the last statement/expression.
384389
func doEval(ir *classic.Interp, code string) (_ []interface{}, err error) {
390+
385391
// Capture a panic from the evaluation if one occurs and store it in the `err` return parameter.
386392
defer func() {
387393
if r := recover(); r != nil {
@@ -394,11 +400,14 @@ func doEval(ir *classic.Interp, code string) (_ []interface{}, err error) {
394400

395401
// Prepare and perform the multiline evaluation.
396402
env := ir.Env
403+
397404
// Don't show the gomacro prompt.
398405
env.Options &^= base.OptShowPrompt
406+
399407
// Don't swallow panics as they are recovered above and handled with a Jupyter `error` message instead.
400408
env.Options &^= base.OptTrapPanic
401-
// Reset the error line so that error messages correspond to the lines from the cell
409+
410+
// Reset the error line so that error messages correspond to the lines from the cell.
402411
env.Line = 0
403412

404413
// Parse the input code (and don't preform gomacro's macroexpansion).
@@ -425,7 +434,7 @@ func doEval(ir *classic.Interp, code string) (_ []interface{}, err error) {
425434
result, results := ir.EvalAst(src)
426435

427436
// If the source ends with an expression, then the result of the execution is the value of the expression. In the
428-
// event that all return values are nil, then the
437+
// event that all return values are nil, the result is also nil.
429438
if srcEndsWithExpr {
430439
// `len(results) == 0` implies a single result stored in `result`.
431440
if len(results) == 0 {
@@ -472,11 +481,11 @@ func handleShutdownRequest(receipt msgReceipt) {
472481
os.Exit(0)
473482
}
474483

475-
// runHeartbeat starts a go-routine for handling heartbeat ping messages sent over the given `hbSocket`. The `wg`'s
476-
// `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `func()` can
477-
// be called.
478-
func runHeartbeat(hbSocket *zmq.Socket, wg *sync.WaitGroup) func() {
479-
quit := make(chan bool)
484+
// startHeartbeat starts a go-routine for handling heartbeat ping messages sent over the given `hbSocket`. The `wg`'s
485+
// `Done` method is invoked after the thread is completely shutdown. To request a shutdown the returned `shutdown` channel
486+
// can be closed.
487+
func startHeartbeat(hbSocket *zmq.Socket, wg *sync.WaitGroup) (shutdown chan struct{}) {
488+
quit := make(chan struct{})
480489

481490
// Start the handler that will echo any received messages back to the sender.
482491
wg.Add(1)
@@ -507,17 +516,13 @@ func runHeartbeat(hbSocket *zmq.Socket, wg *sync.WaitGroup) func() {
507516
}
508517

509518
// Send the received byte string back to let the front-end know that the kernel is alive.
510-
_, err = hbSocket.SendBytes(pingMsg, 0)
511-
if err != nil {
519+
if _, err = hbSocket.SendBytes(pingMsg, 0); err != nil {
512520
log.Printf("Error sending heartbeat pong bytes: %b\n", err)
513521
}
514522
}
515523
}
516524
}
517525
}()
518526

519-
// Wrap the quit channel in a function that writes `true` to the channel to shutdown the handler.
520-
return func() {
521-
quit <- true
522-
}
527+
return quit
523528
}

0 commit comments

Comments
 (0)