Skip to content

Commit 411d226

Browse files
authored
Merge pull request #359 from cthermolia-grnet/logJobId
ARGO-3977 Improve logs generated by the flink jobs
2 parents 19d4273 + fdc9e17 commit 411d226

File tree

13 files changed

+115
-38
lines changed

13 files changed

+115
-38
lines changed

flink_jobs_v2/ams_ingest_metric/src/main/java/argo/streaming/AmsIngestMetric.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535

3636
import argo.avro.MetricData;
3737
import argo.avro.MetricDataOld;
38+
import org.apache.flink.api.common.JobID;
39+
import org.slf4j.MDC;
3840

3941
/**
4042
* Flink Job : Stream metric data from ARGO messaging to Hbase job required cli
@@ -59,6 +61,7 @@ public class AmsIngestMetric {
5961

6062
static Logger LOG = LoggerFactory.getLogger(AmsIngestMetric.class);
6163
private static String runDate;
64+
6265
/**
6366
* Check if flink job has been called with ams rate params
6467
*/
@@ -109,6 +112,7 @@ public static boolean hasArgs(String[] reqArgs, ParameterTool paramTool) {
109112

110113
public static void main(String[] args) throws Exception {
111114

115+
configJID();
112116
// Create flink execution environment
113117
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
114118
see.setParallelism(1);
@@ -138,6 +142,7 @@ public static void main(String[] args) throws Exception {
138142
if (runDate != null) {
139143
runDate = runDate + "T00:00:00.000Z";
140144
}
145+
141146

142147
// Check if checkpointing is desired
143148
if (hasCheckArgs(parameterTool)) {
@@ -253,5 +258,12 @@ public void flatMap(String value, Collector<MetricData> out) throws Exception {
253258

254259
see.execute(jobTitleSB.toString());
255260
}
261+
private static String getJID() {
262+
return JobID.generate().toString();
263+
}
256264

265+
private static void configJID() { //config the JID in the log4j.properties
266+
String jobId = getJID();
267+
MDC.put("JID", jobId);
268+
}
257269
}

flink_jobs_v2/ams_ingest_metric/src/main/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ log4j.rootLogger=INFO, console
2020

2121
log4j.appender.console=org.apache.log4j.ConsoleAppender
2222
log4j.appender.console.layout=org.apache.log4j.PatternLayout
23-
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
23+
log4j.appender.console.layout.ConversionPattern= %d{HH:mm:ss,SSS} %X{JID} %-5p %-60c %x - %m%n

flink_jobs_v2/ams_ingest_sync/src/main/java/argo/streaming/AmsIngestSync.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import ams.connector.ArgoMessagingSource;
44
import java.util.concurrent.TimeUnit;
5+
import org.apache.flink.api.common.JobID;
56

67
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
78
import org.apache.flink.api.common.time.Time;
@@ -10,6 +11,7 @@
1011
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
14+
import org.slf4j.MDC;
1315

1416
/**
1517
* Flink Streaming JOB for Ingesting Sync Data to HDFS job required cli
@@ -30,7 +32,6 @@ public class AmsIngestSync {
3032
// setup logger
3133
static Logger LOG = LoggerFactory.getLogger(AmsIngestSync.class);
3234
private static String runDate;
33-
3435
/**
3536
* Check if a list of expected cli arguments have been provided to this
3637
* flink job
@@ -56,6 +57,7 @@ public static boolean hasAmsRateArgs(ParameterTool paramTool) {
5657

5758
// main job function
5859
public static void main(String[] args) throws Exception {
60+
configJID();
5961

6062
// Create flink execution enviroment
6163
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -76,6 +78,8 @@ public static void main(String[] args) throws Exception {
7678
if (runDate != null) {
7779
runDate = runDate + "T00:00:00.000Z";
7880
}
81+
82+
7983

8084
// set ams client batch and interval to default values
8185
int batch = 1;
@@ -118,5 +122,12 @@ public static void main(String[] args) throws Exception {
118122
see.execute(jobTitleSB.toString());
119123

120124
}
125+
private static String getJID() {
126+
return JobID.generate().toString();
127+
}
121128

129+
private static void configJID() {
130+
String jobId = getJID();
131+
MDC.put("JID", jobId);
132+
}
122133
}

flink_jobs_v2/ams_ingest_sync/src/main/java/argo/streaming/SyncHDFSOutputFormat.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,13 @@ class SyncHDFSOutputFormat implements OutputFormat<String> {
4444

4545
// setup logger
4646
static Logger LOG = LoggerFactory.getLogger(SyncHDFSOutputFormat.class);
47-
47+
4848
private static final long serialVersionUID = 1L;
4949

5050
private URI basePath;
5151
private org.apache.hadoop.conf.Configuration hadoopConf;
5252
private FileSystem hdfs;
53+
5354

5455
public void setBasePath(String url) throws URISyntaxException {
5556
this.basePath = new URI(url);
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
log4j.rootLogger=INFO, console
20+
21+
log4j.appender.console=org.apache.log4j.ConsoleAppender
22+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
23+
log4j.appender.console.layout.ConversionPattern= %d{HH:mm:ss,SSS} %X{JID} %-5p %-60c %x - %m%n

flink_jobs_v2/batch_multi/src/main/java/argo/batch/ArgoMultiJob.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@
5151
import trends.status.ServiceTrendsCounter;
5252

5353
import java.util.List;
54+
import org.apache.flink.api.common.JobID;
5455
import org.joda.time.DateTime;
5556
import org.joda.time.DateTimeZone;
57+
import org.slf4j.MDC;
5658

5759
/**
5860
* Implements an ARGO Status Batch Job in flink
@@ -84,7 +86,8 @@
8486
*/
8587
public class ArgoMultiJob {
8688

87-
static Logger LOG = LoggerFactory.getLogger(ArgoMultiJob.class);
89+
static Logger LOG = LoggerFactory.getLogger( ArgoMultiJob.class);
90+
8891
private static String dbURI;
8992
private static String reportID;
9093
private static Integer rankNum;
@@ -97,9 +100,10 @@ public class ArgoMultiJob {
97100
private static boolean calcStatusTrends = false;
98101
private static boolean calcFlipFlops = false;
99102
private static boolean calcTagTrends = false;
100-
103+
101104
public static void main(String[] args) throws Exception {
102105

106+
configJID();
103107
final ParameterTool params = ParameterTool.fromArgs(args);
104108

105109
// set up the execution environment
@@ -142,6 +146,7 @@ public static void main(String[] args) throws Exception {
142146
List<String> confData = confDS.collect();
143147
ReportManager cfgMgr = new ReportManager();
144148
cfgMgr.loadJsonString(confData);
149+
145150
enableComputations(cfgMgr.activeComputations, params);
146151

147152
DataSource<String> opsDS = env.fromElements(amr.getResourceJSON(ApiResource.OPS));
@@ -226,7 +231,7 @@ public static void main(String[] args) throws Exception {
226231
DataSet<StatusMetric> stDetailDS = mdataTotalDS.groupBy("group", "service", "hostname", "metric")
227232
.sortGroup("timestamp", Order.ASCENDING).reduceGroup(new CalcPrevStatus(params))
228233
.withBroadcastSet(mpsDS, "mps").withBroadcastSet(recDS, "rec").withBroadcastSet(opsDS, "ops");
229-
234+
230235
//Create StatusMetricTimeline dataset for endpoints
231236
DataSet<StatusTimeline> statusMetricTimeline = stDetailDS.groupBy("group", "service", "hostname", "metric").sortGroup("timestamp", Order.ASCENDING)
232237
.reduceGroup(new CalcMetricTimeline(params)).withBroadcastSet(mpsDS, "mps").withBroadcastSet(opsDS, "ops")
@@ -466,7 +471,6 @@ private static void enableComputations(ReportManager.ActiveComputations activeCo
466471
calcStatusTrends = isOFF(params, "calcStatusTrends", activeComputations);
467472
calcFlipFlops = isOFF(params, "calcFlipFlops", activeComputations);
468473
calcTagTrends = isOFF(params, "calcTagTrends", activeComputations);
469-
System.out.println("calcTagTrends--- "+calcTagTrends);
470474

471475
if (!calcStatus && !calcAR && !calcStatusTrends && !calcFlipFlops && !calcStatusTrends) {
472476
System.exit(0);
@@ -496,4 +500,14 @@ public static boolean isOFF(ParameterTool params, String paramName, ReportManage
496500
}
497501
}
498502
}
503+
504+
private static String getJID() {
505+
return JobID.generate().toString();
506+
}
507+
508+
private static void configJID() {//config the JID in the log4j.properties
509+
String jobId=getJID();
510+
MDC.put("JID", jobId);
511+
512+
}
499513
}

flink_jobs_v2/batch_multi/src/main/java/argo/batch/PickEndpoints.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class PickEndpoints extends RichFlatMapFunction<MetricData, StatusMetric>
3737

3838
final ParameterTool params;
3939

40+
4041
public PickEndpoints(ParameterTool params) {
4142
this.params = params;
4243
}
@@ -108,6 +109,7 @@ public void open(Configuration parameters) throws IOException, ParseException {
108109
if (!this.thr.get(0).isEmpty()) {
109110
this.thrMgr.parseJSON(this.thr.get(0));
110111
}
112+
111113
}
112114

113115
@Override

flink_jobs_v2/batch_multi/src/main/java/utils/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public static boolean isPreviousDate(String format, Date nowDate, Date firstDate
7474
}
7575
}
7676

77-
public static boolean checkParameters(ParameterTool params, String... vars) {
77+
public static boolean checkParameters(ParameterTool params,String... vars) {
7878

7979
for (String var : vars) {
8080

flink_jobs_v2/batch_multi/src/main/resources/log4j.properties

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ log4j.rootLogger=INFO, console
2020

2121
log4j.appender.console=org.apache.log4j.ConsoleAppender
2222
log4j.appender.console.layout=org.apache.log4j.PatternLayout
23-
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
23+
log4j.appender.console.layout.ConversionPattern= %d{HH:mm:ss,SSS} %X{JID} %-5p %-60c %x - %m%n
24+

flink_jobs_v2/batch_multi/src/test/java/argo/batch/ArgoMultiJobTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.ArrayList;
2828
import java.util.List;
2929
import java.util.stream.Collectors;
30+
import org.apache.flink.api.common.JobID;
3031
import org.apache.flink.api.common.operators.Order;
3132
import org.apache.flink.api.java.DataSet;
3233
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -77,7 +78,7 @@ public class ArgoMultiJobTest {
7778
private static boolean calcStatusTrends = false;
7879
private static boolean calcFlipFlops = false;
7980
private static boolean calcTagTrends = false;
80-
81+
8182
public ArgoMultiJobTest() {
8283
}
8384

@@ -104,7 +105,7 @@ public void testMain() throws Exception {
104105
List<String> confData = confDS.collect();
105106
ReportManager cfgMgr = new ReportManager();
106107
cfgMgr.loadJsonString(confData);
107-
108+
108109
enableComputations(cfgMgr.activeComputations, params);
109110
DataSource<String> apsDS = env.fromElements(amr.getResourceJSON(ApiResource.AGGREGATION));
110111
DataSource<String> opsDS = env.fromElements(amr.getResourceJSON(ApiResource.OPS));

0 commit comments

Comments
 (0)