Skip to content

Commit b211c24

Browse files
authored
Merge pull request #337 from FlowCI/feature/1420
Feature/1420
2 parents e3172bf + f2fd883 commit b211c24

File tree

74 files changed

+1109
-930
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1109
-930
lines changed

core/src/main/java/com/flowci/core/common/config/AppProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ public static class Admin {
115115
@Data
116116
public static class Job {
117117

118-
private Long timeoutInSeconds; // job execution timeout
118+
private int timeoutInSeconds; // job execution timeout
119119

120-
private Long expireInSeconds; // job queue up timeout
120+
private int expireInSeconds; // job queue up timeout
121121

122-
private Long retryWaitingSeconds;
122+
private int retryWaitingSeconds;
123123
}
124124

125125
@Data

core/src/main/java/com/flowci/core/common/config/QueueConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public class QueueConfig {
4747

4848
@Bean
4949
public ThreadPoolTaskExecutor rabbitConsumerExecutor() {
50-
return ThreadHelper.createTaskExecutor(10, 10, 50, "rabbit-t-");
50+
return ThreadHelper.createTaskExecutor(10, 10, 50, "rabbit-event-");
5151
}
5252

5353
@Bean

core/src/main/java/com/flowci/core/common/config/WebSocketConfig.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
4747
*/
4848
private final String stepsTopic = "/topic/steps";
4949

50+
/**
51+
* To subscribe task update for job
52+
* Ex: /topic/tasks/{job id}
53+
*/
54+
private final String tasksTopic = "/topic/tasks";
55+
5056
/**
5157
* To subscribe real time logging for all jobs.
5258
* Ex: /topic/logs
@@ -70,7 +76,15 @@ public void registerStompEndpoints(StompEndpointRegistry registry) {
7076

7177
@Override
7278
public void configureMessageBroker(MessageBrokerRegistry registry) {
73-
registry.enableSimpleBroker(jobsTopic, stepsTopic, logsTopic, agentsTopic, agentHostTopic, gitTestTopic);
79+
registry.enableSimpleBroker(
80+
jobsTopic,
81+
stepsTopic,
82+
tasksTopic,
83+
logsTopic,
84+
agentsTopic,
85+
agentHostTopic,
86+
gitTestTopic
87+
);
7488
registry.setApplicationDestinationPrefixes("/app");
7589
}
7690

@@ -89,6 +103,11 @@ public String topicForSteps() {
89103
return stepsTopic;
90104
}
91105

106+
@Bean("topicForTasks")
107+
public String topicForTasks() {
108+
return tasksTopic;
109+
}
110+
92111
@Bean("topicForLogs")
93112
public String topicForLogs() {
94113
return logsTopic;
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.flowci.core.common.manager;
2+
3+
import com.flowci.core.config.event.GetConfigEvent;
4+
import com.flowci.domain.Input;
5+
import com.flowci.core.secret.event.GetSecretEvent;
6+
import com.flowci.domain.VarType;
7+
import com.flowci.util.ObjectsHelper;
8+
import com.flowci.util.PatternHelper;
9+
import com.flowci.util.StringHelper;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.stereotype.Component;
12+
13+
import java.util.Objects;
14+
15+
@Component
16+
public class VarManager {
17+
18+
@Autowired
19+
private SpringEventManager eventManager;
20+
21+
public boolean verify(Input input, String value) {
22+
if (input.isRequired() && !StringHelper.hasValue(value)) {
23+
return false;
24+
}
25+
26+
if (!input.isRequired() && !StringHelper.hasValue(value)) {
27+
return true;
28+
}
29+
30+
return verify(input.getType(), value);
31+
}
32+
33+
public boolean verify(VarType type, String value) {
34+
switch (type) {
35+
case INT:
36+
return ObjectsHelper.tryParseInt(value);
37+
38+
case BOOL:
39+
return Objects.equals(value, "true") || Objects.equals(value, "false");
40+
41+
case HTTP_URL:
42+
return PatternHelper.isWebURL(value);
43+
44+
case GIT_URL:
45+
return PatternHelper.isGitURL(value);
46+
47+
case EMAIL:
48+
return PatternHelper.isEmail(value);
49+
50+
case CONFIG:
51+
return !eventManager.publish(new GetConfigEvent(this, value)).hasError();
52+
53+
case SECRET:
54+
return !eventManager.publish(new GetSecretEvent(this, value)).hasError();
55+
}
56+
57+
return true;
58+
}
59+
}

core/src/main/java/com/flowci/core/common/rabbit/QueueOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public void declareExchangeAndBind(String exchange, BuiltinExchangeType type, St
2525
super.declareExchangeAndBind(exchange, type, queue, routingKey);
2626
}
2727

28-
public void startConsumer(boolean autoAck, Function<Message, Boolean> onMessage) {
28+
public void startConsumer(boolean autoAck, Function<Message, Boolean> onMessage) throws IOException {
2929
super.startConsumer(queue, autoAck, onMessage);
3030
}
3131

core/src/main/java/com/flowci/core/common/rabbit/RabbitOperations.java

Lines changed: 51 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.flowci.core.common.rabbit;
1919

2020
import com.flowci.core.common.config.QueueConfig;
21-
import com.flowci.core.common.helper.ThreadHelper;
2221
import com.flowci.util.StringHelper;
2322
import com.rabbitmq.client.*;
2423
import lombok.Getter;
@@ -28,33 +27,24 @@
2827
import java.io.IOException;
2928
import java.util.HashMap;
3029
import java.util.Map;
31-
import java.util.Objects;
3230
import java.util.concurrent.ConcurrentHashMap;
3331
import java.util.function.Function;
3432

3533
@Log4j2
3634
@Getter
3735
public class RabbitOperations implements AutoCloseable {
3836

39-
private final static int ExecutorQueueSize = 1000;
40-
4137
private final Connection conn;
4238

4339
private final Channel channel;
4440

45-
private final Integer concurrency;
46-
47-
private final ThreadPoolTaskExecutor executor;
48-
49-
// key as queue name, value as instance
50-
private final ConcurrentHashMap<String, QueueConsumer> consumers = new ConcurrentHashMap<>();
41+
// key as queue name, value as consumer tag
42+
private final ConcurrentHashMap<String, String> consumers = new ConcurrentHashMap<>();
5143

52-
public RabbitOperations(Connection conn, Integer concurrency) throws IOException {
44+
public RabbitOperations(Connection conn, int prefetch) throws IOException {
5345
this.conn = conn;
54-
this.concurrency = concurrency;
5546
this.channel = conn.createChannel();
56-
this.channel.basicQos(0, concurrency, false);
57-
this.executor = ThreadHelper.createTaskExecutor(concurrency, concurrency, ExecutorQueueSize, "rabbit-oper-");
47+
this.channel.basicQos(prefetch, false);
5848
}
5949

6050
public void declareExchangeAndBind(String exchange, BuiltinExchangeType type, String queue, String routingKey) throws IOException {
@@ -69,7 +59,7 @@ public void declareExchangeAndBind(String exchange,
6959
Map<String, Object> args,
7060
String queue,
7161
String routingKey) throws IOException {
72-
channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, durable, autoDelete, args);
62+
channel.exchangeDeclare(exchange, type, durable, autoDelete, args);
7363
channel.queueBind(queue, exchange, routingKey);
7464
}
7565

@@ -78,7 +68,7 @@ public void declare(String queue, boolean durable) throws IOException {
7868
}
7969

8070
public void declare(String queue, boolean durable, Integer maxPriority, String dlExName) throws IOException {
81-
Map<String, Object> props = new HashMap<>(1);
71+
Map<String, Object> props = new HashMap<>(3);
8272
props.put("x-max-priority", maxPriority);
8373
props.put("x-dead-letter-exchange", dlExName);
8474
props.put("x-dead-letter-routing-key", QueueConfig.JobDlRoutingKey);
@@ -118,11 +108,11 @@ public boolean send(String routingKey, byte[] body) {
118108
/**
119109
* Send to routing key with default exchange and priority
120110
*/
121-
public boolean send(String routingKey, byte[] body, Integer priority, Long expireInSecond) {
111+
public boolean send(String routingKey, byte[] body, Integer priority, int expireInSecond) {
122112
try {
123113
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
124114
.priority(priority)
125-
.expiration(Long.toString(expireInSecond * 1000))
115+
.expiration(Integer.toString(expireInSecond * 1000))
126116
.build();
127117

128118
this.channel.basicPublish(StringHelper.EMPTY, routingKey, props, body);
@@ -132,15 +122,47 @@ public boolean send(String routingKey, byte[] body, Integer priority, Long expir
132122
}
133123
}
134124

135-
public void startConsumer(String queue, boolean autoAck, Function<Message, Boolean> onMessage) {
136-
QueueConsumer consumer = consumers.computeIfAbsent(queue, s -> new QueueConsumer(queue, onMessage));
137-
consumer.start(autoAck);
125+
public void startConsumer(String queue, boolean autoAck, Function<Message, Boolean> onMessage) throws IOException {
126+
startConsumer(queue, autoAck, onMessage, null);
127+
}
128+
129+
public void startConsumer(String queue,
130+
boolean autoAck,
131+
Function<Message, Boolean> onMessage,
132+
ThreadPoolTaskExecutor executor) throws IOException {
133+
Consumer consumer = new DefaultConsumer(channel) {
134+
@Override
135+
public void handleDelivery(String consumerTag,
136+
Envelope envelope,
137+
AMQP.BasicProperties properties,
138+
byte[] body) throws IOException {
139+
140+
if (executor != null) {
141+
executor.execute(() -> {
142+
log.debug("======= {} ======", new String(body));
143+
onMessage.apply(new Message(getChannel(), body, envelope));
144+
});
145+
return;
146+
}
147+
148+
onMessage.apply(new Message(getChannel(), body, envelope));
149+
}
150+
};
151+
152+
String tag = getChannel().basicConsume(queue, autoAck, consumer);
153+
consumers.put(queue, tag);
154+
log.info("[Consumer STARTED] queue {} with tag {}", queue, tag);
138155
}
139156

157+
140158
public void removeConsumer(String queue) {
141-
QueueConsumer consumer = consumers.remove(queue);
142-
if (consumer != null) {
143-
consumer.cancel();
159+
String consumerTag = consumers.remove(queue);
160+
if (consumerTag != null) {
161+
try {
162+
getChannel().basicCancel(consumerTag);
163+
} catch (IOException e) {
164+
log.warn(e);
165+
}
144166
}
145167
}
146168

@@ -151,68 +173,19 @@ public void removeConsumer(String queue) {
151173
*/
152174
@Override
153175
public void close() throws Exception {
154-
consumers.forEach((s, queueConsumer) -> queueConsumer.cancel());
155-
channel.close();
156-
executor.shutdown();
157-
}
158-
159-
private class QueueConsumer extends DefaultConsumer {
160-
161-
private final String queue;
162-
163-
private final Function<Message, Boolean> onMessageFunc;
164-
165-
QueueConsumer(String queue, Function<Message, Boolean> onMessageFunc) {
166-
super(channel);
167-
this.queue = queue;
168-
this.onMessageFunc = onMessageFunc;
169-
}
170-
171-
@Override
172-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
173-
throws IOException {
174-
consume(body, envelope);
175-
}
176-
177-
void consume(byte[] body, Envelope envelope) {
178-
executor.execute(() -> {
179-
Boolean ignoreForNow = onMessageFunc.apply(new Message(getChannel(), body, envelope));
180-
});
181-
}
182-
183-
String start(boolean autoAck) {
184-
try {
185-
String tag = getChannel().basicConsume(queue, autoAck, this);
186-
log.info("[Consumer STARTED] queue {} with tag {}", queue, tag);
187-
return tag;
188-
} catch (IOException e) {
189-
log.warn(e.getMessage());
190-
return null;
191-
}
192-
}
193-
194-
boolean cancel() {
176+
consumers.forEach((s, consumerTag) -> {
195177
try {
196-
if (Objects.isNull(getConsumerTag())) {
197-
return true; // not started
198-
}
178+
getChannel().basicCancel(consumerTag);
179+
} catch (IOException ignore) {
199180

200-
onMessageFunc.apply(Message.STOP_SIGN);
201-
getChannel().basicCancel(getConsumerTag());
202-
log.info("[Consumer STOP] queue {} with tag {}", queue, getConsumerTag());
203-
return true;
204-
} catch (IOException e) {
205-
log.warn(e.getMessage());
206-
return false;
207181
}
208-
}
182+
});
183+
channel.close();
209184
}
210185

211186
@Getter
212187
public static class Message {
213188

214-
public static final Message STOP_SIGN = new Message(null, new byte[0], null);
215-
216189
private final Channel channel;
217190

218191
private final byte[] body;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.flowci.core.config.event;
2+
3+
import com.flowci.core.common.event.AbstractSyncEvent;
4+
import com.flowci.core.config.domain.Config;
5+
import lombok.Getter;
6+
7+
@Getter
8+
public class GetConfigEvent extends AbstractSyncEvent<Config> {
9+
10+
private final String name;
11+
12+
public GetConfigEvent(Object source, String name) {
13+
super(source);
14+
this.name = name;
15+
}
16+
}

core/src/main/java/com/flowci/core/config/service/ConfigServiceImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.flowci.core.config.domain.ConfigParser;
99
import com.flowci.core.config.domain.SmtpConfig;
1010
import com.flowci.core.config.domain.SmtpOption;
11+
import com.flowci.core.config.event.GetConfigEvent;
1112
import com.flowci.core.secret.domain.Secret;
1213
import com.flowci.core.secret.event.GetSecretEvent;
1314
import com.flowci.domain.SimpleAuthPair;
@@ -66,6 +67,16 @@ public void onInit(ContextRefreshedEvent ignore) {
6667
}
6768
}
6869

70+
@EventListener
71+
public void onGetConfigEvent(GetConfigEvent event) {
72+
try {
73+
Config config = get(event.getName());
74+
event.setFetched(config);
75+
} catch (NotFoundException e) {
76+
event.setError(e);
77+
}
78+
}
79+
6980
@Override
7081
public Config get(String name) {
7182
Optional<Config> optional = configDao.findByName(name);

0 commit comments

Comments
 (0)