Skip to content

Commit 06d7998

Browse files
bozgosuri
authored andcommitted
provider/cluster: ignore stale, un-owned leases. (#382)
fixes #380
1 parent 660aefe commit 06d7998

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

provider/cluster/service.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,11 @@ func NewService(ctx context.Context, session session.Session, bus event.Bus, cli
5050
return nil, err
5151
}
5252

53-
deployments, err := client.Deployments()
53+
deployments, err := findDeployments(ctx, log, client, session)
5454
if err != nil {
55-
log.Error("fetching deployments", "err", err)
5655
sub.Close()
5756
return nil, err
5857
}
59-
log.Info("found managed deployments", "count", len(deployments))
60-
for _, deployment := range deployments {
61-
log.Debug("deployment", "lease", deployment.LeaseID(), "mgroup", deployment.ManifestGroup().Name)
62-
}
6358

6459
inventory, err := newInventoryService(config, log, lc.ShuttingDown(), sub, client, deployments)
6560
if err != nil {
@@ -262,3 +257,45 @@ func (s *service) teardownLease(lid types.LeaseID) {
262257
s.log.Error("tearing down lease deployment", "err", err, "lease", lid)
263258
}
264259
}
260+
261+
func findDeployments(ctx context.Context, log log.Logger, client Client, session session.Session) ([]Deployment, error) {
262+
deployments, err := client.Deployments()
263+
if err != nil {
264+
log.Error("fetching deployments", "err", err)
265+
return nil, err
266+
}
267+
268+
leaseList, err := session.Query().Leases(ctx)
269+
if err != nil {
270+
log.Error("fetching deployments", "err", err)
271+
return nil, err
272+
}
273+
274+
leases := make(map[string]*types.Lease, len(leaseList.Items))
275+
for _, lease := range leaseList.Items {
276+
if !lease.Provider.Equal(session.Provider().Address) {
277+
continue
278+
}
279+
if lease.State != types.Lease_ACTIVE {
280+
continue
281+
}
282+
leases[lease.Path()] = lease
283+
}
284+
285+
log.Info("found leases", "num-active", len(leases), "num-skipped", len(leaseList.Items)-len(leases))
286+
287+
dcount := len(deployments)
288+
for idx, deployment := range deployments {
289+
if _, ok := leases[deployment.LeaseID().Path()]; !ok {
290+
continue
291+
}
292+
293+
deployments = append(deployments[:idx], deployments[idx+1:]...)
294+
295+
log.Debug("deployment", "lease", deployment.LeaseID(), "mgroup", deployment.ManifestGroup().Name)
296+
}
297+
298+
log.Info("found deployments", "num-active", len(deployments), "num-skipped", dcount-len(deployments))
299+
300+
return deployments, nil
301+
}

provider/cluster/service_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func withServiceTestSetup(t *testing.T, fn func(event.Bus, types.LeaseID)) {
117117
Return(cluster.NullClient().Inventory()).
118118
Maybe()
119119

120-
c, err := cluster.NewService(ctx, providerSession(t), bus, client)
120+
c, err := cluster.NewService(ctx, providerSession(t, lease), bus, client)
121121
require.NoError(t, err)
122122
testutil.WaitReady(t, c.Ready())
123123

@@ -143,10 +143,14 @@ func withServiceTestSetup(t *testing.T, fn func(event.Bus, types.LeaseID)) {
143143
mock.AssertExpectationsForObjects(t, client)
144144
}
145145

146-
func providerSession(t *testing.T) session.Session {
146+
func providerSession(t *testing.T, leases ...*types.Lease) session.Session {
147147
log := testutil.Logger()
148148
txc := new(txumocks.Client)
149149
qc := new(qmocks.Client)
150+
151+
qc.On("Leases", mock.Anything).
152+
Return(&types.Leases{Items: leases}, nil)
153+
150154
provider := testutil.Provider(testutil.Address(t), 1)
151155
return session.New(log, provider, txc, qc)
152156
}

0 commit comments

Comments
 (0)