Skip to content

Commit 5271227

Browse files
committed
Optimistic concurrent control using seq_no and primary_term
1 parent eef09a1 commit 5271227

File tree

3 files changed

+22
-14
lines changed

3 files changed

+22
-14
lines changed

elasticsearch_dsl/document.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,11 @@ def update(self, using=None, index=None, detect_noop=True,
388388
if retry_on_conflict is not None:
389389
doc_meta['retry_on_conflict'] = retry_on_conflict
390390

391+
# Optimistic concurrency control
392+
if 'seq_no' in self.meta and 'primary_term' in self.meta:
393+
doc_meta['if_seq_no'] = self.meta['seq_no']
394+
doc_meta['if_primary_term'] = self.meta['primary_term']
395+
391396
meta = self._get_connection(using).update(
392397
index=self._get_index(index),
393398
body=body,
@@ -430,6 +435,12 @@ def save(self, using=None, index=None, validate=True, skip_empty=True, **kwargs)
430435
for k in DOC_META_FIELDS
431436
if k in self.meta
432437
}
438+
439+
# Optimistic concurrency control
440+
if 'seq_no' in self.meta and 'primary_term' in self.meta:
441+
doc_meta['if_seq_no'] = self.meta['seq_no']
442+
doc_meta['if_primary_term'] = self.meta['primary_term']
443+
433444
doc_meta.update(kwargs)
434445
meta = es.index(
435446
index=self._get_index(index),

elasticsearch_dsl/utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
EXPAND__TO_DOT = True
1717

1818
DOC_META_FIELDS = frozenset((
19-
'id', 'routing', 'version', 'version_type', 'parent'
19+
'id', 'routing',
2020
))
2121

2222
META_FIELDS = frozenset((
2323
# Elasticsearch metadata fields, except 'type'
24-
'index', 'using', 'score',
24+
'index', 'using', 'score', 'version', 'seq_no', 'primary_term'
2525
)).union(DOC_META_FIELDS)
2626

2727
def _wrap(val, obj_wrapper=None):

test_elasticsearch_dsl/test_integration/test_document.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ def test_update(data_client):
251251
elasticsearch_repo = Repository.get('elasticsearch-dsl-py')
252252
v = elasticsearch_repo.meta.version
253253

254+
old_seq_no = elasticsearch_repo.meta.seq_no
254255
elasticsearch_repo.update(owner={'new_name': 'elastic'}, new_field='testing-update')
255256

256257
assert 'elastic' == elasticsearch_repo.owner.new_name
@@ -263,24 +264,26 @@ def test_update(data_client):
263264
assert 'testing-update' == new_version.new_field
264265
assert 'elastic' == new_version.owner.new_name
265266
assert 'elasticsearch' == new_version.owner.name
267+
assert 'seq_no' in new_version.meta
268+
assert new_version.meta.seq_no != old_seq_no
269+
assert 'primary_term' in new_version.meta
266270

267271

268272
def test_save_updates_existing_doc(data_client):
269273
elasticsearch_repo = Repository.get('elasticsearch-dsl-py')
270274

271275
elasticsearch_repo.new_field = 'testing-save'
272-
v = elasticsearch_repo.meta.version
276+
old_seq_no = elasticsearch_repo.meta.seq_no
273277
assert 'updated' == elasticsearch_repo.save()
274278

275-
# assert version has been updated
276-
assert elasticsearch_repo.meta.version == v + 1
277-
278279
new_repo = data_client.get(index='git', id='elasticsearch-dsl-py')
279280
assert 'testing-save' == new_repo['_source']['new_field']
281+
assert new_repo['_seq_no'] != old_seq_no
282+
assert new_repo['_seq_no'] == elasticsearch_repo.meta.seq_no
280283

281-
def test_save_automatically_uses_versions(data_client):
284+
def test_save_automatically_uses_seq_no_and_primary_term(data_client):
282285
elasticsearch_repo = Repository.get('elasticsearch-dsl-py')
283-
elasticsearch_repo.meta.version += 1
286+
elasticsearch_repo.meta.seq_no += 1
284287

285288
with raises(ConflictError):
286289
elasticsearch_repo.save()
@@ -292,31 +295,25 @@ def assert_doc_equals(expected, actual):
292295

293296
def test_can_save_to_different_index(write_client):
294297
test_repo = Repository(description='testing', meta={'id': 42})
295-
test_repo.meta.version_type = 'external'
296-
test_repo.meta.version = 3
297298
assert test_repo.save(index='test-document')
298299

299300
assert_doc_equals({
300301
'found': True,
301302
'_index': 'test-document',
302303
'_id': '42',
303-
'_version': 3,
304304
'_source': {'description': 'testing'},
305305
},
306306
write_client.get(index='test-document', id=42)
307307
)
308308

309309
def test_save_without_skip_empty_will_include_empty_fields(write_client):
310310
test_repo = Repository(field_1=[], field_2=None, field_3={}, meta={'id': 42})
311-
test_repo.meta.version_type = 'external'
312-
test_repo.meta.version = 3
313311
assert test_repo.save(index='test-document', skip_empty=False)
314312

315313
assert_doc_equals({
316314
'found': True,
317315
'_index': 'test-document',
318316
'_id': '42',
319-
'_version': 3,
320317
'_source': {
321318
"field_1": [],
322319
"field_2": None,

0 commit comments

Comments
 (0)