55 "fmt"
66 "github.com/streadway/amqp"
77 "io/ioutil"
8+ "encoding/json"
89 "os"
910 "path"
1011)
1415 queue = flag .String ("queue" , "" , "Ephemeral AMQP queue name" )
1516 maxMessages = flag .Uint ("max-messages" , 1000 , "Maximum number of messages to dump" )
1617 outputDir = flag .String ("output-dir" , "." , "Directory in which to save the dumped messages" )
18+ full = flag .Bool ("full" , false , "Dump the message, its properties and headers" )
1719 verbose = flag .Bool ("verbose" , false , "Print progress" )
1820)
1921
@@ -27,8 +29,6 @@ func main() {
2729}
2830
2931func DumpMessagesFromQueue (amqpURI string , queueName string , maxMessages uint , outputDir string ) error {
30- var err error
31-
3232 if queueName == "" {
3333 return fmt .Errorf ("Must supply queue name" )
3434 }
@@ -55,23 +55,89 @@ func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, o
5555 false , // autoAck
5656 )
5757 if err != nil {
58- return fmt .Errorf ("Queue Get : %s" , err )
58+ return fmt .Errorf ("Queue get : %s" , err )
5959 }
60- if ok {
61- SaveMessageToFile (msg .Body , outputDir , messagesReceived )
62- } else {
60+
61+ if ! ok {
6362 VerboseLog ("No more messages in queue" )
6463 break
6564 }
65+
66+ err = SaveMessageToFile (msg .Body , outputDir , messagesReceived )
67+ if err != nil {
68+ return fmt .Errorf ("Save message: %s" , err )
69+ }
70+
71+ if * full {
72+ err = SavePropsAndHeadersToFile (msg , outputDir , messagesReceived )
73+ if err != nil {
74+ return fmt .Errorf ("Save props and headers: %s" , err )
75+ }
76+ }
6677 }
6778
6879 return nil
6980}
7081
71- func SaveMessageToFile (body []byte , outputDir string , counter uint ) {
82+ func SaveMessageToFile (body []byte , outputDir string , counter uint ) error {
7283 filePath := GenerateFilePath (outputDir , counter )
73- ioutil .WriteFile (filePath , body , 0644 )
84+ err := ioutil .WriteFile (filePath , body , 0644 )
85+ if err != nil {
86+ return err
87+ }
88+
7489 fmt .Println (filePath )
90+
91+ return nil
92+ }
93+
94+ func GetProperties (msg amqp.Delivery ) map [string ]interface {} {
95+ props := map [string ]interface {}{
96+ "app_id" : msg .AppId ,
97+ "content_encoding" : msg .ContentEncoding ,
98+ "content_type" : msg .ContentType ,
99+ "correlation_id" : msg .CorrelationId ,
100+ "delivery_mode" : msg .DeliveryMode ,
101+ "expiration" : msg .Expiration ,
102+ "message_id" : msg .MessageId ,
103+ "priority" : msg .Priority ,
104+ "reply_to" : msg .ReplyTo ,
105+ "type" : msg .Type ,
106+ "user_id" : msg .UserId ,
107+ }
108+
109+ if ! msg .Timestamp .IsZero () {
110+ props ["timestamp" ] = msg .Timestamp .String ()
111+ }
112+
113+ for k , v := range props {
114+ if v == "" {
115+ delete (props , k )
116+ }
117+ }
118+
119+ return props
120+ }
121+
122+ func SavePropsAndHeadersToFile (msg amqp.Delivery , outputDir string , counter uint ) error {
123+ extras := make (map [string ]interface {})
124+ extras ["properties" ] = GetProperties (msg )
125+ extras ["headers" ] = msg .Headers
126+
127+ data , err := json .MarshalIndent (extras , "" , " " )
128+ if err != nil {
129+ return err
130+ }
131+
132+ filePath := GenerateFilePath (outputDir , counter ) + "-headers+properties.json"
133+ err = ioutil .WriteFile (filePath , data , 0644 )
134+ if err != nil {
135+ return err
136+ }
137+
138+ fmt .Println (filePath )
139+
140+ return nil
75141}
76142
77143func GenerateFilePath (outputDir string , counter uint ) string {
0 commit comments