Skip to content

Commit a9f9f67

Browse files
committed
feat(PubSub): Add go-kit SQS transport functions
1 parent fda81ed commit a9f9f67

File tree

6 files changed

+1268
-0
lines changed

6 files changed

+1268
-0
lines changed

pkg/transport/sqs/encode_decode.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package sqs
2+
3+
import (
4+
"context"
5+
6+
"github.com/aws/aws-sdk-go-v2/service/sqs"
7+
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
8+
)
9+
10+
// DecodeRequestFunc extracts a user-domain request object from
11+
// an SQS message object. It is designed to be used in Subscribers.
12+
type DecodeRequestFunc func(context.Context, types.Message) (request interface{}, err error)
13+
14+
// EncodeRequestFunc encodes the passed payload object into
15+
// an SQS message object. It is designed to be used in Publishers.
16+
type EncodeRequestFunc func(context.Context, *sqs.SendMessageInput, interface{}) error
17+
18+
// EncodeResponseFunc encodes the passed response object to
19+
// an SQS message object. It is designed to be used in Subscribers.
20+
type EncodeResponseFunc func(context.Context, *sqs.SendMessageInput, interface{}) error
21+
22+
// DecodeResponseFunc extracts a user-domain response object from
23+
// an SQS message object. It is designed to be used in Publishers.
24+
type DecodeResponseFunc func(context.Context, types.Message) (response interface{}, err error)

pkg/transport/sqs/publisher.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package sqs
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/aws/aws-sdk-go-v2/aws"
8+
"github.com/aws/aws-sdk-go-v2/service/sqs"
9+
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
10+
"github.com/go-kit/kit/endpoint"
11+
)
12+
13+
type contextKey int
14+
15+
const (
16+
// ContextKeyResponseQueueURL is the context key that allows fetching
17+
// and setting the response queue URL from and into context.
18+
ContextKeyResponseQueueURL contextKey = iota
19+
)
20+
21+
type (
22+
SQSPublisher interface {
23+
Publish(ctx context.Context, message *sqs.SendMessageInput) (*sqs.SendMessageOutput, error)
24+
}
25+
26+
// Publisher wraps an Publisher client, and provides a method that
27+
// implements endpoint.Endpoint.
28+
Publisher struct {
29+
Handler SQSPublisher
30+
queueURL string
31+
enc EncodeRequestFunc
32+
dec DecodeResponseFunc
33+
before []PublisherRequestFunc
34+
after []PublisherResponseFunc
35+
}
36+
)
37+
38+
// NewPublisher constructs a usable Publisher for a single remote method.
39+
func NewPublisher(
40+
handler SQSPublisher,
41+
queueURL string,
42+
enc EncodeRequestFunc,
43+
dec DecodeResponseFunc,
44+
options ...PublisherOption,
45+
) *Publisher {
46+
p := &Publisher{
47+
Handler: handler,
48+
queueURL: queueURL,
49+
enc: enc,
50+
dec: dec,
51+
}
52+
for _, option := range options {
53+
option(p)
54+
}
55+
return p
56+
}
57+
58+
// PublisherOption sets an optional parameter for clients.
59+
type PublisherOption func(*Publisher)
60+
61+
// PublisherBefore sets the RequestFuncs that are applied to the outgoing SQS
62+
// request before it's invoked.
63+
func PublisherBefore(before ...PublisherRequestFunc) PublisherOption {
64+
return func(p *Publisher) { p.before = append(p.before, before...) }
65+
}
66+
67+
// PublisherAfter sets the ClientResponseFuncs applied to the incoming SQS
68+
// request prior to it being decoded. This is useful for obtaining the response
69+
// and adding any information onto the context prior to decoding.
70+
func PublisherAfter(after ...PublisherResponseFunc) PublisherOption {
71+
return func(p *Publisher) { p.after = append(p.after, after...) }
72+
}
73+
74+
// SetPublisherResponseQueueURL can be used as a before function to add
75+
// provided url as responseQueueURL in context.
76+
func SetPublisherResponseQueueURL(url string) PublisherRequestFunc {
77+
return func(ctx context.Context, _ *sqs.SendMessageInput) context.Context {
78+
return context.WithValue(ctx, ContextKeyResponseQueueURL, url)
79+
}
80+
}
81+
82+
// Endpoint returns a usable endpoint that invokes the remote endpoint.
83+
func (p Publisher) Endpoint() endpoint.Endpoint {
84+
return func(ctx context.Context, request interface{}) (interface{}, error) {
85+
msgInput := sqs.SendMessageInput{
86+
QueueUrl: &p.queueURL,
87+
}
88+
if err := p.enc(ctx, &msgInput, request); err != nil {
89+
return nil, err
90+
}
91+
92+
for _, f := range p.before {
93+
ctx = f(ctx, &msgInput)
94+
}
95+
96+
output, err := p.Handler.Publish(ctx, &msgInput)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
var responseMsg types.Message
102+
for _, f := range p.after {
103+
ctx, responseMsg, err = f(ctx, p.Handler, output)
104+
if err != nil {
105+
return nil, err
106+
}
107+
}
108+
109+
response, err := p.dec(ctx, responseMsg)
110+
if err != nil {
111+
return nil, err
112+
}
113+
114+
return response, nil
115+
}
116+
}
117+
118+
// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
119+
// JSON object and loads it as the MessageBody of the sqs.SendMessageInput.
120+
// This can be enough for most JSON over SQS communications.
121+
func EncodeJSONRequest(_ context.Context, msg *sqs.SendMessageInput, request interface{}) error {
122+
b, err := json.Marshal(request)
123+
if err != nil {
124+
return err
125+
}
126+
127+
msg.MessageBody = aws.String(string(b))
128+
129+
return nil
130+
}
131+
132+
// NoResponseDecode is a DecodeResponseFunc that can be used when no response is needed.
133+
// It returns nil value and nil error.
134+
func NoResponseDecode(_ context.Context, _ types.Message) (interface{}, error) {
135+
return nil, nil
136+
}

0 commit comments

Comments
 (0)