Skip to content

Commit 90ae387

Browse files
authored
Merge pull request #3314 from dolthub/aaron/server-handle-connectionauthenticated-callback
server/handler: Add ConnectionAuthenticated callback which Vitess can call once the connection is authenticated.
2 parents c444651 + 016bf0e commit 90ae387

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

server/context.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ func (s *SessionManager) AddConn(conn *mysql.Conn) {
107107
s.wg.Add(1)
108108
}
109109

110+
// Called once a connection is authenticated and in a ready
111+
// state. Responsible for creating the session associated with the
112+
// connection and registering the session, with appropriate
113+
// authentication information, with the process list.
114+
func (s *SessionManager) ConnReady(ctx context.Context, conn *mysql.Conn) error {
115+
sess, err := s.getOrCreateSession(ctx, conn)
116+
if err != nil {
117+
return err
118+
}
119+
s.processlist.ConnectionReady(sess)
120+
return nil
121+
}
122+
110123
// NewSession creates a Session for the given connection and saves it to the session pool.
111124
func (s *SessionManager) NewSession(ctx context.Context, conn *mysql.Conn) error {
112125
s.mu.Lock()
@@ -177,6 +190,18 @@ func (s *SessionManager) SetDB(ctx context.Context, conn *mysql.Conn, dbName str
177190
}
178191
}
179192

193+
// We do this here and in ConnReady.
194+
//
195+
// Previously, Vitess did not have a ConnectionAuthenticated
196+
// callback on the Handler and the only time we updated the
197+
// authenticated user information in the processlist was on
198+
// ComInitDB. This resulted in "unathenticated user" being
199+
// shown in the process list if a connection chose to run
200+
// queries without issuing ComInitDB.
201+
//
202+
// Calling this here makes certain the current database
203+
// updates StartedAt and allows the newly selected database to
204+
// be correctly reflected in the process list.
180205
s.processlist.ConnectionReady(sess)
181206
return nil
182207
}

server/handler.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,19 @@ func (h *Handler) ConnectionAborted(_ *mysql.Conn, _ string) error {
104104
return nil
105105
}
106106

107+
// Called when a new connection successfully
108+
// authenticated. Responsible for creating the session associated with
109+
// the connection in the session manager and updating processlist with
110+
// the authenticated user and remote address.
111+
func (h *Handler) ConnectionAuthenticated(c *mysql.Conn) error {
112+
err := h.sm.ConnReady(context.Background(), c)
113+
if err != nil {
114+
logrus.Errorf("unable to register new authenticated connection: %s", err.Error())
115+
err = sql.CastSQLError(err)
116+
}
117+
return err
118+
}
119+
107120
func (h *Handler) ComInitDB(c *mysql.Conn, schemaName string) error {
108121
// SetDB itself handles session and processlist operation lifecycle callbacks.
109122
err := h.sm.SetDB(context.Background(), c, schemaName)

server/handler_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1997,3 +1997,66 @@ func (h *testHook) Fire(entry *logrus.Entry) error {
19971997
}
19981998
return nil
19991999
}
2000+
2001+
func TestHandlerNewConnectionProcessListInteractions(t *testing.T) {
2002+
e, pro := setupMemDB(require.New(t))
2003+
dbFunc := pro.Database
2004+
2005+
handler := &Handler{
2006+
e: e,
2007+
sm: NewSessionManager(
2008+
sql.NewContext,
2009+
testSessionBuilder(pro),
2010+
sql.NoopTracer,
2011+
dbFunc,
2012+
sql.NewMemoryManager(nil),
2013+
sqle.NewProcessList(),
2014+
"foo",
2015+
),
2016+
readTimeout: time.Second,
2017+
}
2018+
2019+
// Process List starts empty.
2020+
procs := handler.sm.processlist.Processes()
2021+
assert.Len(t, procs, 0)
2022+
2023+
// A new connection is in Connect state and shows "unauthenticated user" as the user.
2024+
abortedConn := newConn(1)
2025+
handler.NewConnection(abortedConn)
2026+
procs = handler.sm.processlist.Processes()
2027+
if assert.Len(t, procs, 1) {
2028+
assert.Equal(t, "unauthenticated user", procs[0].User)
2029+
assert.Equal(t, sql.ProcessCommandConnect, procs[0].Command)
2030+
}
2031+
2032+
// The connection being aborted does not effect the process list.
2033+
handler.ConnectionAborted(abortedConn, "")
2034+
procs = handler.sm.processlist.Processes()
2035+
assert.Len(t, procs, 1)
2036+
2037+
// After the ConnectionAborted called, the ConnectionClosed callback does
2038+
// remove the connection from the processlist.
2039+
handler.ConnectionClosed(abortedConn)
2040+
procs = handler.sm.processlist.Processes()
2041+
assert.Len(t, procs, 0)
2042+
2043+
// A new connection gets updated with the authenticated user
2044+
// and command Sleep when ConnectionAuthenticated is called.
2045+
authenticatedConn := newConn(2)
2046+
handler.NewConnection(authenticatedConn)
2047+
authenticatedConn.User = "authenticated_user"
2048+
handler.ConnectionAuthenticated(authenticatedConn)
2049+
procs = handler.sm.processlist.Processes()
2050+
if assert.Len(t, procs, 1) {
2051+
assert.Equal(t, "authenticated_user", procs[0].User)
2052+
assert.Equal(t, sql.ProcessCommandSleep, procs[0].Command)
2053+
assert.Equal(t, "", procs[0].Database)
2054+
}
2055+
2056+
// After ComInitDB, the selected database is also reflected.
2057+
handler.ComInitDB(authenticatedConn, "test")
2058+
procs = handler.sm.processlist.Processes()
2059+
if assert.Len(t, procs, 1) {
2060+
assert.Equal(t, "test", procs[0].Database)
2061+
}
2062+
}

0 commit comments

Comments
 (0)