Skip to content

Commit 2dcc542

Browse files
committed
Add looped mode support for bulk ingestion operation. (#968)
Signed-off-by: Rishabh Singh <[email protected]> (cherry picked from commit 0522e68)
1 parent bbbb123 commit 2dcc542

File tree

4 files changed

+55
-5
lines changed

4 files changed

+55
-5
lines changed

.ci/variables.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44
"PY39": "3.9.10",
55
"PY310": "3.10.6",
66
"PY311": "3.11.1",
7-
"MIN_PY_VER": "3.8.12"
7+
"MIN_PY_VER": "3.10.6"
88
}
99
}

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ SHELL = /bin/bash
1919
PYTHON = python3
2020
PIP = pip3
2121
VERSIONS = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d')
22-
VERSION38 = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d' | grep 3\.8)
22+
VERSION38 = $(shell jq -r '.python_versions | .[]' .ci/variables.json | sed '$$d' | grep 3\.10)
2323
PYENV_ERROR = "\033[0;31mIMPORTANT\033[0m: Please install pyenv and run \033[0;31meval \"\$$(pyenv init -)\"\033[0m.\n"
2424

2525
all: develop

osbenchmark/workload/params.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -696,10 +696,11 @@ def __init__(self, workload, params, **kwargs):
696696
raise exceptions.InvalidSyntax("'batch-size' must be numeric")
697697

698698
self.ingest_percentage = self.float_param(params, name="ingest-percentage", default_value=100, min_value=0, max_value=100)
699+
self.looped = params.get("looped", False)
699700
self.param_source = PartitionBulkIndexParamSource(self.corpora, self.batch_size, self.bulk_size,
700701
self.ingest_percentage, self.id_conflicts,
701702
self.conflict_probability, self.on_conflict,
702-
self.recency, self.pipeline, self._params)
703+
self.recency, self.pipeline, self.looped, self._params)
703704

704705
def float_param(self, params, name, default_value, min_value, max_value, min_operator=operator.le):
705706
try:
@@ -745,7 +746,7 @@ def params(self):
745746

746747
class PartitionBulkIndexParamSource:
747748
def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflicts, conflict_probability,
748-
on_conflict, recency, pipeline=None, original_params=None):
749+
on_conflict, recency, pipeline=None, looped = False, original_params=None):
749750
"""
750751
751752
:param corpora: Specification of affected document corpora.
@@ -758,6 +759,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
758759
:param recency: A number between [0.0, 1.0] indicating whether to bias generation of conflicting ids towards more recent ones.
759760
May be None.
760761
:param pipeline: The name of the ingest pipeline to run.
762+
:param looped: Set to True for looped mode where bulk requests are repeated from the beginning when entire corpus was ingested.
761763
:param original_params: The original dict passed to the parent parameter source.
762764
"""
763765
self.corpora = corpora
@@ -771,6 +773,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
771773
self.on_conflict = on_conflict
772774
self.recency = recency
773775
self.pipeline = pipeline
776+
self.looped = looped
774777
self.original_params = original_params
775778
# this is only intended for unit-testing
776779
self.create_reader = original_params.pop("__create_reader", create_default_reader)
@@ -793,7 +796,12 @@ def params(self):
793796
# self.internal_params always reads all files. This is necessary to ensure we terminate early in case
794797
# the user has specified ingest percentage.
795798
if self.current_bulk == self.total_bulks:
796-
raise StopIteration()
799+
if self.looped:
800+
self.current_bulk = 0
801+
self._init_internal_params()
802+
else:
803+
raise StopIteration()
804+
797805
self.current_bulk += 1
798806
return next(self.internal_params)
799807

tests/workload/params_test.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,6 +1158,48 @@ def test_create_with_conflict_probability_not_numeric(self):
11581158

11591159
self.assertEqual("'conflict-probability' must be numeric", ctx.exception.args[0])
11601160

1161+
def test_looped_mode(self):
1162+
def create_unit_test_reader(*args):
1163+
return StaticBulkReader(
1164+
"idx",
1165+
"doc",
1166+
bulks=[
1167+
['{"location" : [-0.1485188, 51.5250666]}'],
1168+
['{"location" : [-0.1479949, 51.5252071]}'],
1169+
],
1170+
)
1171+
corpora = [
1172+
workload.DocumentCorpus(
1173+
name="default",
1174+
documents=[
1175+
workload.Documents(
1176+
source_format=workload.Documents.SOURCE_FORMAT_BULK,
1177+
number_of_documents=2,
1178+
target_index="test-idx",
1179+
target_type="test-type",
1180+
)
1181+
],
1182+
),
1183+
]
1184+
1185+
source = params.BulkIndexParamSource(
1186+
workload=workload.Workload(name="unit-test", corpora=corpora),
1187+
params={
1188+
"bulk-size": 2,
1189+
"looped": True,
1190+
"__create_reader": create_unit_test_reader,
1191+
},
1192+
)
1193+
partition = source.partition(0, 1)
1194+
partition.params()
1195+
# should issue 1 bulk with the size of 2
1196+
assert partition.total_bulks == 1
1197+
assert partition.current_bulk == 1
1198+
partition.params()
1199+
# should have looped back to the beginning
1200+
assert partition.total_bulks == 1
1201+
assert partition.current_bulk == 1
1202+
11611203

11621204
class BulkDataGeneratorTests(TestCase):
11631205

0 commit comments

Comments
 (0)