Skip to content

Commit 6f332d5

Browse files
committed
RPC(Http) module classes in collector
1 parent dbb06e9 commit 6f332d5

File tree

16 files changed

+752
-39
lines changed

16 files changed

+752
-39
lines changed

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
<properties>
1515
<maven.compiler.source>8</maven.compiler.source>
1616
<maven.compiler.target>8</maven.compiler.target>
17+
<httpclient.version>4.5.13</httpclient.version>
18+
<httpmine.version>4.5.4</httpmine.version>
19+
<jackson-databind.version>2.13.2.2</jackson-databind.version>
1720
</properties>
1821

1922
<dependencies>
@@ -22,5 +25,22 @@
2225
<artifactId>streamis-job-log-common</artifactId>
2326
<version>0.2.0</version>
2427
</dependency>
28+
<!--http client module-->
29+
<dependency>
30+
<groupId>org.apache.httpcomponents</groupId>
31+
<artifactId>httpclient</artifactId>
32+
<version>${httpclient.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.apache.httpcomponents</groupId>
36+
<artifactId>httpmime</artifactId>
37+
<version>${httpmine.version}</version>
38+
</dependency>
39+
<!-- jackson module -->
40+
<dependency>
41+
<groupId>com.fasterxml.jackson.core</groupId>
42+
<artifactId>jackson-databind</artifactId>
43+
<version>${jackson-databind.version}</version>
44+
</dependency>
2545
</dependencies>
2646
</project>

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@
55
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.buf.SendBuffer;
66
import com.webank.wedatasphere.streamis.jobmanager.log.entities.LogElement;
77

8-
import java.util.List;
9-
import java.util.Objects;
10-
import java.util.Optional;
11-
import java.util.UUID;
8+
import java.util.*;
129
import java.util.concurrent.*;
1310
import java.util.concurrent.atomic.AtomicInteger;
1411
import java.util.concurrent.locks.Condition;
@@ -139,8 +136,17 @@ private class RpcLogContext{
139136
*/
140137
private int consumers = 0;
141138

139+
/**
140+
* Futures of consumers
141+
*/
142+
private final Map<String, SendLogCacheConsumer<T>> sendLogCacheConsumers = new ConcurrentHashMap<>();
143+
/**
144+
* Context lock
145+
*/
146+
private final ReentrantLock ctxLock;
142147
public RpcLogContext(SendLogCache<T> logCache){
143148
this.logCache = logCache;
149+
this.ctxLock = new ReentrantLock();
144150
this.consumePool = new ThreadPoolExecutor(0, maxCacheConsume,
145151
60L, TimeUnit.SECONDS,
146152
new SynchronousQueue<>(), new ThreadFactory() {
@@ -161,33 +167,58 @@ public Thread newThread(Runnable r) {
161167
});
162168
}
163169

164-
public synchronized void startCacheConsumer(){
165-
if (consumers >= maxCacheConsume){
166-
throw new IllegalStateException("Over the limit number of cache consumers: [" + maxCacheConsume + "]");
167-
}
168-
String id = UUID.randomUUID().toString();
169-
SendBuffer<T> sendBuffer = new ImmutableSendBuffer<>(sendBufSize);
170-
this.consumePool.submit(new SendLogCacheConsumer<T>(id, logCache, sendBuffer, rpcSenderConfig) {
171-
@Override
172-
protected void onFlushAndSend(SendBuffer<T> sendBuffer) {
173-
// First to aggregate the buffer
174-
E aggEntity = aggregateBuffer(sendBuffer);
175-
Optional.ofNullable(getSendLogExceptionStrategy()).ifPresent(
176-
strategy -> strategy.doSend(() -> {
177-
doSend(aggEntity, rpcSenderConfig);
178-
return null;
179-
}, sendBuffer));
170+
public void startCacheConsumer(){
171+
this.ctxLock.lock();
172+
try {
173+
if (consumers >= maxCacheConsume) {
174+
throw new IllegalStateException("Over the limit number of cache consumers: [" + maxCacheConsume + "]");
180175
}
181-
});
182-
this.consumers ++;
176+
String id = UUID.randomUUID().toString();
177+
SendBuffer<T> sendBuffer = new ImmutableSendBuffer<>(sendBufSize);
178+
SendLogCacheConsumer<T> consumer = new SendLogCacheConsumer<T>(id, logCache, sendBuffer, rpcSenderConfig) {
179+
@Override
180+
protected void onFlushAndSend(SendBuffer<T> sendBuffer) {
181+
// First to aggregate the buffer
182+
E aggEntity = aggregateBuffer(sendBuffer);
183+
Optional.ofNullable(getSendLogExceptionStrategy()).ifPresent(
184+
strategy -> strategy.doSend(() -> {
185+
doSend(aggEntity, rpcSenderConfig);
186+
return null;
187+
}, sendBuffer));
188+
}
189+
};
190+
Future<?> future = this.consumePool.submit(consumer);
191+
consumer.setFuture(future);
192+
sendLogCacheConsumers.put(id, consumer);
193+
this.consumers++;
194+
} finally {
195+
this.ctxLock.unlock();
196+
}
183197
}
184198

185199
public SendLogCache<T> getLogCache(){
186200
return this.logCache;
187201
}
188202

189-
public void destroyCacheConsumer(){
203+
/**
204+
* Destroy cache consumer
205+
* @param id id
206+
*/
207+
public void destroyCacheConsumer(String id){
208+
SendLogCacheConsumer<T> consumer = sendLogCacheConsumers.remove(id);
209+
consumer.shutdown();
210+
}
190211

212+
/**
213+
* Destroy all the consumers
214+
*/
215+
public void destroyCacheConsumers(){
216+
this.ctxLock.lock();
217+
try {
218+
sendLogCacheConsumers.forEach( (key, consumer)-> consumer.shutdown());
219+
} finally {
220+
this.ctxLock.unlock();
221+
}
191222
}
192223
}
193224
/**

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/RpcSenderConfig.java

Lines changed: 162 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,181 @@ public class RpcSenderConfig {
88
/**
99
* Send address
1010
*/
11-
private String sendAddress;
11+
private String address;
12+
13+
/**
14+
* Key of token-code
15+
*/
16+
private String tokenCodeKey = "Token-Code";
17+
18+
/**
19+
* Key of token-user
20+
*/
21+
private String tokenUserKey = "Token-User";
22+
23+
/**
24+
* Token user
25+
*/
26+
private String tokenUser = System.getProperty("user.name");
27+
28+
/**
29+
* Token code
30+
*/
31+
private String tokenCode = "STREAM_LOG";
32+
33+
/**
34+
* Timeout of connecting
35+
*/
36+
private int connectionTimeout = 3000;
37+
38+
/**
39+
* Timeout of reading from socket
40+
*/
41+
private int socketTimeout = 15000;
42+
1243
/**
1344
* Size of send cache
1445
*/
15-
private int sendCacheSize = 150;
46+
private int cacheSize = 150;
1647

1748
/**
1849
* Size of send buffer
1950
*/
20-
private int sendBufferSize = 50;
51+
private int bufferSize = 50;
2152

2253
/**
2354
* Expire time of send buffer
2455
*/
25-
private long sendBufferExpireTimeInSec = -1;
56+
private long bufferExpireTimeInSec = -1;
57+
58+
/**
59+
* Retry count of sending
60+
*/
61+
private int sendRetryCnt = 3;
62+
63+
/**
64+
* The time for server recovery
65+
*/
66+
private int serverRecoveryTimeInSec = 5;
67+
/**
68+
* Retry max delay time of sender
69+
*/
70+
private int maxDelayTimeInSec;
71+
72+
/**
73+
* Max number of consuming thread
74+
*/
75+
private int maxConsumeThread = 10;
76+
77+
public long getBufferExpireTimeInSec() {
78+
return bufferExpireTimeInSec;
79+
}
80+
81+
public void setBufferExpireTimeInSec(long bufferExpireTimeInSec) {
82+
this.bufferExpireTimeInSec = bufferExpireTimeInSec;
83+
}
84+
85+
public String getAddress() {
86+
return address;
87+
}
88+
89+
public void setAddress(String address) {
90+
this.address = address;
91+
}
92+
93+
public int getCacheSize() {
94+
return cacheSize;
95+
}
96+
97+
public void setCacheSize(int cacheSize) {
98+
this.cacheSize = cacheSize;
99+
}
100+
101+
public int getBufferSize() {
102+
return bufferSize;
103+
}
104+
105+
public void setBufferSize(int bufferSize) {
106+
this.bufferSize = bufferSize;
107+
}
108+
109+
public int getSendRetryCnt() {
110+
return sendRetryCnt;
111+
}
112+
113+
public void setSendRetryCnt(int sendRetryCnt) {
114+
this.sendRetryCnt = sendRetryCnt;
115+
}
116+
117+
public int getConnectionTimeout() {
118+
return connectionTimeout;
119+
}
120+
121+
public void setConnectionTimeout(int connectionTimeout) {
122+
this.connectionTimeout = connectionTimeout;
123+
}
124+
125+
public int getSocketTimeout() {
126+
return socketTimeout;
127+
}
128+
129+
public void setSocketTimeout(int socketTimeout) {
130+
this.socketTimeout = socketTimeout;
131+
}
132+
133+
public int getMaxConsumeThread() {
134+
return maxConsumeThread;
135+
}
136+
137+
public void setMaxConsumeThread(int maxConsumeThread) {
138+
this.maxConsumeThread = maxConsumeThread;
139+
}
140+
141+
public String getTokenCodeKey() {
142+
return tokenCodeKey;
143+
}
144+
145+
public void setTokenCodeKey(String tokenCodeKey) {
146+
this.tokenCodeKey = tokenCodeKey;
147+
}
148+
149+
public String getTokenCode() {
150+
return tokenCode;
151+
}
152+
153+
public void setTokenCode(String tokenCode) {
154+
this.tokenCode = tokenCode;
155+
}
156+
157+
public int getMaxDelayTimeInSec() {
158+
return maxDelayTimeInSec;
159+
}
160+
161+
public void setMaxDelayTimeInSec(int maxDelayTimeInSec) {
162+
this.maxDelayTimeInSec = maxDelayTimeInSec;
163+
}
164+
165+
public String getTokenUserKey() {
166+
return tokenUserKey;
167+
}
168+
169+
public void setTokenUserKey(String tokenUserKey) {
170+
this.tokenUserKey = tokenUserKey;
171+
}
172+
173+
public String getTokenUser() {
174+
return tokenUser;
175+
}
176+
177+
public void setTokenUser(String tokenUser) {
178+
this.tokenUser = tokenUser;
179+
}
26180

27-
public long getSendBufferExpireTimeInSec() {
28-
return sendBufferExpireTimeInSec;
181+
public int getServerRecoveryTimeInSec() {
182+
return serverRecoveryTimeInSec;
29183
}
30184

31-
public void setSendBufferExpireTimeInSec(long sendBufferExpireTimeInSec) {
32-
this.sendBufferExpireTimeInSec = sendBufferExpireTimeInSec;
185+
public void setServerRecoveryTimeInSec(int serverRecoveryTimeInSec) {
186+
this.serverRecoveryTimeInSec = serverRecoveryTimeInSec;
33187
}
34188
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/SendLogCacheConsumer.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.buf.SendBuffer;
44
import com.webank.wedatasphere.streamis.jobmanager.log.entities.LogElement;
55

6+
import java.util.concurrent.Future;
67
import java.util.concurrent.TimeUnit;
78

89
/**
@@ -29,14 +30,20 @@ public abstract class SendLogCacheConsumer<T extends LogElement> implements Runn
2930
private final SendBuffer<T> sendBuffer;
3031

3132
private final String id;
33+
34+
/**
35+
* Future for execution
36+
*/
37+
private Future<?> future;
38+
3239
public SendLogCacheConsumer(String id, SendLogCache<T> cache,
3340
SendBuffer<T> sendBuffer,
3441
RpcSenderConfig rpcSenderConfig){
3542
this.id = id;
3643
this.cache = cache;
3744
this.sendBuffer = sendBuffer;
38-
this.bufferExpireTimeInMills = rpcSenderConfig.getSendBufferExpireTimeInSec() > 0 ? TimeUnit.SECONDS
39-
.toMillis(rpcSenderConfig.getSendBufferExpireTimeInSec()) : -1;
45+
this.bufferExpireTimeInMills = rpcSenderConfig.getBufferExpireTimeInSec() > 0 ? TimeUnit.SECONDS
46+
.toMillis(rpcSenderConfig.getBufferExpireTimeInSec()) : -1;
4047

4148
}
4249

@@ -88,6 +95,14 @@ public void shutdown(){
8895
this.isTerminated = true;
8996
}
9097

98+
public Future<?> getFuture() {
99+
return future;
100+
}
101+
102+
public void setFuture(Future<?> future) {
103+
this.future = future;
104+
}
105+
91106
private long requireNewFlushTime(){
92107
return bufferExpireTimeInMills > 0 ? System.currentTimeMillis() + bufferExpireTimeInMills : -1;
93108
}

0 commit comments

Comments
 (0)