Skip to content

Commit 2840e33

Browse files
authored
Merge branch 'master' into support_ttl
2 parents a4ce01e + 2016b4c commit 2840e33

File tree

5 files changed

+69
-3
lines changed

5 files changed

+69
-3
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ NOTE: Rule complexity is limited by the querying capabilities of the backend.
5151

5252
`save()` - store the Model instance to the backend
5353

54+
`batch_save()` - store the List of Model instance to the backend
55+
5456
### DynamoDB
5557

5658
`get(key: Union[Dict, Any])`

pydanticrud/backends/dynamodb.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def _to_epoch_decimal(dt: datetime) -> Decimal:
9292
"array": json.dumps,
9393
}
9494

95-
9695
DESERIALIZE_MAP = {
9796
"number": float,
9897
"boolean": bool,
@@ -101,6 +100,11 @@ def _to_epoch_decimal(dt: datetime) -> Decimal:
101100
}
102101

103102

103+
def chunk_list(lst, size):
104+
for i in range(0, len(lst), size):
105+
yield lst[i:i + size]
106+
107+
104108
def index_definition(index_name, keys, gsi=False):
105109
schema = {
106110
"IndexName": index_name,
@@ -118,7 +122,7 @@ def index_definition(index_name, keys, gsi=False):
118122

119123
class DynamoSerializer:
120124
def __init__(self, schema, ttl_field=None):
121-
self.properties = schema["properties"]
125+
self.properties = schema.get("properties")
122126
self.definitions = schema.get("definitions")
123127
self.ttl_field = ttl_field
124128

@@ -420,3 +424,28 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
420424

421425
def delete(self, key):
422426
self.get_table().delete_item(Key=self._key_param_to_dict(key))
427+
428+
def batch_save(self, items: list) -> dict:
429+
"""
430+
This function is to write multiple records in to dynamodb and returns unprocessed records in dict
431+
if something gone wrong with the record.Currently, batch_write is not supporting ConditionExpression
432+
Refer docs:
433+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html
434+
"""
435+
# Prepare the batch write requests
436+
request_items = {self.table_name: []}
437+
438+
# chunk list for size limit of 25 items to write using this batch_write operation refer below.
439+
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html#:~:text=The%20BatchWriteItem%20operation,Data%20Types.
440+
for chunk in chunk_list(items, 25):
441+
serialized_items = [self.serializer.serialize_record(item.dict(by_alias=True)) for item in chunk]
442+
for serialized_item in serialized_items:
443+
request_items[self.table_name].append({"PutRequest": {"Item": serialized_item}})
444+
try:
445+
response = self.dynamodb.batch_write_item(RequestItems=request_items)
446+
except ClientError as e:
447+
raise e
448+
except (ValueError, TypeError, KeyError) as ex:
449+
raise ex
450+
unprocessed_items = response.get("UnprocessedItems", {})
451+
return unprocessed_items

pydanticrud/backends/sqlite.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,6 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
191191

192192
def delete(self, item_key: str):
193193
self._conn.execute(f"DELETE FROM {self.table_name} WHERE {self.hash_key} = ?;", [item_key])
194+
195+
def batch_save(self, items: dict):
196+
raise NotImplementedError("This functionality is not yet implemented")

pydanticrud/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ def save(self) -> bool:
7373
@classmethod
7474
def delete(cls, *args, **kwargs):
7575
cls.__backend__.delete(*args, **kwargs)
76+
77+
@classmethod
78+
def batch_save(cls, *args, **kwargs):
79+
return cls.__backend__.batch_save(*args, **kwargs)

tests/test_dynamodb.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66

77
import docker
8+
from botocore.exceptions import ClientError
89
from pydantic import BaseModel as PydanticBaseModel, Field, root_validator, ValidationError
910
from pydanticrud import BaseModel, DynamoDbBackend, ConditionCheckFailed
1011
import pytest
@@ -68,6 +69,7 @@ class ComplexKeyModel(BaseModel):
6869
category_id: int
6970
notification_id: str
7071
thread_id: str
72+
body: str = "some random string"
7173

7274
class Config:
7375
title = "ComplexModelTitle123"
@@ -244,7 +246,7 @@ def complex_query_data(complex_table):
244246
accounts = [str(uuid4()) for i in range(4)]
245247

246248
data = [
247-
complex_model_data_generator(account=accounts[i % 4], **p)
249+
complex_model_data_generator(account=accounts[i % 4], body="some random string", **p)
248250
for i, p in enumerate(presets)
249251
]
250252
for datum in data:
@@ -546,3 +548,29 @@ def test_alias_model_validator_ingest(dynamo):
546548
data.pop("typ")
547549
with pytest.raises(ValidationError):
548550
AliasKeyModel(**data)
551+
552+
553+
def test_batch_write(dynamo, complex_table):
554+
response = {"UnprocessedItems": {}}
555+
data = [ComplexKeyModel.parse_obj(complex_model_data_generator()) for x in range(0, 10)]
556+
un_proc = ComplexKeyModel.batch_save(data)
557+
assert un_proc == response["UnprocessedItems"]
558+
res_get = ComplexKeyModel.get((data[0].account, data[0].sort_date_key))
559+
res_query = ComplexKeyModel.query(
560+
Rule(f"account == '{data[0].account}' and sort_date_key == '{data[0].sort_date_key}'")
561+
)
562+
assert res_get == data[0]
563+
assert res_query.count == 1
564+
assert res_query.records == [data[0]]
565+
566+
567+
def test_message_batch_write_client_exception(dynamo, complex_table):
568+
data = [
569+
ComplexKeyModel.parse_obj(complex_model_data_generator(body="some big string" * 10000))
570+
for x in range(0, 2)
571+
]
572+
with pytest.raises(ClientError) as exc:
573+
ComplexKeyModel.batch_save(data)
574+
assert (
575+
exc.value.response["Error"]["Message"] == "Item size has exceeded the maximum allowed size"
576+
)

0 commit comments

Comments
 (0)