Skip to content

Commit e691754

Browse files
authored
Modularize Cloud Map client (#31)
Encapsulate Cloud map client logic into modular and maintainable interfaces. AWS facade provides minimal SDK API surface to facilitate mocking. Service discovery API acts as a thin wrapper to handle conversion of SD request/response data structures. Operation poller simplifies logic to monitor multiple inflight operation status. Operation collector asynchronously collects operations to poll. Client and Endpoint Manager have been unified due to simplified design gains following above.
1 parent 390d3e6 commit e691754

File tree

8 files changed

+688
-446
lines changed

8 files changed

+688
-446
lines changed

Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/confi
9393
MOCKS_DESTINATION=mocks
9494
generate-mocks: mockgen
9595
$(MOCKGEN) --source pkg/cloudmap/client.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/client_mock.go --package cloudmap
96+
$(MOCKGEN) --source pkg/cloudmap/operation_poller.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_poller_mock.go --package cloudmap
97+
$(MOCKGEN) --source pkg/cloudmap/operation_collector.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/operation_collector_mock.go --package cloudmap
98+
$(MOCKGEN) --source pkg/cloudmap/api.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/api_mock.go --package cloudmap
99+
$(MOCKGEN) --source pkg/cloudmap/aws_facade.go --destination $(MOCKS_DESTINATION)/pkg/cloudmap/aws_facade_mock.go --package cloudmap
100+
96101

97102
CONTROLLER_GEN = $(shell pwd)/bin/controller-gen
98103
controller-gen: ## Download controller-gen locally if necessary.

pkg/cloudmap/api.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package cloudmap
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
9+
"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
10+
"github.com/go-logr/logr"
11+
"k8s.io/apimachinery/pkg/util/wait"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
)
14+
15+
// ServiceDiscoveryApi handles the AWS Cloud Map API request and response processing logic, and converts results to
16+
// internal data structures. It manages all interactions with the AWS SDK.
17+
type ServiceDiscoveryApi interface {
18+
// ListNamespaces returns a list of all namespaces.
19+
ListNamespaces(ctx context.Context) (namespaces []*Resource, err error)
20+
21+
// ListServices returns a list of services for a given namespace.
22+
ListServices(ctx context.Context, namespaceId string) (services []*Resource, err error)
23+
24+
// ListInstances returns a list of service instances registered to a given service.
25+
ListInstances(ctx context.Context, serviceId string) ([]*model.Endpoint, error)
26+
27+
// ListOperations returns a map of operations to their status matching a list of filters.
28+
ListOperations(ctx context.Context, opFilters []types.OperationFilter) (operationStatusMap map[string]types.OperationStatus, err error)
29+
30+
// GetOperation returns an operation.
31+
GetOperation(ctx context.Context, operationId string) (operation *types.Operation, err error)
32+
33+
// CreateHttpNamespace creates a HTTP namespace in AWS Cloud Map for a given name.
34+
CreateHttpNamespace(ctx context.Context, namespaceName string) (operationId string, err error)
35+
36+
// CreateService creates a named service in AWS Cloud Map under the given namespace.
37+
CreateService(ctx context.Context, namespaceId string, serviceName string) (serviceId string, err error)
38+
39+
// RegisterInstance registers a service instance in AWS Cloud Map.
40+
RegisterInstance(ctx context.Context, serviceId string, instanceId string, instanceAttrs map[string]string) (operationId string, err error)
41+
42+
// DeregisterInstance de-registers a service instance in Cloud Map.
43+
DeregisterInstance(ctx context.Context, serviceId string, instanceId string) (operationId string, err error)
44+
45+
// PollCreateNamespace polls a create namespace operation, and returns the namespace ID.
46+
PollCreateNamespace(ctx context.Context, operationId string) (namespaceId string, err error)
47+
}
48+
49+
type serviceDiscoveryApi struct {
50+
log logr.Logger
51+
awsFacade AwsFacade
52+
}
53+
54+
// Resource encapsulates a ID/name pair
55+
type Resource struct {
56+
Id string
57+
Name string
58+
}
59+
60+
// NewServiceDiscoveryApiFromConfig creates a new AWS Cloud Map API connection manager from an AWS client config.
61+
func NewServiceDiscoveryApiFromConfig(cfg *aws.Config) ServiceDiscoveryApi {
62+
return &serviceDiscoveryApi{
63+
log: ctrl.Log.WithName("cloudmap"),
64+
awsFacade: NewAwsFacadeFromConfig(cfg),
65+
}
66+
}
67+
68+
func (sdApi *serviceDiscoveryApi) ListNamespaces(ctx context.Context) ([]*Resource, error) {
69+
namespaces := make([]*Resource, 0)
70+
pages := sd.NewListNamespacesPaginator(sdApi.awsFacade, &sd.ListNamespacesInput{})
71+
72+
for pages.HasMorePages() {
73+
output, err := pages.NextPage(ctx)
74+
if err != nil {
75+
return namespaces, err
76+
}
77+
78+
for _, ns := range output.Namespaces {
79+
namespaces = append(namespaces, &Resource{
80+
Id: aws.ToString(ns.Id),
81+
Name: aws.ToString(ns.Name),
82+
})
83+
}
84+
}
85+
86+
return namespaces, nil
87+
}
88+
89+
func (sdApi *serviceDiscoveryApi) ListServices(ctx context.Context, nsId string) ([]*Resource, error) {
90+
svcs := make([]*Resource, 0)
91+
92+
filter := types.ServiceFilter{
93+
Name: types.ServiceFilterNameNamespaceId,
94+
Values: []string{nsId},
95+
}
96+
sdApi.log.Info("paginating", "nsId", nsId)
97+
98+
pages := sd.NewListServicesPaginator(sdApi.awsFacade, &sd.ListServicesInput{Filters: []types.ServiceFilter{filter}})
99+
100+
for pages.HasMorePages() {
101+
output, err := pages.NextPage(ctx)
102+
if err != nil {
103+
return svcs, err
104+
}
105+
106+
for _, svc := range output.Services {
107+
svcs = append(svcs, &Resource{
108+
Id: aws.ToString(svc.Id),
109+
Name: aws.ToString(svc.Name),
110+
})
111+
}
112+
}
113+
114+
return svcs, nil
115+
}
116+
117+
func (sdApi *serviceDiscoveryApi) ListInstances(ctx context.Context, svcId string) ([]*model.Endpoint, error) {
118+
endpts := make([]*model.Endpoint, 0)
119+
120+
pages := sd.NewListInstancesPaginator(sdApi.awsFacade, &sd.ListInstancesInput{ServiceId: &svcId})
121+
122+
for pages.HasMorePages() {
123+
output, err := pages.NextPage(ctx)
124+
if err != nil {
125+
return endpts, err
126+
}
127+
128+
for _, inst := range output.Instances {
129+
endpt, endptErr := model.NewEndpointFromInstance(&inst)
130+
131+
if endptErr != nil {
132+
sdApi.log.Info(fmt.Sprintf("skipping instance %s to endpoint conversion: %s", *inst.Id, endptErr.Error()))
133+
continue
134+
}
135+
136+
endpts = append(endpts, endpt)
137+
}
138+
}
139+
140+
return endpts, nil
141+
}
142+
143+
func (sdApi *serviceDiscoveryApi) ListOperations(ctx context.Context, opFilters []types.OperationFilter) (opStatusMap map[string]types.OperationStatus, err error) {
144+
opStatusMap = make(map[string]types.OperationStatus, 0)
145+
146+
pages := sd.NewListOperationsPaginator(sdApi.awsFacade, &sd.ListOperationsInput{
147+
Filters: opFilters,
148+
})
149+
150+
for pages.HasMorePages() {
151+
output, err := pages.NextPage(ctx)
152+
153+
if err != nil {
154+
return opStatusMap, err
155+
}
156+
157+
for _, sdOp := range output.Operations {
158+
opStatusMap[aws.ToString(sdOp.Id)] = sdOp.Status
159+
}
160+
}
161+
162+
return opStatusMap, nil
163+
}
164+
165+
func (sdApi *serviceDiscoveryApi) GetOperation(ctx context.Context, opId string) (operation *types.Operation, err error) {
166+
opResp, err := sdApi.awsFacade.GetOperation(ctx, &sd.GetOperationInput{OperationId: &opId})
167+
168+
if err != nil {
169+
return nil, err
170+
}
171+
172+
return opResp.Operation, nil
173+
}
174+
175+
func (sdApi *serviceDiscoveryApi) CreateHttpNamespace(ctx context.Context, nsName string) (opId string, err error) {
176+
output, err := sdApi.awsFacade.CreateHttpNamespace(ctx, &sd.CreateHttpNamespaceInput{
177+
Name: &nsName,
178+
})
179+
180+
if err != nil {
181+
return "", err
182+
}
183+
184+
return aws.ToString(output.OperationId), nil
185+
}
186+
187+
func (sdApi *serviceDiscoveryApi) CreateService(ctx context.Context, nsId string, svcName string) (svcId string, err error) {
188+
output, err := sdApi.awsFacade.CreateService(ctx, &sd.CreateServiceInput{
189+
NamespaceId: &nsId,
190+
Name: &svcName})
191+
192+
if err != nil {
193+
return "", err
194+
}
195+
196+
svcId = aws.ToString(output.Service.Id)
197+
sdApi.log.Info("service created", "svcId", svcId)
198+
return svcId, nil
199+
}
200+
201+
func (sdApi *serviceDiscoveryApi) RegisterInstance(ctx context.Context, svcId string, instId string, instAttrs map[string]string) (opId string, err error) {
202+
regResp, err := sdApi.awsFacade.RegisterInstance(ctx, &sd.RegisterInstanceInput{
203+
Attributes: instAttrs,
204+
InstanceId: &instId,
205+
ServiceId: &svcId,
206+
})
207+
208+
if err != nil {
209+
return "", err
210+
}
211+
212+
return aws.ToString(regResp.OperationId), nil
213+
}
214+
215+
func (sdApi *serviceDiscoveryApi) DeregisterInstance(ctx context.Context, svcId string, instId string) (opId string, err error) {
216+
deregResp, err := sdApi.awsFacade.DeregisterInstance(ctx, &sd.DeregisterInstanceInput{
217+
InstanceId: &instId,
218+
ServiceId: &svcId,
219+
})
220+
221+
if err != nil {
222+
return "", err
223+
}
224+
225+
return aws.ToString(deregResp.OperationId), err
226+
227+
}
228+
229+
func (sdApi *serviceDiscoveryApi) PollCreateNamespace(ctx context.Context, opId string) (nsId string, err error) {
230+
return nsId, wait.Poll(defaultOperationPollInterval, defaultOperationPollTimeout, func() (done bool, pollErr error) {
231+
sdApi.log.Info("polling operation", "opId", opId)
232+
op, opErr := sdApi.GetOperation(ctx, opId)
233+
234+
if opErr != nil {
235+
return true, opErr
236+
}
237+
238+
if op.Status == types.OperationStatusFail {
239+
return true, fmt.Errorf("failed to create namespace: %s", aws.ToString(op.ErrorMessage))
240+
}
241+
242+
if op.Status == types.OperationStatusSuccess {
243+
nsId = op.Targets[string(types.OperationTargetTypeNamespace)]
244+
sdApi.log.Info("namespace created", "nsId", nsId)
245+
return true, nil
246+
}
247+
248+
return false, nil
249+
})
250+
}

pkg/cloudmap/aws_facade.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package cloudmap
2+
3+
import (
4+
"context"
5+
"github.com/aws/aws-sdk-go-v2/aws"
6+
sd "github.com/aws/aws-sdk-go-v2/service/servicediscovery"
7+
)
8+
9+
// AwsFacade wraps the minimal surface area of ServiceDiscovery API calls for the AWS SDK
10+
// required by the AWS Cloud Map client. This enables mock generation for unit testing.
11+
type AwsFacade interface {
12+
// ListNamespaces provides ServiceDiscovery ListNamespaces wrapper interface for paginator.
13+
ListNamespaces(context.Context, *sd.ListNamespacesInput, ...func(*sd.Options)) (*sd.ListNamespacesOutput, error)
14+
15+
// ListServices provides ServiceDiscovery ListServices wrapper interface for paginator.
16+
ListServices(context.Context, *sd.ListServicesInput, ...func(options *sd.Options)) (*sd.ListServicesOutput, error)
17+
18+
// ListInstances provides ServiceDiscovery ListInstances wrapper interface for paginator.
19+
ListInstances(context.Context, *sd.ListInstancesInput, ...func(*sd.Options)) (*sd.ListInstancesOutput, error)
20+
21+
// ListOperations provides ServiceDiscovery ListOperations wrapper interface for paginator.
22+
ListOperations(context.Context, *sd.ListOperationsInput, ...func(*sd.Options)) (*sd.ListOperationsOutput, error)
23+
24+
// GetOperation provides ServiceDiscovery GetOperation wrapper interface.
25+
GetOperation(context.Context, *sd.GetOperationInput, ...func(*sd.Options)) (*sd.GetOperationOutput, error)
26+
27+
// CreateHttpNamespace provides ServiceDiscovery CreateHttpNamespace wrapper interface.
28+
CreateHttpNamespace(context.Context, *sd.CreateHttpNamespaceInput, ...func(*sd.Options)) (*sd.CreateHttpNamespaceOutput, error)
29+
30+
// CreateService provides ServiceDiscovery CreateService wrapper interface.
31+
CreateService(context.Context, *sd.CreateServiceInput, ...func(*sd.Options)) (*sd.CreateServiceOutput, error)
32+
33+
// RegisterInstance provides ServiceDiscovery RegisterInstance wrapper interface.
34+
RegisterInstance(context.Context, *sd.RegisterInstanceInput, ...func(*sd.Options)) (*sd.RegisterInstanceOutput, error)
35+
36+
// DeregisterInstance provides ServiceDiscovery DeregisterInstance wrapper interface.
37+
DeregisterInstance(context.Context, *sd.DeregisterInstanceInput, ...func(*sd.Options)) (*sd.DeregisterInstanceOutput, error)
38+
}
39+
40+
type awsFacade struct {
41+
*sd.Client
42+
}
43+
44+
// NewAwsFacadeFromConfig creates a new AWS facade from an AWS client config.
45+
func NewAwsFacadeFromConfig(cfg *aws.Config) AwsFacade {
46+
return &awsFacade{sd.NewFromConfig(*cfg)}
47+
}

0 commit comments

Comments
 (0)