Skip to content

Commit 0734c08

Browse files
authored
Rework operation poller (#245)
* Replace ListOperations with GetOperation because it has higher limits. Rework the poller code to keep the OperationPolling simple with Submit - submitting operation provider, Poll - polling operations till complete or timeout, and Await - waiting for the all the submitted operation's result. Moving the rate-limiter to the separate file for the clarity and separation of concerns. Adding the limiter for Register/DeregisterInstance api calls. * Add found check in the rate limiters map.
1 parent 7fc195b commit 0734c08

13 files changed

+449
-626
lines changed

Makefile

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,6 @@ ifneq ($(shell test -d $(MOCKS_DESTINATION); echo $$?), 0)
158158
$(MOCKGEN) --source pkg/cloudmap/client.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/client_mock.go --package cloudmap_mock
159159
$(MOCKGEN) --source pkg/cloudmap/cache.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/cache_mock.go --package cloudmap_mock
160160
$(MOCKGEN) --source pkg/cloudmap/operation_poller.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_poller_mock.go --package cloudmap_mock
161-
$(MOCKGEN) --source pkg/cloudmap/operation_collector.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_collector_mock.go --package cloudmap_mock
162161
$(MOCKGEN) --source pkg/cloudmap/api.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/api_mock.go --package cloudmap_mock
163162
$(MOCKGEN) --source pkg/cloudmap/aws_facade.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/aws_facade_mock.go --package cloudmap_mock
164163
$(MOCKGEN) --source integration/janitor/api.go --destination $(MOCKS_DESTINATION)/integration/janitor/api_mock.go --package janitor_mock

integration/janitor/janitor.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (j *cloudMapJanitor) Cleanup(ctx context.Context, nsName string) {
7272
opId, err := j.sdApi.DeleteNamespace(ctx, ns.Id)
7373
if err == nil {
7474
fmt.Println("namespace delete in progress")
75-
_, err = j.sdApi.PollNamespaceOperation(ctx, opId)
75+
_, err = cloudmap.NewOperationPoller(j.sdApi).Poll(ctx, opId)
7676
}
7777
j.checkOrFail(err, "clean up successful", "could not cleanup namespace")
7878
}
@@ -87,17 +87,17 @@ func (j *cloudMapJanitor) deregisterInstances(ctx context.Context, nsName string
8787
fmt.Sprintf("service has %d instances to clean", len(insts)),
8888
"could not list instances to cleanup")
8989

90-
opColl := cloudmap.NewOperationCollector()
90+
opPoller := cloudmap.NewOperationPoller(j.sdApi)
9191
for _, inst := range insts {
9292
instId := aws.ToString(inst.InstanceId)
9393
fmt.Printf("found instance to clean: %s\n", instId)
94-
opColl.Add(func() (opId string, err error) {
94+
opPoller.Submit(ctx, func() (opId string, err error) {
9595
return j.sdApi.DeregisterInstance(ctx, svcId, instId)
9696
})
9797
}
9898

99-
opErr := cloudmap.NewDeregisterInstancePoller(j.sdApi, svcId, opColl.Collect(), opColl.GetStartTime()).Poll(ctx)
100-
j.checkOrFail(opErr, "instances de-registered", "could not cleanup instances")
99+
err = opPoller.Await()
100+
j.checkOrFail(err, "instances de-registered", "could not cleanup instances")
101101
}
102102

103103
func (j *cloudMapJanitor) checkOrFail(err error, successMsg string, failMsg string) {

integration/janitor/janitor_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,15 @@ func TestCleanupHappyCase(t *testing.T) {
3939

4040
tj.mockApi.EXPECT().DeregisterInstance(context.TODO(), test.SvcId, test.EndptId1).
4141
Return(test.OpId1, nil)
42-
tj.mockApi.EXPECT().ListOperations(context.TODO(), gomock.Any()).
43-
Return(map[string]types.OperationStatus{test.OpId1: types.OperationStatusSuccess}, nil)
42+
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId1).
43+
Return(&types.Operation{Status: types.OperationStatusSuccess}, nil)
4444
tj.mockApi.EXPECT().DeleteService(context.TODO(), test.SvcId).
4545
Return(nil)
4646
tj.mockApi.EXPECT().DeleteNamespace(context.TODO(), test.HttpNsId).
4747
Return(test.OpId2, nil)
48-
tj.mockApi.EXPECT().PollNamespaceOperation(context.TODO(), test.OpId2).
49-
Return(test.HttpNsId, nil)
48+
tj.mockApi.EXPECT().GetOperation(context.TODO(), test.OpId2).
49+
Return(&types.Operation{Status: types.OperationStatusSuccess,
50+
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}, nil)
5051

5152
tj.janitor.Cleanup(context.TODO(), test.HttpNsName)
5253
assert.False(t, *tj.failed)

pkg/cloudmap/api.go

Lines changed: 19 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,12 @@ package cloudmap
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
7-
"time"
8-
9-
"golang.org/x/time/rate"
105

116
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
127
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
138
"github.com/aws/aws-sdk-go-v2/aws"
149
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
1510
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
16-
"k8s.io/apimachinery/pkg/util/wait"
1711
)
1812

1913
const (
@@ -32,9 +26,6 @@ type ServiceDiscoveryApi interface {
3226
// DiscoverInstances returns a list of service instances registered to a given service.
3327
DiscoverInstances(ctx context.Context, nsName string, svcName string, queryParameters map[string]string) (insts []types.HttpInstanceSummary, err error)
3428

35-
// ListOperations returns a map of operations to their status matching a list of filters.
36-
ListOperations(ctx context.Context, opFilters []types.OperationFilter) (operationStatusMap map[string]types.OperationStatus, err error)
37-
3829
// GetOperation returns an operation.
3930
GetOperation(ctx context.Context, operationId string) (operation *types.Operation, err error)
4031

@@ -49,32 +40,25 @@ type ServiceDiscoveryApi interface {
4940

5041
// DeregisterInstance de-registers a service instance in Cloud Map.
5142
DeregisterInstance(ctx context.Context, serviceId string, instanceId string) (operationId string, err error)
52-
53-
// PollNamespaceOperation polls a namespace operation, and returns the namespace ID.
54-
PollNamespaceOperation(ctx context.Context, operationId string) (namespaceId string, err error)
5543
}
5644

5745
type serviceDiscoveryApi struct {
58-
log common.Logger
59-
awsFacade AwsFacade
60-
nsRateLimiter *rate.Limiter
61-
svcRateLimiter *rate.Limiter
62-
opRateLimiter *rate.Limiter
46+
log common.Logger
47+
awsFacade AwsFacade
48+
rateLimiter common.RateLimiter
6349
}
6450

6551
// NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config.
6652
func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi {
6753
return &serviceDiscoveryApi{
68-
log: common.NewLogger("cloudmap", "api"),
69-
awsFacade: NewAwsFacadeFromConfig(cfg),
70-
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 5), // 1 per second
71-
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 10), // 2 per second
72-
opRateLimiter: rate.NewLimiter(rate.Every(100*time.Second), 200), // 100 per second
54+
log: common.NewLogger("cloudmap", "api"),
55+
awsFacade: NewAwsFacadeFromConfig(cfg),
56+
rateLimiter: common.NewDefaultRateLimiter(),
7357
}
7458
}
7559

7660
func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[string]*model.Namespace, error) {
77-
err := sdApi.nsRateLimiter.Wait(ctx)
61+
err := sdApi.rateLimiter.Wait(ctx, common.ListNamespaces)
7862
if err != nil {
7963
return nil, err
8064
}
@@ -105,7 +89,7 @@ func (sdApi *serviceDiscoveryApi) GetNamespaceMap(ctx context.Context) (map[stri
10589
}
10690

10791
func (sdApi *serviceDiscoveryApi) GetServiceIdMap(ctx context.Context, nsId string) (map[string]string, error) {
108-
err := sdApi.svcRateLimiter.Wait(ctx)
92+
err := sdApi.rateLimiter.Wait(ctx, common.ListServices)
10993
if err != nil {
11094
return nil, err
11195
}
@@ -151,30 +135,8 @@ func (sdApi *serviceDiscoveryApi) DiscoverInstances(ctx context.Context, nsName
151135
return out.Instances, nil
152136
}
153137

154-
func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters []types.OperationFilter) (map[string]types.OperationStatus, error) {
155-
opStatusMap := make(map[string]types.OperationStatus)
156-
157-
pages := sd.NewListOperationsPaginator(sdApi.awsFacade, &sd.ListOperationsInput{
158-
Filters: opFilters,
159-
})
160-
161-
for pages.HasMorePages() {
162-
output, err := pages.NextPage(ctx)
163-
164-
if err != nil {
165-
return opStatusMap, err
166-
}
167-
168-
for _, sdOp := range output.Operations {
169-
opStatusMap[aws.ToString(sdOp.Id)] = sdOp.Status
170-
}
171-
}
172-
173-
return opStatusMap, nil
174-
}
175-
176138
func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
177-
err = sdApi.opRateLimiter.Wait(ctx)
139+
err = sdApi.rateLimiter.Wait(ctx, common.GetOperation)
178140
if err != nil {
179141
return nil, err
180142
}
@@ -236,6 +198,11 @@ func (sdApi *serviceDiscoveryApi) getDnsConfig() types.DnsConfig {
236198
}
237199

238200
func (sdApi *serviceDiscoveryApi) RegisterInstance(ctx context.Context, svcId string, instId string, instAttrs map[string]string) (opId string, err error) {
201+
err = sdApi.rateLimiter.Wait(ctx, common.RegisterInstance)
202+
if err != nil {
203+
return "", err
204+
}
205+
239206
regResp, err := sdApi.awsFacade.RegisterInstance(ctx, &sd.RegisterInstanceInput{
240207
Attributes: instAttrs,
241208
InstanceId: &instId,
@@ -250,6 +217,11 @@ func (sdApi *serviceDiscoveryApi) RegisterInstance(ctx context.Context, svcId st
250217
}
251218

252219
func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId string, instId string) (opId string, err error) {
220+
err = sdApi.rateLimiter.Wait(ctx, common.DeregisterInstance)
221+
if err != nil {
222+
return "", err
223+
}
224+
253225
deregResp, err := sdApi.awsFacade.DeregisterInstance(ctx, &sd.DeregisterInstanceInput{
254226
InstanceId: &instId,
255227
ServiceId: &svcId,
@@ -261,31 +233,3 @@ func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId
261233

262234
return aws.ToString(deregResp.OperationId), err
263235
}
264-
265-
func (sdApi *serviceDiscoveryApi) PollNamespaceOperation(ctx context.Context, opId string) (nsId string, err error) {
266-
err = wait.Poll(defaultOperationPollInterval, defaultOperationPollTimeout, func() (done bool, err error) {
267-
sdApi.log.Info("polling operation", "opId", opId)
268-
op, err := sdApi.GetOperation(ctx, opId)
269-
270-
if err != nil {
271-
return true, err
272-
}
273-
274-
if op.Status == types.OperationStatusFail {
275-
return true, fmt.Errorf("failed to create namespace: %s", aws.ToString(op.ErrorMessage))
276-
}
277-
278-
if op.Status == types.OperationStatusSuccess {
279-
nsId = op.Targets[string(types.OperationTargetTypeNamespace)]
280-
return true, nil
281-
}
282-
283-
return false, nil
284-
})
285-
286-
if err == wait.ErrWaitTimeout {
287-
err = errors.New(operationPollTimoutErrorMessage)
288-
}
289-
290-
return nsId, err
291-
}

pkg/cloudmap/api_test.go

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@ import (
55
"errors"
66
"fmt"
77
"testing"
8-
"time"
9-
10-
"golang.org/x/time/rate"
118

129
aboutv1alpha1 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/apis/about/v1alpha1"
1310

@@ -131,26 +128,6 @@ func TestServiceDiscoveryApi_DiscoverInstances_HappyCase(t *testing.T) {
131128
assert.Equal(t, test.EndptId2, *insts[1].InstanceId)
132129
}
133130

134-
func TestServiceDiscoveryApi_ListOperations_HappyCase(t *testing.T) {
135-
mockController := gomock.NewController(t)
136-
defer mockController.Finish()
137-
138-
awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
139-
sdApi := getServiceDiscoveryApi(t, awsFacade)
140-
141-
filters := make([]types.OperationFilter, 0)
142-
awsFacade.EXPECT().ListOperations(context.TODO(), &sd.ListOperationsInput{Filters: filters}).
143-
Return(&sd.ListOperationsOutput{
144-
Operations: []types.OperationSummary{
145-
{Id: aws.String(test.OpId1), Status: types.OperationStatusSuccess},
146-
}}, nil)
147-
148-
ops, err := sdApi.ListOperations(context.TODO(), filters)
149-
assert.Nil(t, err, "No error for happy case")
150-
assert.True(t, len(ops) == 1)
151-
assert.Equal(t, ops[test.OpId1], types.OperationStatusSuccess)
152-
}
153-
154131
func TestServiceDiscoveryApi_GetOperation_HappyCase(t *testing.T) {
155132
mockController := gomock.NewController(t)
156133
defer mockController.Finish()
@@ -316,33 +293,12 @@ func TestServiceDiscoveryApi_DeregisterInstance_Error(t *testing.T) {
316293
assert.Equal(t, sdkErr, err)
317294
}
318295

319-
func TestServiceDiscoveryApi_PollNamespaceOperation_HappyCase(t *testing.T) {
320-
mockController := gomock.NewController(t)
321-
defer mockController.Finish()
322-
323-
awsFacade := cloudmapMock.NewMockAwsFacade(mockController)
324-
awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
325-
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusPending}}, nil)
326-
327-
awsFacade.EXPECT().GetOperation(context.TODO(), &sd.GetOperationInput{OperationId: aws.String(test.OpId1)}).
328-
Return(&sd.GetOperationOutput{Operation: &types.Operation{Status: types.OperationStatusSuccess,
329-
Targets: map[string]string{string(types.OperationTargetTypeNamespace): test.HttpNsId}}}, nil)
330-
331-
sdApi := getServiceDiscoveryApi(t, awsFacade)
332-
333-
nsId, err := sdApi.PollNamespaceOperation(context.TODO(), test.OpId1)
334-
assert.Nil(t, err)
335-
assert.Equal(t, test.HttpNsId, nsId)
336-
}
337-
338296
func getServiceDiscoveryApi(t *testing.T, awsFacade *cloudmapMock.MockAwsFacade) ServiceDiscoveryApi {
339297
scheme := runtime.NewScheme()
340298
scheme.AddKnownTypes(aboutv1alpha1.GroupVersion, &aboutv1alpha1.ClusterProperty{})
341299
return &serviceDiscoveryApi{
342-
log: common.NewLoggerWithLogr(testr.New(t)),
343-
awsFacade: awsFacade,
344-
nsRateLimiter: rate.NewLimiter(rate.Every(1*time.Second), 2), // 1 per second
345-
svcRateLimiter: rate.NewLimiter(rate.Every(2*time.Second), 4), // 2 per second
346-
opRateLimiter: rate.NewLimiter(rate.Every(10*time.Second), 100), // 10 per second
300+
log: common.NewLoggerWithLogr(testr.New(t)),
301+
awsFacade: awsFacade,
302+
rateLimiter: common.NewDefaultRateLimiter(),
347303
}
348304
}

pkg/cloudmap/client.go

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/common"
99
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
1010
"github.com/aws/aws-sdk-go-v2/aws"
11+
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
1112
)
1213

1314
// ServiceDiscoveryClient provides the service endpoint management functionality required by the AWS Cloud Map
@@ -149,27 +150,21 @@ func (sdc *serviceDiscoveryClient) RegisterEndpoints(ctx context.Context, nsName
149150
return err
150151
}
151152

152-
opCollector := NewOperationCollector()
153-
153+
operationPoller := NewOperationPoller(sdc.sdApi)
154154
for _, endpt := range endpts {
155155
endptId := endpt.Id
156156
endptAttrs := endpt.GetCloudMapAttributes()
157-
opCollector.Add(func() (opId string, err error) {
157+
operationPoller.Submit(ctx, func() (opId string, err error) {
158158
return sdc.sdApi.RegisterInstance(ctx, svcId, endptId, endptAttrs)
159159
})
160160
}
161161

162-
err = NewRegisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)
163-
164162
// Evict cache entry so next list call reflects changes
165163
sdc.cache.EvictEndpoints(nsName, svcName)
166164

165+
err = operationPoller.Await()
167166
if err != nil {
168-
return err
169-
}
170-
171-
if !opCollector.IsAllOperationsCreated() {
172-
return errors.New("failure while registering endpoints")
167+
return common.Wrap(err, errors.New("failure while registering endpoints"))
173168
}
174169

175170
return nil
@@ -188,29 +183,23 @@ func (sdc *serviceDiscoveryClient) DeleteEndpoints(ctx context.Context, nsName s
188183
return err
189184
}
190185

191-
opCollector := NewOperationCollector()
192-
186+
operationPoller := NewOperationPoller(sdc.sdApi)
193187
for _, endpt := range endpts {
194188
endptId := endpt.Id
195-
// add operation to delete endpoint
196-
opCollector.Add(func() (opId string, err error) {
189+
operationPoller.Submit(ctx, func() (opId string, err error) {
197190
return sdc.sdApi.DeregisterInstance(ctx, svcId, endptId)
198191
})
199192
}
200193

201-
err = NewDeregisterInstancePoller(sdc.sdApi, svcId, opCollector.Collect(), opCollector.GetStartTime()).Poll(ctx)
202-
203194
// Evict cache entry so next list call reflects changes
204195
sdc.cache.EvictEndpoints(nsName, svcName)
205-
if err != nil {
206-
return err
207-
}
208196

209-
if !opCollector.IsAllOperationsCreated() {
210-
return errors.New("failure while de-registering endpoints")
197+
err = operationPoller.Await()
198+
if err != nil {
199+
return common.Wrap(err, errors.New("failure while de-registering endpoints"))
211200
}
212201

213-
return nil
202+
return err
214203
}
215204

216205
func (sdc *serviceDiscoveryClient) getEndpoints(ctx context.Context, nsName string, svcName string) (endpts []*model.Endpoint, err error) {
@@ -315,12 +304,13 @@ func (sdc *serviceDiscoveryClient) createNamespace(ctx context.Context, nsName s
315304
return nil, err
316305
}
317306

318-
nsId, err := sdc.sdApi.PollNamespaceOperation(ctx, opId)
307+
op, err := NewOperationPoller(sdc.sdApi).Poll(ctx, opId)
319308
if err != nil {
320309
return nil, err
321310
}
311+
nsId := op.Targets[string(types.OperationTargetTypeNamespace)]
322312

323-
sdc.log.Info("namespace created", "nsId", nsId)
313+
sdc.log.Info("namespace created", "nsId", nsId, "namespace", nsName)
324314

325315
// Default namespace type HTTP
326316
namespace = &model.Namespace{

0 commit comments

Comments
 (0)