Skip to content

Commit 6d9f97d

Browse files
authored
ICU-16457 Split Liveness Query In Two (#5628)
* Fix #5602: Ensure controller status update triggers update_time Rework and fix the logic of #5602 query split. Initial implementation failed to update controller status within `server_controller` as `w.Update` did not trigger `update_time` if `address` or `description` were not updated. Now using `w.Exec` with `updateController` query to always set `update_time` to `now()`. Additional Notes: - Updated `repository_controller` tests to verify `update_time` updates even when controller values don't change. - Added `cleanupFunc` to test cases to remove inserted controllers after each test. - Verified via `boundary dev` that `worker.upstreamDialerFunc` errors are not present. Refs: 6c0b3a8 * Resolve Comments
1 parent cd84da3 commit 6d9f97d

File tree

4 files changed

+231
-19
lines changed

4 files changed

+231
-19
lines changed

internal/daemon/controller/tickers.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (c *Controller) startStatusTicking(cancelCtx context.Context) {
3636
return
3737

3838
case <-timer.C:
39-
if err := c.upsertController(cancelCtx); err != nil {
39+
if err := c.updateController(cancelCtx); err != nil {
4040
event.WriteError(cancelCtx, op, err, event.WithInfoMsg("error fetching repository for status update"))
4141
}
4242
timer.Reset(statusInterval)
@@ -57,17 +57,39 @@ func (c *Controller) upsertController(ctx context.Context) error {
5757
controller := server.NewController(c.conf.RawConfig.Controller.Name, opts...)
5858
repo, err := c.ServersRepoFn()
5959
if err != nil {
60-
return errors.Wrap(ctx, err, op, errors.WithMsg("error fetching repository for status update"))
60+
return errors.Wrap(ctx, err, op, errors.WithMsg("error fetching repository for status upsert"))
6161
}
6262

6363
_, err = repo.UpsertController(ctx, controller)
6464
if err != nil {
65-
return errors.Wrap(ctx, err, op, errors.WithMsg("error performing status update"))
65+
return errors.Wrap(ctx, err, op, errors.WithMsg("error performing status upsert"))
6666
}
6767

6868
return nil
6969
}
7070

71+
func (c *Controller) updateController(ctx context.Context) error {
72+
const op = "controller.(Controller).updateController"
73+
var opts []server.Option
74+
if c.conf.RawConfig.Controller.Description != "" {
75+
opts = append(opts, server.WithDescription(c.conf.RawConfig.Controller.Description))
76+
}
77+
if c.conf.RawConfig.Controller.PublicClusterAddr != "" {
78+
opts = append(opts, server.WithAddress(c.conf.RawConfig.Controller.PublicClusterAddr))
79+
}
80+
controller := server.NewController(c.conf.RawConfig.Controller.Name, opts...)
81+
repo, err := c.ServersRepoFn()
82+
if err != nil {
83+
return errors.Wrap(ctx, err, op, errors.WithMsg("error fetching repository for status update"))
84+
}
85+
86+
_, err = repo.UpdateControllerStatus(ctx, controller)
87+
if err != nil {
88+
return errors.Wrap(ctx, err, op, errors.WithMsg("error performing status update"))
89+
}
90+
return nil
91+
}
92+
7193
func (c *Controller) startNonceCleanupTicking(cancelCtx context.Context) {
7294
const op = "controller.(Controller).startNonceCleanupTicking"
7395
timer := time.NewTimer(0)
@@ -76,7 +98,6 @@ func (c *Controller) startNonceCleanupTicking(cancelCtx context.Context) {
7698
case <-cancelCtx.Done():
7799
event.WriteSysEvent(cancelCtx, op, "recovery nonce ticking shutting down")
78100
return
79-
80101
case <-timer.C:
81102
repo, err := c.ServersRepoFn()
82103
if err != nil {
@@ -113,7 +134,6 @@ func (c *Controller) startTerminateCompletedSessionsTicking(cancelCtx context.Co
113134
case <-cancelCtx.Done():
114135
event.WriteSysEvent(cancelCtx, op, "terminating completed sessions ticking shutting down")
115136
return
116-
117137
case <-timer.C:
118138
repo, err := c.SessionRepoFn()
119139
if err != nil {
@@ -148,7 +168,6 @@ func (c *Controller) startCloseExpiredPendingTokens(cancelCtx context.Context) {
148168
case <-cancelCtx.Done():
149169
event.WriteSysEvent(cancelCtx, op, "closing expired pending tokens ticking shutting down")
150170
return
151-
152171
case <-timer.C:
153172
repo, err := c.AuthTokenRepoFn()
154173
if err != nil {
@@ -192,7 +211,6 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C
192211
case <-cancelCtx.Done():
193212
event.WriteSysEvent(cancelCtx, op, "context done, shutting down")
194213
return
195-
196214
case <-timer.C:
197215
connectionState := m.Connected()
198216
if len(connectionState.WorkerIds()) > 0 {
@@ -208,7 +226,6 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C
208226
}
209227
connectionState.DisconnectMissingWorkers(knownWorkers)
210228
}
211-
212229
if len(connectionState.UnmappedKeyIds()) > 0 {
213230
repo, err := c.WorkerAuthRepoStorageFn()
214231
if err != nil {
@@ -223,7 +240,6 @@ func (c *Controller) startWorkerConnectionMaintenanceTicking(cancelCtx context.C
223240
connectionState.DisconnectMissingUnmappedKeyIds(authorized)
224241
}
225242
}
226-
227243
timer.Reset(getRandomInterval())
228244
}
229245
}()

internal/server/query.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,12 @@ const (
223223
where last_status_time > now() - interval '%d seconds'
224224
and operational_state = 'active';
225225
`
226+
227+
updateController = `
228+
update server_controller
229+
set address = @controller_address,
230+
description = @controller_description,
231+
update_time = now()
232+
where private_id = @controller_private_id;
233+
`
226234
)

internal/server/repository_controller.go

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
// Copyright (c) HashiCorp, Inc.
22
// SPDX-License-Identifier: BUSL-1.1
3-
43
package server
54

65
import (
76
"context"
7+
"database/sql"
88
"fmt"
99

1010
"github.com/hashicorp/boundary/internal/db"
@@ -24,12 +24,10 @@ func (r *Repository) listControllersWithReader(ctx context.Context, reader db.Re
2424
if liveness == 0 {
2525
liveness = DefaultLiveness
2626
}
27-
2827
var where string
2928
if liveness > 0 {
3029
where = fmt.Sprintf("update_time > now() - interval '%d seconds'", uint32(liveness.Seconds()))
3130
}
32-
3331
var controllers []*Controller
3432
if err := reader.SearchWhere(
3533
ctx,
@@ -40,17 +38,14 @@ func (r *Repository) listControllersWithReader(ctx context.Context, reader db.Re
4038
); err != nil {
4139
return nil, errors.Wrap(ctx, err, "workers.listControllersWithReader")
4240
}
43-
4441
return controllers, nil
4542
}
4643

4744
func (r *Repository) UpsertController(ctx context.Context, controller *Controller) (int, error) {
4845
const op = "server.(Repository).UpsertController"
49-
5046
if controller == nil {
5147
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "controller is nil")
5248
}
53-
5449
var rowsUpdated int64
5550
_, err := r.writer.DoTx(
5651
ctx,
@@ -66,7 +61,6 @@ func (r *Repository) UpsertController(ctx context.Context, controller *Controlle
6661
if err != nil {
6762
return errors.Wrap(ctx, err, op+":Upsert")
6863
}
69-
7064
return nil
7165
},
7266
)
@@ -76,3 +70,52 @@ func (r *Repository) UpsertController(ctx context.Context, controller *Controlle
7670

7771
return int(rowsUpdated), nil
7872
}
73+
74+
// UpdateControllerStatus updates the controller's status in the repository.
75+
// This includes updating the address or description of the controller as well
76+
// as updating the update_time attribute, which is required for liveness checks
77+
// as part of a controller's status ticking.
78+
func (r *Repository) UpdateControllerStatus(ctx context.Context, controller *Controller) (int, error) {
79+
const op = "server.(Repository).UpdateControllerStatus"
80+
81+
if controller == nil {
82+
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "controller is nil")
83+
}
84+
if controller.PrivateId == "" {
85+
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "controller private_id is empty")
86+
}
87+
if controller.Address == "" {
88+
return db.NoRowsAffected, errors.New(ctx, errors.InvalidParameter, op, "controller address is empty")
89+
}
90+
91+
var rowsUpdated int
92+
_, err := r.writer.DoTx(
93+
ctx,
94+
db.StdRetryCnt,
95+
db.ExpBackoff{},
96+
func(reader db.Reader, w db.Writer) error {
97+
var err error
98+
rowsUpdated, err = w.Exec(ctx, updateController,
99+
[]any{
100+
sql.Named("controller_address", controller.Address),
101+
sql.Named("controller_description", controller.Description),
102+
sql.Named("controller_private_id", controller.PrivateId),
103+
})
104+
switch {
105+
case err != nil:
106+
return errors.Wrap(ctx, err, op+":Update")
107+
case rowsUpdated > 1:
108+
return errors.New(ctx, errors.MultipleRecords, op, "more than 1 resource would have been updated")
109+
case rowsUpdated == 0:
110+
return errors.New(ctx, errors.RecordNotFound, op, "no resources would have been updated")
111+
default:
112+
return nil
113+
}
114+
},
115+
)
116+
if err != nil {
117+
return db.NoRowsAffected, err
118+
}
119+
120+
return rowsUpdated, nil
121+
}

internal/server/repository_controller_test.go

Lines changed: 148 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
// Copyright (c) HashiCorp, Inc.
22
// SPDX-License-Identifier: BUSL-1.1
3-
43
package server
54

65
import (
@@ -14,6 +13,10 @@ import (
1413
"github.com/stretchr/testify/require"
1514
)
1615

16+
const (
17+
removeControllerSql = `delete from server_controller where private_id = $1`
18+
)
19+
1720
func TestRepository_UpsertController(t *testing.T) {
1821
ctx := context.Background()
1922
conn, _ := db.TestSetup(t, "postgres")
@@ -22,10 +25,8 @@ func TestRepository_UpsertController(t *testing.T) {
2225
testKms := kms.TestKms(t, conn, wrapper)
2326
testRepo, err := NewRepository(ctx, rw, rw, testKms)
2427
require.NoError(t, err)
25-
2628
iamRepo := iam.TestRepo(t, conn, wrapper)
2729
iam.TestScopes(t, iamRepo)
28-
2930
tests := []struct {
3031
name string
3132
controller *Controller
@@ -81,3 +82,147 @@ func TestRepository_UpsertController(t *testing.T) {
8182
})
8283
}
8384
}
85+
86+
func TestRepository_UpdateControllerStatus(t *testing.T) {
87+
ctx := context.Background()
88+
conn, _ := db.TestSetup(t, "postgres")
89+
rw := db.New(conn)
90+
wrapper := db.TestWrapper(t)
91+
testKms := kms.TestKms(t, conn, wrapper)
92+
testRepo, err := NewRepository(ctx, rw, rw, testKms)
93+
require.NoError(t, err)
94+
95+
iamRepo := iam.TestRepo(t, conn, wrapper)
96+
iam.TestScopes(t, iamRepo)
97+
98+
tests := map[string]struct {
99+
originalController *Controller
100+
updatedController *Controller
101+
wantCount int
102+
wantErr bool
103+
cleanUpFunc func(t *testing.T, rw *db.Db, privateId string)
104+
}{
105+
"nil-controller": {
106+
wantErr: true,
107+
},
108+
"empty-id": {
109+
updatedController: NewController("", WithAddress("127.0.0.1")),
110+
wantErr: true,
111+
},
112+
"empty-address": {
113+
updatedController: NewController("test-controller"),
114+
wantErr: true,
115+
},
116+
"controller-not-found": {
117+
updatedController: NewController("test-controller", WithAddress("127.0.0.1"), WithDescription("new ipv4 description")),
118+
wantErr: true,
119+
},
120+
"valid-ipv4-controller": {
121+
originalController: NewController("ipv4-controller", WithAddress("127.0.0.1"), WithDescription("ipv4 description")),
122+
updatedController: NewController("ipv4-controller", WithAddress("127.0.0.2"), WithDescription("new ipv4 description")),
123+
wantCount: 1,
124+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
125+
t.Helper()
126+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
127+
require.NoError(t, err)
128+
require.Equal(t, 1, c)
129+
},
130+
},
131+
"valid-ipv6-controller": {
132+
originalController: NewController("test-ipv6-controller", WithAddress("[2001:4860:4860:0:0:0:0:8888]"), WithDescription("ipv6 description")),
133+
updatedController: NewController("test-ipv6-controller", WithAddress("[2001:4860:4860:0:0:0:0:9999]"), WithDescription("new ipv6 description")),
134+
wantCount: 1,
135+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
136+
t.Helper()
137+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
138+
require.NoError(t, err)
139+
require.Equal(t, 1, c)
140+
},
141+
},
142+
"valid-abbreviated-ipv6-controller": {
143+
originalController: NewController("test-abbreviated-ipv6-controller", WithAddress("[2001:4860:4860::8888]"), WithDescription("abbreviated ipv6 description")),
144+
updatedController: NewController("test-abbreviated-ipv6-controller", WithAddress("[2001:4860:4860::9999]"), WithDescription("new abbreviated ipv6 description")),
145+
wantCount: 1,
146+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
147+
t.Helper()
148+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
149+
require.NoError(t, err)
150+
require.Equal(t, 1, c)
151+
},
152+
},
153+
"valid-controller-short-name": {
154+
originalController: NewController("test", WithAddress("127.0.0.1"), WithDescription("short name description")),
155+
updatedController: NewController("test", WithAddress("127.0.0.2"), WithDescription("new short name description")),
156+
wantCount: 1,
157+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
158+
t.Helper()
159+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
160+
require.NoError(t, err)
161+
require.Equal(t, 1, c)
162+
},
163+
},
164+
// Test case for updating a controller with the same attributes validating update_time is updated
165+
"duplicate-ipv4-controller-update": {
166+
originalController: NewController("ipv4-controller", WithAddress("127.0.0.1"), WithDescription("new ipv4 description")),
167+
updatedController: NewController("ipv4-controller", WithAddress("127.0.0.1"), WithDescription("new ipv4 description")),
168+
wantCount: 1,
169+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
170+
t.Helper()
171+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
172+
require.NoError(t, err)
173+
require.Equal(t, 1, c)
174+
},
175+
},
176+
"duplicate-ipv6-controller-update": {
177+
originalController: NewController("test-ipv6-controller", WithAddress("[2001:4860:4860:0:0:0:0:8888]"), WithDescription("ipv6 description")),
178+
updatedController: NewController("test-ipv6-controller", WithAddress("[2001:4860:4860:0:0:0:0:8888]"), WithDescription("ipv6 description")),
179+
wantCount: 1,
180+
cleanUpFunc: func(t *testing.T, rw *db.Db, privateId string) {
181+
t.Helper()
182+
c, err := rw.Exec(ctx, removeControllerSql, []any{privateId})
183+
require.NoError(t, err)
184+
require.Equal(t, 1, c)
185+
},
186+
},
187+
}
188+
189+
for name, tt := range tests {
190+
t.Run(name, func(t *testing.T) {
191+
assert, require := assert.New(t), require.New(t)
192+
193+
var originalControllerEntry *Controller
194+
if tt.originalController != nil {
195+
_, err := testRepo.UpsertController(ctx, tt.originalController)
196+
require.NoError(err)
197+
198+
controllerList, err := testRepo.ListControllers(ctx, []Option{}...)
199+
require.NoError(err)
200+
originalControllerEntry = controllerList[0]
201+
}
202+
203+
got, err := testRepo.UpdateControllerStatus(ctx, tt.updatedController)
204+
if tt.wantErr {
205+
require.Error(err)
206+
assert.Equal(0, got)
207+
if tt.cleanUpFunc != nil {
208+
tt.cleanUpFunc(t, rw, tt.updatedController.PrivateId)
209+
}
210+
return
211+
}
212+
require.NoError(err)
213+
assert.Equal(tt.wantCount, got)
214+
215+
controllerList, err := testRepo.ListControllers(ctx, []Option{}...)
216+
require.NoError(err)
217+
require.Len(controllerList, 1)
218+
219+
updatedControllerEntry := controllerList[0]
220+
221+
assert.Equal(tt.updatedController.PrivateId, updatedControllerEntry.PrivateId)
222+
assert.Equal(tt.updatedController.Address, updatedControllerEntry.Address)
223+
assert.Equal(tt.updatedController.Description, updatedControllerEntry.Description)
224+
assert.True(updatedControllerEntry.UpdateTime.AsTime().After(originalControllerEntry.UpdateTime.AsTime()))
225+
tt.cleanUpFunc(t, rw, tt.updatedController.PrivateId)
226+
})
227+
}
228+
}

0 commit comments

Comments
 (0)