Skip to content

Commit edee0d6

Browse files
author
Yang Guo
committed
do not handle rabiit size
1 parent beb89fe commit edee0d6

File tree

3 files changed

+3
-16
lines changed

3 files changed

+3
-16
lines changed

platform-core/src/main/java/com/flow/platform/core/queue/RabbitQueue.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ public class RabbitQueue extends PlatformQueue<PriorityMessage> implements Conte
5454

5555
private SimpleMessageListenerContainer container;
5656

57-
private volatile AtomicInteger size = new AtomicInteger(0);
58-
5957
public RabbitQueue(ThreadPoolTaskExecutor executor, String host, int maxSize, int maxPriority, String queueName) {
6058
super(executor, maxSize, queueName);
6159
this.host = host;
@@ -98,7 +96,6 @@ public void resume() {
9896
@Override
9997
public void enqueue(PriorityMessage item) {
10098
template.send("", name, item);
101-
size.incrementAndGet();
10299
}
103100

104101
@Override
@@ -118,7 +115,7 @@ public boolean isRunning() {
118115

119116
@Override
120117
public int size() {
121-
return size.get();
118+
return 0;
122119
}
123120

124121
private void initRabbitMQ() throws URISyntaxException {
@@ -157,7 +154,6 @@ private class RabbitMessageListener implements MessageListener {
157154

158155
@Override
159156
public void onMessage(Message message) {
160-
size.decrementAndGet();
161157
listener.onQueueItem(new PriorityMessage(message));
162158
}
163159
}

platform-core/src/test/java/com/flow/platform/core/test/PlatformQueueTest.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,16 +140,6 @@ public void should_enqueue_for_rabbit_queue() throws Throwable {
140140
Assert.assertEquals(0, rabbitQueue.size());
141141
Assert.assertNotNull(result.getInstance());
142142
Assert.assertEquals("hello", new String(result.getInstance().getBody(), "UTF-8"));
143-
144-
// when: pause and enqueue again
145-
rabbitQueue.pause();
146-
rabbitQueue.enqueue(PriorityMessage.create("pause".getBytes(), 1));
147-
Assert.assertEquals(1, rabbitQueue.size());
148-
149-
// then: resume
150-
rabbitQueue.resume();
151-
Thread.sleep(1000);
152-
Assert.assertEquals(0, rabbitQueue.size());
153143
}
154144

155145
@After

platform-queue/src/main/java/com/flow/platform/queue/PriorityQueueItem.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package com.flow.platform.queue;
1818

19+
import java.io.Serializable;
1920
import java.util.Comparator;
2021
import java.util.Objects;
2122

2223
/**
2324
* @author yang
2425
*/
25-
public interface PriorityQueueItem extends Comparable<PriorityQueueItem> {
26+
public interface PriorityQueueItem extends Serializable, Comparable<PriorityQueueItem> {
2627

2728
ItemComparator COMPARATOR = new ItemComparator();
2829

0 commit comments

Comments
 (0)