Skip to content

Commit 894a64c

Browse files
isapegoivandasch
andauthored
GG-33908 IGNITE-14911 Unify timeouts, add support for datetime.timedelta for expiry_policy (#52)
(cherry picked from commit 92a115c) Co-authored-by: Ivan Daschinsky <[email protected]>
1 parent a983639 commit 894a64c

File tree

12 files changed

+151
-79
lines changed

12 files changed

+151
-79
lines changed

docs/async_examples.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,20 +62,20 @@ in cache settings dictionary on creation.
6262
.. literalinclude:: ../examples/expiry_policy.py
6363
:language: python
6464
:dedent: 12
65-
:lines: 72-75
65+
:lines: 73-76
6666

6767
.. literalinclude:: ../examples/expiry_policy.py
6868
:language: python
6969
:dedent: 12
70-
:lines: 81-89
70+
:lines: 82-90
7171

7272
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
7373
:py:meth:`~pygridgain.cache.BaseCache.with_expire_policy`
7474

7575
.. literalinclude:: ../examples/expiry_policy.py
7676
:language: python
7777
:dedent: 12
78-
:lines: 96-105
78+
:lines: 97-106
7979

8080
Transactions
8181
------------

docs/examples.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,20 +96,20 @@ in cache settings dictionary on creation.
9696
.. literalinclude:: ../examples/expiry_policy.py
9797
:language: python
9898
:dedent: 12
99-
:lines: 31-34
99+
:lines: 32-35
100100

101101
.. literalinclude:: ../examples/expiry_policy.py
102102
:language: python
103103
:dedent: 12
104-
:lines: 40-46
104+
:lines: 41-47
105105

106106
Secondly, expiry policy can be set for all cache operations, which are done under decorator. To create it use
107107
:py:meth:`~pygridgain.cache.BaseCache.with_expire_policy`
108108

109109
.. literalinclude:: ../examples/expiry_policy.py
110110
:language: python
111111
:dedent: 12
112-
:lines: 53-60
112+
:lines: 54-61
113113

114114
Scan
115115
====

examples/expiry_policy.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#
1616
import asyncio
1717
import time
18+
from datetime import timedelta
1819

1920
from pygridgain import Client, AioClient
2021
from pygridgain.datatypes import ExpiryPolicy
@@ -31,7 +32,7 @@ def main():
3132
try:
3233
ttl_cache = client.create_cache({
3334
PROP_NAME: 'test',
34-
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
35+
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
3536
})
3637
except NotSupportedByClusterError:
3738
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -51,7 +52,7 @@ def main():
5152
print("Create simple Cache and set TTL through `with_expire_policy`")
5253
simple_cache = client.create_cache('test')
5354
try:
54-
ttl_cache = simple_cache.with_expire_policy(access=1.0)
55+
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
5556
ttl_cache.put(1, 1)
5657
time.sleep(0.5)
5758
print(f"key = {1}, value = {ttl_cache.get(1)}")
@@ -72,7 +73,7 @@ async def async_main():
7273
try:
7374
ttl_cache = await client.create_cache({
7475
PROP_NAME: 'test',
75-
PROP_EXPIRY_POLICY: ExpiryPolicy(create=1.0)
76+
PROP_EXPIRY_POLICY: ExpiryPolicy(create=timedelta(seconds=1.0))
7677
})
7778
except NotSupportedByClusterError:
7879
print("'ExpiryPolicy' API is not supported by cluster. Finishing...")
@@ -94,7 +95,7 @@ async def async_main():
9495
print("Create simple Cache and set TTL through `with_expire_policy`")
9596
simple_cache = await client.create_cache('test')
9697
try:
97-
ttl_cache = simple_cache.with_expire_policy(access=1.0)
98+
ttl_cache = simple_cache.with_expire_policy(access=timedelta(seconds=1.0))
9899
await ttl_cache.put(1, 1)
99100
await asyncio.sleep(0.5)
100101
value = await ttl_cache.get(1)

examples/transactions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def async_example():
6262

6363
# rollback transaction on timeout.
6464
try:
65-
async with client.tx_start(timeout=1.0, label='long-tx') as tx:
65+
async with client.tx_start(timeout=1000, label='long-tx') as tx:
6666
await cache.put(key, 'fail')
6767
await asyncio.sleep(2.0)
6868
await tx.commit()
@@ -114,7 +114,7 @@ def sync_example():
114114

115115
# rollback transaction on timeout.
116116
try:
117-
with client.tx_start(timeout=1.0, label='long-tx') as tx:
117+
with client.tx_start(timeout=1000, label='long-tx') as tx:
118118
cache.put(key, 'fail')
119119
time.sleep(2.0)
120120
tx.commit()

pygridgain/aio_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,15 +490,15 @@ def get_cluster(self) -> 'AioCluster':
490490

491491
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
492492
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
493-
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'AioTransaction':
493+
timeout: int = 0, label: Optional[str] = None) -> 'AioTransaction':
494494
"""
495495
Start async thin client transaction. **Supported only python 3.7+**
496496
497497
:param concurrency: (optional) transaction concurrency, see
498-
:py:class:`~pygridgain.datatypes.transactions.TransactionConcurrency`
498+
:py:class:`~pygridgain.datatypes.transactions.TransactionConcurrency`,
499499
:param isolation: (optional) transaction isolation level, see
500-
:py:class:`~pygridgain.datatypes.transactions.TransactionIsolation`
501-
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
500+
:py:class:`~pygridgain.datatypes.transactions.TransactionIsolation`,
501+
:param timeout: (optional) transaction timeout in milliseconds,
502502
:param label: (optional) transaction label.
503503
:return: :py:class:`~pygridgain.transaction.AioTransaction` instance.
504504
"""

pygridgain/cache.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16-
import time
16+
import datetime
1717
from typing import Any, Iterable, Optional, Tuple, Union
1818

1919
from .api.tx_api import get_tx_connection
@@ -137,16 +137,16 @@ def cache_id(self) -> int:
137137

138138
def with_expire_policy(
139139
self, expiry_policy: Optional[ExpiryPolicy] = None,
140-
create: Union[int, float] = ExpiryPolicy.UNCHANGED,
141-
update: Union[int, float] = ExpiryPolicy.UNCHANGED,
142-
access: Union[int, float] = ExpiryPolicy.UNCHANGED
140+
create: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
141+
update: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED,
142+
access: Union[int, datetime.timedelta] = ExpiryPolicy.UNCHANGED
143143
):
144144
"""
145145
:param expiry_policy: optional :class:`~pygridgain.datatypes.expiry_policy.ExpiryPolicy`
146-
object. If it is set, other params will be ignored.
147-
:param create: create TTL in seconds (float) or milliseconds (int),
148-
:param update: Create TTL in seconds (float) or milliseconds (int),
149-
:param access: Create TTL in seconds (float) or milliseconds (int).
146+
object. If it is set, other params will be ignored,
147+
:param create: TTL for create in milliseconds or :py:class:`~time.timedelta`,
148+
:param update: TTL for update in milliseconds or :py:class:`~time.timedelta`,
149+
:param access: TTL for access in milliseconds or :py:class:`~time.timedelta`,
150150
:return: cache decorator with expiry policy set.
151151
"""
152152
if not self.client.protocol_context.is_expiry_policy_supported():

pygridgain/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -745,15 +745,15 @@ def get_cluster(self) -> 'Cluster':
745745

746746
def tx_start(self, concurrency: TransactionConcurrency = TransactionConcurrency.PESSIMISTIC,
747747
isolation: TransactionIsolation = TransactionIsolation.REPEATABLE_READ,
748-
timeout: Union[int, float] = 0, label: Optional[str] = None) -> 'Transaction':
748+
timeout: int = 0, label: Optional[str] = None) -> 'Transaction':
749749
"""
750750
Start thin client transaction.
751751
752752
:param concurrency: (optional) transaction concurrency, see
753-
:py:class:`~pygridgain.datatypes.transactions.TransactionConcurrency`
753+
:py:class:`~pygridgain.datatypes.transactions.TransactionConcurrency`,
754754
:param isolation: (optional) transaction isolation level, see
755-
:py:class:`~pygridgain.datatypes.transactions.TransactionIsolation`
756-
:param timeout: (optional) transaction timeout in seconds if float, in millis if int
755+
:py:class:`~pygridgain.datatypes.transactions.TransactionIsolation`,
756+
:param timeout: (optional) transaction timeout in milliseconds,
757757
:param label: (optional) transaction label.
758758
:return: :py:class:`~pygridgain.transaction.Transaction` instance.
759759
"""

pygridgain/datatypes/cache_properties.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515
#
1616
import ctypes
17+
import math
18+
from typing import Union
1719

1820
from . import ExpiryPolicy
1921
from .prop_codes import *
@@ -137,6 +139,20 @@ async def from_python_async(cls, stream, value):
137139
return cls.from_python(stream, value)
138140

139141

142+
class TimeoutProp(PropBase):
143+
prop_data_class = Long
144+
145+
@classmethod
146+
def from_python(cls, stream, value: int):
147+
if not isinstance(value, int) or value < 0:
148+
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
149+
return super().from_python(stream, value)
150+
151+
@classmethod
152+
async def from_python_async(cls, stream, value):
153+
return cls.from_python(stream, value)
154+
155+
140156
class PropName(PropBase):
141157
prop_code = PROP_NAME
142158
prop_data_class = String
@@ -227,9 +243,8 @@ class PropRebalanceDelay(PropBase):
227243
prop_data_class = Long
228244

229245

230-
class PropRebalanceTimeout(PropBase):
246+
class PropRebalanceTimeout(TimeoutProp):
231247
prop_code = PROP_REBALANCE_TIMEOUT
232-
prop_data_class = Long
233248

234249

235250
class PropRebalanceBatchSize(PropBase):
@@ -262,9 +277,8 @@ class PropCacheKeyConfiguration(PropBase):
262277
prop_data_class = CacheKeyConfiguration
263278

264279

265-
class PropDefaultLockTimeout(PropBase):
280+
class PropDefaultLockTimeout(TimeoutProp):
266281
prop_code = PROP_DEFAULT_LOCK_TIMEOUT
267-
prop_data_class = Long
268282

269283

270284
class PropMaxConcurrentAsyncOperation(PropBase):

pygridgain/datatypes/expiry_policy.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515
#
1616
import ctypes
17+
import math
18+
from datetime import timedelta
1719
from io import SEEK_CUR
1820
from typing import Union
1921

@@ -23,13 +25,16 @@
2325

2426

2527
def _positive(_, attrib, value):
28+
if isinstance(value, timedelta):
29+
value = value.total_seconds() * 1000
30+
2631
if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]:
2732
raise ValueError(f"'{attrib.name}' value must not be negative")
2833

2934

3035
def _write_duration(stream, value):
31-
if isinstance(value, float):
32-
value = int(value * 1000)
36+
if isinstance(value, timedelta):
37+
value = math.floor(value.total_seconds() * 1000)
3338

3439
stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True))
3540

@@ -45,17 +50,17 @@ class ExpiryPolicy:
4550
#: Set TTL eternal.
4651
ETERNAL = -1
4752

48-
#: Set TTL for create in seconds(float) or millis(int)
49-
create = attr.ib(kw_only=True, default=UNCHANGED,
50-
validator=[attr.validators.instance_of((int, float)), _positive])
53+
#: Set TTL for create in milliseconds or :py:class:`~time.timedelta`
54+
create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
55+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5156

52-
#: Set TTL for update in seconds(float) or millis(int)
53-
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
54-
validator=[attr.validators.instance_of((int, float)), _positive])
57+
#: Set TTL for update in milliseconds or :py:class:`~time.timedelta`
58+
update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
59+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5560

56-
#: Set TTL for access in seconds(float) or millis(int)
57-
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, float],
58-
validator=[attr.validators.instance_of((int, float)), _positive])
61+
#: Set TTL for access in milliseconds or :py:class:`~time.timedelta`
62+
access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta],
63+
validator=[attr.validators.instance_of((int, timedelta)), _positive])
5964

6065
class _CType(ctypes.LittleEndianStructure):
6166
_pack_ = 1

pygridgain/transaction.py

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,50 @@
1414
# limitations under the License.
1515
#
1616

17-
import math
18-
from typing import Union
17+
from enum import IntEnum
18+
from typing import Union, Type
1919

2020
from pygridgain.api.tx_api import tx_end, tx_start, tx_end_async, tx_start_async
2121
from pygridgain.datatypes import TransactionIsolation, TransactionConcurrency
2222
from pygridgain.exceptions import CacheError
2323
from pygridgain.utils import status_to_exception
2424

2525

26-
def _convert_to_millis(timeout: Union[int, float]) -> int:
27-
if isinstance(timeout, float):
28-
return math.floor(timeout * 1000)
29-
return timeout
26+
def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
27+
if value not in cls:
28+
raise ValueError(f'{value} not in {cls}')
29+
return value
3030

3131

32-
class Transaction:
32+
def _validate_timeout(value):
33+
if not isinstance(value, int) or value < 0:
34+
raise ValueError(f'Timeout value should be a positive integer, {value} passed instead')
35+
return value
36+
37+
38+
def _validate_label(value):
39+
if value and not isinstance(value, str):
40+
raise ValueError(f'Label should be str, {type(value)} passed instead')
41+
return value
42+
43+
44+
class _BaseTransaction:
45+
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
46+
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
47+
self.client = client
48+
self.concurrency = _validate_int_enum_param(concurrency, TransactionConcurrency)
49+
self.isolation = _validate_int_enum_param(isolation, TransactionIsolation)
50+
self.timeout = _validate_timeout(timeout)
51+
self.label, self.closed = _validate_label(label), False
52+
53+
54+
class Transaction(_BaseTransaction):
3355
"""
3456
Thin client transaction.
3557
"""
3658
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
3759
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
38-
self.client, self.concurrency = client, concurrency
39-
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
40-
self.label, self.closed = label, False
60+
super().__init__(client, concurrency, isolation, timeout, label)
4161
self.tx_id = self.__start_tx()
4262

4363
def commit(self) -> None:
@@ -78,15 +98,13 @@ def __end_tx(self, committed):
7898
return tx_end(self.tx_id, committed)
7999

80100

81-
class AioTransaction:
101+
class AioTransaction(_BaseTransaction):
82102
"""
83103
Async thin client transaction.
84104
"""
85105
def __init__(self, client, concurrency=TransactionConcurrency.PESSIMISTIC,
86106
isolation=TransactionIsolation.REPEATABLE_READ, timeout=0, label=None):
87-
self.client, self.concurrency = client, concurrency
88-
self.isolation, self.timeout = isolation, _convert_to_millis(timeout)
89-
self.label, self.closed = label, False
107+
super().__init__(client, concurrency, isolation, timeout, label)
90108

91109
def __await__(self):
92110
return (yield from self.__aenter__().__await__())

0 commit comments

Comments
 (0)