Skip to content

Commit 88413fa

Browse files
committed
Schema update and websocket fixes
1 parent 49784f5 commit 88413fa

File tree

2 files changed

+33
-29
lines changed

2 files changed

+33
-29
lines changed

controllers/kernel_controller.go

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"log"
88
"net/http"
99
"strings"
10+
"sync"
1011
"time"
1112

1213
"github.com/Thanus-Kumaar/controller_microservice_v2/pkg/jupyter_client"
@@ -163,7 +164,6 @@ func (c *KernelController) KernelChannelsHandler(w http.ResponseWriter, r *http.
163164
gatewayToken := c.JupyterClient.GetAuthToken()
164165

165166
// Construct the target websocket URL
166-
// Note: Replace http with ws or https with wss
167167
wsURL := "ws" + strings.TrimPrefix(gatewayURL, "http")
168168
targetURL := fmt.Sprintf("%s/api/kernels/%s/channels", wsURL, kernelID)
169169

@@ -184,25 +184,24 @@ func (c *KernelController) KernelChannelsHandler(w http.ResponseWriter, r *http.
184184

185185
c.Logger.Info().Str("kernel_id", kernelID).Msg("websocket proxy established")
186186

187-
// Create a channel to signal when one of the proxy goroutines is done
188-
done := make(chan struct{})
187+
var wg sync.WaitGroup
188+
wg.Add(2)
189189

190190
// Goroutine to proxy messages from Frontend to Kernel Gateway
191191
go func() {
192-
defer close(done) // Signal that this goroutine is finished
192+
defer wg.Done()
193193
for {
194194
messageType, p, err := feConn.ReadMessage()
195195
if err != nil {
196196
if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
197-
c.Logger.Error().Err(err).Msg("error reading from frontend")
198-
}
199-
if writeErr := kgConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseAbnormalClosure, err.Error())); writeErr != nil {
200-
c.Logger.Error().Err(writeErr).Msg("failed to write close message to kernel gateway")
197+
c.Logger.Warn().Err(err).Msg("error reading from frontend, closing proxy")
201198
}
199+
// Closing the kernel connection will cause the other goroutine's ReadMessage to error out.
200+
kgConn.Close()
202201
return
203202
}
204203
if err := kgConn.WriteMessage(messageType, p); err != nil {
205-
c.Logger.Error().Err(err).Msg("error writing to kernel gateway")
204+
c.Logger.Warn().Err(err).Msg("error writing to kernel gateway, closing proxy")
206205
return
207206
}
208207
c.Logger.Trace().Str("direction", "FE->KG").Int("size", len(p)).Msg("proxied message")
@@ -211,31 +210,26 @@ func (c *KernelController) KernelChannelsHandler(w http.ResponseWriter, r *http.
211210

212211
// Goroutine to proxy messages from Kernel Gateway to Frontend
213212
go func() {
213+
defer wg.Done()
214214
for {
215-
select {
216-
case <-done: // If the other goroutine finished, we're done too.
217-
return
218-
default:
219-
messageType, p, err := kgConn.ReadMessage()
220-
if err != nil {
221-
if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
222-
c.Logger.Error().Err(err).Msg("error reading from kernel gateway")
223-
}
224-
if writeErr := feConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseAbnormalClosure, err.Error())); writeErr != nil {
225-
c.Logger.Error().Err(writeErr).Msg("failed to write close message to frontend")
226-
}
227-
return
228-
}
229-
if err := feConn.WriteMessage(messageType, p); err != nil {
230-
c.Logger.Error().Err(err).Msg("error writing to frontend")
231-
return
215+
messageType, p, err := kgConn.ReadMessage()
216+
if err != nil {
217+
if !websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
218+
c.Logger.Warn().Err(err).Msg("error reading from kernel gateway, closing proxy")
232219
}
233-
c.Logger.Trace().Str("direction", "KG->FE").Int("size", len(p)).Msg("proxied message")
220+
// Closing the frontend connection will cause the other goroutine's ReadMessage to error out.
221+
feConn.Close()
222+
return
223+
}
224+
if err := feConn.WriteMessage(messageType, p); err != nil {
225+
c.Logger.Warn().Err(err).Msg("error writing to frontend, closing proxy")
226+
return
234227
}
228+
c.Logger.Trace().Str("direction", "KG->FE").Int("size", len(p)).Msg("proxied message")
235229
}
236230
}()
237231

238-
// Wait for one of the goroutines to finish
239-
<-done
232+
// Wait for both goroutines to finish
233+
wg.Wait()
240234
c.Logger.Info().Str("kernel_id", kernelID).Msg("websocket proxy closed")
241235
}

db/schema.sql

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
-- Drop tables in reverse order of creation to handle dependencies
2+
DROP TABLE IF EXISTS cell_variations CASCADE;
3+
DROP TABLE IF EXISTS evolution_runs CASCADE;
4+
DROP TABLE IF EXISTS cell_outputs CASCADE;
5+
DROP TABLE IF EXISTS cells CASCADE;
6+
DROP TABLE IF EXISTS sessions CASCADE;
7+
DROP TABLE IF EXISTS notebooks CASCADE;
8+
DROP TABLE IF EXISTS problem_statements CASCADE;
9+
DROP TABLE IF EXISTS users CASCADE;
10+
111
CREATE TABLE IF NOT EXISTS users (
212
id UUID PRIMARY KEY,
313
username TEXT NOT NULL,

0 commit comments

Comments
 (0)