Skip to content

Commit b8bdbcb

Browse files
committed
Initial commit
0 parents  commit b8bdbcb

File tree

6 files changed

+227
-0
lines changed

6 files changed

+227
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/bin
2+
/release

LICENSE

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
The MIT License (MIT)
2+
3+
Copyright (c) 2014 Dov Murik
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in
13+
all copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
THE SOFTWARE.

README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# rabbitmq-dump-queue
2+
3+
Dump messages from a RabbitMQ queue to files, without affecting the queue.
4+
5+
## Installation
6+
7+
### Download a release
8+
9+
Precompiled binary pacakges can be found on the
10+
[releases](https://github.com/dubek/rabbitmq-dump-queue/releases) page.
11+
12+
### Compile from source
13+
14+
If you have [Go](https://golang.org/doc/install) installed, you can install
15+
rabbitmq-dump-queue from source by running:
16+
17+
```
18+
go get github.com/dubek/rabbitmq-dump-queue
19+
```
20+
21+
22+
## Usage
23+
24+
To dump the first 50 messages of queue `incoming_1` to `/tmp`:
25+
26+
```
27+
rabbitmq-dump-queue -url="amqp://user:[email protected]:5672/" -queue=incoming_1 -max-messages=50 -output-dir=/tmp
28+
```
29+
30+
This will create the files `/tmp/msg-0000`, `/tmp/msg-0001`, and so on.
31+
32+
The output filenames are printed one per line to the standard output; this
33+
allows piping the output of rabbitmq-dump-queue to `xargs` or similar utilities
34+
in order to perform further processing on each message (e.g. decompressing,
35+
decoding, etc.).
36+
37+
Running `rabbitmq-dump-queue -help` will list the available command-line
38+
options.
39+
40+
41+
## Message requeuing implementation details
42+
43+
In order to fetch messages from the queue and later return them in the original
44+
order, rabbitmq-dump-queue uses a standard [AMQP `basic.get` API
45+
call](https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get) without
46+
automatic acknowledgements, and it doesn't manually acknowledge the received
47+
messages. Thus, when the AMQP connection is closed (after all the messages
48+
were received and written to files), RabbitMQ returns all the un-acked messages
49+
(all the messages) back to the queue in their original order.
50+
51+
This means that during the time rabbitmq-dump-queue receives and saves the
52+
messages, the messages are not visible to other consumers of the queue. This
53+
duration is usually very short (unless you're downloading a lot of messages),
54+
but make sure your system can handle such a situation (or shut down other
55+
consumers of the queue during the time you use this tool).
56+
57+
Note that the same approach is used by RabbitMQ's management HTTP API (the
58+
`/api/queues/{vhost}/{queue}/get` endpoint with `requeue=true`).
59+
60+
61+
## Contributing
62+
63+
Github pull requests and issues are welcome.
64+
65+
66+
## License
67+
68+
rabbitmq-dump-queue is under the MIT License. See the [LICENSE](LICENSE) file
69+
for details.

main.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"github.com/streadway/amqp"
7+
"io/ioutil"
8+
"os"
9+
"path"
10+
)
11+
12+
var (
13+
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
14+
queue = flag.String("queue", "", "Ephemeral AMQP queue name")
15+
maxMessages = flag.Uint("max-messages", 1000, "Maximum number of messages to dump")
16+
outputDir = flag.String("output-dir", ".", "Directory in which to save the dumped messages")
17+
verbose = flag.Bool("verbose", false, "Print progress")
18+
)
19+
20+
func main() {
21+
flag.Parse()
22+
err := DumpMessagesFromQueue(*uri, *queue, *maxMessages, *outputDir)
23+
if err != nil {
24+
fmt.Fprintf(os.Stderr, "%s\n", err)
25+
os.Exit(1)
26+
}
27+
}
28+
29+
func DumpMessagesFromQueue(amqpURI string, queueName string, maxMessages uint, outputDir string) error {
30+
var err error
31+
32+
if queueName == "" {
33+
return fmt.Errorf("Must supply queue name")
34+
}
35+
36+
VerboseLog(fmt.Sprintf("Dialing %q", amqpURI))
37+
conn, err := amqp.Dial(amqpURI)
38+
if err != nil {
39+
return fmt.Errorf("Dial: %s", err)
40+
}
41+
42+
defer func() {
43+
conn.Close()
44+
VerboseLog("AMQP connection closed")
45+
}()
46+
47+
channel, err := conn.Channel()
48+
if err != nil {
49+
return fmt.Errorf("Channel: %s", err)
50+
}
51+
52+
VerboseLog(fmt.Sprintf("Pulling messages from queue %q", queueName))
53+
for messagesReceived := uint(0); messagesReceived < maxMessages; messagesReceived++ {
54+
msg, ok, err := channel.Get(queueName,
55+
false, // autoAck
56+
)
57+
if err != nil {
58+
return fmt.Errorf("Queue Get: %s", err)
59+
}
60+
if ok {
61+
SaveMessageToFile(msg.Body, outputDir, messagesReceived)
62+
} else {
63+
VerboseLog("No more messages in queue")
64+
break
65+
}
66+
}
67+
68+
return nil
69+
}
70+
71+
func SaveMessageToFile(body []byte, outputDir string, counter uint) {
72+
filePath := GenerateFilePath(outputDir, counter)
73+
ioutil.WriteFile(filePath, body, 0644)
74+
fmt.Println(filePath)
75+
}
76+
77+
func GenerateFilePath(outputDir string, counter uint) string {
78+
return path.Join(outputDir, fmt.Sprintf("msg-%04d", counter))
79+
}
80+
81+
func VerboseLog(msg string) {
82+
if *verbose {
83+
fmt.Println("*", msg)
84+
}
85+
}

scripts/build

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
ROOT=$(dirname $(dirname $0))
6+
cd $ROOT
7+
FILENAME=rabbitmq-dump-queue
8+
EXT=""
9+
if [ "$GOOS" == "windows" ] ; then
10+
EXT=".exe"
11+
fi
12+
go build -o bin/$FILENAME$EXT .

scripts/release

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/bin/sh
2+
3+
set -e
4+
5+
if [ "$1" == "" ] ; then
6+
echo "Usage: $0 VERSION"
7+
exit 1
8+
fi
9+
10+
VERSION=$1
11+
12+
ROOT=$(dirname $(dirname $0))
13+
cd $ROOT
14+
15+
for os in linux darwin windows ; do
16+
for arch in 386 amd64 ; do
17+
export GOOS=$os
18+
export GOARCH=$arch
19+
20+
releasename="rabbitmq-dump-queue-$VERSION-$GOOS-$GOARCH"
21+
releasedir="release/$releasename"
22+
23+
rm -rf bin
24+
scripts/build
25+
26+
rm -rf $releasedir
27+
mkdir -p $releasedir
28+
mv bin/* $releasedir/
29+
cp README.md $releasedir/
30+
cp LICENSE $releasedir/
31+
32+
if [ "$os" == "linux" ] ; then
33+
(cd release && tar czf $releasename.tgz $releasename)
34+
else
35+
(cd release && zip -qr $releasename.zip $releasename)
36+
fi
37+
done
38+
done

0 commit comments

Comments
 (0)