Skip to content

Commit 4b1767c

Browse files
authored
Merge pull request dubek#7 from msteggink/master
Add support for `-ack=true` (messages are acked and removed from the queue).
2 parents 2ac37cd + 00bd598 commit 4b1767c

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,18 @@ The JSON files have the following structure:
5757
}
5858
}
5959

60+
61+
By default, it will not acknowledge messages, so they will be requeued.
62+
Acknowledging messages using the `-ack=true` switch will *remove* them from the
63+
queue, allowing the user to process new messages (see implementation details).
64+
65+
rabbitmq-dump-queue -uri="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp -ack=true
66+
6067
Running `rabbitmq-dump-queue -help` will list the available command-line
6168
options.
6269

6370

71+
6472
## Message requeuing implementation details
6573

6674
In order to fetch messages from the queue and later return them in the original

main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var (
1717
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
1818
insecure_tls = flag.Bool("insecure-tls", false, "Insecure TLS mode: don't check certificates")
1919
queue = flag.String("queue", "", "AMQP queue name")
20+
ack = flag.Bool("ack", false, "Acknowledge messages")
2021
maxMessages = flag.Uint("max-messages", 1000, "Maximum number of messages to dump")
2122
outputDir = flag.String("output-dir", ".", "Directory in which to save the dumped messages")
2223
full = flag.Bool("full", false, "Dump the message, its properties and headers")
@@ -27,7 +28,7 @@ func main() {
2728
flag.Parse()
2829
if flag.NArg() > 0 {
2930
fmt.Fprintf(os.Stderr, "Error: Unused command line arguments detected.\n")
30-
flag.Usage();
31+
flag.Usage()
3132
os.Exit(2)
3233
}
3334
err := DumpMessagesFromQueue(*uri, *queue, *maxMessages, *outputDir)
@@ -72,7 +73,7 @@ func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, o
7273
VerboseLog(fmt.Sprintf("Pulling messages from queue %q", queueName))
7374
for messagesReceived := uint(0); messagesReceived < maxMessages; messagesReceived++ {
7475
msg, ok, err := channel.Get(queueName,
75-
false, // autoAck
76+
*ack, // autoAck
7677
)
7778
if err != nil {
7879
return fmt.Errorf("Queue get: %s", err)

main_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,37 @@ func verifyFileContent(t *testing.T, filename, expectedContent string) {
122122
}
123123
}
124124

125+
func TestAcknowledge(t *testing.T) {
126+
os.MkdirAll("tmp-test", 0775)
127+
defer os.RemoveAll("tmp-test")
128+
populateTestQueue(t, 10)
129+
defer deleteTestQueue(t)
130+
output, err := exec.Command("./rabbitmq-dump-queue", "-uri="+TEST_AMQP_URI, "-queue="+TEST_QUEUE_NAME, "-max-messages=3", "-output-dir=tmp-test", "-ack=true").CombinedOutput()
131+
if err != nil {
132+
t.Fatalf("run: %s: %s", err, string(output))
133+
}
134+
expectedOutput := "tmp-test/msg-0000\n" +
135+
"tmp-test/msg-0001\n" +
136+
"tmp-test/msg-0002\n"
137+
if string(output) != expectedOutput {
138+
t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput, output)
139+
}
140+
output2, err2 := exec.Command("./rabbitmq-dump-queue", "-uri="+TEST_AMQP_URI, "-queue="+TEST_QUEUE_NAME, "-max-messages=10", "-output-dir=tmp-test", "-ack=true").CombinedOutput()
141+
if err2 != nil {
142+
t.Fatalf("run: %s: %s", err, string(output))
143+
}
144+
expectedOutput2 := "tmp-test/msg-0000\n" +
145+
"tmp-test/msg-0001\n" +
146+
"tmp-test/msg-0002\n" +
147+
"tmp-test/msg-0003\n" +
148+
"tmp-test/msg-0004\n" +
149+
"tmp-test/msg-0005\n" +
150+
"tmp-test/msg-0006\n"
151+
if string(output2) != expectedOutput2 {
152+
t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput2, output2)
153+
}
154+
}
155+
125156
func TestNormal(t *testing.T) {
126157
os.MkdirAll("tmp-test", 0775)
127158
defer os.RemoveAll("tmp-test")

0 commit comments

Comments
 (0)