Skip to content

Commit cc97eb8

Browse files
authored
Merge pull request #112 from ARGOeu/devel
Preparing Release 1.2
2 parents 469178d + f34eddc commit cc97eb8

26 files changed

+1871
-263
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ Job Optional cli parameters for ams ingestion related
209209

210210
`--ams.interval` : interval (in ms) between AMS service requests
211211
Other optional cli parameters
212+
`--init.status` : "OK", "MISSING" - initialize statuses for new items to a default value. Optimistically defaults to "OK"
213+
212214
`--daily` : true/false - controls daily regeneration of events (not used in notifications)
213215

214216
`--timeout` : long(ms) - controls default timeout for event regeneration (used in notifications)

bin/ar_job_submit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ def compose_hdfs_commands(year, month, day, args, config):
8585
# recomputation lies in the hdfs in the form of
8686
# /sync/recomp_TENANTNAME_ReportName_2018-08-02.json
8787
if client.test(urlparse(hdfs_sync+"/recomp_"+args.tenant+"_"+args.report+"_"+args.date+".json").path, exists=True):
88-
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.date+".json"
88+
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.tenant+"_"+args.report+"_"+args.date+".json"
8989
else:
9090
hdfs_commands["--rec"] = hdfs_check_path(hdfs_sync+"/recomp.json", client)
9191

bin/status_job_submit.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ def compose_hdfs_commands(year, month, day, args, config):
7272

7373
# file location of recomputations file (local or hdfs)
7474
# first check if there is a recomputations file for the given date
75-
if client.test(urlparse(hdfs_sync+"/recomp_"+args.date+".json").path, exists=True):
76-
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.date+".json"
75+
if client.test(urlparse(hdfs_sync+"/recomp_"+args.tenant+"_"+args.report+"_"+args.date+".json").path, exists=True):
76+
hdfs_commands["--rec"] = hdfs_sync+"/recomp_"+args.tenant+"_"+args.report+"_"+args.date+".json"
7777
log.info("Using recomputations file for the given date")
7878
else:
7979
hdfs_commands["--rec"] = hdfs_check_path(hdfs_sync+"/recomp.json", client)

bin/utils/argo_mongo.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,54 @@ def __init__(self, args, config, cols):
1919
self.config = config
2020
self.cols = cols
2121

22+
23+
def ensure_status_indexes(self, db):
24+
"""Checks if required indexes exist in specific argo status-related collections
25+
in mongodb
26+
27+
Args:
28+
db (obj): pymongo database object
29+
30+
"""
31+
32+
log.info("Checking required indexes in status collections...")
33+
def is_index_included(index_set, index):
34+
"""gets a set of mongodb indexes and checks if specified
35+
mongodb index exists in this set
36+
37+
Args:
38+
index_set (dict): pymongo mongodb index object
39+
index (obj): pymongo index object
40+
41+
Returns:
42+
bool: If index exists in set return true
43+
"""
44+
45+
for name in index_set.keys():
46+
if index_set[name]['key'] == index:
47+
return True
48+
return False
49+
# Used in all status collections
50+
index_report_date = [("report",pymongo.ASCENDING), ("date_integer",pymongo.ASCENDING)]
51+
# Used only in status_metrics collection
52+
index_date_host = [("date_integer",pymongo.ASCENDING), ("report",pymongo.ASCENDING)]
53+
status_collections = ["status_metrics","status_endpoints","status_services","status_endpoint_groups"]
54+
# Check first for index report,date
55+
for status_name in status_collections:
56+
col = db[status_name]
57+
indexes = col.index_information()
58+
if not is_index_included(indexes,index_report_date):
59+
# ensure index
60+
col.create_index(index_report_date,background=True)
61+
log.info("Created (report,date) index in %s.%s",col.database.name,col.name)
62+
63+
# Check for index date,host in status_metrics
64+
col = db["status_metrics"]
65+
if not is_index_included(indexes,index_date_host):
66+
col.create_index(index_date_host,background=True)
67+
log.info("Created (report,date) index in %s.%s",col.database.name,col.name)
68+
69+
2270
def mongo_clean_ar(self, uri):
2371

2472
tenant_report = None
@@ -111,6 +159,9 @@ def mongo_clean_status(self, uri):
111159
# from the uri, retrieve the path section, which reprents the db, and ignore the / in the begging
112160
db = client[uri.path[1:]]
113161

162+
# ensure indexes for status collections
163+
self.ensure_status_indexes(db)
164+
114165
# iterate over the specified collections
115166
for col in self.cols:
116167
if tenant_report is not None:

bin/utils/recomputations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ def write_output(results, tenant, report, target_date, config):
4444
hdfs_writer = config.get("HDFS", "writer_bin")
4545
hdfs_namenode = config.get("HDFS", "namenode")
4646
hdfs_user = config.get("HDFS", "user")
47-
hdfs_sync = config.get("HDFS", "path_sync").fill(namenode=hdfs_namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).geturl()
48-
print type(hdfs_sync), hdfs_sync
47+
hdfs_sync = config.get("HDFS", "path_sync").fill(namenode=hdfs_namenode.geturl(), hdfs_user=hdfs_user, tenant=tenant).path
48+
4949
status = subprocess.check_call([hdfs_writer, "put", recomp_filepath, hdfs_sync])
5050
# clear temp local file
5151
os.remove(recomp_filepath)

bin/utils/test_update_cron.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -45,58 +45,58 @@ def test_update_cron(self):
4545

4646
# Test generation of ar cronjob for a specific tenant and report
4747
expected = "#TENANT_A:report1 daily A/R\n"\
48-
+ "5 * * * * foo " + BATCH_AR + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m upsert " + "-c "\
48+
+ "5 5 * * * foo " + BATCH_AR + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m upsert " + "-c "\
4949
+ config.conf_path + "\n"
5050

5151
self.assertEquals(expected, gen_batch_ar(config, "TENANT_A", "report1", "daily", "foo", "upsert"))
5252

5353
# Test generation of ar cronjob for a specific tenant and report
5454
expected = "#TENANT_A:report1 hourly A/R\n"\
55-
+ "5 5 * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + TODAY + " -m insert " + "-c "\
55+
+ "5 * * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + TODAY + " -m insert " + "-c "\
5656
+ config.conf_path + "\n"
5757

5858
self.assertEquals(expected, gen_batch_ar(config, "TENANT_A", "report1", "hourly"))
5959

6060
# Test generation of ar cronjob for a specific tenant and report
6161
expected = "#TENANT_B:report1 daily Status\n"\
62-
+ "5 * * * * foo " + BATCH_STATUS + " -t TENANT_B -r report1 -d " \
62+
+ "5 5 * * * foo " + BATCH_STATUS + " -t TENANT_B -r report1 -d " \
6363
+ YESTERDAY + " -m upsert " + "-c "\
6464
+ config.conf_path + "\n"
6565

6666
self.assertEquals(expected, gen_batch_status(config, "TENANT_B", "report1", "daily", "foo", "upsert"))
6767

6868
# Test generation of status cronjob for a specific tenant and report
6969
expected = "#TENANT_B:report1 hourly Status\n"\
70-
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + TODAY + " -m insert " + "-c "\
70+
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + TODAY + " -m insert " + "-c "\
7171
+ config.conf_path + "\n"
7272

7373
self.assertEquals(expected, gen_batch_status(config, "TENANT_B", "report1", "hourly"))
7474

7575
# Test generation of cronjobs for a tenant's reports
7676
expected = "#Jobs for TENANT_A\n\n" \
7777
+ "#TENANT_A:report1 hourly A/R\n" \
78-
+ "5 5 * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + TODAY + " -m insert -c " \
78+
+ "5 * * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + TODAY + " -m insert -c " \
7979
+ config.conf_path + "\n\n" \
8080
+ "#TENANT_A:report1 daily A/R\n" \
81-
+ "5 * * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m insert -c " \
81+
+ "5 5 * * * " + BATCH_AR + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m insert -c " \
8282
+ config.conf_path + "\n\n" \
8383
+ "#TENANT_A:report1 hourly Status\n" \
84-
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_A -r report1 -d " + TODAY + " -m insert -c " \
84+
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_A -r report1 -d " + TODAY + " -m insert -c " \
8585
+ config.conf_path + "\n\n" \
8686
+ "#TENANT_A:report1 daily Status\n" \
87-
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m insert -c " \
87+
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_A -r report1 -d " + YESTERDAY + " -m insert -c " \
8888
+ config.conf_path + "\n\n" \
8989
+ "#TENANT_A:report2 hourly A/R\n" \
90-
+ "5 5 * * * " + BATCH_AR + " -t TENANT_A -r report2 -d " + TODAY + " -m insert -c " \
90+
+ "5 * * * * " + BATCH_AR + " -t TENANT_A -r report2 -d " + TODAY + " -m insert -c " \
9191
+ config.conf_path + "\n\n" \
9292
+ "#TENANT_A:report2 daily A/R\n" \
93-
+ "5 * * * * " + BATCH_AR + " -t TENANT_A -r report2 -d " + YESTERDAY + " -m insert -c " \
93+
+ "5 5 * * * " + BATCH_AR + " -t TENANT_A -r report2 -d " + YESTERDAY + " -m insert -c " \
9494
+ config.conf_path + "\n\n" \
9595
+ "#TENANT_A:report2 hourly Status\n" \
96-
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_A -r report2 -d " + TODAY + " -m insert -c " \
96+
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_A -r report2 -d " + TODAY + " -m insert -c " \
9797
+ config.conf_path + "\n\n" \
9898
+ "#TENANT_A:report2 daily Status\n" \
99-
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_A -r report2 -d " + YESTERDAY + " -m insert -c " \
99+
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_A -r report2 -d " + YESTERDAY + " -m insert -c " \
100100
+ config.conf_path + "\n\n" \
101101
+ "\n"
102102

@@ -105,28 +105,28 @@ def test_update_cron(self):
105105
# Test generation of cronjobs for all tenants and all reports
106106
expected2 = "#Jobs for TENANT_B\n\n" \
107107
+ "#TENANT_B:report1 hourly A/R\n" \
108-
+ "5 5 * * * " + BATCH_AR + " -t TENANT_B -r report1 -d " + TODAY + " -m insert -c " \
108+
+ "5 * * * * " + BATCH_AR + " -t TENANT_B -r report1 -d " + TODAY + " -m insert -c " \
109109
+ config.conf_path + "\n\n" \
110110
+ "#TENANT_B:report1 daily A/R\n" \
111-
+ "5 * * * * " + BATCH_AR + " -t TENANT_B -r report1 -d " + YESTERDAY + " -m insert -c " \
111+
+ "5 5 * * * " + BATCH_AR + " -t TENANT_B -r report1 -d " + YESTERDAY + " -m insert -c " \
112112
+ config.conf_path + "\n\n" \
113113
+ "#TENANT_B:report1 hourly Status\n" \
114-
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + TODAY + " -m insert -c " \
114+
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + TODAY + " -m insert -c " \
115115
+ config.conf_path + "\n\n" \
116116
+ "#TENANT_B:report1 daily Status\n" \
117-
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + YESTERDAY + " -m insert -c " \
117+
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_B -r report1 -d " + YESTERDAY + " -m insert -c " \
118118
+ config.conf_path + "\n\n" \
119119
+ "#TENANT_B:report2 hourly A/R\n" \
120-
+ "5 5 * * * " + BATCH_AR + " -t TENANT_B -r report2 -d " + TODAY + " -m insert -c " \
120+
+ "5 * * * * " + BATCH_AR + " -t TENANT_B -r report2 -d " + TODAY + " -m insert -c " \
121121
+ config.conf_path + "\n\n" \
122122
+ "#TENANT_B:report2 daily A/R\n" \
123-
+ "5 * * * * " + BATCH_AR + " -t TENANT_B -r report2 -d " + YESTERDAY + " -m insert -c " \
123+
+ "5 5 * * * " + BATCH_AR + " -t TENANT_B -r report2 -d " + YESTERDAY + " -m insert -c " \
124124
+ config.conf_path + "\n\n" \
125125
+ "#TENANT_B:report2 hourly Status\n" \
126-
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_B -r report2 -d " + TODAY + " -m insert -c " \
126+
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_B -r report2 -d " + TODAY + " -m insert -c " \
127127
+ config.conf_path + "\n\n" \
128128
+ "#TENANT_B:report2 daily Status\n" \
129-
+ "5 * * * * " + BATCH_STATUS + " -t TENANT_B -r report2 -d " + YESTERDAY + " -m insert -c " \
129+
+ "5 5 * * * " + BATCH_STATUS + " -t TENANT_B -r report2 -d " + YESTERDAY + " -m insert -c " \
130130
+ config.conf_path + "\n\n" \
131131
+ "\n"
132132

bin/utils/update_cron.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,16 +91,16 @@ def gen_batch_ar(config, tenant, report, iteration="hourly", user=None, mongo_me
9191
"""
9292
description = "{}:{} {} A/R".format(tenant, report, iteration)
9393

94-
if iteration == "hourly":
95-
# generate an hourly job
94+
if iteration == "daily":
95+
# generate an daily job
9696
cron_prefix = get_daily(DAILY_HOUR, HOURLY_MIN)
97-
# hourly jobs target today's date
98-
target_date = TODAY
97+
# daily jobs target today's date
98+
target_date = YESTERDAY
9999
else:
100-
# generate a daily job
100+
# generate a hourly job
101101
cron_prefix = get_hourly(HOURLY_MIN)
102-
# daily jobs target day before
103-
target_date = YESTERDAY
102+
# hourly jobs target day before
103+
target_date = TODAY
104104

105105
cmd = "{} -t {} -r {} -d {} -m {} -c {}".format(BATCH_AR, tenant, report, target_date, mongo_method,
106106
os.path.abspath(config.conf_path))
@@ -122,16 +122,16 @@ def gen_batch_status(config, tenant, report, iteration="hourly", user=None, mong
122122
"""
123123
description = "{}:{} {} Status".format(tenant, report, iteration)
124124

125-
if iteration == "hourly":
126-
# generate an hourly job
125+
if iteration == "daily":
126+
# generate a daily job
127127
cron_prefix = get_daily(DAILY_HOUR, HOURLY_MIN)
128-
# hourly jobs target today's date
129-
target_date = TODAY
128+
# daily jobs target today's date
129+
target_date = YESTERDAY
130130
else:
131-
# generate a daily job
131+
# generate an hourly job
132132
cron_prefix = get_hourly(HOURLY_MIN)
133-
# daily jobs target day before
134-
target_date = YESTERDAY
133+
# hourly jobs target day before
134+
target_date = TODAY
135135
cmd = "{} -t {} -r {} -d {} -m {} -c {}".format(BATCH_STATUS, tenant, report, target_date, mongo_method,
136136
os.path.abspath(config.conf_path))
137137
return gen_entry(cron_prefix, cmd, user, description)
@@ -257,4 +257,4 @@ def run_cron_update(args):
257257
"-u", "--user", help="config", dest="user", metavar="STRING", required=False, default=None)
258258

259259
# Parse the command line arguments accordingly and introduce them to the run method
260-
sys.exit(run_cron_update(arg_parser.parse_args()))
260+
sys.exit(run_cron_update(arg_parser.parse_args()))

flink_jobs/batch_ar/src/main/java/argo/batch/ArgoArBatch.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,12 @@ public static void main(String[] args) throws Exception {
165165
.withBroadcastSet(mpsDS, "mps").withBroadcastSet(egpDS, "egp").withBroadcastSet(ggpDS, "ggp")
166166
.withBroadcastSet(opsDS, "ops").withBroadcastSet(aprDS, "aps").withBroadcastSet(recDS, "rec");
167167

168+
// Calculate endpoint ar from endpoint timelines
169+
DataSet<EndpointAR> endpointResultDS = endpointTimelinesDS.flatMap(new CalcEndpointAR(params))
170+
.withBroadcastSet(mpsDS, "mps").withBroadcastSet(egpDS, "egp").withBroadcastSet(ggpDS, "ggp")
171+
.withBroadcastSet(aprDS, "apr").withBroadcastSet(recDS, "rec").withBroadcastSet(opsDS, "ops")
172+
.withBroadcastSet(confDS, "conf");
173+
168174
// Calculate service ar from service timelines
169175
DataSet<ServiceAR> serviceResultDS = serviceTimelinesDS.flatMap(new CalcServiceAR(params))
170176
.withBroadcastSet(mpsDS, "mps").withBroadcastSet(egpDS, "egp").withBroadcastSet(ggpDS, "ggp")
@@ -181,11 +187,15 @@ public static void main(String[] args) throws Exception {
181187
String dbURI = params.getRequired("mongo.uri");
182188
String dbMethod = params.getRequired("mongo.method");
183189

184-
// Initialize two mongodb outputs
190+
// Initialize endpoint ar mongo output
191+
MongoEndpointArOutput endpointMongoOut = new MongoEndpointArOutput(dbURI,"endpoint_ar",dbMethod);
192+
// Initialize service ar mongo output
185193
MongoServiceArOutput serviceMongoOut = new MongoServiceArOutput(dbURI,"service_ar",dbMethod);
186-
// Initialize two mongodb outputs
194+
// Initialize endpoint group ar mongo output
187195
MongoEndGroupArOutput egroupMongoOut = new MongoEndGroupArOutput(dbURI,"endpoint_group_ar",dbMethod);
188196

197+
198+
endpointResultDS.output(endpointMongoOut);
189199
serviceResultDS.output(serviceMongoOut);
190200
groupResultDS.output(egroupMongoOut);
191201

0 commit comments

Comments
 (0)