diff --git a/.ci/variables.json b/.ci/variables.json index 52f6e3daf..831804d61 100644 --- a/.ci/variables.json +++ b/.ci/variables.json @@ -4,6 +4,6 @@ "PY39": "3.9.10", "PY310": "3.10.6", "PY311": "3.11.1", - "MIN_PY_VER": "3.8.12" + "MIN_PY_VER": "3.10.6" } } diff --git a/.pylintrc b/.pylintrc index 285a676ea..980c75848 100644 --- a/.pylintrc +++ b/.pylintrc @@ -173,7 +173,8 @@ disable=print-statement, W0613, W0621, invalid-docstring-quote, - raise-missing-from + raise-missing-from, + unsubscriptable-object # Enable the message, report, category or checker with the given id(s). You can diff --git a/Makefile b/Makefile index f55e885ed..fed58b674 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ SHELL = /bin/bash PYTHON = python3 PIP = pip3 VERSIONS = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d') -VERSION38 = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d' | grep 3\.8) +VERSION38 = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d' | grep 3\.10) PYENV_ERROR = "\033[0;31mIMPORTANT\033[0m: Please install pyenv and run \033[0;31meval \"\$$(pyenv init -)\"\033[0m.\n" all: develop diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index ec28d8cd4..aa0203858 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -696,10 +696,11 @@ def __init__(self, workload, params, **kwargs): raise exceptions.InvalidSyntax("'batch-size' must be numeric") self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100) + self.looped = params.get("looped", False) self.param_source = PartitionBulkIndexParamSource(self.corpora, self.batch_size, self.bulk_size, self.ingest_percentage, self.id_conflicts, self.conflict_probability, self.on_conflict, - self.recency, self.pipeline, self._params) + self.recency, self.pipeline, self.looped, self._params) def float_param(self, params, name, default_value, min_value, max_value, min_operator=operator.le): try: @@ -745,7 +746,7 @@ def params(self): class PartitionBulkIndexParamSource: def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflicts, conflict_probability, - on_conflict, recency, pipeline=None, original_params=None): + on_conflict, recency, pipeline=None, looped = False, original_params=None): """ :param corpora: Specification of affected document corpora. @@ -758,6 +759,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic :param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones. May be None. :param pipeline: The name of the ingest pipeline to run. + :param looped: Set to True for looped mode where bulk requests are repeated from the beginning when entire corpus was ingested. :param original_params: The original dict passed to the parent parameter source. """ self.corpora = corpora @@ -771,6 +773,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic self.on_conflict = on_conflict self.recency = recency self.pipeline = pipeline + self.looped = looped self.original_params = original_params # this is only intended for unit-testing self.create_reader = original_params.pop("__create_reader", create_default_reader) @@ -793,7 +796,12 @@ def params(self): # self.internal_params always reads all files. This is necessary to ensure we terminate early in case # the user has specified ingest percentage. if self.current_bulk == self.total_bulks: - raise StopIteration() + if self.looped: + self.current_bulk = 0 + self._init_internal_params() + else: + raise StopIteration() + self.current_bulk += 1 return next(self.internal_params) diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 1badddc9a..dbbb942bc 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -1158,6 +1158,48 @@ def test_create_with_conflict_probability_not_numeric(self): self.assertEqual("'conflict-probability' must be numeric", ctx.exception.args[0]) + def test_looped_mode(self): + def create_unit_test_reader(*args): + return StaticBulkReader( + "idx", + "doc", + bulks=[ + ['{"location" : [-0.1485188, 51.5250666]}'], + ['{"location" : [-0.1479949, 51.5252071]}'], + ], + ) + corpora = [ + workload.DocumentCorpus( + name="default", + documents=[ + workload.Documents( + source_format=workload.Documents.SOURCE_FORMAT_BULK, + number_of_documents=2, + target_index="test-idx", + target_type="test-type", + ) + ], + ), + ] + + source = params.BulkIndexParamSource( + workload=workload.Workload(name="unit-test", corpora=corpora), + params={ + "bulk-size": 2, + "looped": True, + "__create_reader": create_unit_test_reader, + }, + ) + partition = source.partition(0, 1) + partition.params() + # should issue 1 bulk with the size of 2 + assert partition.total_bulks == 1 + assert partition.current_bulk == 1 + partition.params() + # should have looped back to the beginning + assert partition.total_bulks == 1 + assert partition.current_bulk == 1 + class BulkDataGeneratorTests(TestCase):