Skip to content

Commit 4a3c98c

Browse files
author
Yang Guo
committed
add unit test for queue priority
1 parent 642791e commit 4a3c98c

File tree

5 files changed

+53
-3
lines changed

5 files changed

+53
-3
lines changed

platform-core/src/main/java/com/flow/platform/core/queue/InMemoryQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public void resume() {
9191
pause = false;
9292
}
9393

94+
@Override
95+
public void clean() {
96+
queue.clear();
97+
}
98+
9499
@Override
95100
public boolean isRunning() {
96101
return !pause && !stop;

platform-core/src/main/java/com/flow/platform/core/queue/PlatformQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public void cleanListener() {
7575
*/
7676
public abstract void resume();
7777

78+
/**
79+
* Remove all items from queue
80+
*/
81+
public abstract void clean();
82+
7883
/**
7984
* Queue processor is running
8085
*/

platform-core/src/main/java/com/flow/platform/core/queue/PriorityMessage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ public PriorityMessage(byte[] body, MessageProperties messageProperties) {
4040

4141
@Override
4242
public int compareTo(PriorityMessage o) {
43-
return this.getMessageProperties().getPriority().compareTo(o.getMessageProperties().getPriority());
43+
return o.getMessageProperties().getPriority().compareTo(this.getMessageProperties().getPriority());
4444
}
4545
}

platform-core/src/main/java/com/flow/platform/core/queue/RabbitQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public void enqueue(PriorityMessage item) {
9999
size.incrementAndGet();
100100
}
101101

102+
@Override
103+
public void clean() {
104+
throw new UnsupportedOperationException();
105+
}
106+
102107
@Override
103108
public boolean isRunning() {
104109
return container.isRunning();

platform-core/src/test/java/com/flow/platform/core/test/sysinfo/PlatformQueueTest.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,15 @@
1919
import com.flow.platform.core.queue.PlatformQueue;
2020
import com.flow.platform.core.queue.PriorityMessage;
2121
import com.flow.platform.core.queue.QueueListener;
22-
import com.flow.platform.core.queue.RabbitQueue;
2322
import com.flow.platform.util.ObjectWrapper;
23+
import java.util.ArrayList;
24+
import java.util.List;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.TimeUnit;
27+
import org.junit.After;
2628
import org.junit.Assert;
2729
import org.junit.Test;
2830
import org.junit.runner.RunWith;
29-
import org.springframework.amqp.core.Message;
3031
import org.springframework.beans.factory.annotation.Autowired;
3132
import org.springframework.test.context.ContextConfiguration;
3233
import org.springframework.test.context.junit4.SpringRunner;
@@ -76,6 +77,31 @@ public void should_enqueue_for_in_memory_queue() throws Throwable {
7677
Assert.assertEquals(0, inMemoryQueue.size());
7778
}
7879

80+
@Test
81+
public void should_enqueue_with_priority_in_memory_queue() throws Throwable {
82+
// given: queue listener
83+
int size = 2;
84+
CountDownLatch latch = new CountDownLatch(size);
85+
List<String> prioritizedList = new ArrayList<>(size);
86+
87+
QueueListener<PriorityMessage> listener = item -> {
88+
latch.countDown();
89+
prioritizedList.add(new String(item.getBody()));
90+
};
91+
92+
// when:
93+
inMemoryQueue.register(listener);
94+
inMemoryQueue.enqueue(PriorityMessage.create("1".getBytes(), 1)); // should be second
95+
inMemoryQueue.enqueue(PriorityMessage.create("2".getBytes(), 10)); // should be first
96+
inMemoryQueue.start();
97+
98+
// then:
99+
latch.await(10, TimeUnit.SECONDS);
100+
Assert.assertEquals(2, prioritizedList.size());
101+
Assert.assertEquals("2", prioritizedList.get(0));
102+
Assert.assertEquals("1", prioritizedList.get(1));
103+
}
104+
79105
@Test
80106
public void should_enqueue_for_rabbit_queue() throws Throwable {
81107
// given: queue listener
@@ -109,4 +135,13 @@ public void should_enqueue_for_rabbit_queue() throws Throwable {
109135
Assert.assertEquals(0, rabbitQueue.size());
110136
}
111137

138+
@After
139+
public void clean() {
140+
inMemoryQueue.stop();
141+
inMemoryQueue.clean();
142+
inMemoryQueue.cleanListener();
143+
144+
rabbitQueue.stop();
145+
}
146+
112147
}

0 commit comments

Comments
 (0)