@@ -3,84 +3,25 @@ package main
33import (
44 "bytes"
55 "context"
6- "encoding/base64"
7- "encoding/json"
86 "flag"
97 "fmt"
10- "io"
118 "log"
12- "net/http"
139 "os"
14- "strings"
1510
1611 "cloud.google.com/go/pubsub"
12+ push "github.com/klassmann/pubsub-push"
1713)
1814
1915const (
2016 credentialsVarName string = "GOOGLE_APPLICATION_CREDENTIALS"
2117 messageMimetype string = "application/json"
2218)
2319
24- type headers []string
25-
26- func (h * headers ) String () string {
27- b := strings.Builder {}
28-
29- for _ , v := range * h {
30- b .WriteString (v )
31- }
32-
33- return b .String ()
34- }
35-
36- func (h * headers ) Set (value string ) error {
37- * h = append (* h , value )
38- return nil
39- }
40-
41- func (h * headers ) applyHeaders (ht * http.Request ) {
42- for _ , v := range * h {
43- parts := strings .Split (v , "=" )
44-
45- if len (parts ) == 2 {
46- ht .Header .Set (parts [0 ], parts [1 ])
47- } else if len (parts ) == 1 {
48- ht .Header .Set (parts [0 ], "" )
49- }
50- }
51- }
52-
5320type settings struct {
5421 ProjectID string
5522 Subscription string
5623 Endpoint string
57- Headers headers
58- }
59-
60- type message struct {
61- MessageID string `json:"messageId"`
62- Data string `json:"data"`
63- Attributes map [string ]string `json:"attributes"`
64- }
65-
66- type request struct {
67- Message message `json:"message"`
68- }
69-
70- func encodeMessage (m * pubsub.Message ) ([]byte , int ) {
71- data := m .Data
72- req := request {}
73- req .Message .Data = base64 .StdEncoding .EncodeToString (data )
74- req .Message .Attributes = m .Attributes
75- req .Message .MessageID = m .ID
76- b , err := json .Marshal (req )
77-
78- if err != nil {
79- log .Fatal (err )
80- return []byte {}, 0
81- }
82-
83- return b , len (b )
24+ Headers push.Headers
8425}
8526
8627func getArguments () * settings {
@@ -100,17 +41,6 @@ func getArguments() *settings {
10041 return & s
10142}
10243
103- func post (url string , contentType string , body io.Reader , h * headers ) (* http.Response , error ) {
104- req , err := http .NewRequest ("POST" , url , body )
105- if err != nil {
106- log .Fatalf ("I was not possible to create a new request: %v\n " , err )
107- return nil , err
108- }
109- req .Header .Set ("Content-type" , contentType )
110- h .applyHeaders (req )
111- return http .DefaultClient .Do (req )
112- }
113-
11444func main () {
11545 settings := getArguments ()
11646
@@ -131,10 +61,10 @@ func main() {
13161 fmt .Printf ("Listening subscription %s:\n " , settings .Subscription )
13262 sub := client .Subscription (settings .Subscription )
13363 err = sub .Receive (ctx , func (ctx context.Context , m * pubsub.Message ) {
134- b , size := encodeMessage (m )
64+ b , size := push . EncodeMessage (m )
13565 buff := bytes .NewBuffer (b )
13666
137- resp , err := post (settings .Endpoint , messageMimetype , buff , & settings .Headers ) //http.Post(settings.Endpoint, messageMimetype, buff )
67+ resp , err := push . PostMessage (settings .Endpoint , messageMimetype , buff , & settings .Headers )
13868
13969 if err != nil {
14070 log .Fatalf ("Error on send message to endpoint: %v\n " , err )
0 commit comments