Skip to content

Commit 7474e46

Browse files
authored
Merge pull request #370 from ARGOeu/devel
Devel
2 parents 615f8cb + 9bb5e7e commit 7474e46

File tree

21 files changed

+253
-859
lines changed

21 files changed

+253
-859
lines changed

flink_jobs_v2/ApiResourceManager/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
<parent>
55
<groupId>flink.jobs.v2</groupId>
66
<artifactId>flink_jobs_v2</artifactId>
7-
<version>2.1.0</version>
7+
<version>2.1.1</version>
88
</parent>
9-
<version>2.1.0</version>
9+
<version>2.1.1</version>
1010
<groupId>api.resource.manager</groupId>
1111
<artifactId>ApiResourceManager</artifactId>
1212
<packaging>jar</packaging>
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package argo.amr;
22

33
public enum ApiResource {
4-
CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS
4+
CONFIG, OPS, METRIC, AGGREGATION, THRESHOLDS, TOPOENDPOINTS, TOPOGROUPS, WEIGHTS, DOWNTIMES, RECOMPUTATIONS, MTAGS, TENANTFEED
55
}

flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResourceManager.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class ApiResourceManager {
3333
private String weightsID;
3434
private RequestManager requestManager;
3535
private ApiResponseParser apiResponseParser;
36+
private boolean isCombined;
3637
//private boolean verify;
3738
//private int timeoutSec;
3839

@@ -47,8 +48,8 @@ public ApiResourceManager(String endpoint, String token) {
4748
this.reportID = "";
4849
this.date = "";
4950
this.weightsID = "";
50-
this.tenant="";
51-
this.egroup="";
51+
this.tenant = "";
52+
this.egroup = "";
5253
this.requestManager = new RequestManager("", this.token);
5354
this.apiResponseParser = new ApiResponseParser(this.reportName, this.metricID, this.aggregationID, this.opsID, this.threshID, this.tenant, this.egroup);
5455
}
@@ -182,8 +183,6 @@ public String getEgroup() {
182183
public void setEgroup(String egroup) {
183184
this.egroup = egroup;
184185
}
185-
186-
187186

188187
/**
189188
* Retrieves the remote report configuration based on reportID main class
@@ -297,7 +296,6 @@ public void getRemoteDowntimes() {
297296
String path = "https://%s/api/v2/downtimes?date=%s";
298297
String fullURL = String.format(path, this.endpoint, this.date);
299298
String content = this.requestManager.getResource(fullURL);
300-
301299
this.data.put(ApiResource.DOWNTIMES, this.apiResponseParser.getJsonData(content, false));
302300

303301
}
@@ -353,8 +351,8 @@ public void parseReport() {
353351
this.opsID = this.apiResponseParser.getOpsID();
354352
this.threshID = this.apiResponseParser.getThreshID();
355353
this.reportName = this.apiResponseParser.getReportName();
356-
this.tenant=this.apiResponseParser.getTenant();
357-
this.egroup=this.apiResponseParser.getEgroup();
354+
this.tenant = this.apiResponseParser.getTenant();
355+
this.egroup = this.apiResponseParser.getEgroup();
358356
}
359357

360358
/**
@@ -458,12 +456,43 @@ public MetricProfile[] getListMetrics() {
458456
return rArr;
459457
}
460458

459+
/**
460+
* Retrieves the remote report configuration based on reportID main class
461+
* attribute and stores the content in the enum map
462+
*/
463+
public void getRemoteTenantFeed() {
464+
String path = "https://%s/api/v2/feeds/data";
465+
String fullURL = String.format(path, this.endpoint);
466+
String content = this.requestManager.getResource(fullURL);
467+
if (content != null) {
468+
this.data.put(ApiResource.TENANTFEED, this.apiResponseParser.getJsonData(content, true));
469+
}
470+
}
471+
472+
public String[] getListTenants() {
473+
List<String> results = new ArrayList<String>();
474+
if (!this.data.containsKey(ApiResource.TENANTFEED)) {
475+
String[] rArr = new String[results.size()];
476+
rArr = results.toArray(rArr);
477+
return rArr;
478+
}
479+
480+
String content = this.data.get(ApiResource.TENANTFEED);
481+
results = this.apiResponseParser.getListTenants(content);
482+
String[] rArr = new String[results.size()];
483+
rArr = results.toArray(rArr);
484+
return rArr;
485+
}
486+
461487
/**
462488
* Executes all steps to retrieve the complete amount of the available
463489
* profile, topology, weights and downtime information from argo-web-api
464490
*/
465491
public void getRemoteAll() {
466492
// Start with report and configuration
493+
if (isCombined) {
494+
this.getRemoteTenantFeed();
495+
}
467496
this.getRemoteConfig();
468497
// parse remote report config to be able to get the other profiles
469498

@@ -489,4 +518,12 @@ public void getRemoteAll() {
489518
this.getRemoteMetricTags();
490519
}
491520

521+
public boolean isIsCombined() {
522+
return isCombined;
523+
}
524+
525+
public void setIsCombined(boolean isCombined) {
526+
this.isCombined = isCombined;
527+
}
528+
492529
}

flink_jobs_v2/ApiResourceManager/src/main/java/argo/amr/ApiResponseParser.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
import com.google.gson.JsonElement;
1515
import com.google.gson.JsonObject;
1616
import com.google.gson.JsonParser;
17+
1718
import java.util.ArrayList;
18-
import java.util.EnumMap;
1919
import java.util.HashMap;
2020
import java.util.List;
2121
import java.util.Map;
@@ -103,13 +103,11 @@ public void setEgroup(String egroup) {
103103
this.egroup = egroup;
104104
}
105105

106-
107106
/**
108107
* Extract first JSON item from data JSON array in api response
109108
*
110109
* @param content JSON content of the full repsonse (status + data)
111110
* @return First available item in data array as JSON string representation
112-
*
113111
*/
114112
public String getJsonData(String content, boolean asArray) {
115113

@@ -118,10 +116,17 @@ public String getJsonData(String content, boolean asArray) {
118116
JsonElement jElement = jsonParser.parse(content);
119117
JsonObject jRoot = jElement.getAsJsonObject();
120118
// Get the data array and the first item
119+
121120
if (asArray) {
121+
if (jRoot.get("data") == null) {
122+
return null;
123+
}
122124
return jRoot.get("data").toString();
123125
}
124126
JsonArray jData = jRoot.get("data").getAsJsonArray();
127+
if (!jData.iterator().hasNext()) {
128+
return null;
129+
}
125130
JsonElement jItem = jData.get(0);
126131
return jItem.toString();
127132
}
@@ -163,6 +168,21 @@ public void parseReport(String content) {
163168

164169
}
165170

171+
public List<String> getListTenants(String content) {
172+
List<String> results = new ArrayList<String>();
173+
174+
JsonParser jsonParser = new JsonParser();
175+
JsonElement jElement = jsonParser.parse(content);
176+
JsonArray jArray = jElement.getAsJsonArray();
177+
JsonObject jRoot = jArray.get(0).getAsJsonObject();
178+
JsonArray tenants = jRoot.get("tenants").getAsJsonArray();
179+
for (int i = 0; i < tenants.size(); i++) {
180+
String jItem = tenants.get(i).getAsString();
181+
results.add(jItem);
182+
}
183+
return results;
184+
}
185+
166186
/**
167187
* Parses the Downtime content retrieved from argo-web-api and provides a
168188
* list of Downtime avro objects to be used in the next steps of the

flink_jobs_v2/ProfilesManager/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
<parent>
55
<groupId>flink.jobs.v2</groupId>
66
<artifactId>flink_jobs_v2</artifactId>
7-
<version>2.1.0</version>
7+
<version>2.1.1</version>
88
</parent>
99
<groupId>profiles.manager</groupId>
1010
<artifactId>ProfilesManager</artifactId>
11-
<version>2.1.0</version>
11+
<version>2.1.1</version>
1212
<packaging>jar</packaging>
1313
<dependencies>
1414
<dependency>

flink_jobs_v2/Timelines/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44
<parent>
55
<groupId>flink.jobs.v2</groupId>
66
<artifactId>flink_jobs_v2</artifactId>
7-
<version>2.1.0</version>
7+
<version>2.1.1</version>
88
</parent>
99
<groupId>timeline.manager</groupId>
1010
<artifactId>Timelines</artifactId>
11-
<version>2.1.0</version>
11+
<version>2.1.1</version>
1212
<packaging>jar</packaging>
1313
<dependencies>
1414
<dependency>

flink_jobs_v2/ams-connector/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
<parent>
1717
<groupId>flink.jobs.v2</groupId>
1818
<artifactId>flink_jobs_v2</artifactId>
19-
<version>2.1.0</version>
19+
<version>2.1.1</version>
2020
</parent>
2121

2222
<groupId>ams.connector</groupId>
23-
<version>2.1.0</version>
23+
<version>2.1.1</version>
2424
<name>ams.connector</name>
2525
<description>Connect to AMS</description>
2626

flink_jobs_v2/ams-connector/src/main/java/ams/connector/ArgoMessagingSource.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ public class ArgoMessagingSource extends RichSourceFunction<String> {
3030
private boolean useProxy = false;
3131
private String proxyURL = "";
3232
private transient Object rateLck; // lock for waiting to establish rate
33+
private boolean advanceOffset = true;
34+
3335

3436
private volatile boolean isRunning = true;
3537

@@ -49,6 +51,21 @@ public ArgoMessagingSource(String endpoint, String port, String token, String pr
4951
this.runDate=runDate;
5052

5153
}
54+
55+
// second constructor with advanceOffset parametter
56+
public ArgoMessagingSource(String endpoint, String port, String token, String project, String sub, int batch, Long interval, String runDate, boolean advanceOffset) {
57+
this.endpoint = endpoint;
58+
this.port = port;
59+
this.token = token;
60+
this.project = project;
61+
this.sub = sub;
62+
this.interval = interval;
63+
this.batch = batch;
64+
this.verify = true;
65+
this.runDate=runDate;
66+
this.advanceOffset = advanceOffset;
67+
68+
}
5269

5370
/**
5471
* Set verify to true or false. If set to false AMS client will be able to contact AMS endpoints that use self-signed certificates
@@ -115,8 +132,16 @@ public void open(Configuration parameters) throws Exception {
115132
if (this.useProxy) {
116133
client.setProxy(this.proxyURL);
117134
}
118-
int offset=client.offset(); //get the offset of the subscription, that corresponds to the date
119-
client.modifyOffset(offset); //mofify the offset of the subscription to point to the offset index of the date. if date is null then the index points to the latest offset (max)
135+
136+
// if advanceOffset is set to true (default) advance the offset to latest or based to the run date provided
137+
if (advanceOffset) {
138+
// get the offset of the subscription, that corresponds to the date
139+
int offset=client.offset();
140+
// mofify the offset of the subscription to point to the offset index of the date.
141+
// if date is null then the index points to the latest offset (max)
142+
client.modifyOffset(offset);
143+
}
144+
120145
} catch (KeyManagementException e) {
121146
e.printStackTrace();
122147
} catch (NoSuchAlgorithmException e) {

flink_jobs_v2/ams_ingest_metric/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ language governing permissions and limitations under the License. -->
1414

1515
<groupId>argo.streaming</groupId>
1616
<artifactId>ams-ingest-metric</artifactId>
17-
<version>2.1.0</version>
17+
<version>2.1.1</version>
1818
<packaging>jar</packaging>
1919

2020
<name>ARGO AMS Ingest Metric Data job</name>
@@ -63,7 +63,7 @@ language governing permissions and limitations under the License. -->
6363
<dependency>
6464
<groupId>ams.connector</groupId>
6565
<artifactId>ams-connector</artifactId>
66-
<version>2.1.0</version>
66+
<version>2.1.1</version>
6767
<type>jar</type>
6868
</dependency>
6969
<dependency>
@@ -382,4 +382,4 @@ language governing permissions and limitations under the License. -->
382382
</configuration> </plugin> </plugins> </pluginManagement> -->
383383

384384
</build>
385-
</project>
385+
</project>

0 commit comments

Comments
 (0)