Skip to content

Commit 5f755e4

Browse files
committed
Fix the problem in send buffer and consumer.
1 parent 0cedc39 commit 5f755e4

File tree

11 files changed

+91
-18
lines changed

11 files changed

+91
-18
lines changed

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.apache.logging.log4j.core.layout.PatternLayout;
1919

2020
import java.io.Serializable;
21+
import java.nio.Buffer;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.Objects;

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfigBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ public abstract class StreamisLogAppenderConfigBuilder {
1717
* @param senderConfig sender config
1818
* @return
1919
*/
20-
public abstract StreamisLogAppenderConfig build(String applicationName,
21-
Filter filter, RpcLogSenderConfig senderConfig);
20+
public StreamisLogAppenderConfig build(String applicationName,
21+
Filter filter, RpcLogSenderConfig senderConfig){
22+
return null;
23+
}
2224

23-
abstract Map<String, String> loadConfigProps();
25+
26+
public abstract Map<String, String> loadConfigProps();
2427
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/AbstractRpcLogSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ public int drainLogsTo(SendBuffer<T> sendBuffer, int maxElements) {
421421

422422
@SuppressWarnings("unchecked")
423423
private int sendBuf(SendBuffer<T> sendBuffer, Object[] items, int takeIndex, int len){
424-
int send = sendBuffer.writeBuf((T[]) items, takeIndex, len);
424+
int send = sendBuffer.writeBuf(items, takeIndex, len);
425425
if (send < len){
426426
// Buffer full exception
427427
exceptionListener.onException(this, null, "The sender buffer is full," +

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/SendLogCacheConsumer.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,16 @@ public SendLogCacheConsumer(String id, SendLogCache<T> cache,
5353
public void run() {
5454
int remain;
5555
long expireTimeInMills = requireNewFlushTime();
56+
int capacity = sendBuffer.capacity();
5657
while (!this.isTerminated) {
5758
try {
58-
if ((expireTimeInMills > 0 && expireTimeInMills >= System.currentTimeMillis())
59-
|| (remain = this.sendBuffer.remaining()) <= 0) {
59+
remain = this.sendBuffer.remaining();
60+
if ((expireTimeInMills > 0 && expireTimeInMills <= System.currentTimeMillis()) || remain <= 0) {
6061
// Transient to the read mode
61-
sendBuffer.flip();
62-
onFlushAndSend(sendBuffer);
62+
if (remain < capacity) {
63+
sendBuffer.flip();
64+
onFlushAndSend(sendBuffer);
65+
}
6366
expireTimeInMills = requireNewFlushTime();
6467
if (sendBuffer.isReadMode()) {
6568
// Clear the buffer and transient to the write mode, otherwise continue writing
@@ -84,11 +87,16 @@ public void run() {
8487
if (this.isTerminated && e instanceof InterruptedException){
8588
return;
8689
} else {
87-
System.err.println("SendLogCacheConsumer[" + Thread.currentThread().getName() + "] occurred exception [" + e.getLocalizedMessage() + "]");
90+
System.err.println("SendLogCacheConsumer[" + Thread.currentThread().getName() + "] occurred exception [" + e.getLocalizedMessage() + "]");
8891
// For the unknown exception clear the cache
8992
sendBuffer.clear();
9093
expireTimeInMills = requireNewFlushTime();
9194
}
95+
try {
96+
Thread.sleep(500);
97+
} catch (InterruptedException ex) {
98+
// Ignore
99+
}
92100
}
93101
}
94102
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/StreamisRpcLogSender.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ public StreamisRpcLogSender(String applicationName, RpcLogSenderConfig rpcSender
3030
@Override
3131
protected StreamisLogEvents aggregateBuffer(SendBuffer<StreamisLogEvent> sendBuffer) {
3232
int remain = sendBuffer.remaining();
33-
StreamisLogEvent[] logEvents = new StreamisLogEvent[remain];
34-
sendBuffer.readBuf(logEvents, 0, logEvents.length);
35-
return new StreamisLogEvents(applicationName, logEvents);
33+
if (remain > 0) {
34+
StreamisLogEvent[] logEvents = new StreamisLogEvent[remain];
35+
sendBuffer.readBuf(logEvents, 0, logEvents.length);
36+
return new StreamisLogEvents(applicationName, logEvents);
37+
}
38+
return null;
3639
}
3740
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/buf/AbstractSendBuffer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ final int nextPosition(int offset, Flag accessFlag){
103103
}
104104
if (p + offset > limit){
105105
this.position = limit;
106+
} else {
107+
this.position = p + offset;
106108
}
107109
return p;
108110
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/buf/ImmutableSendBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public void capacity(String newCapacity) {
3131

3232
@Override
3333
@SuppressWarnings("all")
34-
public int writeBuf(E[] elements, int srcIndex, int length) {
34+
public int writeBuf(Object[] elements, int srcIndex, int length) {
3535
if (srcIndex < elements.length){
3636
int startPos = nextPosition(Math.min(elements.length - srcIndex, length), Flag.WRITE_MODE);
3737
if (startPos >= 0){
@@ -45,7 +45,7 @@ public int writeBuf(E[] elements, int srcIndex, int length) {
4545

4646
@Override
4747
@SuppressWarnings("all")
48-
public int readBuf(E[] elements, int srcIndex, int length) {
48+
public int readBuf(Object[] elements, int srcIndex, int length) {
4949
if (srcIndex < elements.length){
5050
int startPos = nextPosition(Math.min(elements.length - srcIndex, length), Flag.READ_MODE);
5151
if (startPos >= 0){

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/buf/SendBuffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public interface SendBuffer<E> {
6464
* @param length the length to read
6565
* @return write num
6666
*/
67-
int writeBuf(E[] elements, int srcIndex, int length);
67+
int writeBuf(Object[] elements, int srcIndex, int length);
6868

6969
/**
7070
* Read buffer element
@@ -79,7 +79,7 @@ public interface SendBuffer<E> {
7979
* @param length the length to write
8080
* @return read num
8181
*/
82-
int readBuf(E[] elements, int srcIndex, int length);
82+
int readBuf(Object[] elements, int srcIndex, int length);
8383

8484
/**
8585
* Compact the buffer, avoid the useless elements

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/AbstractHttpLogSender.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.IOException;
1616
import java.io.InterruptedIOException;
1717
import java.net.UnknownHostException;
18+
import java.util.Optional;
1819
import java.util.concurrent.TimeUnit;
1920
import java.util.concurrent.atomic.AtomicInteger;
2021
import java.util.concurrent.atomic.AtomicLong;
@@ -55,14 +56,15 @@ public int retryCount() {
5556

5657
@Override
5758
public SendLogExceptionStrategy.RetryDescription onException(Exception e, SendBuffer<T> sendBuffer) {
59+
e.printStackTrace();
5860
boolean shouldRetry = false;
5961
// Limit of exception number is the same as the retry times
6062
if (exceptionCounter.incrementAndGet() > retryCount()){
6163
serverRecoveryTimePoint.set(System.currentTimeMillis() +
6264
TimeUnit.SECONDS.toMillis(rpcSenderConfig.getServerRecoveryTimeInSec()));
6365
} else {
6466
for (Class<?> retryOnException : retryOnExceptions) {
65-
if (retryOnException.isAssignableFrom(e.getClass())) {
67+
if (retryOnException.equals(e.getClass())) {
6668
shouldRetry = true;
6769
break;
6870
}
@@ -76,7 +78,7 @@ public SendLogExceptionStrategy.RetryDescription onException(Exception e, SendBu
7678
sendBuffer.compact( element -> element.mark() > 1);
7779
shouldRetry = false;
7880
}
79-
exceptionListener.onException(sender, e, null);
81+
Optional.ofNullable(exceptionListener).ifPresent(listener -> listener.onException(sender, e, null));
8082
return new RetryDescription(shouldRetry);
8183
}
8284
};
@@ -97,6 +99,7 @@ protected void doSend(E aggregatedEntity, RpcLogSenderConfig rpcSenderConfig) th
9799
return;
98100
}
99101
}
102+
100103
EntityPostAction<E> postAction = new EntityPostAction<>(rpcSenderConfig.getAddress(), aggregatedEntity);
101104
RpcAuthConfig authConfig = rpcSenderConfig.getAuthConfig();
102105
postAction.getRequestHeaders().put(authConfig.getTokenUserKey(), authConfig.getTokenUser());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
public class StreamisLogAppenderTest {
7+
private static final Logger LOG = LoggerFactory.getLogger(StreamisLogAppenderTest.class);
8+
public static void main(String[] args) throws InterruptedException {
9+
while(true){
10+
for(int i = 0; i < 100; i ++){
11+
LOG.info("Stream Log appender test");
12+
}
13+
Thread.sleep(1000);
14+
}
15+
}
16+
}

0 commit comments

Comments
 (0)