Skip to content

Commit 25f772d

Browse files
committed
Fix serialization and configuration problem.
1 parent e0d3a53 commit 25f772d

File tree

10 files changed

+101
-25
lines changed

10 files changed

+101
-25
lines changed

streamis-jobmanager/streamis-job-log/flink-streamis-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/flink/FlinkStreamisConfigAutowired.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.apache.commons.lang3.StringUtils;
66
import org.apache.flink.configuration.Configuration;
77
import org.apache.flink.configuration.GlobalConfiguration;
8+
import org.apache.flink.runtime.util.EnvironmentInformation;
89
import org.apache.flink.yarn.configuration.YarnConfigOptions;
910
import org.apache.logging.log4j.Level;
1011
import org.apache.logging.log4j.core.Filter;
@@ -29,11 +30,13 @@ public class FlinkStreamisConfigAutowired implements StreamisConfigAutowired {
2930

3031
public FlinkStreamisConfigAutowired(){
3132
// First to load configuration
32-
this.configuration = loadConfiguration();
33+
// We should sleep and wait for append of the flink-yaml.conf
3334
}
3435
@Override
3536
public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception{
36-
String applicationName = this.configuration.getString(YarnConfigOptions.APPLICATION_NAME);
37+
this.configuration = loadConfiguration();
38+
String applicationName =
39+
this.configuration.getString(YarnConfigOptions.APPLICATION_NAME);
3740
if (StringUtils.isNotBlank(applicationName)){
3841
builder.setAppName(applicationName);
3942
}
@@ -48,13 +51,17 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui
4851
List<String> filterStrategies = this.configuration.get(LOG_FILTER_STRATEGIES);
4952
for(String filterStrategy : filterStrategies){
5053
if ("LevelMatch".equals(filterStrategy)){
51-
builder.withFilter(LevelMatchFilter.newBuilder()
54+
builder.withFilter(LevelMatchFilter.newBuilder().setOnMatch(Filter.Result.ACCEPT).setOnMismatch(Filter.Result.DENY)
5255
.setLevel(Level.getLevel(this.configuration.getString(LOG_FILTER_LEVEL_MATCH))).build());
5356
} else if ("RegexMatch".equals(filterStrategy)){
5457
builder.withFilter(RegexFilter.createFilter( this.configuration.getString(LOG_FILTER_REGEX),
5558
null, true, Filter.Result.ACCEPT, Filter.Result.DENY));
5659
}
5760
}
61+
String hadoopUser = EnvironmentInformation.getHadoopUser();
62+
if (hadoopUser.equals("<no hadoop dependency found>") || hadoopUser.equals("<unknown>")){
63+
hadoopUser = "";
64+
}
5865
return builder.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT))
5966
.setRpcSocketTimeout(this.configuration.getInteger(LOG_RPC_SOCKET_TIMEOUT))
6067
.setRpcSendRetryCnt(this.configuration.getInteger(LOG_RPC_SEND_RETRY_COUNT))
@@ -63,7 +70,8 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui
6370
.setRpcAuthTokenCodeKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE_KEY))
6471
.setRpcAuthTokenUserKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER_KEY))
6572
.setRpcAuthTokenCode(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE))
66-
.setRpcAuthTokenUser(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER))
73+
.setRpcAuthTokenUser(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER,
74+
hadoopUser))
6775
.setRpcCacheSize(this.configuration.getInteger(LOG_RPC_CACHE_SIZE))
6876
.setRpcCacheMaxConsumeThread(this.configuration.getInteger(LOG_PRC_CACHE_MAX_CONSUME_THREAD))
6977
.setRpcBufferSize(this.configuration.getInteger(LOG_RPC_BUFFER_SIZE))
@@ -86,11 +94,12 @@ public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Bui
8694
* the configuration directory of Flink yarn container is always ".",
8795
* @return configuration
8896
*/
89-
private Configuration loadConfiguration(){
90-
String configDir = System.getenv("FLINK_CONF_DIR");
91-
if (null == configDir){
92-
configDir = ".";
93-
}
97+
private synchronized Configuration loadConfiguration(){
98+
// String configDir = System.getenv("FLINK_CONF_DIR");
99+
// if (null == configDir){
100+
// configDir = ".";
101+
// }
102+
String configDir = ".";
94103
Properties properties = System.getProperties();
95104
Enumeration<?> enumeration = properties.propertyNames();
96105
Configuration dynamicConfiguration = new Configuration();

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/StreamisRpcLogAppender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") Str
9999
if (null == applicationName || applicationName.trim().equals("")){
100100
throw new IllegalArgumentException("Application name cannot be empty");
101101
}
102-
LOGGER.info("StreamisRpcLogAppender: init with config {}", Json.toJson(logAppenderConfig, null));
103-
return new StreamisRpcLogAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
102+
System.out.println("StreamisRpcLogAppender: init with config" + Json.toJson(logAppenderConfig, null));
103+
return new StreamisRpcLogAppender(name, logAppenderConfig.getFilter(), layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
104104
}
105105

106106
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/config/StreamisLogAppenderConfig.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.ArrayList;
77
import java.util.List;
8+
import java.util.Objects;
89
import java.util.Optional;
910

1011
/**
@@ -47,7 +48,9 @@ public static class Builder{
4748
public Builder(String applicationName, Filter filter,
4849
RpcLogSenderConfig rpcLogSenderConfig){
4950
this.applicationName = applicationName;
50-
this.filters.add(filter);
51+
if (Objects.nonNull(filter)) {
52+
this.filters.add(filter);
53+
}
5154
this.rpcLogSenderConfig = Optional.ofNullable(rpcLogSenderConfig).orElse(new RpcLogSenderConfig());
5255
}
5356

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/SendLogExceptionStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ <V>V doSend(Callable<V> sendOperation, SendBuffer<T> sendBuffer){
4141
if (Objects.isNull(retryDescription) || !retryDescription.canRetry) {
4242
break;
4343
}
44-
4544
}
4645
}
4746
return null;

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/AbstractHttpLogSender.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,22 @@
77
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.buf.SendBuffer;
88
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.http.request.EntityPostAction;
99
import com.webank.wedatasphere.streamis.jobmanager.log.entities.LogElement;
10+
import org.apache.http.HttpResponse;
1011
import org.apache.http.client.HttpClient;
12+
import org.apache.http.client.HttpResponseException;
13+
import org.apache.http.client.methods.CloseableHttpResponse;
1114
import org.apache.http.conn.ConnectTimeoutException;
15+
import org.apache.logging.log4j.core.util.IOUtils;
1216

1317
import javax.net.ssl.SSLException;
18+
import javax.sound.midi.SysexMessage;
19+
import javax.xml.ws.Response;
20+
import java.io.BufferedReader;
1421
import java.io.IOException;
22+
import java.io.InputStreamReader;
1523
import java.io.InterruptedIOException;
1624
import java.net.UnknownHostException;
25+
import java.nio.charset.StandardCharsets;
1726
import java.util.Optional;
1827
import java.util.concurrent.TimeUnit;
1928
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,7 +64,6 @@ public int retryCount() {
5564

5665
@Override
5766
public SendLogExceptionStrategy.RetryDescription onException(Exception e, SendBuffer<T> sendBuffer) {
58-
e.printStackTrace();
5967
boolean shouldRetry = false;
6068
// Limit of exception number is the same as the retry times
6169
if (exceptionCounter.incrementAndGet() > retryCount()){
@@ -68,6 +76,11 @@ public SendLogExceptionStrategy.RetryDescription onException(Exception e, SendBu
6876
break;
6977
}
7078
}
79+
if (!shouldRetry && e instanceof HttpResponseException){
80+
if (((HttpResponseException) e).getStatusCode() < 500){
81+
shouldRetry = true;
82+
}
83+
}
7184
}
7285
if (shouldRetry && !sender.getOrCreateLogCache().isCacheable()){
7386
// Means that the cache is full
@@ -103,8 +116,26 @@ protected void doSend(E aggregatedEntity, RpcLogSenderConfig rpcSenderConfig) th
103116
EntityPostAction<E> postAction = new EntityPostAction<>(rpcSenderConfig.getAddress(), aggregatedEntity);
104117
RpcAuthConfig authConfig = rpcSenderConfig.getAuthConfig();
105118
postAction.getRequestHeaders().put(authConfig.getTokenUserKey(), authConfig.getTokenUser());
106-
// Ignore the response
107-
postAction.execute(this.globalHttpClient);
119+
HttpResponse response = null;
120+
try {
121+
response = postAction.execute(this.globalHttpClient);
122+
int statusCode = response.getStatusLine().getStatusCode();
123+
if (statusCode > 200){
124+
throw new HttpResponseException(statusCode,
125+
IOUtils.toString(new InputStreamReader(
126+
response.getEntity().getContent(), StandardCharsets.UTF_8)));
127+
}
128+
}finally {
129+
// Close the response and release the conn
130+
if (null != response){
131+
if (response instanceof CloseableHttpResponse){
132+
((CloseableHttpResponse)response).close();
133+
} else {
134+
// Destroy the stream
135+
response.getEntity().getContent().close();
136+
}
137+
}
138+
}
108139
// Init the counter
109140
this.exceptionCounter.set(0);
110141
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/request/AbstractHttpAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public HttpResponse execute(HttpClient httpClient) throws IOException {
5757
} catch (URISyntaxException e) {
5858
throw new IllegalArgumentException("URI maybe has wrong format", e);
5959
}
60+
requestHeaders.forEach(requestBase::setHeader);
6061
return httpClient.execute(requestBase);
6162
}
6263
}

streamis-jobmanager/streamis-job-log/streamis-job-log-collector/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/collector/sender/http/request/EntityPostAction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ private void addEntityBody(MultipartEntityBuilder builder, String prefix, JsonNo
7272
}
7373
} else if (node instanceof ValueNode){
7474
ContentType strContent = ContentType.create("text/plain", StandardCharsets.UTF_8);
75-
System.out.println("p: " + prefix + ", data: " + node.asText());
7675
builder.addTextBody(prefix, node.asText(), strContent);
7776
}
7877
}

streamis-jobmanager/streamis-job-log/streamis-job-log-common/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/entities/StreamisLogEvent.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@ public class StreamisLogEvent implements LogElement, Serializable {
1212
/**
1313
* Log time
1414
*/
15-
private final long logTimeInMills;
15+
private long logTimeInMills;
1616

1717
/**
1818
* Log content
1919
*/
20-
private final String content;
20+
private String content;
2121

2222
/**
2323
* Mark
2424
*/
2525
private int mark;
2626

27+
public StreamisLogEvent(){
28+
29+
}
2730
public StreamisLogEvent(String content, long logTimeInMills){
2831
this.content = content;
2932
this.logTimeInMills = logTimeInMills;
@@ -53,7 +56,19 @@ public int mark() {
5356
return this.mark;
5457
}
5558

56-
public void mark(int mark){
59+
public void setLogTimeStamp(long logTimeInMills) {
60+
this.logTimeInMills = logTimeInMills;
61+
}
62+
63+
public void setContent(String content) {
64+
this.content = content;
65+
}
66+
67+
public void setMark(int mark) {
5768
this.mark = mark;
5869
}
70+
71+
public void setSequenceId(int sequenceId){
72+
// Ignore
73+
}
5974
}

streamis-jobmanager/streamis-job-log/streamis-job-log-common/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/entities/StreamisLogEvents.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ public class StreamisLogEvents implements LogElement, Serializable {
1313
/**
1414
* Log time
1515
*/
16-
private final long logTimeInMills;
16+
private long logTimeInMills;
1717

18-
private final StreamisLogEvent[] events;
18+
private StreamisLogEvent[] events;
19+
public StreamisLogEvents(){
1920

21+
}
2022
public StreamisLogEvents(String applicationName, StreamisLogEvent[] events){
2123
this.appName = applicationName;
2224
this.events = events;
@@ -69,4 +71,20 @@ public String getAppName() {
6971
public StreamisLogEvent[] getEvents() {
7072
return events;
7173
}
74+
75+
public void setAppName(String appName) {
76+
this.appName = appName;
77+
}
78+
79+
public void setLogTimeStamp(long logTimeInMills) {
80+
this.logTimeInMills = logTimeInMills;
81+
}
82+
83+
public void setEvents(StreamisLogEvent[] events) {
84+
this.events = events;
85+
}
86+
87+
public void setSequenceId(int sequenceId){
88+
// Ignore
89+
}
7290
}

streamis-jobmanager/streamis-job-log/streamis-job-log-server/src/main/java/com/webank/wedatasphere/streamis/jobmanager/log/server/storage/bucket/Log4j2JobLogBucket.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,13 @@ private synchronized Logger initLogger(String bucketName, JobLogBucketConfig con
120120
// .withFileOwner()
121121
.withFileName(fileName)
122122
.withFilePattern(resolveFilePattern(fileName, config.getBucketPartCompress()))
123+
.withPolicy(SizeBasedTriggeringPolicy.createPolicy(config.getMaxBucketActivePartSize() + "MB"))
123124
.withStrategy(createRolloverStrategy(log4jConfig, fileName, ROLLOVER_MAX.getValue(), config.getBucketPartHoldTimeInDay()))
124125
.setConfiguration(log4jConfig)
125126
.build();
126127
appender.start();
127128
log4jConfig.addAppender(appender);
128-
LoggerConfig loggerConfig = LoggerConfig.newBuilder().withLevel(Level.ALL)
129+
LoggerConfig loggerConfig = LoggerConfig.newBuilder().withAdditivity(false).withLevel(Level.ALL)
129130
.withRefs(new AppenderRef[]{
130131
AppenderRef.createAppenderRef(bucketName, null, null)
131132
})
@@ -251,13 +252,13 @@ private RolloverStrategy createRolloverStrategy(Configuration configuration,
251252
*/
252253
private String resolveFileName(String bucketRootPath, String bucketName){
253254
// {projectName}.{jobName}
254-
String fileName = FilenameUtils.normalize(bucketName.substring(bucketName.indexOf(".")));
255+
String fileName = FilenameUtils.normalize(bucketName);
255256
String basePath = bucketRootPath;
256257
if (!basePath.endsWith("/")){
257258
basePath += "/";
258259
}
259260
basePath += fileName.replace(".", "/");
260-
return basePath + "/" + fileName;
261+
return basePath + "/" + fileName.substring(bucketName.indexOf(".") + 1) + ".log";
261262
}
262263

263264
/**

0 commit comments

Comments
 (0)