Skip to content

Commit 2191aa1

Browse files
authored
refact(session connection): remove session connection state table (#4617)
* refact(session connection): remove session connection state table
1 parent eb6e1b5 commit 2191aa1

27 files changed

+542
-728
lines changed

internal/daemon/cluster/handlers/worker_service.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -622,16 +622,13 @@ func (ws *workerServiceServer) AuthorizeConnection(ctx context.Context, req *pbs
622622
return nil, status.Errorf(codes.NotFound, "worker not found with name %q", req.GetWorkerId())
623623
}
624624

625-
connectionInfo, connStates, err := connectionRepo.AuthorizeConnection(ctx, req.GetSessionId(), w.GetPublicId())
625+
connectionInfo, err := connectionRepo.AuthorizeConnection(ctx, req.GetSessionId(), w.GetPublicId())
626626
if err != nil {
627627
return nil, err
628628
}
629629
if connectionInfo == nil {
630630
return nil, status.Error(codes.Internal, "Invalid authorize connection response.")
631631
}
632-
if len(connStates) == 0 {
633-
return nil, status.Error(codes.Internal, "Invalid connection state in authorize response.")
634-
}
635632

636633
sessInfo, authzSummary, err := sessionRepo.LookupSession(ctx, req.GetSessionId())
637634
if err != nil {
@@ -648,7 +645,7 @@ func (ws *workerServiceServer) AuthorizeConnection(ctx context.Context, req *pbs
648645

649646
ret := &pbs.AuthorizeConnectionResponse{
650647
ConnectionId: connectionInfo.GetPublicId(),
651-
Status: connStates[0].Status.ProtoVal(),
648+
Status: session.ConnectionStatusFromString(connectionInfo.Status).ProtoVal(),
652649
ConnectionsLeft: authzSummary.ConnectionLimit,
653650
Route: route,
654651
}
@@ -680,7 +677,7 @@ func (ws *workerServiceServer) ConnectConnection(ctx context.Context, req *pbs.C
680677
return nil, status.Errorf(codes.Internal, "error getting session repo: %v", err)
681678
}
682679

683-
connectionInfo, connStates, err := connRepo.ConnectConnection(ctx, session.ConnectWith{
680+
connectionInfo, err := connRepo.ConnectConnection(ctx, session.ConnectWith{
684681
ConnectionId: req.GetConnectionId(),
685682
ClientTcpAddress: req.GetClientTcpAddress(),
686683
ClientTcpPort: req.GetClientTcpPort(),
@@ -696,7 +693,7 @@ func (ws *workerServiceServer) ConnectConnection(ctx context.Context, req *pbs.C
696693
}
697694

698695
return &pbs.ConnectConnectionResponse{
699-
Status: connStates[0].Status.ProtoVal(),
696+
Status: session.ConnectionStatusFromString(connectionInfo.Status).ProtoVal(),
700697
}, nil
701698
}
702699

@@ -742,12 +739,9 @@ func (ws *workerServiceServer) CloseConnection(ctx context.Context, req *pbs.Clo
742739
if v.Connection == nil {
743740
return nil, status.Errorf(codes.Internal, "No connection found while closing one of the connection IDs: %v", closeIds)
744741
}
745-
if len(v.ConnectionStates) == 0 {
746-
return nil, status.Errorf(codes.Internal, "No connection states found while closing one of the connection IDs: %v", closeIds)
747-
}
748742
closeData = append(closeData, &pbs.CloseConnectionResponseData{
749743
ConnectionId: v.Connection.GetPublicId(),
750-
Status: v.ConnectionStates[0].Status.ProtoVal(),
744+
Status: v.ConnectionState.ProtoVal(),
751745
})
752746
}
753747

internal/daemon/cluster/handlers/worker_service_status_test.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ func TestStatus(t *testing.T) {
9797
tofu := session.TestTofu(t)
9898
canceledSess, _, err = repo.ActivateSession(ctx, canceledSess.PublicId, canceledSess.Version, tofu)
9999
require.NoError(t, err)
100-
canceledConn, _, err := connRepo.AuthorizeConnection(ctx, canceledSess.PublicId, worker1.PublicId)
100+
canceledConn, err := connRepo.AuthorizeConnection(ctx, canceledSess.PublicId, worker1.PublicId)
101101
require.NoError(t, err)
102102

103103
canceledSess, err = repo.CancelSession(ctx, canceledSess.PublicId, canceledSess.Version)
@@ -120,7 +120,7 @@ func TestStatus(t *testing.T) {
120120
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
121121
require.NotNil(t, s)
122122

123-
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
123+
connection, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
124124
require.NoError(t, err)
125125

126126
cases := []struct {
@@ -562,7 +562,7 @@ func TestStatusSessionClosed(t *testing.T) {
562562
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
563563
require.NotNil(t, s)
564564

565-
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
565+
connection, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
566566
require.NoError(t, err)
567567

568568
cases := []struct {
@@ -757,9 +757,9 @@ func TestStatusDeadConnection(t *testing.T) {
757757
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
758758
require.NotNil(t, s)
759759

760-
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
760+
connection, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
761761
require.NoError(t, err)
762-
deadConn, _, err := connRepo.AuthorizeConnection(ctx, sess2.PublicId, worker1.PublicId)
762+
deadConn, err := connRepo.AuthorizeConnection(ctx, sess2.PublicId, worker1.PublicId)
763763
require.NoError(t, err)
764764
require.NotEqual(t, deadConn.PublicId, connection.PublicId)
765765

@@ -823,12 +823,10 @@ func TestStatusDeadConnection(t *testing.T) {
823823
),
824824
)
825825

826-
gotConn, states, err := connRepo.LookupConnection(ctx, deadConn.PublicId)
826+
gotConn, err := connRepo.LookupConnection(ctx, deadConn.PublicId)
827827
require.NoError(t, err)
828828
assert.Equal(t, session.ConnectionSystemError, session.ClosedReason(gotConn.ClosedReason))
829-
assert.Equal(t, 2, len(states))
830-
assert.Nil(t, states[0].EndTime)
831-
assert.Equal(t, session.StatusClosed, states[0].Status)
829+
assert.Equal(t, session.StatusClosed, session.ConnectionStatusFromString(gotConn.Status))
832830
}
833831

834832
func TestStatusWorkerWithKeyId(t *testing.T) {
@@ -927,7 +925,7 @@ func TestStatusWorkerWithKeyId(t *testing.T) {
927925
s := NewWorkerServiceServer(serversRepoFn, workerAuthRepoFn, sessionRepoFn, connRepoFn, nil, new(sync.Map), kms, new(atomic.Int64), fce)
928926
require.NotNil(t, s)
929927

930-
connection, _, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
928+
connection, err := connRepo.AuthorizeConnection(ctx, sess.PublicId, worker1.PublicId)
931929
require.NoError(t, err)
932930

933931
cases := []struct {

internal/db/schema/migrations/oss/postgres/0/50_session.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ begin;
219219
create trigger insert_new_session_state after insert on session
220220
for each row execute procedure insert_new_session_state();
221221

222+
-- Updated in 90/01_remove_session_connection_state
222223
-- update_connection_state_on_closed_reason() is used in an update insert trigger on the
223224
-- session_connection table. it will valiadate that all the session's
224225
-- connections are closed, and then insert a state of "closed" in

internal/db/schema/migrations/oss/postgres/0/51_connection.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ begin;
148148
create trigger default_create_time_column before insert on session_connection
149149
for each row execute procedure default_create_time();
150150

151+
-- Removed in 90/01_remove_session_connection_state.up.sql
151152
-- insert_new_connection_state() is used in an after insert trigger on the
152153
-- session_connection table. it will insert a state of "authorized" in
153154
-- session_connection_state for the new session connection.

internal/db/schema/migrations/oss/postgres/15/01_wh_rename_key_columns.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ begin;
404404
drop trigger wh_insert_session_connection_state on session_connection_state;
405405
drop function wh_insert_session_connection_state;
406406

407+
-- Updated in 90/01_remove_session_connection_state.up.sql
407408
create function wh_insert_session_connection_state() returns trigger
408409
as $$
409410
declare

internal/db/schema/migrations/oss/postgres/27/01_disable_terminate_session.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ begin;
77
drop trigger update_connection_state_on_closed_reason on session_connection;
88
drop function update_connection_state_on_closed_reason();
99

10+
-- Removed in 90/01_remove_session_connection_state.up.sql
1011
create function update_connection_state_on_closed_reason() returns trigger
1112
as $$
1213
begin

internal/db/schema/migrations/oss/postgres/27/02_wh_session_facts.up.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ begin;
77
drop trigger wh_insert_session_connection on session_connection;
88
drop function wh_insert_session_connection();
99

10+
-- Updated in 90/01_remove_session_connection_state
1011
create function wh_insert_session_connection() returns trigger
1112
as $$
1213
declare

0 commit comments

Comments
 (0)