Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,11 @@ def __init__(self, workload, params, **kwargs):
raise exceptions.InvalidSyntax("'batch-size' must be numeric")

self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
self.looped = params.get("looped", False)
self.param_source = PartitionBulkIndexParamSource(self.corpora, self.batch_size, self.bulk_size,
self.ingest_percentage, self.id_conflicts,
self.conflict_probability, self.on_conflict,
self.recency, self.pipeline, self._params)
self.recency, self.pipeline, self.looped, self._params)

def float_param(self, params, name, default_value, min_value, max_value, min_operator=operator.le):
try:
Expand Down Expand Up @@ -749,7 +750,7 @@ def params(self):

class PartitionBulkIndexParamSource:
def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflicts, conflict_probability,
on_conflict, recency, pipeline=None, original_params=None):
on_conflict, recency, pipeline=None, looped = False, original_params=None):
"""
:param corpora: Specification of affected document corpora.
Expand All @@ -762,6 +763,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
May be None.
:param pipeline: The name of the ingest pipeline to run.
:param looped: Set to True for looped mode where bulk requests are repeated from the beginning when entire corpus was ingested.
:param original_params: The original dict passed to the parent parameter source.
"""
self.corpora = corpora
Expand All @@ -775,6 +777,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
self.on_conflict = on_conflict
self.recency = recency
self.pipeline = pipeline
self.looped = looped
self.original_params = original_params
# this is only intended for unit-testing
self.create_reader = original_params.pop("__create_reader", create_default_reader)
Expand All @@ -798,7 +801,11 @@ def params(self):
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
# the user has specified ingest percentage.
if not self.streaming_ingestion and self.current_bulk == self.total_bulks:
raise StopIteration()
if self.looped:
self.current_bulk = 0
self._init_internal_params()
else:
raise StopIteration()
self.current_bulk += 1
return next(self.internal_params)

Expand Down
42 changes: 42 additions & 0 deletions tests/workload/params_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,48 @@ def test_create_with_conflict_probability_not_numeric(self):

self.assertEqual("'conflict-probability' must be numeric", ctx.exception.args[0])

def test_looped_mode(self):
def create_unit_test_reader(*args):
return StaticBulkReader(
"idx",
"doc",
bulks=[
['{"location" : [-0.1485188, 51.5250666]}'],
['{"location" : [-0.1479949, 51.5252071]}'],
],
)
corpora = [
workload.DocumentCorpus(
name="default",
documents=[
workload.Documents(
source_format=workload.Documents.SOURCE_FORMAT_BULK,
number_of_documents=2,
target_index="test-idx",
target_type="test-type",
)
],
),
]

source = params.BulkIndexParamSource(
workload=workload.Workload(name="unit-test", corpora=corpora),
params={
"bulk-size": 2,
"looped": True,
"__create_reader": create_unit_test_reader,
},
)
partition = source.partition(0, 1)
partition.params()
# should issue 1 bulk with the size of 2
assert partition.total_bulks == 1
assert partition.current_bulk == 1
partition.params()
# should have looped back to the beginning
assert partition.total_bulks == 1
assert partition.current_bulk == 1


class BulkDataGeneratorTests(TestCase):

Expand Down
Loading