Skip to content

Commit 84da6b7

Browse files
feat(datastore): add readTime param to lookup/runQuery (#913)
1 parent c32a84f commit 84da6b7

File tree

5 files changed

+192
-18
lines changed

5 files changed

+192
-18
lines changed

datastore/gcloud/aio/datastore/datastore.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class Datastore:
7070
_api_root: str
7171
_api_is_dev: bool
7272

73+
Timeout = Union[int, float]
74+
7375
def __init__(
7476
self, project: Optional[str] = None,
7577
service_file: Optional[Union[str, IO[AnyStr]]] = None,
@@ -159,7 +161,7 @@ def make_mutation(
159161
async def allocateIds(
160162
self, keys: List[Key],
161163
session: Optional[Session] = None,
162-
timeout: int = 10,
164+
timeout: Timeout = 10,
163165
) -> List[Key]:
164166
project = await self.project()
165167
url = f'{self._api_root}/projects/{project}:allocateIds'
@@ -187,7 +189,7 @@ async def allocateIds(
187189
# TODO: support readwrite vs readonly transaction types
188190
async def beginTransaction(
189191
self, session: Optional[Session] = None,
190-
timeout: int = 10,
192+
timeout: Timeout = 10,
191193
) -> str:
192194
project = await self.project()
193195
url = f'{self._api_root}/projects/{project}:beginTransaction'
@@ -210,7 +212,7 @@ async def commit(
210212
transaction: Optional[str] = None,
211213
mode: Mode = Mode.TRANSACTIONAL,
212214
session: Optional[Session] = None,
213-
timeout: int = 10,
215+
timeout: Timeout = 10,
214216
) -> Dict[str, Any]:
215217
project = await self.project()
216218
url = f'{self._api_root}/projects/{project}:commit'
@@ -249,7 +251,7 @@ async def export(
249251
namespaces: Optional[List[str]] = None,
250252
labels: Optional[Dict[str, str]] = None,
251253
session: Optional[Session] = None,
252-
timeout: int = 10,
254+
timeout: Timeout = 10,
253255
) -> DatastoreOperation:
254256
project = await self.project()
255257
url = f'{self._api_root}/projects/{project}:export'
@@ -282,7 +284,7 @@ async def export(
282284
async def get_datastore_operation(
283285
self, name: str,
284286
session: Optional[Session] = None,
285-
timeout: int = 10,
287+
timeout: Timeout = 10,
286288
) -> DatastoreOperation:
287289
url = f'{self._api_root}/{name}'
288290

@@ -297,19 +299,21 @@ async def get_datastore_operation(
297299

298300
return self.datastore_operation_kind.from_repr(data)
299301

302+
# pylint: disable=too-many-locals
300303
# https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects/lookup
301304
async def lookup(
302305
self, keys: List[Key],
303306
transaction: Optional[str] = None,
304307
newTransaction: Optional[TransactionOptions] = None,
305308
consistency: Consistency = Consistency.STRONG,
306-
session: Optional[Session] = None, timeout: int = 10,
309+
read_time: Optional[str] = None,
310+
session: Optional[Session] = None, timeout: Timeout = 10,
307311
) -> LookUpResult:
308312
project = await self.project()
309313
url = f'{self._api_root}/projects/{project}:lookup'
310314

311315
read_options = self._build_read_options(
312-
consistency, newTransaction, transaction)
316+
consistency, newTransaction, transaction, read_time)
313317

314318
payload = json.dumps({
315319
'keys': [k.to_repr() for k in keys],
@@ -350,27 +354,35 @@ def _build_lookup_result(self, data: Dict[str, Any]) -> LookUpResult:
350354
if 'transaction' in data:
351355
new_transaction: str = data['transaction']
352356
result['transaction'] = new_transaction
357+
if 'readTime' in data:
358+
read_time: str = data['readTime']
359+
result['readTime'] = read_time
353360
return result
354361

355362
# https://cloud.google.com/datastore/docs/reference/data/rest/v1/ReadOptions
356363
def _build_read_options(self,
357364
consistency: Consistency,
358365
newTransaction: Optional[TransactionOptions],
359-
transaction: Optional[str]) -> Dict[str, Any]:
366+
transaction: Optional[str],
367+
read_time: Optional[str],
368+
) -> Dict[str, Any]:
360369
# TODO: expose ReadOptions directly to users
361370
if transaction:
362371
return {'transaction': transaction}
363372

364373
if newTransaction:
365374
return {'newTransaction': newTransaction.to_repr()}
366375

376+
if read_time:
377+
return {'readTime': read_time}
378+
367379
return {'readConsistency': consistency.value}
368380

369381
# https://cloud.google.com/datastore/docs/reference/data/rest/v1/projects/reserveIds
370382
async def reserveIds(
371383
self, keys: List[Key], database_id: str = '',
372384
session: Optional[Session] = None,
373-
timeout: int = 10,
385+
timeout: Timeout = 10,
374386
) -> None:
375387
project = await self.project()
376388
url = f'{self._api_root}/projects/{project}:reserveIds'
@@ -393,7 +405,7 @@ async def reserveIds(
393405
async def rollback(
394406
self, transaction: str,
395407
session: Optional[Session] = None,
396-
timeout: int = 10,
408+
timeout: Timeout = 10,
397409
) -> None:
398410
project = await self.project()
399411
url = f'{self._api_root}/projects/{project}:rollback'
@@ -417,26 +429,26 @@ async def runQuery(
417429
self, query: BaseQuery,
418430
explain_options: Optional[ExplainOptions] = None,
419431
transaction: Optional[str] = None,
432+
newTransaction: Optional[TransactionOptions] = None,
420433
consistency: Consistency = Consistency.EVENTUAL,
434+
read_time: Optional[str] = None,
421435
session: Optional[Session] = None,
422-
timeout: int = 10,
436+
timeout: Timeout = 10,
423437
) -> QueryResult:
424438

425439
project = await self.project()
426440
url = f'{self._api_root}/projects/{project}:runQuery'
427441

428-
if transaction:
429-
options = {'transaction': transaction}
430-
else:
431-
options = {'readConsistency': consistency.value}
442+
read_options = self._build_read_options(
443+
consistency, newTransaction, transaction, read_time)
432444

433445
payload_dict = {
434446
'partitionId': {
435447
'projectId': project,
436448
'namespaceId': self.namespace,
437449
},
438450
query.json_key: query.to_repr(),
439-
'readOptions': options,
451+
'readOptions': read_options,
440452
}
441453

442454
if explain_options:

datastore/gcloud/aio/datastore/query.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ def __eq__(self, other: Any) -> bool:
194194

195195

196196
class QueryResultBatch:
197+
# pylint: disable=too-many-instance-attributes
197198
entity_result_kind = EntityResult
198199

199200
def __init__(
@@ -203,6 +204,7 @@ def __init__(
203204
more_results: MoreResultsType = MoreResultsType.UNSPECIFIED,
204205
skipped_cursor: str = '', skipped_results: int = 0,
205206
snapshot_version: str = '',
207+
read_time: Optional[str] = None,
206208
) -> None:
207209
self.end_cursor = end_cursor
208210

@@ -212,6 +214,7 @@ def __init__(
212214
self.skipped_cursor = skipped_cursor
213215
self.skipped_results = skipped_results
214216
self.snapshot_version = snapshot_version
217+
self.read_time = read_time
215218

216219
def __eq__(self, other: Any) -> bool:
217220
if not isinstance(other, QueryResultBatch):
@@ -224,7 +227,8 @@ def __eq__(self, other: Any) -> bool:
224227
and self.more_results == other.more_results
225228
and self.skipped_cursor == other.skipped_cursor
226229
and self.skipped_results == other.skipped_results
227-
and self.snapshot_version == other.snapshot_version,
230+
and self.snapshot_version == other.snapshot_version
231+
and self.read_time == other.read_time,
228232
)
229233

230234
def __repr__(self) -> str:
@@ -242,12 +246,15 @@ def from_repr(cls, data: Dict[str, Any]) -> 'QueryResultBatch':
242246
skipped_cursor = data.get('skippedCursor', '')
243247
skipped_results = data.get('skippedResults', 0)
244248
snapshot_version = data.get('snapshotVersion', '')
249+
read_time = data.get('readTime')
250+
245251
return cls(
246252
end_cursor, entity_result_type=entity_result_type,
247253
entity_results=entity_results, more_results=more_results,
248254
skipped_cursor=skipped_cursor,
249255
skipped_results=skipped_results,
250256
snapshot_version=snapshot_version,
257+
read_time=read_time,
251258
)
252259

253260
def to_repr(self) -> Dict[str, Any]:
@@ -262,7 +269,8 @@ def to_repr(self) -> Dict[str, Any]:
262269
data['skippedCursor'] = self.skipped_cursor
263270
if self.snapshot_version:
264271
data['snapshotVersion'] = self.snapshot_version
265-
272+
if self.read_time:
273+
data['readTime'] = self.read_time
266274
return data
267275

268276

datastore/tests/integration/smoke_test.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import uuid
23

34
import pytest
@@ -657,3 +658,95 @@ async def test_analyze_query_explain(
657658
finally:
658659
for key in test_entities:
659660
await ds.delete(key, session=s)
661+
662+
663+
@pytest.mark.asyncio
664+
async def test_lookup_with_read_time(
665+
creds: str, kind: str, project: str) -> None:
666+
test_value = f'test_read_time_{uuid.uuid4()}'
667+
key = Key(project, [PathElement(kind, name=test_value)])
668+
669+
async with Session() as s:
670+
ds = Datastore(project=project, service_file=creds, session=s)
671+
672+
# insert and read without readTime
673+
time_before_insert = datetime.datetime.now(datetime.timezone.utc)
674+
await ds.insert(key,
675+
{'value': test_value, 'timestamp': 'after'},
676+
session=s)
677+
678+
result = await ds.lookup([key], session=s)
679+
assert len(result['found']) == 1
680+
assert result['found'][0].entity.properties['value'] == test_value
681+
assert isinstance(result['readTime'], str)
682+
683+
# lookup entity version w/ readTime
684+
current_time = datetime.datetime.now(datetime.timezone.utc)
685+
current_time_str = current_time.isoformat().replace('+00:00', 'Z')
686+
result_with_datetime = await ds.lookup([key],
687+
read_time=current_time_str,
688+
session=s)
689+
assert len(result_with_datetime.get('found', [])) == 1
690+
assert isinstance(result_with_datetime['readTime'], str)
691+
692+
# lookup entity before insertion timestamp
693+
past_time = time_before_insert - datetime.timedelta(seconds=10)
694+
past_time_str = past_time.isoformat().replace('+00:00', 'Z')
695+
result_past = await ds.lookup([key],
696+
read_time=past_time_str,
697+
session=s)
698+
assert len(result_past.get('found', [])) == 0
699+
assert len(result_past.get('missing', [])) == 1
700+
701+
await ds.delete(key, session=s)
702+
703+
704+
# pylint: disable=too-many-locals
705+
@pytest.mark.asyncio
706+
async def test_run_query_with_read_time(
707+
creds: str, kind: str, project: str) -> None:
708+
test_value = f'read_time_test_{uuid.uuid4()}'
709+
710+
async with Session() as s:
711+
ds = Datastore(project=project, service_file=creds, session=s)
712+
713+
before_insert = datetime.datetime.now(datetime.timezone.utc)
714+
key = Key(project, [PathElement(kind, name=test_value)])
715+
await ds.insert(key, {'test_field': test_value}, session=s)
716+
717+
# insert and query for entity
718+
query = Query(
719+
kind=kind,
720+
query_filter=Filter(PropertyFilter(
721+
prop='test_field',
722+
operator=PropertyFilterOperator.EQUAL,
723+
value=Value(test_value)
724+
))
725+
)
726+
result_current = await ds.runQuery(query, session=s)
727+
728+
assert len(result_current.result_batch.entity_results) == 1
729+
assert result_current.result_batch.entity_results[0].entity.properties[
730+
'test_field'] == test_value
731+
732+
# query w/ readTime
733+
current = datetime.datetime.now(datetime.timezone.utc)
734+
current_str = current.isoformat().replace('+00:00', 'Z')
735+
result_with_datetime = await ds.runQuery(query,
736+
read_time=current_str,
737+
session=s)
738+
assert len(result_with_datetime.result_batch.entity_results) == 1
739+
740+
# verify readTime != empty and is a string
741+
assert isinstance(result_with_datetime.result_batch.read_time, str)
742+
assert result_with_datetime.result_batch.read_time is not None
743+
744+
# query w/ readTime before insertion time
745+
past_time = before_insert - datetime.timedelta(seconds=10)
746+
past_time_str = past_time.isoformat().replace('+00:00', 'Z')
747+
result_past = await ds.runQuery(query,
748+
read_time=past_time_str,
749+
session=s)
750+
assert len(result_past.result_batch.entity_results) == 0
751+
752+
await ds.delete(key, session=s)

datastore/tests/unit/datastore_test.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import pytest
2+
from gcloud.aio.datastore import Consistency
23
from gcloud.aio.datastore import Datastore
34
from gcloud.aio.datastore import Key
45
from gcloud.aio.datastore import Operation
@@ -16,6 +17,30 @@ def test_make_mutation_from_value_object(key):
1617

1718
assert results['insert']['properties']['value'] == value.to_repr()
1819

20+
# pylint: disable=protected-access
21+
@staticmethod
22+
def test_build_read_options_priority():
23+
ds = Datastore()
24+
dt_str = '2025-01-01T12:00:00Z'
25+
26+
# transaction > readTime > consistency
27+
result = ds._build_read_options(
28+
Consistency.STRONG, None, 'txn123', dt_str
29+
)
30+
assert result == {'transaction': 'txn123'}
31+
32+
# readTime > consistency
33+
result = ds._build_read_options(
34+
Consistency.STRONG, None, None, dt_str
35+
)
36+
assert result == {'readTime': '2025-01-01T12:00:00Z'}
37+
38+
# fall back to consistency
39+
result = ds._build_read_options(
40+
Consistency.STRONG, None, None, None
41+
)
42+
assert result == {'readConsistency': 'STRONG'}
43+
1944
@staticmethod
2045
@pytest.fixture(scope='session')
2146
def key() -> Key:

0 commit comments

Comments
 (0)