99 "os"
1010
1111 zmq "github.com/alecthomas/gozmq"
12+ "github.com/pkg/errors"
1213)
1314
1415var logger * log.Logger
@@ -37,17 +38,16 @@ type SocketGroup struct {
3738}
3839
3940// PrepareSockets sets up the ZMQ sockets through which the kernel will communicate.
40- func PrepareSockets (connInfo ConnectionInfo ) (sg SocketGroup ) {
41+ func PrepareSockets (connInfo ConnectionInfo ) (SocketGroup , error ) {
4142
42- // TODO handle errors.
43- context , _ := zmq .NewContext ()
44- sg .ShellSocket , _ = context .NewSocket (zmq .ROUTER )
45- sg .ControlSocket , _ = context .NewSocket (zmq .ROUTER )
46- sg .StdinSocket , _ = context .NewSocket (zmq .ROUTER )
47- sg .IOPubSocket , _ = context .NewSocket (zmq .PUB )
43+ // Initialize the Socket Group.
44+ context , sg , err := createSockets ()
45+ if err != nil {
46+ return sg , errors .Wrap (err , "Could not initialize context and Socket Group" )
47+ }
4848
49+ // Bind the sockets.
4950 address := fmt .Sprintf ("%v://%v:%%v" , connInfo .Transport , connInfo .IP )
50-
5151 sg .ShellSocket .Bind (fmt .Sprintf (address , connInfo .ShellPort ))
5252 sg .ControlSocket .Bind (fmt .Sprintf (address , connInfo .ControlPort ))
5353 sg .StdinSocket .Bind (fmt .Sprintf (address , connInfo .StdinPort ))
@@ -57,11 +57,46 @@ func PrepareSockets(connInfo ConnectionInfo) (sg SocketGroup) {
5757 sg .Key = []byte (connInfo .Key )
5858
5959 // Start the heartbeat device
60- HBSocket , _ := context .NewSocket (zmq .REP )
60+ HBSocket , err := context .NewSocket (zmq .REP )
61+ if err != nil {
62+ return sg , errors .Wrap (err , "Could not get the Heartbeat device socket" )
63+ }
6164 HBSocket .Bind (fmt .Sprintf (address , connInfo .HBPort ))
6265 go zmq .Device (zmq .FORWARDER , HBSocket , HBSocket )
6366
64- return
67+ return sg , nil
68+ }
69+
70+ // createSockets initializes the sockets for the socket group based on values from zmq.
71+ func createSockets () (* zmq.Context , SocketGroup , error ) {
72+
73+ context , err := zmq .NewContext ()
74+ if err != nil {
75+ return context , SocketGroup {}, errors .Wrap (err , "Could not create zmq Context" )
76+ }
77+
78+ var sg SocketGroup
79+ sg .ShellSocket , err = context .NewSocket (zmq .ROUTER )
80+ if err != nil {
81+ return context , sg , errors .Wrap (err , "Could not get Shell Socket" )
82+ }
83+
84+ sg .ControlSocket , err = context .NewSocket (zmq .ROUTER )
85+ if err != nil {
86+ return context , sg , errors .Wrap (err , "Could not get Control Socket" )
87+ }
88+
89+ sg .StdinSocket , err = context .NewSocket (zmq .ROUTER )
90+ if err != nil {
91+ return context , sg , errors .Wrap (err , "Could not get Stdin Socket" )
92+ }
93+
94+ sg .IOPubSocket , err = context .NewSocket (zmq .PUB )
95+ if err != nil {
96+ return context , sg , errors .Wrap (err , "Could not get IOPub Socket" )
97+ }
98+
99+ return context , sg , nil
65100}
66101
67102// HandleShellMsg responds to a message on the shell ROUTER socket.
@@ -126,14 +161,16 @@ func RunKernel(connectionFile string, logwriter io.Writer) {
126161 if err != nil {
127162 log .Fatalln (err )
128163 }
129- err = json .Unmarshal (bs , & connInfo )
130- if err != nil {
164+ if err = json .Unmarshal (bs , & connInfo ); err != nil {
131165 log .Fatalln (err )
132166 }
133167 logger .Printf ("%+v\n " , connInfo )
134168
135169 // Set up the ZMQ sockets through which the kernel will communicate
136- sockets := PrepareSockets (connInfo )
170+ sockets , err := PrepareSockets (connInfo )
171+ if err != nil {
172+ log .Fatalln (err )
173+ }
137174
138175 pi := zmq.PollItems {
139176 zmq.PollItem {Socket : sockets .ShellSocket , Events : zmq .POLLIN },
@@ -144,8 +181,7 @@ func RunKernel(connectionFile string, logwriter io.Writer) {
144181 var msgparts [][]byte
145182 // Message receiving loop:
146183 for {
147- _ , err = zmq .Poll (pi , - 1 )
148- if err != nil {
184+ if _ , err = zmq .Poll (pi , - 1 ); err != nil {
149185 log .Fatalln (err )
150186 }
151187 switch {
0 commit comments