Skip to content

Commit 2016b4c

Browse files
authored
Merge pull request #17 from RSS-Engineering/batch_write_to_backend
Batch write to backend
2 parents 8058047 + 526885b commit 2016b4c

File tree

5 files changed

+69
-3
lines changed

5 files changed

+69
-3
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ NOTE: Rule complexity is limited by the querying capabilities of the backend.
5151

5252
`save()` - store the Model instance to the backend
5353

54+
`batch_save()` - store the List of Model instance to the backend
55+
5456
### DynamoDB
5557

5658
`get(key: Union[Dict, Any])`

pydanticrud/backends/dynamodb.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ def _to_epoch_float(dt):
8686
"array": json.dumps,
8787
}
8888

89-
9089
DESERIALIZE_MAP = {
9190
"number": float,
9291
"boolean": bool,
@@ -95,6 +94,11 @@ def _to_epoch_float(dt):
9594
}
9695

9796

97+
def chunk_list(lst, size):
98+
for i in range(0, len(lst), size):
99+
yield lst[i:i + size]
100+
101+
98102
def index_definition(index_name, keys, gsi=False):
99103
schema = {
100104
"IndexName": index_name,
@@ -112,7 +116,7 @@ def index_definition(index_name, keys, gsi=False):
112116

113117
class DynamoSerializer:
114118
def __init__(self, schema):
115-
self.properties = schema["properties"]
119+
self.properties = schema.get("properties")
116120
self.definitions = schema.get("definitions")
117121

118122
def _get_type_possibilities(self, field_name) -> Set[tuple]:
@@ -409,3 +413,28 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
409413

410414
def delete(self, key):
411415
self.get_table().delete_item(Key=self._key_param_to_dict(key))
416+
417+
def batch_save(self, items: list) -> dict:
418+
"""
419+
This function is to write multiple records in to dynamodb and returns unprocessed records in dict
420+
if something gone wrong with the record.Currently, batch_write is not supporting ConditionExpression
421+
Refer docs:
422+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html
423+
"""
424+
# Prepare the batch write requests
425+
request_items = {self.table_name: []}
426+
427+
# chunk list for size limit of 25 items to write using this batch_write operation refer below.
428+
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html#:~:text=The%20BatchWriteItem%20operation,Data%20Types.
429+
for chunk in chunk_list(items, 25):
430+
serialized_items = [self.serializer.serialize_record(item.dict(by_alias=True)) for item in chunk]
431+
for serialized_item in serialized_items:
432+
request_items[self.table_name].append({"PutRequest": {"Item": serialized_item}})
433+
try:
434+
response = self.dynamodb.batch_write_item(RequestItems=request_items)
435+
except ClientError as e:
436+
raise e
437+
except (ValueError, TypeError, KeyError) as ex:
438+
raise ex
439+
unprocessed_items = response.get("UnprocessedItems", {})
440+
return unprocessed_items

pydanticrud/backends/sqlite.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,6 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
191191

192192
def delete(self, item_key: str):
193193
self._conn.execute(f"DELETE FROM {self.table_name} WHERE {self.hash_key} = ?;", [item_key])
194+
195+
def batch_save(self, items: dict):
196+
raise NotImplementedError("This functionality is not yet implemented")

pydanticrud/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ def save(self) -> bool:
7373
@classmethod
7474
def delete(cls, *args, **kwargs):
7575
cls.__backend__.delete(*args, **kwargs)
76+
77+
@classmethod
78+
def batch_save(cls, *args, **kwargs):
79+
return cls.__backend__.batch_save(*args, **kwargs)

tests/test_dynamodb.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66

77
import docker
8+
from botocore.exceptions import ClientError
89
from pydantic import BaseModel as PydanticBaseModel, Field, root_validator, ValidationError
910
from pydanticrud import BaseModel, DynamoDbBackend, ConditionCheckFailed
1011
import pytest
@@ -66,6 +67,7 @@ class ComplexKeyModel(BaseModel):
6667
category_id: int
6768
notification_id: str
6869
thread_id: str
70+
body: str = "some random string"
6971

7072
class Config:
7173
title = "ComplexModelTitle123"
@@ -241,7 +243,7 @@ def complex_query_data(complex_table):
241243
accounts = [str(uuid4()) for i in range(4)]
242244

243245
data = [
244-
complex_model_data_generator(account=accounts[i % 4], **p)
246+
complex_model_data_generator(account=accounts[i % 4], body="some random string", **p)
245247
for i, p in enumerate(presets)
246248
]
247249
for datum in data:
@@ -528,3 +530,29 @@ def test_alias_model_validator_ingest(dynamo):
528530
data.pop("typ")
529531
with pytest.raises(ValidationError):
530532
AliasKeyModel(**data)
533+
534+
535+
def test_batch_write(dynamo, complex_table):
536+
response = {"UnprocessedItems": {}}
537+
data = [ComplexKeyModel.parse_obj(complex_model_data_generator()) for x in range(0, 10)]
538+
un_proc = ComplexKeyModel.batch_save(data)
539+
assert un_proc == response["UnprocessedItems"]
540+
res_get = ComplexKeyModel.get((data[0].account, data[0].sort_date_key))
541+
res_query = ComplexKeyModel.query(
542+
Rule(f"account == '{data[0].account}' and sort_date_key == '{data[0].sort_date_key}'")
543+
)
544+
assert res_get == data[0]
545+
assert res_query.count == 1
546+
assert res_query.records == [data[0]]
547+
548+
549+
def test_message_batch_write_client_exception(dynamo, complex_table):
550+
data = [
551+
ComplexKeyModel.parse_obj(complex_model_data_generator(body="some big string" * 10000))
552+
for x in range(0, 2)
553+
]
554+
with pytest.raises(ClientError) as exc:
555+
ComplexKeyModel.batch_save(data)
556+
assert (
557+
exc.value.response["Error"]["Message"] == "Item size has exceeded the maximum allowed size"
558+
)

0 commit comments

Comments
 (0)