Skip to content

Commit 0936be5

Browse files
author
Harinath
committed
Added batch_save method to backend.
1 parent 3d7695b commit 0936be5

File tree

7 files changed

+189
-8
lines changed

7 files changed

+189
-8
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ NOTE: Rule complexity is limited by the querying capabilities of the backend.
5151

5252
`save()` - store the Model instance to the backend
5353

54+
`batch_save()` - store the List of Model instance to the backend
55+
5456
### DynamoDB
5557

5658
`get(key: Union[Dict, Any])`

pydanticrud/backends/dynamodb.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from ..main import IterableResult
1313
from ..exceptions import DoesNotExist, ConditionCheckFailed
14+
from ..dynamo_type_serializer import DynamoTypeSerializer
1415

1516
log = logging.getLogger(__name__)
1617

@@ -86,7 +87,6 @@ def _to_epoch_float(dt):
8687
"array": json.dumps,
8788
}
8889

89-
9090
DESERIALIZE_MAP = {
9191
"number": float,
9292
"boolean": bool,
@@ -95,6 +95,10 @@ def _to_epoch_float(dt):
9595
}
9696

9797

98+
def chunk_list(lst, size):
99+
return [lst[i: i + size] for i in range(0, len(lst), size)]
100+
101+
98102
def index_definition(index_name, keys, gsi=False):
99103
schema = {
100104
"IndexName": index_name,
@@ -110,9 +114,10 @@ def index_definition(index_name, keys, gsi=False):
110114
return schema
111115

112116

113-
class DynamoSerializer:
117+
class DynamoSerializer(DynamoTypeSerializer):
114118
def __init__(self, schema):
115-
self.properties = schema["properties"]
119+
super().__init__()
120+
self.properties = schema.get("properties")
116121
self.definitions = schema.get("definitions")
117122

118123
def _get_type_possibilities(self, field_name) -> Set[tuple]:
@@ -409,3 +414,32 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
409414

410415
def delete(self, key):
411416
self.get_table().delete_item(Key=self._key_param_to_dict(key))
417+
418+
def batch_save(self, items: list) -> dict:
419+
"""
420+
This function is to write multiple records in to dynamodb and returns unprocessed records in dict
421+
if something gone wrong with the record.This will by default try 3 time if any unprocessed
422+
records exists in result. Currently, batch_write is not supporting ConditionExpression
423+
Refer docs:
424+
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html
425+
"""
426+
# Prepare the batch write requests
427+
request_items = {self.table_name: []}
428+
429+
# chunk list for size limit of 25 items to write using this batch_write operation refer below.
430+
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/batch_write_item.html#:~:text=The%20BatchWriteItem%20operation,Data%20Types.
431+
for chunk in chunk_list(items, 20):
432+
serialized_items = [self.serializer.serialize_record(item.dict(by_alias=True)) for item in chunk]
433+
for serialized_item in serialized_items:
434+
request_items[self.table_name].append({"PutRequest":
435+
{"Item": serialized_item }#self.serializer.serialize_item(serialized_item)}
436+
})
437+
try:
438+
response = self.dynamodb.batch_write_item(RequestItems=request_items)
439+
except ClientError as e:
440+
raise e
441+
except (ValueError, TypeError, KeyError) as ex:
442+
raise ex
443+
unprocessed_items = response.get("UnprocessedItems", {})
444+
# Return any unprocessed items
445+
return unprocessed_items

pydanticrud/backends/sqlite.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,3 +191,6 @@ def save(self, item, condition: Optional[Rule] = None) -> bool:
191191

192192
def delete(self, item_key: str):
193193
self._conn.execute(f"DELETE FROM {self.table_name} WHERE {self.hash_key} = ?;", [item_key])
194+
195+
def batch_save(self, items: dict):
196+
pass
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import decimal
2+
from typing import Any, Mapping, Union
3+
4+
NoneType = type(None)
5+
6+
class DynamoTypeSerializer:
7+
def __init__(self):
8+
self._methods = {
9+
bool: serialize_bool,
10+
decimal.Decimal: serialize_number,
11+
dict: self.serialize_mapping,
12+
int: serialize_number,
13+
list: self.serialize_list,
14+
Mapping: self.serialize_mapping,
15+
NoneType: serialize_none,
16+
str: serialize_str,
17+
}
18+
19+
def serialize_item(self, item) -> dict:
20+
return {k: self.serialize(v) for k, v in item.items()}
21+
22+
def serialize_mapping(self, value: Mapping) -> dict:
23+
return {
24+
'M': {k: self.serialize(v) for k, v in value.items()}
25+
}
26+
27+
def serialize_list(self, value: Union[list, tuple]) -> dict:
28+
return {'L': [self.serialize(item) for item in value]}
29+
30+
def serialize(self, value: Any):
31+
value_type = type(value)
32+
try:
33+
route = self._methods[value_type]
34+
except KeyError:
35+
for type_route, route in self._methods.items():
36+
if issubclass(value_type, type_route):
37+
self._methods[value_type] = route
38+
return route(value)
39+
else:
40+
return route(value)
41+
raise TypeError(f'The value {value} is not Dynamodb serializable type.')
42+
43+
def serialize_bool(value: bool) -> dict:
44+
return {'BOOL': value}
45+
46+
def serialize_number(value: Union[int, float, decimal.Decimal]) -> dict:
47+
return {'N': str(value)}
48+
49+
def serialize_none(value: None) -> dict:
50+
return {'NULL': True}
51+
52+
def serialize_str(value: str) -> dict:
53+
return {'S': value}
54+

pydanticrud/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,7 @@ def save(self) -> bool:
7373
@classmethod
7474
def delete(cls, *args, **kwargs):
7575
cls.__backend__.delete(*args, **kwargs)
76+
77+
@classmethod
78+
def batch_save(cls, *args, **kwargs):
79+
return cls.__backend__.batch_save(*args, **kwargs)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from decimal import Decimal
2+
3+
from pydanticrud.backends.dynamodb import DynamoSerializer
4+
5+
serializer = DynamoSerializer(schema={})
6+
def test_int():
7+
assert serializer.serialize(0) == {'N': '0'}
8+
assert serializer.serialize(34) == {'N': '34'}
9+
10+
11+
def test_decimal():
12+
assert serializer.serialize(Decimal('1.2345')) == {'N': '1.2345'}
13+
assert serializer.serialize(Decimal('0.00')) == {'N': '0.00'}
14+
15+
def test_str():
16+
assert serializer.serialize('') == {'S': ''}
17+
assert serializer.serialize('hello') == {'S': 'hello'}
18+
assert serializer.serialize('hello world') == {'S': 'hello world'}
19+
20+
def test_none():
21+
return serializer.serialize(None) == {'NULL': True}
22+
23+
def test_bools():
24+
assert serializer.serialize(True) == {'BOOL': True}
25+
assert serializer.serialize(False) == {'BOOL': False}
26+
27+
28+
def test_list():
29+
assert serializer.serialize([1, 'abc', None, None, True]) == {
30+
'L': [{'N': '1'}, {'S': 'abc'}, {'NULL': True}, {'NULL': True},
31+
{'BOOL': True}]
32+
}
33+
34+
def test_maps():
35+
assert serializer.serialize({
36+
'hello': 'world',
37+
'another': 123,
38+
'loop': {'Working': True}
39+
}) == {
40+
'M': {
41+
'hello': {'S': 'world'},
42+
'another': {'N': '123'},
43+
'loop': {'M': {'Working': {'BOOL': True}}}
44+
}
45+
}
46+
47+
def test_item():
48+
assert serializer.serialize_item({
49+
'Hello': 'World',
50+
'another': 123,
51+
'loop': {'Working': True}
52+
}) == {
53+
'Hello': {'S': 'World'},
54+
'another': {'N': '123'},
55+
'loop': {'M': {'Working': {'BOOL': True}}}
56+
}

tests/test_dynamodb.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66

77
import docker
8+
from botocore.exceptions import ClientError
89
from pydantic import BaseModel as PydanticBaseModel, Field, root_validator, ValidationError
910
from pydanticrud import BaseModel, DynamoDbBackend, ConditionCheckFailed
1011
import pytest
@@ -66,6 +67,7 @@ class ComplexKeyModel(BaseModel):
6667
category_id: int
6768
notification_id: str
6869
thread_id: str
70+
body: str = 'some random string'
6971

7072
class Config:
7173
title = "ComplexModelTitle123"
@@ -396,7 +398,7 @@ def test_query_with_hash_key_complex(dynamo, complex_query_data):
396398

397399
@pytest.mark.parametrize('order', ('asc', 'desc'))
398400
def test_ordered_query_with_hash_key_complex(dynamo, complex_query_data, order):
399-
middle_record = complex_query_data[(len(complex_query_data)//2)]
401+
middle_record = complex_query_data[(len(complex_query_data) // 2)]
400402
res = ComplexKeyModel.query(
401403
Rule(f"account == '{middle_record['account']}' and sort_date_key >= '{middle_record['sort_date_key']}'"),
402404
order=order
@@ -414,8 +416,9 @@ def test_ordered_query_with_hash_key_complex(dynamo, complex_query_data, order):
414416
@pytest.mark.parametrize('order', ('asc', 'desc'))
415417
def test_pagination_query_with_hash_key_complex(dynamo, complex_query_data, order):
416418
page_size = 2
417-
middle_record = complex_query_data[(len(complex_query_data)//2)]
418-
query_rule = Rule(f"account == '{middle_record['account']}' and sort_date_key >= '{middle_record['sort_date_key']}'")
419+
middle_record = complex_query_data[(len(complex_query_data) // 2)]
420+
query_rule = Rule(
421+
f"account == '{middle_record['account']}' and sort_date_key >= '{middle_record['sort_date_key']}'")
419422
res = ComplexKeyModel.query(query_rule, order=order, limit=page_size)
420423
res_data = [(m.account, m.sort_date_key) for m in res]
421424
check_data = sorted([
@@ -432,13 +435,13 @@ def test_pagination_query_with_hash_key_complex(dynamo, complex_query_data, orde
432435
(m["account"], m["sort_date_key"])
433436
for m in complex_query_data
434437
if m["account"] == middle_record['account'] and m["sort_date_key"] >= middle_record['sort_date_key']
435-
], reverse=order == 'desc')[page_size:page_size*2]
438+
], reverse=order == 'desc')[page_size:page_size * 2]
436439
assert res_data == check_data
437440

438441

439442
def test_pagination_query_with_index_complex(dynamo, complex_query_data):
440443
page_size = 2
441-
middle_record = complex_query_data[(len(complex_query_data)//2)]
444+
middle_record = complex_query_data[(len(complex_query_data) // 2)]
442445
query_rule = Rule(f"account == '{middle_record['account']}' and category_id >= {middle_record['category_id']}")
443446
check_data = ComplexKeyModel.query(query_rule)
444447
res = ComplexKeyModel.query(query_rule, limit=page_size)
@@ -528,3 +531,28 @@ def test_alias_model_validator_ingest(dynamo):
528531
data.pop("typ")
529532
with pytest.raises(ValidationError):
530533
AliasKeyModel(**data)
534+
535+
536+
def test_batch_write(dynamo, complex_table):
537+
response = {"UnprocessedItems": {}}
538+
data = [
539+
ComplexKeyModel.parse_obj(complex_model_data_generator())
540+
for x in range(0, 10)
541+
]
542+
un_proc = ComplexKeyModel.batch_save(data)
543+
assert un_proc == response["UnprocessedItems"]
544+
res_get = ComplexKeyModel.get((data[0].account, data[0].sort_date_key))
545+
res_query = ComplexKeyModel.query(Rule(f"account == '{data[0].account}' and sort_date_key == '{data[0].sort_date_key}'"))
546+
assert res_get == data[0]
547+
assert res_query.count == 1
548+
assert res_query.records == [data[0]]
549+
550+
551+
def test_message_batch_write_client_exception(dynamo, complex_table):
552+
data = [
553+
ComplexKeyModel.parse_obj(complex_model_data_generator(body="some big string"*10000))
554+
for x in range(0, 2)
555+
]
556+
with pytest.raises(ClientError) as exc:
557+
ComplexKeyModel.batch_save(data)
558+
assert exc.value.response['Error']['Message'] == 'Item size has exceeded the maximum allowed size'

0 commit comments

Comments
 (0)