Skip to content

Commit 0522e68

Browse files
authored
Add looped mode support for bulk ingestion operation. (#968)
Signed-off-by: Rishabh Singh <[email protected]>
1 parent b380ddd commit 0522e68

File tree

2 files changed

+52
-3
lines changed

2 files changed

+52
-3
lines changed

osbenchmark/workload/params.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,11 @@ def __init__(self, workload, params, **kwargs):
699699
raise exceptions.InvalidSyntax("'batch-size' must be numeric")
700700

701701
self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
702+
self.looped = params.get("looped", False)
702703
self.param_source = PartitionBulkIndexParamSource(self.corpora, self.batch_size, self.bulk_size,
703704
self.ingest_percentage, self.id_conflicts,
704705
self.conflict_probability, self.on_conflict,
705-
self.recency, self.pipeline, self._params)
706+
self.recency, self.pipeline, self.looped, self._params)
706707

707708
def float_param(self, params, name, default_value, min_value, max_value, min_operator=operator.le):
708709
try:
@@ -749,7 +750,7 @@ def params(self):
749750

750751
class PartitionBulkIndexParamSource:
751752
def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflicts, conflict_probability,
752-
on_conflict, recency, pipeline=None, original_params=None):
753+
on_conflict, recency, pipeline=None, looped = False, original_params=None):
753754
"""
754755
755756
:param corpora: Specification of affected document corpora.
@@ -762,6 +763,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
762763
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
763764
May be None.
764765
:param pipeline: The name of the ingest pipeline to run.
766+
:param looped: Set to True for looped mode where bulk requests are repeated from the beginning when entire corpus was ingested.
765767
:param original_params: The original dict passed to the parent parameter source.
766768
"""
767769
self.corpora = corpora
@@ -775,6 +777,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
775777
self.on_conflict = on_conflict
776778
self.recency = recency
777779
self.pipeline = pipeline
780+
self.looped = looped
778781
self.original_params = original_params
779782
# this is only intended for unit-testing
780783
self.create_reader = original_params.pop("__create_reader", create_default_reader)
@@ -798,7 +801,11 @@ def params(self):
798801
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
799802
# the user has specified ingest percentage.
800803
if not self.streaming_ingestion and self.current_bulk == self.total_bulks:
801-
raise StopIteration()
804+
if self.looped:
805+
self.current_bulk = 0
806+
self._init_internal_params()
807+
else:
808+
raise StopIteration()
802809
self.current_bulk += 1
803810
return next(self.internal_params)
804811

tests/workload/params_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1174,6 +1174,48 @@ def test_create_with_conflict_probability_not_numeric(self):
11741174

11751175
self.assertEqual("'conflict-probability' must be numeric", ctx.exception.args[0])
11761176

1177+
def test_looped_mode(self):
1178+
def create_unit_test_reader(*args):
1179+
return StaticBulkReader(
1180+
"idx",
1181+
"doc",
1182+
bulks=[
1183+
['{"location" : [-0.1485188, 51.5250666]}'],
1184+
['{"location" : [-0.1479949, 51.5252071]}'],
1185+
],
1186+
)
1187+
corpora = [
1188+
workload.DocumentCorpus(
1189+
name="default",
1190+
documents=[
1191+
workload.Documents(
1192+
source_format=workload.Documents.SOURCE_FORMAT_BULK,
1193+
number_of_documents=2,
1194+
target_index="test-idx",
1195+
target_type="test-type",
1196+
)
1197+
],
1198+
),
1199+
]
1200+
1201+
source = params.BulkIndexParamSource(
1202+
workload=workload.Workload(name="unit-test", corpora=corpora),
1203+
params={
1204+
"bulk-size": 2,
1205+
"looped": True,
1206+
"__create_reader": create_unit_test_reader,
1207+
},
1208+
)
1209+
partition = source.partition(0, 1)
1210+
partition.params()
1211+
# should issue 1 bulk with the size of 2
1212+
assert partition.total_bulks == 1
1213+
assert partition.current_bulk == 1
1214+
partition.params()
1215+
# should have looped back to the beginning
1216+
assert partition.total_bulks == 1
1217+
assert partition.current_bulk == 1
1218+
11771219

11781220
class BulkDataGeneratorTests(TestCase):
11791221

0 commit comments

Comments
 (0)