Skip to content

Commit 5a53dd4

Browse files
authored
Merge pull request #287 from FlowCI/hotfix/283
Hotfix/283
2 parents daa5a21 + b89642d commit 5a53dd4

File tree

14 files changed

+125
-75
lines changed

14 files changed

+125
-75
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ velocity.log
55
deploy.sh
66
api-doc/
77

8+
velocity.log*
9+
810
node_modules/
911
apidoc/
1012
dist/

platform-api/src/main/java/com/flow/platform/api/config/QueueConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
@Configurable
3232
public class QueueConfig {
3333

34+
public final static long DEFAULT_CMD_CALLBACK_QUEUE_PRIORITY = 1L;
35+
3436
@Autowired
3537
private ThreadPoolTaskExecutor taskExecutor;
3638

platform-api/src/main/java/com/flow/platform/api/consumer/CmdCallbackQueueConsumer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.flow.platform.queue.PlatformQueue;
2626
import com.flow.platform.queue.QueueListener;
2727
import com.flow.platform.util.Logger;
28+
import java.util.Objects;
2829
import javax.annotation.PostConstruct;
2930
import org.springframework.beans.factory.annotation.Autowired;
3031
import org.springframework.stereotype.Component;
@@ -53,7 +54,7 @@ public void init() {
5354

5455
@Override
5556
public void onQueueItem(PriorityMessage message) {
56-
if (message == null) {
57+
if (Objects.isNull(message)) {
5758
return;
5859
}
5960

platform-api/src/main/java/com/flow/platform/api/controller/CmdWebhookController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.flow.platform.api.controller;
1818

19+
import com.flow.platform.api.config.QueueConfig;
1920
import com.flow.platform.api.domain.CmdCallbackQueueItem;
2021
import com.flow.platform.api.service.job.JobService;
2122
import com.flow.platform.core.exception.IllegalParameterException;
@@ -64,7 +65,7 @@ public void execute(@RequestBody Cmd cmd, @RequestParam String identifier) {
6465
cmd.getStatus(),
6566
cmd.getId(),
6667
cmd.getCmdResult());
67-
jobService.enqueue(new CmdCallbackQueueItem(jobId, cmd), System.nanoTime());
68+
jobService.enqueue(new CmdCallbackQueueItem(jobId, cmd), QueueConfig.DEFAULT_CMD_CALLBACK_QUEUE_PRIORITY);
6869
} catch (NumberFormatException warn) {
6970
LOGGER.warn("Invalid job id format");
7071
}

platform-control-center/src/main/java/com/flow/platform/cc/consumer/CmdQueueConsumer.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,27 +98,22 @@ public void onQueueItem(PriorityMessage message) {
9898
cmd.setRetry(retry);
9999
cmdService.save(cmd);
100100

101-
// re-enqueue
102-
resend(cmd.getId(), retry);
101+
// do not retry
102+
if (retry <= 0) {
103+
return;
104+
}
105+
106+
// retry the message
107+
retry(message);
103108

104109
} catch (Throwable e) {
105110
LOGGER.error("Unexpected exception", e);
106111
}
107112
}
108113

109-
/**
110-
* Re-enqueue cmd and return num of retry
111-
*/
112-
private void resend(final String cmdId, final int retry) {
113-
if (retry <= 0) {
114-
return;
115-
}
116-
117-
ThreadUtil.sleep(RETRY_WAIT_TIME);
118-
119-
// reset cmd status
120-
PriorityMessage message = PriorityMessage.create(cmdId.getBytes(), QueueConfig.MAX_PRIORITY);
114+
private void retry(final PriorityMessage message) {
115+
message.setPriority(QueueConfig.MAX_PRIORITY);
121116
cmdQueue.enqueue(message);
122-
LOGGER.trace("Re-enqueue item %s", cmdId);
117+
ThreadUtil.sleep(RETRY_WAIT_TIME);
123118
}
124119
}

platform-core/src/main/java/com/flow/platform/core/queue/PriorityMessage.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,44 @@ public class PriorityMessage extends Message implements PriorityQueueItem {
2727

2828
private Long priority;
2929

30+
private Long timestamp;
31+
3032
public static PriorityMessage create(byte[] content, long priority) {
3133
MessageProperties properties = new MessageProperties();
3234
return new PriorityMessage(content, properties, priority);
3335
}
3436

3537
public PriorityMessage(Message message) {
3638
super(message.getBody(), message.getMessageProperties());
39+
this.timestamp = System.nanoTime();
3740
}
3841

3942
public PriorityMessage(byte[] body, MessageProperties messageProperties, long priority) {
4043
super(body, messageProperties);
4144
this.priority = priority;
45+
this.timestamp = System.currentTimeMillis();
4246
}
4347

4448
@Override
4549
public Long getPriority() {
4650
return this.priority;
4751
}
4852

53+
public void setPriority(long priority) {
54+
this.priority = priority;
55+
}
56+
57+
@Override
58+
public Long getTimestamp() {
59+
return this.timestamp;
60+
}
61+
62+
public void setTimestamp(long timestamp) {
63+
this.timestamp = timestamp;
64+
}
65+
4966
@Override
5067
public int compareTo(PriorityQueueItem o) {
51-
return getPriority().compareTo(o.getPriority());
68+
return COMPARATOR.compare(this, o);
5269
}
5370
}

platform-core/src/main/java/com/flow/platform/core/queue/RabbitQueue.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.flow.platform.core.context.ContextEvent;
2020
import com.flow.platform.queue.PlatformQueue;
21+
import com.flow.platform.queue.QueueListener;
2122
import com.flow.platform.util.Logger;
2223
import java.net.URI;
2324
import java.net.URISyntaxException;
@@ -54,8 +55,6 @@ public class RabbitQueue extends PlatformQueue<PriorityMessage> implements Conte
5455

5556
private SimpleMessageListenerContainer container;
5657

57-
private volatile AtomicInteger size = new AtomicInteger(0);
58-
5958
public RabbitQueue(ThreadPoolTaskExecutor executor, String host, int maxSize, int maxPriority, String queueName) {
6059
super(executor, maxSize, queueName);
6160
this.host = host;
@@ -98,7 +97,6 @@ public void resume() {
9897
@Override
9998
public void enqueue(PriorityMessage item) {
10099
template.send("", name, item);
101-
size.incrementAndGet();
102100
}
103101

104102
@Override
@@ -118,7 +116,7 @@ public boolean isRunning() {
118116

119117
@Override
120118
public int size() {
121-
return size.get();
119+
return 0;
122120
}
123121

124122
private void initRabbitMQ() throws URISyntaxException {
@@ -157,8 +155,9 @@ private class RabbitMessageListener implements MessageListener {
157155

158156
@Override
159157
public void onMessage(Message message) {
160-
size.decrementAndGet();
161-
listener.onQueueItem(new PriorityMessage(message));
158+
for (QueueListener<PriorityMessage> listener : listeners) {
159+
listener.onQueueItem(new PriorityMessage(message));
160+
}
162161
}
163162
}
164163
}

platform-core/src/test/java/com/flow/platform/core/test/PlatformQueueTest.java

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.flow.platform.core.test;
1818

1919
import com.flow.platform.core.queue.PriorityMessage;
20+
import com.flow.platform.core.util.ThreadUtil;
2021
import com.flow.platform.queue.PlatformQueue;
2122
import com.flow.platform.queue.QueueListener;
2223
import com.flow.platform.util.ObjectWrapper;
@@ -27,8 +28,10 @@
2728
import org.junit.After;
2829
import org.junit.Assert;
2930
import org.junit.Before;
31+
import org.junit.FixMethodOrder;
3032
import org.junit.Test;
3133
import org.junit.runner.RunWith;
34+
import org.junit.runners.MethodSorters;
3235
import org.springframework.beans.factory.annotation.Autowired;
3336
import org.springframework.test.context.ContextConfiguration;
3437
import org.springframework.test.context.junit4.SpringRunner;
@@ -38,6 +41,7 @@
3841
*/
3942
@RunWith(SpringRunner.class)
4043
@ContextConfiguration(classes = {TestConfig.class})
44+
@FixMethodOrder(MethodSorters.JVM)
4145
public class PlatformQueueTest {
4246

4347
@Autowired
@@ -50,10 +54,11 @@ public class PlatformQueueTest {
5054
public void init() {
5155
inMemoryQueue.clean();
5256
inMemoryQueue.cleanListener();
57+
rabbitQueue.cleanListener();
5358
}
5459

5560
@Test
56-
public void should_enqueue_for_in_memory_queue() throws Throwable {
61+
public void should_enqueue_for_rabbit_queue() throws Throwable {
5762
// given: queue listener
5863
CountDownLatch latch = new CountDownLatch(1);
5964
ObjectWrapper<PriorityMessage> result = new ObjectWrapper<>();
@@ -62,32 +67,22 @@ public void should_enqueue_for_in_memory_queue() throws Throwable {
6267
latch.countDown();
6368
};
6469

65-
inMemoryQueue.register(listener);
66-
inMemoryQueue.start();
70+
rabbitQueue.register(listener);
71+
rabbitQueue.start();
6772

6873
// when: enqueue
69-
inMemoryQueue.enqueue(PriorityMessage.create("Hello".getBytes(), 1));
74+
rabbitQueue.enqueue(PriorityMessage.create("hello".getBytes(), 1));
7075

7176
// then:
72-
latch.await(30, TimeUnit.SECONDS);
73-
Assert.assertEquals(0, inMemoryQueue.size());
74-
Assert.assertEquals("Hello", new String(result.getInstance().getBody()));
75-
76-
// when: pause and enqueue again
77-
inMemoryQueue.pause();
78-
inMemoryQueue.enqueue(PriorityMessage.create("Pause".getBytes(), 1));
79-
Assert.assertEquals(1, inMemoryQueue.size());
80-
81-
// then: resume
82-
inMemoryQueue.resume();
83-
Thread.sleep(1000);
84-
Assert.assertEquals(0, inMemoryQueue.size());
77+
latch.await(10, TimeUnit.SECONDS);
78+
Assert.assertNotNull(result.getInstance());
79+
Assert.assertEquals("hello", new String(result.getInstance().getBody(), "UTF-8"));
8580
}
8681

8782
@Test
8883
public void should_enqueue_with_priority_in_memory_queue() throws Throwable {
8984
// given: queue listener
90-
final int size = 2;
85+
final int size = 6;
9186
CountDownLatch latch = new CountDownLatch(size);
9287
List<String> prioritizedList = new ArrayList<>(size);
9388

@@ -98,21 +93,34 @@ public void should_enqueue_with_priority_in_memory_queue() throws Throwable {
9893

9994
// when:
10095
inMemoryQueue.register(listener);
101-
inMemoryQueue.enqueue(PriorityMessage.create("1".getBytes(), 1)); // should be second
102-
inMemoryQueue.enqueue(PriorityMessage.create("2".getBytes(), 10)); // should be first
96+
inMemoryQueue.enqueue(PriorityMessage.create("1".getBytes(), 1));
97+
ThreadUtil.sleep(1);
98+
inMemoryQueue.enqueue(PriorityMessage.create("2".getBytes(), 1));
99+
ThreadUtil.sleep(1);
100+
inMemoryQueue.enqueue(PriorityMessage.create("3".getBytes(), 10));
101+
ThreadUtil.sleep(1);
102+
inMemoryQueue.enqueue(PriorityMessage.create("4".getBytes(), 10));
103+
ThreadUtil.sleep(1);
104+
inMemoryQueue.enqueue(PriorityMessage.create("5".getBytes(), 10));
105+
ThreadUtil.sleep(1);
106+
inMemoryQueue.enqueue(PriorityMessage.create("6".getBytes(), 10));
103107
inMemoryQueue.start();
104108

105109
// then:
106110
boolean await = latch.await(60, TimeUnit.SECONDS);
107111
Assert.assertTrue(await);
108112
Assert.assertEquals(size, prioritizedList.size());
109113

110-
Assert.assertEquals("2", prioritizedList.get(0));
111-
Assert.assertEquals("1", prioritizedList.get(1));
114+
Assert.assertEquals("3", prioritizedList.get(0));
115+
Assert.assertEquals("4", prioritizedList.get(1));
116+
Assert.assertEquals("5", prioritizedList.get(2));
117+
Assert.assertEquals("6", prioritizedList.get(3));
118+
Assert.assertEquals("1", prioritizedList.get(4));
119+
Assert.assertEquals("2", prioritizedList.get(5));
112120
}
113121

114122
@Test
115-
public void should_enqueue_for_rabbit_queue() throws Throwable {
123+
public void should_enqueue_for_in_memory_queue() throws Throwable {
116124
// given: queue listener
117125
CountDownLatch latch = new CountDownLatch(1);
118126
ObjectWrapper<PriorityMessage> result = new ObjectWrapper<>();
@@ -121,27 +129,26 @@ public void should_enqueue_for_rabbit_queue() throws Throwable {
121129
latch.countDown();
122130
};
123131

124-
rabbitQueue.register(listener);
125-
rabbitQueue.start();
132+
inMemoryQueue.register(listener);
133+
inMemoryQueue.start();
126134

127135
// when: enqueue
128-
rabbitQueue.enqueue(PriorityMessage.create("hello".getBytes(), 1));
136+
inMemoryQueue.enqueue(PriorityMessage.create("Hello".getBytes(), 1));
129137

130138
// then:
131-
latch.await(10, TimeUnit.SECONDS);
132-
Assert.assertEquals(0, rabbitQueue.size());
133-
Assert.assertNotNull(result.getInstance());
134-
Assert.assertEquals("hello", new String(result.getInstance().getBody(), "UTF-8"));
139+
latch.await(30, TimeUnit.SECONDS);
140+
Assert.assertEquals(0, inMemoryQueue.size());
141+
Assert.assertEquals("Hello", new String(result.getInstance().getBody()));
135142

136143
// when: pause and enqueue again
137-
rabbitQueue.pause();
138-
rabbitQueue.enqueue(PriorityMessage.create("pause".getBytes(), 1));
139-
Assert.assertEquals(1, rabbitQueue.size());
144+
inMemoryQueue.pause();
145+
inMemoryQueue.enqueue(PriorityMessage.create("Pause".getBytes(), 1));
146+
Assert.assertEquals(1, inMemoryQueue.size());
140147

141148
// then: resume
142-
rabbitQueue.resume();
149+
inMemoryQueue.resume();
143150
Thread.sleep(1000);
144-
Assert.assertEquals(0, rabbitQueue.size());
151+
Assert.assertEquals(0, inMemoryQueue.size());
145152
}
146153

147154
@After

platform-plugin/src/test/java/com/flow/platform/plugin/test/util/DockerUtilTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.flow.platform.plugin.util.docker.Docker;
2121
import java.nio.file.Path;
2222
import java.nio.file.Paths;
23+
import org.junit.Ignore;
2324
import org.junit.Test;
2425

2526
/**
@@ -28,6 +29,7 @@
2829
public class DockerUtilTest extends TestBase {
2930

3031
@Test
32+
@Ignore
3133
public void should_pull_success() {
3234
Docker docker = new Docker();
3335
String image = "flowci/plugin-environment";

platform-queue/src/main/java/com/flow/platform/queue/DefaultQueueMessage.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,31 @@ public class DefaultQueueMessage implements PriorityQueueItem {
2525

2626
private Long priority;
2727

28+
private Long timestamp;
29+
2830
public DefaultQueueMessage(byte[] body, Long priority) {
2931
this.body = body;
3032
this.priority = priority;
33+
this.timestamp = System.currentTimeMillis();
3134
}
3235

3336
@Override
3437
public Long getPriority() {
3538
return priority;
3639
}
3740

41+
@Override
42+
public Long getTimestamp() {
43+
return timestamp;
44+
}
45+
3846
@Override
3947
public byte[] getBody() {
40-
return new byte[0];
48+
return body;
4149
}
4250

4351
@Override
4452
public int compareTo(PriorityQueueItem o) {
45-
return o.getPriority().compareTo(getPriority());
53+
return COMPARATOR.compare(this, o);
4654
}
4755
}

0 commit comments

Comments
 (0)