Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions server/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,19 @@ func (s *SessionManager) AddConn(conn *mysql.Conn) {
s.wg.Add(1)
}

// Called once a connection is authenticated and in a ready
// state. Responsible for creating the session associated with the
// connection and registering the session, with appropriate
// authentication information, with the process list.
func (s *SessionManager) ConnReady(ctx context.Context, conn *mysql.Conn) error {
sess, err := s.getOrCreateSession(ctx, conn)
if err != nil {
return err
}
s.processlist.ConnectionReady(sess)
return nil
}

// NewSession creates a Session for the given connection and saves it to the session pool.
func (s *SessionManager) NewSession(ctx context.Context, conn *mysql.Conn) error {
s.mu.Lock()
Expand Down Expand Up @@ -177,6 +190,18 @@ func (s *SessionManager) SetDB(ctx context.Context, conn *mysql.Conn, dbName str
}
}

// We do this here and in ConnReady.
//
// Previously, Vitess did not have a ConnectionAuthenticated
// callback on the Handler and the only time we updated the
// authenticated user information in the processlist was on
// ComInitDB. This resulted in "unathenticated user" being
// shown in the process list if a connection chose to run
// queries without issuing ComInitDB.
//
// Calling this here makes certain the current database
// updates StartedAt and allows the newly selected database to
// be correctly reflected in the process list.
s.processlist.ConnectionReady(sess)
return nil
}
Expand Down
13 changes: 13 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,19 @@ func (h *Handler) ConnectionAborted(_ *mysql.Conn, _ string) error {
return nil
}

// Called when a new connection successfully
// authenticated. Responsible for creating the session associated with
// the connection in the session manager and updating processlist with
// the authenticated user and remote address.
func (h *Handler) ConnectionAuthenticated(c *mysql.Conn) error {
err := h.sm.ConnReady(context.Background(), c)
if err != nil {
logrus.Errorf("unable to register new authenticated connection: %s", err.Error())
err = sql.CastSQLError(err)
}
return err
}

func (h *Handler) ComInitDB(c *mysql.Conn, schemaName string) error {
// SetDB itself handles session and processlist operation lifecycle callbacks.
err := h.sm.SetDB(context.Background(), c, schemaName)
Expand Down
63 changes: 63 additions & 0 deletions server/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1997,3 +1997,66 @@ func (h *testHook) Fire(entry *logrus.Entry) error {
}
return nil
}

func TestHandlerNewConnectionProcessListInteractions(t *testing.T) {
e, pro := setupMemDB(require.New(t))
dbFunc := pro.Database

handler := &Handler{
e: e,
sm: NewSessionManager(
sql.NewContext,
testSessionBuilder(pro),
sql.NoopTracer,
dbFunc,
sql.NewMemoryManager(nil),
sqle.NewProcessList(),
"foo",
),
readTimeout: time.Second,
}

// Process List starts empty.
procs := handler.sm.processlist.Processes()
assert.Len(t, procs, 0)

// A new connection is in Connect state and shows "unauthenticated user" as the user.
abortedConn := newConn(1)
handler.NewConnection(abortedConn)
procs = handler.sm.processlist.Processes()
if assert.Len(t, procs, 1) {
assert.Equal(t, "unauthenticated user", procs[0].User)
assert.Equal(t, sql.ProcessCommandConnect, procs[0].Command)
}

// The connection being aborted does not effect the process list.
handler.ConnectionAborted(abortedConn, "")
procs = handler.sm.processlist.Processes()
assert.Len(t, procs, 1)

// After the ConnectionAborted called, the ConnectionClosed callback does
// remove the connection from the processlist.
handler.ConnectionClosed(abortedConn)
procs = handler.sm.processlist.Processes()
assert.Len(t, procs, 0)

// A new connection gets updated with the authenticated user
// and command Sleep when ConnectionAuthenticated is called.
authenticatedConn := newConn(2)
handler.NewConnection(authenticatedConn)
authenticatedConn.User = "authenticated_user"
handler.ConnectionAuthenticated(authenticatedConn)
procs = handler.sm.processlist.Processes()
if assert.Len(t, procs, 1) {
assert.Equal(t, "authenticated_user", procs[0].User)
assert.Equal(t, sql.ProcessCommandSleep, procs[0].Command)
assert.Equal(t, "", procs[0].Database)
}

// After ComInitDB, the selected database is also reflected.
handler.ComInitDB(authenticatedConn, "test")
procs = handler.sm.processlist.Processes()
if assert.Len(t, procs, 1) {
assert.Equal(t, "test", procs[0].Database)
}
}