Skip to content

Commit ed5d2c5

Browse files
gkamatIanHoang
authored andcommitted
Streaming ingestion enhancements: glob support, path separators, etc.
Signed-off-by: Govind Kamat <[email protected]>
1 parent cf093f6 commit ed5d2c5

File tree

9 files changed

+134
-88
lines changed

9 files changed

+134
-88
lines changed

osbenchmark/benchmark.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1359,12 +1359,19 @@ def main():
13591359

13601360
end = time.time()
13611361
if success:
1362-
console.println("")
1363-
console.info("✅ SUCCESS (took %d seconds)" % (end - start), overline="-", underline="-")
1362+
message = "✅ SUCCESS"
1363+
status = 0
13641364
else:
1365-
console.println("")
1366-
console.info("❌ FAILURE (took %d seconds)" % (end - start), overline="-", underline="-")
1367-
sys.exit(64)
1365+
message = "❌ FAILURE"
1366+
status = 64
1367+
message = message + f" (took {int(round(end - start))} seconds)"
1368+
console.println("")
1369+
console.info(message, overline="-", underline="-")
1370+
if hasattr(args, "results_file") and args.results_file:
1371+
with open(args.results_file, "a") as fh:
1372+
print("\n", message, file=fh)
1373+
1374+
sys.exit(status)
13681375

13691376

13701377
if __name__ == "__main__":

osbenchmark/cloud_provider/vendors/s3_data_producer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ def __init__(self, bucket:str, keys, client_options: dict, data_dir=None) -> Non
4646

4747
def _get_keys(self):
4848
rsl = list()
49-
rsl.append(self.keys)
49+
if self.keys[-1] == '*':
50+
keys = self.s3_client.list_objects(Bucket=self.bucket, Prefix=self.keys[:-1])
51+
for key in keys['Contents']:
52+
rsl.append(key['Key'])
53+
else:
54+
rsl.append(self.keys)
5055
return rsl
5156

5257
def _get_next_downloader(self):

osbenchmark/worker_coordinator/runner.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ def _with_assertions(delegate):
281281

282282
def _with_completion(delegate):
283283
unwrapped_runner = unwrap(delegate)
284-
if hasattr(unwrapped_runner, "completed") and hasattr(unwrapped_runner, "percent_completed"):
284+
if hasattr(unwrapped_runner, "completed") and hasattr(unwrapped_runner, "task_progress"):
285285
return WithCompletion(delegate, unwrapped_runner)
286286
else:
287287
return NoCompletion(delegate)
@@ -296,7 +296,7 @@ def completed(self):
296296
return None
297297

298298
@property
299-
def percent_completed(self):
299+
def task_progress(self):
300300
return None
301301

302302
async def __call__(self, *args):
@@ -323,8 +323,8 @@ def completed(self):
323323
return self.progressable.completed
324324

325325
@property
326-
def percent_completed(self):
327-
return self.progressable.percent_completed
326+
def task_progress(self):
327+
return self.progressable.task_progress
328328

329329
async def __call__(self, *args):
330330
return await self.delegate(*args)
@@ -2351,7 +2351,7 @@ class WaitForTransform(Runner):
23512351
def __init__(self):
23522352
super().__init__()
23532353
self._completed = False
2354-
self._percent_completed = 0.0
2354+
self._task_progress = (0.0, '%')
23552355
self._start_time = None
23562356
self._last_documents_processed = 0
23572357
self._last_processing_time = 0
@@ -2361,8 +2361,8 @@ def completed(self):
23612361
return self._completed
23622362

23632363
@property
2364-
def percent_completed(self):
2365-
return self._percent_completed
2364+
def task_progress(self):
2365+
return self._task_progress
23662366

23672367
async def __call__(self, opensearch, params):
23682368
"""
@@ -2415,10 +2415,10 @@ async def __call__(self, opensearch, params):
24152415
f"Transform [{transform_id}] failed with [{failure_reason}].")
24162416
elif state == "stopped" or wait_for_completion is False:
24172417
self._completed = True
2418-
self._percent_completed = 1.0
2418+
self._task_progress = (1.0, '%')
24192419
else:
2420-
self._percent_completed = stats_response["transforms"][0].get("checkpointing", {}).get("next", {}).get(
2421-
"checkpoint_progress", {}).get("percent_complete", 0.0) / 100.0
2420+
self._task_progress = (stats_response["transforms"][0].get("checkpointing", {}).get("next", {}).get(
2421+
"checkpoint_progress", {}).get("percent_complete", 0.0) / 100.0, '%')
24222422

24232423
documents_processed = transform_stats.get("documents_processed", 0)
24242424
processing_time = transform_stats.get("search_time_in_ms", 0)

osbenchmark/worker_coordinator/worker_coordinator.py

Lines changed: 44 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,18 +1299,24 @@ def update_progress_message(self, task_finished=False):
12991299
if not self.quiet and self.current_step >= 0:
13001300
tasks = ",".join([t.name for t in self.tasks_per_join_point[self.current_step]])
13011301

1302-
if task_finished:
1303-
total_progress = 1.0
1302+
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1303+
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1304+
# task that is completing the parallel structure.
1305+
progress_per_client = [s.task_progress
1306+
for s in self.most_recent_sample_per_client.values() if s.task_progress is not None]
1307+
1308+
num_clients = len(progress_per_client)
1309+
assert num_clients > 0, "Number of clients is 0"
1310+
total_progress = sum([p[0] for p in progress_per_client]) / num_clients
1311+
units = set(progress_per_client)
1312+
assert len(units) == 1, "Encountered mix of disparate units while tracking task progress"
1313+
unit = units.pop()
1314+
if unit != '%':
1315+
self.progress_publisher.print("Running %s" % tasks, "[%4.1f GB]" % total_progress)
13041316
else:
1305-
# we only count clients which actually contribute to progress. If clients are executing tasks eternally in a parallel
1306-
# structure, we should not count them. The reason is that progress depends entirely on the client(s) that execute the
1307-
# task that is completing the parallel structure.
1308-
progress_per_client = [s.percent_completed
1309-
for s in self.most_recent_sample_per_client.values() if s.percent_completed is not None]
1310-
1311-
num_clients = max(len(progress_per_client), 1)
1312-
total_progress = sum(progress_per_client) / num_clients
1313-
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
1317+
if task_finished:
1318+
total_progress = 1.0
1319+
self.progress_publisher.print("Running %s" % tasks, "[%3d%% done]" % (round(total_progress * 100)))
13141320
if task_finished:
13151321
self.progress_publisher.finish()
13161322

@@ -1739,9 +1745,9 @@ def receiveMsg_WakeupMessage(self, msg, sender):
17391745
else:
17401746
if current_samples and len(current_samples) > 0:
17411747
most_recent_sample = current_samples[-1]
1742-
if most_recent_sample.percent_completed is not None:
1748+
if most_recent_sample.task_progress is not None and most_recent_sample.task_progress[1] == '%':
17431749
self.logger.debug("Worker[%s] is executing [%s] (%.2f%% complete).",
1744-
str(self.worker_id), most_recent_sample.task, most_recent_sample.percent_completed * 100.0)
1750+
str(self.worker_id), most_recent_sample.task, most_recent_sample.task_progress[0] * 100.0)
17451751
else:
17461752
# TODO: This could be misleading given that one worker could execute more than one task...
17471753
self.logger.debug("Worker[%s] is executing [%s] (dependent eternal task).",
@@ -1842,13 +1848,13 @@ class DefaultSampler(Sampler):
18421848
"""
18431849

18441850
def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, latency, service_time,
1845-
client_processing_time, processing_time, throughput, ops, ops_unit, time_period, percent_completed,
1851+
client_processing_time, processing_time, throughput, ops, ops_unit, time_period, task_progress,
18461852
dependent_timing=None):
18471853
try:
18481854
self.q.put_nowait(
18491855
DefaultSample(client_id, absolute_time, request_start, self.start_timestamp, task, sample_type, meta_data,
18501856
latency, service_time, client_processing_time, processing_time, throughput, ops, ops_unit, time_period,
1851-
percent_completed, dependent_timing))
1857+
task_progress, dependent_timing))
18521858
except queue.Full:
18531859
self.logger.warning("Dropping sample for [%s] due to a full sampling queue.", task.operation.name)
18541860

@@ -1857,12 +1863,12 @@ class ProfileMetricsSampler(Sampler):
18571863
Encapsulates management of gathered profile metrics samples.
18581864
"""
18591865

1860-
def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, time_period, percent_completed,
1866+
def add(self, task, client_id, sample_type, meta_data, absolute_time, request_start, time_period, task_progress,
18611867
dependent_timing=None):
18621868
try:
18631869
self.q.put_nowait(
18641870
ProfileMetricsSample(client_id, absolute_time, request_start, self.start_timestamp, task, sample_type, meta_data,
1865-
time_period, percent_completed, dependent_timing))
1871+
time_period, task_progress, dependent_timing))
18661872
except queue.Full:
18671873
self.logger.warning("Dropping sample for [%s] due to a full sampling queue.", task.operation.name)
18681874

@@ -1872,7 +1878,7 @@ class Sample:
18721878
Basic information used by metrics store to keep track of samples
18731879
"""
18741880
def __init__(self, client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data,
1875-
time_period, percent_completed, dependent_timing=None):
1881+
time_period, task_progress, dependent_timing=None):
18761882
self.client_id = client_id
18771883
self.absolute_time = absolute_time
18781884
self.request_start = request_start
@@ -1883,7 +1889,7 @@ def __init__(self, client_id, absolute_time, request_start, task_start, task, sa
18831889
self.time_period = time_period
18841890
self._dependent_timing = dependent_timing
18851891
# may be None for eternal tasks!
1886-
self.percent_completed = percent_completed
1892+
self.task_progress = task_progress
18871893

18881894
@property
18891895
def operation_name(self):
@@ -1911,8 +1917,8 @@ class DefaultSample(Sample):
19111917
"""
19121918
def __init__(self, client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data, latency,
19131919
service_time, client_processing_time, processing_time, throughput, total_ops, total_ops_unit, time_period,
1914-
percent_completed, dependent_timing=None):
1915-
super().__init__(client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data, time_period, percent_completed, dependent_timing)
1920+
task_progress, dependent_timing=None):
1921+
super().__init__(client_id, absolute_time, request_start, task_start, task, sample_type, request_meta_data, time_period, task_progress, dependent_timing)
19161922
self.latency = latency
19171923
self.service_time = service_time
19181924
self.client_processing_time = client_processing_time
@@ -1927,7 +1933,7 @@ def dependent_timings(self):
19271933
for t in self._dependent_timing:
19281934
yield DefaultSample(self.client_id, t["absolute_time"], t["request_start"], self.task_start, self.task,
19291935
self.sample_type, self.request_meta_data, 0, t["service_time"], 0, 0, 0, self.total_ops,
1930-
self.total_ops_unit, self.time_period, self.percent_completed, None)
1936+
self.total_ops_unit, self.time_period, self.task_progress, None)
19311937

19321938
def __repr__(self, *args, **kwargs):
19331939
return f"[{self.absolute_time}; {self.relative_time}] [client [{self.client_id}]] [{self.task}] " \
@@ -1944,7 +1950,7 @@ def dependent_timings(self):
19441950
if self._dependent_timing:
19451951
for t in self._dependent_timing:
19461952
yield ProfileMetricsSample(self.client_id, t["absolute_time"], t["request_start"], self.task_start, self.task,
1947-
self.sample_type, self.request_meta_data, self.time_period, self.percent_completed, None)
1953+
self.sample_type, self.request_meta_data, self.time_period, self.task_progress, None)
19481954

19491955

19501956
def select_test_procedure(config, t):
@@ -2392,7 +2398,7 @@ async def _execute_request(self, params: dict, expected_scheduled_time: float, t
23922398
}
23932399

23942400
def _process_results(self, result_data: dict, total_start: float, client_state: bool,
2395-
percent_completed: float, add_profile_metric_sample: bool = False) -> bool:
2401+
task_progress: tuple, add_profile_metric_sample: bool = False) -> bool:
23962402
"""Process results from a request."""
23972403
# Handle cases where the request was skipped (no-op)
23982404
if result_data["request_meta_data"].get("skipped_request"):
@@ -2419,7 +2425,7 @@ def _process_results(self, result_data: dict, total_start: float, client_state:
24192425
if result_data["throughput_throttled"] else service_time)
24202426

24212427
runner_completed = getattr(self.runner, "completed", False)
2422-
runner_percent_completed = getattr(self.runner, "percent_completed", None)
2428+
runner_task_progress = getattr(self.runner, "task_progress", None)
24232429

24242430
if self.task_completes_parent:
24252431
completed = runner_completed
@@ -2428,10 +2434,10 @@ def _process_results(self, result_data: dict, total_start: float, client_state:
24282434

24292435
if completed:
24302436
progress = 1.0
2431-
elif runner_percent_completed is not None:
2432-
progress = runner_percent_completed
2437+
elif runner_task_progress is not None:
2438+
progress = runner_task_progress
24332439
else:
2434-
progress = percent_completed
2440+
progress = task_progress
24352441

24362442
if client_state:
24372443
if add_profile_metric_sample:
@@ -2484,7 +2490,7 @@ async def __call__(self, *args, **kwargs):
24842490
self.logger.debug("Entering main loop for client id [%s].", self.client_id)
24852491
profile_metrics_sample_count = 0
24862492
try:
2487-
async for expected_scheduled_time, sample_type, percent_completed, runner, params in schedule:
2493+
async for expected_scheduled_time, sample_type, task_progress, runner, params in schedule:
24882494
self.expected_scheduled_time = expected_scheduled_time
24892495
self.sample_type = sample_type
24902496
self.runner = runner
@@ -2506,7 +2512,7 @@ async def __call__(self, *args, **kwargs):
25062512

25072513
result_data = await self._execute_request(params, expected_scheduled_time, total_start, client_state)
25082514

2509-
completed = self._process_results(result_data, total_start, client_state, percent_completed, add_profile_metric_sample)
2515+
completed = self._process_results(result_data, total_start, client_state, task_progress, add_profile_metric_sample)
25102516

25112517
if completed:
25122518
self.logger.info("Task [%s] is considered completed due to external event.", self.task)
@@ -2914,14 +2920,14 @@ def after_request(self, now, weight, unit, request_meta_data):
29142920
async def __call__(self):
29152921
next_scheduled = 0
29162922
if self.task_progress_control.infinite:
2917-
param_source_knows_progress = hasattr(self.params, "percent_completed")
2923+
param_source_knows_progress = hasattr(self.params, "task_progress")
29182924
while True:
29192925
try:
29202926
next_scheduled = self.sched.next(next_scheduled)
29212927
# does not contribute at all to completion. Hence, we cannot define completion.
2922-
percent_completed = self.params.percent_completed if param_source_knows_progress else None
2928+
task_progress = self.params.task_progress if param_source_knows_progress else None
29232929
# current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
2924-
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
2930+
yield (next_scheduled, self.task_progress_control.sample_type, task_progress, self.runner,
29252931
self.params.params())
29262932
self.task_progress_control.next()
29272933
except StopIteration:
@@ -2933,7 +2939,7 @@ async def __call__(self):
29332939
#current_params = await self.loop.run_in_executor(self.io_pool_exc, self.params.params)
29342940
yield (next_scheduled,
29352941
self.task_progress_control.sample_type,
2936-
self.task_progress_control.percent_completed,
2942+
self.task_progress_control.task_progress,
29372943
self.runner,
29382944
self.params.params())
29392945
self.task_progress_control.next()
@@ -2969,8 +2975,8 @@ def infinite(self):
29692975
return self._time_period is None
29702976

29712977
@property
2972-
def percent_completed(self):
2973-
return self._elapsed / self._duration
2978+
def task_progress(self):
2979+
return (self._elapsed / self._duration, '%')
29742980

29752981
@property
29762982
def completed(self):
@@ -3007,8 +3013,8 @@ def infinite(self):
30073013
return self._iterations is None
30083014

30093015
@property
3010-
def percent_completed(self):
3011-
return (self._it + 1) / self._total_iterations
3016+
def task_progress(self):
3017+
return ((self._it + 1) / self._total_iterations, '%')
30123018

30133019
@property
30143020
def completed(self):

osbenchmark/workload/loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ def first_existing(root_dirs, f):
272272
document_set.document_archive = first_existing(data_root, document_set.document_archive)
273273
if document_set.document_file:
274274
if corpus.streaming_ingestion:
275-
document_set.document_file = os.path.join(data_root[0], document_set.document_file)
275+
document_set.data_dir = data_root[0]
276276
else:
277277
document_set.document_file = first_existing(data_root, document_set.document_file)
278278

0 commit comments

Comments
 (0)