|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "encoding/json" |
| 5 | + "fmt" |
| 6 | + "io/ioutil" |
| 7 | + "os" |
| 8 | + "os/exec" |
| 9 | + "strings" |
| 10 | + "testing" |
| 11 | + |
| 12 | + "github.com/streadway/amqp" |
| 13 | +) |
| 14 | + |
| 15 | +const ( |
| 16 | + TEST_AMQP_URI = "amqp://guest:[email protected]:5672/" |
| 17 | + TEST_QUEUE_NAME = "test-rabbitmq-dump-queue" |
| 18 | +) |
| 19 | + |
| 20 | +func makeAmqpMessage(i int) amqp.Publishing { |
| 21 | + headers := make(amqp.Table) |
| 22 | + headers["my-header"] = fmt.Sprintf("my-value-%d", i) |
| 23 | + return amqp.Publishing{ |
| 24 | + Headers: headers, |
| 25 | + ContentType: "text/plain", |
| 26 | + Priority: 4, |
| 27 | + MessageId: fmt.Sprintf("msgid-%d", i), |
| 28 | + Body: []byte(fmt.Sprintf("message-%d-body", i)), |
| 29 | + } |
| 30 | +} |
| 31 | + |
| 32 | +// Publish 10 messages to the queue |
| 33 | +func populateTestQueue(t *testing.T, messagesToPublish int) { |
| 34 | + conn, err := amqp.Dial(TEST_AMQP_URI) |
| 35 | + if err != nil { |
| 36 | + t.Fatalf("Dial: %s", err) |
| 37 | + } |
| 38 | + defer conn.Close() |
| 39 | + |
| 40 | + channel, err := conn.Channel() |
| 41 | + if err != nil { |
| 42 | + t.Fatalf("Channel: %s", err) |
| 43 | + } |
| 44 | + |
| 45 | + _, err = channel.QueueDeclare(TEST_QUEUE_NAME, true, false, false, false, nil) |
| 46 | + if err != nil { |
| 47 | + t.Fatalf("QueueDeclare: %s", err) |
| 48 | + } |
| 49 | + |
| 50 | + _, err = channel.QueuePurge(TEST_QUEUE_NAME, false) |
| 51 | + if err != nil { |
| 52 | + t.Fatalf("QueuePurge: %s", err) |
| 53 | + } |
| 54 | + |
| 55 | + for i := 0; i < messagesToPublish; i++ { |
| 56 | + err = channel.Publish("", TEST_QUEUE_NAME, false, false, makeAmqpMessage(i)) |
| 57 | + if err != nil { |
| 58 | + t.Fatalf("Publish: %s", err) |
| 59 | + } |
| 60 | + } |
| 61 | +} |
| 62 | + |
| 63 | +func deleteTestQueue(t *testing.T) { |
| 64 | + conn, err := amqp.Dial(TEST_AMQP_URI) |
| 65 | + if err != nil { |
| 66 | + t.Fatalf("Dial: %s", err) |
| 67 | + } |
| 68 | + defer conn.Close() |
| 69 | + |
| 70 | + channel, err := conn.Channel() |
| 71 | + if err != nil { |
| 72 | + t.Fatalf("Channel: %s", err) |
| 73 | + } |
| 74 | + |
| 75 | + _, err = channel.QueueDelete(TEST_QUEUE_NAME, false, false, false) |
| 76 | + if err != nil { |
| 77 | + t.Fatalf("QueueDelete: %s", err) |
| 78 | + } |
| 79 | +} |
| 80 | + |
| 81 | +func getTestQueueLength(t *testing.T) int { |
| 82 | + conn, err := amqp.Dial(TEST_AMQP_URI) |
| 83 | + if err != nil { |
| 84 | + t.Fatalf("Dial: %s", err) |
| 85 | + } |
| 86 | + defer conn.Close() |
| 87 | + |
| 88 | + channel, err := conn.Channel() |
| 89 | + if err != nil { |
| 90 | + t.Fatalf("Channel: %s", err) |
| 91 | + } |
| 92 | + |
| 93 | + queue, err := channel.QueueInspect(TEST_QUEUE_NAME) |
| 94 | + if err != nil { |
| 95 | + t.Fatalf("QueueInspect: %s", err) |
| 96 | + } |
| 97 | + |
| 98 | + return queue.Messages |
| 99 | +} |
| 100 | + |
| 101 | +func run(t *testing.T, commandLine string) string { |
| 102 | + queueLengthBeforeDump := getTestQueueLength(t) |
| 103 | + args := strings.Split(commandLine, " ") |
| 104 | + output, err := exec.Command("./rabbitmq-dump-queue", args...).CombinedOutput() |
| 105 | + if err != nil { |
| 106 | + t.Fatalf("run: %s: %s", err, string(output)) |
| 107 | + } |
| 108 | + queueLengthAfterDump := getTestQueueLength(t) |
| 109 | + if queueLengthAfterDump != queueLengthBeforeDump { |
| 110 | + t.Errorf("Queue length changed after rabbitmq-dump-queue: expected %d but got %d", queueLengthBeforeDump, queueLengthAfterDump) |
| 111 | + } |
| 112 | + return string(output) |
| 113 | +} |
| 114 | + |
| 115 | +func verifyFileContent(t *testing.T, filename, expectedContent string) { |
| 116 | + content, err := ioutil.ReadFile(filename) |
| 117 | + if err != nil { |
| 118 | + t.Fatalf("Error reading %s: %s", filename, err) |
| 119 | + } |
| 120 | + if expectedContent != string(content) { |
| 121 | + t.Errorf("Wrong content for %s: expected '%s', got '%s'", filename, expectedContent, string(content)) |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +func TestNormal(t *testing.T) { |
| 126 | + os.MkdirAll("tmp-test", 0775) |
| 127 | + defer os.RemoveAll("tmp-test") |
| 128 | + populateTestQueue(t, 10) |
| 129 | + defer deleteTestQueue(t) |
| 130 | + output := run(t, "-uri="+TEST_AMQP_URI+" -queue="+TEST_QUEUE_NAME+" -max-messages=3 -output-dir=tmp-test") |
| 131 | + expectedOutput := "tmp-test/msg-0000\n" + |
| 132 | + "tmp-test/msg-0001\n" + |
| 133 | + "tmp-test/msg-0002\n" |
| 134 | + if output != expectedOutput { |
| 135 | + t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput, output) |
| 136 | + } |
| 137 | + verifyFileContent(t, "tmp-test/msg-0000", "message-0-body") |
| 138 | + verifyFileContent(t, "tmp-test/msg-0001", "message-1-body") |
| 139 | + verifyFileContent(t, "tmp-test/msg-0002", "message-2-body") |
| 140 | + _, err := os.Stat("tmp-test/msg-0003") |
| 141 | + if !os.IsNotExist(err) { |
| 142 | + t.Errorf("Expected msg-0003 to not exist: %v", err) |
| 143 | + } |
| 144 | +} |
| 145 | + |
| 146 | +func TestEmptyQueue(t *testing.T) { |
| 147 | + os.MkdirAll("tmp-test", 0775) |
| 148 | + defer os.RemoveAll("tmp-test") |
| 149 | + populateTestQueue(t, 0) |
| 150 | + defer deleteTestQueue(t) |
| 151 | + output := run(t, "-uri="+TEST_AMQP_URI+" -queue="+TEST_QUEUE_NAME+" -max-messages=3 -output-dir=tmp-test") |
| 152 | + expectedOutput := "" |
| 153 | + if output != expectedOutput { |
| 154 | + t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput, output) |
| 155 | + } |
| 156 | +} |
| 157 | + |
| 158 | +func TestMaxMessagesLargerThanQueueLength(t *testing.T) { |
| 159 | + os.MkdirAll("tmp-test", 0775) |
| 160 | + defer os.RemoveAll("tmp-test") |
| 161 | + populateTestQueue(t, 3) |
| 162 | + defer deleteTestQueue(t) |
| 163 | + output := run(t, "-uri="+TEST_AMQP_URI+" -queue="+TEST_QUEUE_NAME+" -max-messages=9 -output-dir=tmp-test") |
| 164 | + expectedOutput := "tmp-test/msg-0000\n" + |
| 165 | + "tmp-test/msg-0001\n" + |
| 166 | + "tmp-test/msg-0002\n" |
| 167 | + if output != expectedOutput { |
| 168 | + t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput, output) |
| 169 | + } |
| 170 | +} |
| 171 | + |
| 172 | +func TestFull(t *testing.T) { |
| 173 | + os.MkdirAll("tmp-test", 0775) |
| 174 | + defer os.RemoveAll("tmp-test") |
| 175 | + populateTestQueue(t, 10) |
| 176 | + defer deleteTestQueue(t) |
| 177 | + output := run(t, "-uri="+TEST_AMQP_URI+" -queue="+TEST_QUEUE_NAME+" -max-messages=3 -output-dir=tmp-test -full") |
| 178 | + expectedOutput := "tmp-test/msg-0000\n" + |
| 179 | + "tmp-test/msg-0000-headers+properties.json\n" + |
| 180 | + "tmp-test/msg-0001\n" + |
| 181 | + "tmp-test/msg-0001-headers+properties.json\n" + |
| 182 | + "tmp-test/msg-0002\n" + |
| 183 | + "tmp-test/msg-0002-headers+properties.json\n" |
| 184 | + if output != expectedOutput { |
| 185 | + t.Errorf("Wrong output: expected '%s' but got '%s'", expectedOutput, output) |
| 186 | + } |
| 187 | + verifyFileContent(t, "tmp-test/msg-0000", "message-0-body") |
| 188 | + jsonContent, err := ioutil.ReadFile("tmp-test/msg-0000-headers+properties.json") |
| 189 | + if err != nil { |
| 190 | + t.Fatalf("Error reading tmp-test/msg-0000-headers+properties.json: %s", err) |
| 191 | + } |
| 192 | + var v map[string]interface{} |
| 193 | + err = json.Unmarshal(jsonContent, &v) |
| 194 | + if err != nil { |
| 195 | + t.Fatalf("Error unmarshaling JSON: %s", err) |
| 196 | + } |
| 197 | + |
| 198 | + headers, ok := v["headers"].(map[string]interface{}) |
| 199 | + if !ok { |
| 200 | + t.Fatalf("Wrong data type for 'headers' in JSON") |
| 201 | + } |
| 202 | + if headers["my-header"] != "my-value-0" { |
| 203 | + t.Errorf("Wrong value for my-header, got: %v", headers["my-header"]) |
| 204 | + } |
| 205 | + |
| 206 | + properties, ok := v["properties"].(map[string]interface{}) |
| 207 | + if !ok { |
| 208 | + t.Fatalf("Wrong data type for 'properties' in JSON") |
| 209 | + } |
| 210 | + if properties["priority"] != 4.0 || // JSON numbers are floats |
| 211 | + properties["content_type"] != "text/plain" || |
| 212 | + properties["message_id"] != "msgid-0" { |
| 213 | + t.Errorf("Wrong property value: properties = %#v", properties) |
| 214 | + } |
| 215 | +} |
0 commit comments