Skip to content

Commit c3bebb2

Browse files
committed
Added ability to ack specific messages
1 parent c615543 commit c3bebb2

File tree

6 files changed

+223
-132
lines changed

6 files changed

+223
-132
lines changed

README.md

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,17 @@
22

33
Dump messages from a RabbitMQ queue to files, without affecting the queue.
44

5-
## Installation
6-
7-
### Download a release
5+
This work is derivated from https://github.com/dubek/rabbitmq-dump-queue
86

9-
Precompiled binary packages can be found on the
10-
[releases](https://github.com/dubek/rabbitmq-dump-queue/releases) page.
7+
## Installation
118

129
### Compile from source
1310

1411
If you have [Go](https://golang.org/doc/install) installed, you can install
1512
rabbitmq-dump-queue from source by running:
1613

1714
```
18-
go get github.com/dubek/rabbitmq-dump-queue
15+
go get github.com/luneo7/rabbitmq-dump-queue
1916
```
2017

2118
The `rabbitmq-dump-queue` executable will be created in the `$GOPATH/bin`
@@ -28,41 +25,57 @@ To dump the first 50 messages of queue `incoming_1` to `/tmp`:
2825

2926
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp
3027

31-
This will create the files `/tmp/msg-0000`, `/tmp/msg-0001`, and so on.
28+
This will create the files `/tmp/msg-0000.json`, `/tmp/msg-0001.json`, and so on.
3229

3330
The output filenames are printed one per line to the standard output; this
3431
allows piping the output of rabbitmq-dump-queue to `xargs` or similar utilities
3532
in order to perform further processing on each message (e.g. decompressing,
3633
decoding, etc.).
3734

38-
To include the AMQP headers and properties in the output, add the `-full`
39-
option to the command-line. This will create the following files:
35+
By default, it will not acknowledge messages, so they will be requeued.
36+
Acknowledging messages using the `-ack=true` switch will *remove* them from the
37+
queue, allowing the user to process new messages (see implementation details).
4038

41-
/tmp/msg-0000
42-
/tmp/msg-0000-headers+properties.json
43-
/tmp/msg-0001
44-
/tmp/msg-0001-headers+properties.json
45-
...
39+
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp -ack=true
4640

47-
The JSON files have the following structure:
41+
If you want to acknowledge a list of specific messages you can create a JSON file
42+
with the following structure
4843

4944
{
50-
"headers": {
51-
"x-my-private-header": "my-value"
52-
},
53-
"properties": {
54-
"correlation_id": "XYZ-9876",
55-
"delivery_mode": 0,
56-
"priority": 5
57-
}
45+
"messages": [
46+
{
47+
"routingKey" : "testEvent",
48+
"containsString" : "[email protected]",
49+
"contains" : [
50+
{
51+
"key": "fieldKeyInContent1",
52+
"value": "fieldValueInContent"
53+
},
54+
{
55+
"key": "fieldKeyInContent2",
56+
"value": 12345
57+
},
58+
{
59+
"key": "fieldKeyInContent3",
60+
"value": true
61+
}
62+
]
63+
}
64+
]
5865
}
5966

67+
In the above JSON `routingKey` is mandatory, `containsString` and `contains`
68+
are optional. So you can have only `containsString` or `contains` with the
69+
proper values. To use the acknowledge json file you should use the
70+
`-messages-to-ack` parameter:
6071

61-
By default, it will not acknowledge messages, so they will be requeued.
62-
Acknowledging messages using the `-ack=true` switch will *remove* them from the
63-
queue, allowing the user to process new messages (see implementation details).
72+
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp -messages-to-ack=/tmp/ackmessages.json
6473

65-
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp -ack=true
74+
75+
`rabbitmq-dump-queue` is expecting a JSON response in the content of the message
76+
if it's not the case you can disable it with `json-content` paramater
77+
78+
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp -json-content=false
6679

6780
Running `rabbitmq-dump-queue -help` will list the available command-line
6881
options.

ackmessageexample1.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"messages": [
3+
{
4+
"routingKey" : "test",
5+
"containsString" : "[email protected]"
6+
}
7+
]
8+
}

ackmessageexample2.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
{
2+
"messages": [
3+
{
4+
"routingKey" : "test",
5+
"contains" : [
6+
{
7+
"key": "id",
8+
"value": "6276b5b4-81d5-4e04-b58f-f1afd15a9c5d"
9+
}
10+
]
11+
}
12+
]
13+
}

ackmessageexample3.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"messages": [
3+
{
4+
"routingKey" : "test",
5+
"containsString" : "[email protected]",
6+
"contains" : [
7+
{
8+
"key": "id",
9+
"value": "6276b5b4-81d5-4e04-b58f-f1afd15a9c5d"
10+
},
11+
{
12+
"key": "checked",
13+
"value": true
14+
},
15+
{
16+
"key": "number",
17+
"value": 123456
18+
}
19+
]
20+
}
21+
]
22+
}

main.go

Lines changed: 114 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,47 @@ import (
88
"io/ioutil"
99
"os"
1010
"path"
11+
"strconv"
1112
"strings"
1213

1314
"github.com/streadway/amqp"
1415
)
1516

1617
var (
17-
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
18-
insecure_tls = flag.Bool("insecure-tls", false, "Insecure TLS mode: don't check certificates")
19-
queue = flag.String("queue", "", "AMQP queue name")
20-
ack = flag.Bool("ack", false, "Acknowledge messages")
21-
maxMessages = flag.Uint("max-messages", 1000, "Maximum number of messages to dump")
22-
outputDir = flag.String("output-dir", ".", "Directory in which to save the dumped messages")
23-
full = flag.Bool("full", false, "Dump the message, its properties and headers")
24-
verbose = flag.Bool("verbose", false, "Print progress")
18+
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
19+
insecure_tls = flag.Bool("insecure-tls", false, "Insecure TLS mode: don't check certificates")
20+
queue = flag.String("queue", "", "AMQP queue name")
21+
ack = flag.Bool("ack", false, "Acknowledge messages")
22+
maxMessages = flag.Uint("max-messages", 1000, "Maximum number of messages to dump")
23+
outputDir = flag.String("output-dir", ".", "Directory in which to save the dumped messages")
24+
jsonContent = flag.Bool("json-content", true, "If the content of the message is a JSON object")
25+
verbose = flag.Bool("verbose", false, "Print progress")
26+
messagesToAckFile = flag.String("messages-to-ack", "", "File with messages to ack")
2527
)
2628

29+
type Messages struct {
30+
Messages []Message `json:"messages"`
31+
}
32+
33+
type Message struct {
34+
RoutingKey string `json:"routingKey"`
35+
Contains []Contain `json:"contains"`
36+
ContainsString string `json:"containsString"`
37+
}
38+
39+
type Contain struct {
40+
Key string `json:"key"`
41+
Value interface{} `json:"value"`
42+
}
43+
2744
func main() {
2845
flag.Parse()
2946
if flag.NArg() > 0 {
3047
fmt.Fprintf(os.Stderr, "Error: Unused command line arguments detected.\n")
3148
flag.Usage()
3249
os.Exit(2)
3350
}
34-
err := DumpMessagesFromQueue(*uri, *queue, *maxMessages, *outputDir)
51+
err := DumpMessagesFromQueue(*uri, *queue, *maxMessages, *outputDir, *messagesToAckFile, *jsonContent)
3552
if err != nil {
3653
fmt.Fprintf(os.Stderr, "%s\n", err)
3754
os.Exit(1)
@@ -50,7 +67,28 @@ func dial(amqpURI string) (*amqp.Connection, error) {
5067
return conn, err
5168
}
5269

53-
func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, outputDir string) error {
70+
func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, outputDir string, messagesToAckFilePath string, isContentJSON bool) error {
71+
var messages Messages
72+
var ackMessage bool
73+
searchToAck := false
74+
75+
if messagesToAckFilePath != "" {
76+
jsonFile, err := os.Open(messagesToAckFilePath)
77+
if err != nil {
78+
return fmt.Errorf("Opening messages to ack file: %s", err)
79+
} else {
80+
byteValue, err := ioutil.ReadAll(jsonFile)
81+
if err != nil {
82+
return fmt.Errorf("Reding sessages to ack file: %s", err)
83+
}
84+
85+
json.Unmarshal(byteValue, &messages)
86+
jsonFile.Close()
87+
byteValue = nil
88+
searchToAck = true
89+
}
90+
}
91+
5492
if queueName == "" {
5593
return fmt.Errorf("Must supply queue name")
5694
}
@@ -72,8 +110,9 @@ func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, o
72110

73111
VerboseLog(fmt.Sprintf("Pulling messages from queue %q", queueName))
74112
for messagesReceived := uint(0); messagesReceived < maxMessages; messagesReceived++ {
113+
ackMessage = *ack
75114
msg, ok, err := channel.Get(queueName,
76-
*ack, // autoAck
115+
ackMessage, // autoAck
77116
)
78117
if err != nil {
79118
return fmt.Errorf("Queue get: %s", err)
@@ -84,31 +123,54 @@ func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, o
84123
break
85124
}
86125

87-
err = SaveMessageToFile(msg.Body, outputDir, messagesReceived)
88-
if err != nil {
89-
return fmt.Errorf("Save message: %s", err)
90-
}
91-
92-
if *full {
93-
err = SavePropsAndHeadersToFile(msg, outputDir, messagesReceived)
94-
if err != nil {
95-
return fmt.Errorf("Save props and headers: %s", err)
126+
if searchToAck {
127+
message_content := string(msg.Body[:])
128+
for i := len(messages.Messages) - 1; i >= 0; i-- {
129+
if msg.RoutingKey == messages.Messages[i].RoutingKey {
130+
containsNeeded := len(messages.Messages[i].Contains)
131+
containsFound := 0
132+
for j := 0; j < containsNeeded; j++ {
133+
search := ""
134+
switch messages.Messages[i].Contains[j].Value.(type) {
135+
case string:
136+
search = "\"" + messages.Messages[i].Contains[j].Key + "\":\"" + messages.Messages[i].Contains[j].Value.(string) + "\""
137+
case float64:
138+
val := messages.Messages[i].Contains[j].Value.(float64)
139+
fmtp := "%." + strconv.Itoa(NumDecPlaces(val)) + "f"
140+
search = "\"" + messages.Messages[i].Contains[j].Key + "\":" + fmt.Sprintf(fmtp, val)
141+
case bool:
142+
search = "\"" + messages.Messages[i].Contains[j].Key + "\":" + fmt.Sprintf("%t", messages.Messages[i].Contains[j].Value.(bool))
143+
}
144+
145+
if search != "" && strings.Contains(message_content, search) {
146+
containsFound++
147+
}
148+
}
149+
150+
if messages.Messages[i].ContainsString != "" {
151+
containsNeeded++
152+
if strings.Contains(message_content, messages.Messages[i].ContainsString) {
153+
containsFound++
154+
}
155+
}
156+
157+
if containsNeeded == containsFound {
158+
msg.Ack(true)
159+
fmt.Printf("Acked msg-%04d\n", messagesReceived)
160+
ackMessage = true
161+
messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
162+
}
163+
}
96164
}
97165
}
98-
}
99166

100-
return nil
101-
}
167+
err = SaveMessageToFile(msg, outputDir, messagesReceived, isContentJSON, ackMessage)
168+
if err != nil {
169+
return fmt.Errorf("Save message: %s", err)
170+
}
102171

103-
func SaveMessageToFile(body []byte, outputDir string, counter uint) error {
104-
filePath := GenerateFilePath(outputDir, counter)
105-
err := ioutil.WriteFile(filePath, body, 0644)
106-
if err != nil {
107-
return err
108172
}
109173

110-
fmt.Println(filePath)
111-
112174
return nil
113175
}
114176

@@ -125,6 +187,8 @@ func GetProperties(msg amqp.Delivery) map[string]interface{} {
125187
"reply_to": msg.ReplyTo,
126188
"type": msg.Type,
127189
"user_id": msg.UserId,
190+
"exchange": msg.Exchange,
191+
"routing_key": msg.RoutingKey,
128192
}
129193

130194
if !msg.Timestamp.IsZero() {
@@ -140,17 +204,25 @@ func GetProperties(msg amqp.Delivery) map[string]interface{} {
140204
return props
141205
}
142206

143-
func SavePropsAndHeadersToFile(msg amqp.Delivery, outputDir string, counter uint) error {
207+
func SaveMessageToFile(msg amqp.Delivery, outputDir string, counter uint, isContentJSON bool, wasAcked bool) error {
144208
extras := make(map[string]interface{})
145209
extras["properties"] = GetProperties(msg)
146210
extras["headers"] = msg.Headers
211+
extras["acked"] = wasAcked
212+
if isContentJSON {
213+
var content interface{}
214+
json.Unmarshal(msg.Body, &content)
215+
extras["content"] = content
216+
} else {
217+
extras["content"] = string(msg.Body[:])
218+
}
147219

148220
data, err := json.MarshalIndent(extras, "", " ")
149221
if err != nil {
150222
return err
151223
}
152224

153-
filePath := GenerateFilePath(outputDir, counter) + "-headers+properties.json"
225+
filePath := GenerateFilePath(outputDir, counter)
154226
err = ioutil.WriteFile(filePath, data, 0644)
155227
if err != nil {
156228
return err
@@ -162,11 +234,20 @@ func SavePropsAndHeadersToFile(msg amqp.Delivery, outputDir string, counter uint
162234
}
163235

164236
func GenerateFilePath(outputDir string, counter uint) string {
165-
return path.Join(outputDir, fmt.Sprintf("msg-%04d", counter))
237+
return path.Join(outputDir, fmt.Sprintf("msg-%04d.json", counter))
166238
}
167239

168240
func VerboseLog(msg string) {
169241
if *verbose {
170242
fmt.Println("*", msg)
171243
}
172244
}
245+
246+
func NumDecPlaces(v float64) int {
247+
s := strconv.FormatFloat(v, 'f', -1, 64)
248+
i := strings.IndexByte(s, '.')
249+
if i > -1 {
250+
return len(s) - i - 1
251+
}
252+
return 0
253+
}

0 commit comments

Comments
 (0)