Skip to content

Commit fd6bf33

Browse files
committed
Merge branch feature/add_http_sub_example into release/1.0.6
Title: Go SDK 1.0.6 版本 本次代码评审主要涉及更新了消息队列订阅逻辑,增加了HTTP订阅示例,调整了错误处理和代码结构,优化了注释和版本记录,同时放宽了消息保留周期的检查,移除了对最大消息大小的硬性限制,并新增了HTTP授权验证的示例代码。 Link: https://code.alibaba-inc.com/messaging/aliyun-mns-go-sdk/codereview/19011184
2 parents be479b7 + dbd95f7 commit fd6bf33

File tree

8 files changed

+199
-32
lines changed

8 files changed

+199
-32
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Change log
22

3+
## 1.0.6
4+
- Added an example of HTTP endpoint subscription in `topic_example.go`.
5+
- Added an example of HTTP authorization in `http_authorization.go`.
6+
- Removed the check for message body size to allow for larger messages.
7+
38
## 1.0.5
49
- update the minimum Go version declared in go.mod to fix build failures.
510

README-CN.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Aliyun MNS Go SDK
22

3-
[![Github version](https://badgen.net/badge/color/1.0.5/green?label=version)](https://badgen.net/badge/color/1.0.5/green?label=version)
3+
[![Github version](https://badgen.net/badge/color/1.0.6/green?label=version)](https://badgen.net/badge/color/1.0.6/green?label=version)
44

55
Aliyun MNS Go SDK 是 MNS 在 Go 编译语言的官方 SDK
66

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Aliyun MNS Go SDK
22

3-
[![Github version](https://badgen.net/badge/color/1.0.5/green?label=version)](https://badgen.net/badge/color/1.0.5/green?label=version)
3+
[![Github version](https://badgen.net/badge/color/1.0.6/green?label=version)](https://badgen.net/badge/color/1.0.6/green?label=version)
44

55
The Aliyun MNS Go SDK is the official SDK for MNS in the Go programming language
66

errors.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var (
7272
ERR_MNS_MAX_MESSAGE_SIZE_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 128, "max message size is not in range of (1024~65536)")
7373
ERR_MNS_MSG_RETENTION_PERIOD_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 129, "message retention period is not in range of (60~129600)")
7474
ERR_MNS_MSG_VISIBILITY_TIMEOUT_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 130, "message visibility timeout is not in range of (1~43200)")
75-
ERR_MNS_MSG_POOLLING_WAIT_SECONDS_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 131, "message poolling wait seconds is not in range of (0~30)")
75+
ERR_MNS_MSG_POOLLING_WAIT_SECONDS_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 131, "message polling wait seconds is not in range of (0~30)")
7676
ERR_MNS_RET_NUMBER_RANGE_ERROR = errors.TN(ALI_MNS_ERR_NS, 132, "list param of ret number is not in range of (1~1000)")
7777
ERR_MNS_QUEUE_ALREADY_EXIST_AND_HAVE_SAME_ATTR = errors.TN(ALI_MNS_ERR_NS, 133, "mns queue already exist, and the attribute is the same, queue name: {{.name}}")
7878
ERR_MNS_BATCH_OP_FAIL = errors.TN(ALI_MNS_ERR_NS, 136, "mns queue batch operation fail")

example/http_authorization.go

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package main
2+
3+
import (
4+
"crypto"
5+
"crypto/rsa"
6+
"crypto/sha1"
7+
"crypto/x509"
8+
"encoding/base64"
9+
"encoding/pem"
10+
"fmt"
11+
"io/ioutil"
12+
"net/http"
13+
"sort"
14+
"strings"
15+
)
16+
17+
func main() {
18+
headers := map[string]string{
19+
"Connection": "keep-alive",
20+
"x-mns-message-id": "AC10020B0019681A95159E591D10****",
21+
"Content-Type": "text/plain;charset=utf-8",
22+
"Content-md5": "5B4682CCA07FFB080FFE0A9D9821****",
23+
"x-mns-topic-owner": "****",
24+
"x-mns-topic-name": "HTTP-test",
25+
"x-mns-subscriber": "****",
26+
"x-mns-subscription-name": "sub-test-header",
27+
"x-mns-publish-time": "1730368640272",
28+
"x-mns-request-id": "672354803035301900003***",
29+
"Date": "Thu, 31 Oct 2024 09:57:20 GMT",
30+
"x-mns-version": "2015-06-06",
31+
"User-Agent": "Aliyun Notification Service Agent",
32+
"x-mns-signing-cert-url": "aHR0cHM6Ly9tbnN0ZXN0Lm9zcy1jbi1oYW5nemhvdS5hbGl5dW5****",
33+
"Authorization": "pg5Prc+ADujqjHbK1XKMK+o+aZjtkAntpR19s2B0T1k1deilZ5UgUFoIsKmLbgirN+1m2srdh****",
34+
"Host": "****",
35+
"Content-length": "40",
36+
"Accept": "*/*",
37+
}
38+
39+
method := "POST"
40+
path := "/api/test"
41+
if authenticateWithHeaderMap(method, path, toLowercaseKeys(headers)) {
42+
fmt.Println("Signature verification succeeded")
43+
} else {
44+
fmt.Println("Signature verification failed")
45+
}
46+
}
47+
48+
func authenticateWithResponse(method, path string, resp *http.Response) bool {
49+
headersMap := headersToMap(resp)
50+
return authenticateWithHeaderMap(method, path, headersMap)
51+
}
52+
53+
func authenticateWithHeaderMap(method, path string, headers map[string]string) bool {
54+
// Get string to sign
55+
var serviceHeaders []string
56+
for k, v := range headers {
57+
if strings.HasPrefix(strings.ToLower(k), "x-mns-") {
58+
serviceHeaders = append(serviceHeaders, fmt.Sprintf("%s:%s", k, v))
59+
}
60+
}
61+
62+
sort.Strings(serviceHeaders)
63+
serviceStr := strings.Join(serviceHeaders, "\n")
64+
var signHeaderList []string
65+
for _, key := range []string{"content-md5", "content-type", "date"} {
66+
if val, ok := headers[key]; ok {
67+
signHeaderList = append(signHeaderList, val)
68+
} else {
69+
signHeaderList = append(signHeaderList, "")
70+
}
71+
}
72+
73+
str2sign := fmt.Sprintf("%s\n%s\n%s\n%s", method, strings.Join(signHeaderList, "\n"), serviceStr, path)
74+
fmt.Println("String to sign:", str2sign)
75+
76+
// 获取证书的URL
77+
certURLBase64, ok := headers["x-mns-signing-cert-url"]
78+
if !ok {
79+
fmt.Println("x-mns-signing-cert-url Header not found")
80+
return false
81+
}
82+
83+
certURLBytes, err := base64.StdEncoding.DecodeString(certURLBase64)
84+
if err != nil {
85+
fmt.Println("Failed to decode base64 cert URL:", err)
86+
return false
87+
}
88+
89+
certURL := string(certURLBytes)
90+
fmt.Println("x-mns-signing-cert-url:\t", certURL)
91+
92+
// 根据URL获取证书,并从证书中获取公钥
93+
resp, err := http.Get(certURL)
94+
if err != nil {
95+
fmt.Println("Failed to fetch certificate:", err)
96+
return false
97+
}
98+
//goland:noinspection GoUnhandledErrorResult
99+
defer resp.Body.Close()
100+
101+
//goland:noinspection GoDeprecation
102+
certData, err := ioutil.ReadAll(resp.Body)
103+
if err != nil {
104+
fmt.Println("Failed to read certificate:", err)
105+
return false
106+
}
107+
108+
block, _ := pem.Decode(certData)
109+
if block == nil || block.Type != "CERTIFICATE" {
110+
fmt.Println("Failed to decode PEM block containing the certificate")
111+
return false
112+
}
113+
114+
cert, err := x509.ParseCertificate(block.Bytes)
115+
if err != nil {
116+
fmt.Println("Failed to parse certificate:", err)
117+
return false
118+
}
119+
120+
pubKey, ok := cert.PublicKey.(*rsa.PublicKey)
121+
if !ok {
122+
fmt.Println("Failed to cast public key to RSA public key")
123+
return false
124+
}
125+
126+
// 对Authorization字段做Base64解码
127+
signatureBase64, ok := headers["authorization"]
128+
if !ok {
129+
fmt.Println("Authorization Header not found")
130+
return false
131+
}
132+
133+
signature, err := base64.StdEncoding.DecodeString(signatureBase64)
134+
if err != nil {
135+
fmt.Println("Failed to decode base64 signature:", err)
136+
return false
137+
}
138+
139+
// 认证
140+
hash := sha1.New()
141+
hash.Write([]byte(str2sign))
142+
digest := hash.Sum(nil)
143+
err = rsa.VerifyPKCS1v15(pubKey, crypto.SHA1, digest, signature)
144+
if err != nil {
145+
fmt.Println("Signature verification failed:", err)
146+
return false
147+
}
148+
149+
return true
150+
}
151+
152+
func headersToMap(resp *http.Response) map[string]string {
153+
headersMap := make(map[string]string)
154+
for key, values := range resp.Header {
155+
// 连接多个值为一个字符串,使用逗号分隔
156+
// map 键值全小写
157+
lowercaseKey := strings.ToLower(key)
158+
headersMap[lowercaseKey] = strings.Join(values, ",")
159+
}
160+
return headersMap
161+
}
162+
163+
// HTTP header 不区分大小写,将 map 的 keys 转换为全小写
164+
func toLowercaseKeys(input map[string]string) map[string]string {
165+
output := make(map[string]string)
166+
for k, v := range input {
167+
lowercaseKey := strings.ToLower(k)
168+
output[lowercaseKey] = v
169+
}
170+
return output
171+
}

example/topic_example.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ func main() {
1313
endpoint := "http://xxx.mns.cn-hangzhou.aliyuncs.com"
1414
queueName := "test-queue"
1515
topicName := "test-topic"
16-
subName := "test-sub"
16+
queueSubName := "test-sub-queue"
17+
httpSubName := "test-sub-http"
1718
client := ali_mns.NewClient(endpoint)
1819

1920
// 1. create a queue for receiving pushed messages
@@ -33,15 +34,26 @@ func main() {
3334
return
3435
}
3536

36-
// 3. subscribe to topic, the endpoint is set to be a queue in this example
3737
topic := ali_mns.NewMNSTopic(topicName, client)
38-
sub := ali_mns.MessageSubsribeRequest{
38+
// 3. subscribe to topic, the endpoint is queue
39+
queueSub := ali_mns.MessageSubsribeRequest{
3940
Endpoint: topic.GenerateQueueEndpoint(queueName),
4041
NotifyContentFormat: ali_mns.SIMPLIFIED,
4142
}
4243

43-
// topic.Unsubscribe("SubscriptionNameA")
44-
err = topic.Subscribe(subName, sub)
44+
// 4. subscribe to topic, the endpoint is HTTP(S)
45+
httpSub := ali_mns.MessageSubsribeRequest{
46+
Endpoint: "http://www.baidu.com",
47+
NotifyContentFormat: ali_mns.SIMPLIFIED,
48+
}
49+
50+
err = topic.Subscribe(queueSubName, queueSub)
51+
if err != nil && !ali_mns.ERR_MNS_SUBSCRIPTION_ALREADY_EXIST_AND_HAVE_SAME_ATTR.IsEqual(err) {
52+
fmt.Println(err)
53+
return
54+
}
55+
56+
err = topic.Subscribe(httpSubName, httpSub)
4557
if err != nil && !ali_mns.ERR_MNS_SUBSCRIPTION_ALREADY_EXIST_AND_HAVE_SAME_ATTR.IsEqual(err) {
4658
fmt.Println(err)
4759
return
@@ -65,7 +77,7 @@ func main() {
6577

6678
time.Sleep(time.Duration(2) * time.Second)
6779

68-
// 4. now publish message
80+
// 5. now publish message
6981
msg := ali_mns.MessagePublishRequest{
7082
MessageBody: "hello topic <\"aliyun-mns-go-sdk\">",
7183
MessageAttributes: &ali_mns.MessageAttributes{
@@ -81,7 +93,7 @@ func main() {
8193
return
8294
}
8395

84-
// 5. receive the message from queue
96+
// 6. receive the message from queue
8597
queue := ali_mns.NewMNSQueue(queueName, client)
8698
endChan := make(chan int)
8799
respChan := make(chan ali_mns.MessageReceiveResponse)

queue_manager.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,6 @@ func checkDelaySeconds(seconds int32) (err error) {
4040
return
4141
}
4242

43-
func checkMaxMessageSize(maxSize int32) (err error) {
44-
if maxSize < 1024 || maxSize > 262144 {
45-
err = ERR_MNS_MAX_MESSAGE_SIZE_RANGE_ERROR.New()
46-
return
47-
}
48-
return
49-
}
50-
5143
func checkMessageRetentionPeriod(retentionPeriod int32) (err error) {
5244
if retentionPeriod < 60 || retentionPeriod > 1296000 {
5345
err = ERR_MNS_MSG_RETENTION_PERIOD_RANGE_ERROR.New()
@@ -79,13 +71,10 @@ func NewMNSQueueManager(client MNSClient) AliQueueManager {
7971
}
8072
}
8173

82-
func checkAttributes(delaySeconds int32, maxMessageSize int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error) {
74+
func checkAttributes(delaySeconds int32, messageRetentionPeriod int32, visibilityTimeout int32, pollingWaitSeconds int32) (err error) {
8375
if err = checkDelaySeconds(delaySeconds); err != nil {
8476
return
8577
}
86-
if err = checkMaxMessageSize(maxMessageSize); err != nil {
87-
return
88-
}
8978
if err = checkMessageRetentionPeriod(messageRetentionPeriod); err != nil {
9079
return
9180
}
@@ -110,7 +99,6 @@ func (p *MNSQueueManager) CreateQueue(queueName string, delaySeconds int32, maxM
11099
}
111100

112101
if err = checkAttributes(delaySeconds,
113-
maxMessageSize,
114102
messageRetentionPeriod,
115103
visibilityTimeout,
116104
pollingWaitSeconds); err != nil {
@@ -144,7 +132,6 @@ func (p *MNSQueueManager) SetQueueAttributes(queueName string, delaySeconds int3
144132
}
145133

146134
if err = checkAttributes(delaySeconds,
147-
maxMessageSize,
148135
messageRetentionPeriod,
149136
visibilityTimeout,
150137
pollingWaitSeconds); err != nil {

topic_manager.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ func (p *MNSTopicManager) CreateTopic(topicName string, maxMessageSize int32, lo
5050
return
5151
}
5252

53-
if err = checkMaxMessageSize(maxMessageSize); err != nil {
54-
return
55-
}
56-
5753
message := CreateTopicRequest{
5854
MaxMessageSize: maxMessageSize,
5955
LoggingEnabled: loggingEnabled,
@@ -77,10 +73,6 @@ func (p *MNSTopicManager) SetTopicAttributes(topicName string, maxMessageSize in
7773
return
7874
}
7975

80-
if err = checkMaxMessageSize(maxMessageSize); err != nil {
81-
return
82-
}
83-
8476
message := CreateTopicRequest{
8577
MaxMessageSize: maxMessageSize,
8678
LoggingEnabled: loggingEnabled,

0 commit comments

Comments
 (0)