Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions batch_write_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,48 @@ package dynago

import (
"context"
"fmt"

"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)

const ChunkSize = 25

type BatchPutItemsInput struct {
PartitionKeyValue Attribute
SortKeyValue Attribute
Item any
}

/**
* Used to update records to dynamodb
* @param input slice of record want to put to DB
* @return error
* BatchPutItems writes multiple items to DynamoDB in batches.
* Items are automatically chunked into groups of 25 (DynamoDB's batch limit).
* Each item is marshaled and partition/sort keys are added before writing.
* @param ctx context for the operation
* @param inputs slice of items to put into DynamoDB
* @return error if operation fails
*/

func (t *Client) BatchWriteItems(ctx context.Context, input []map[string]types.AttributeValue) error {
items := make([]types.WriteRequest, 0, len(input))
func (t *Client) BatchPutItems(ctx context.Context, inputs []BatchPutItemsInput) error {
items := make([]types.WriteRequest, 0, len(inputs))
table := t.TableName
for _, model := range input {
items = append(items,
types.WriteRequest{
PutRequest: &types.PutRequest{
Item: model,
},

for _, input := range inputs {
item, err := attributevalue.MarshalMap(input.Item)
if err != nil {
return fmt.Errorf("failed to marshall item; %s", err)
}

for k, v := range t.NewKeys(input.PartitionKeyValue, input.SortKeyValue) {
item[k] = v
}

items = append(items, types.WriteRequest{
PutRequest: &types.PutRequest{
Item: item,
},
)
})
}
chunkedItems := chunkBy(items, ChunkSize)
for _, chunkedBatch := range chunkedItems {
Expand All @@ -40,5 +58,4 @@ func (t *Client) BatchWriteItems(ctx context.Context, input []map[string]types.A
}

return nil

}
1 change: 1 addition & 0 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type WriteAPI interface {
PutItem(ctx context.Context, pk, sk Attribute, item interface{}, opt ...PutOption) error
DeleteItem(ctx context.Context, pk, sk string) error
BatchDeleteItems(ctx context.Context, input []AttributeRecord) []AttributeRecord
BatchPutItems(ctx context.Context, items []BatchPutItemsInput) error
}

type TransactionAPI interface {
Expand Down
208 changes: 208 additions & 0 deletions tests/batch_put_items_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package tests

import (
"context"
"reflect"
"testing"

"github.com/oolio-group/dynago"
)

type BatchTestRecord struct {
ID string
Name string
Age int
}

func TestBatchPutItems(t *testing.T) {
table := prepareTable(t)
ctx := context.TODO()

testCases := []struct {
title string
inputs []dynago.BatchPutItemsInput
expected []BatchTestRecord
}{
{
title: "single item batch",
inputs: []dynago.BatchPutItemsInput{
{
PartitionKeyValue: dynago.StringValue("user_1"),
SortKeyValue: dynago.StringValue("profile_1"),
Item: BatchTestRecord{
ID: "1",
Name: "John Doe",
Age: 30,
},
},
},
expected: []BatchTestRecord{
{
ID: "1",
Name: "John Doe",
Age: 30,
},
},
},
{
title: "multiple items batch",
inputs: []dynago.BatchPutItemsInput{
{
PartitionKeyValue: dynago.StringValue("user_1"),
SortKeyValue: dynago.StringValue("profile_1"),
Item: BatchTestRecord{
ID: "1",
Name: "John Doe",
Age: 30,
},
},
{
PartitionKeyValue: dynago.StringValue("user_2"),
SortKeyValue: dynago.StringValue("profile_2"),
Item: BatchTestRecord{
ID: "2",
Name: "Jane Smith",
Age: 25,
},
},
{
PartitionKeyValue: dynago.StringValue("user_3"),
SortKeyValue: dynago.StringValue("profile_3"),
Item: BatchTestRecord{
ID: "3",
Name: "Bob Johnson",
Age: 35,
},
},
},
expected: []BatchTestRecord{
{
ID: "1",
Name: "John Doe",
Age: 30,
},
{
ID: "2",
Name: "Jane Smith",
Age: 25,
},
{
ID: "3",
Name: "Bob Johnson",
Age: 35,
},
},
},
{
title: "batch with more than 25 items (chunking test)",
inputs: func() []dynago.BatchPutItemsInput {
var inputs []dynago.BatchPutItemsInput
for i := 1; i <= 30; i++ {
inputs = append(inputs, dynago.BatchPutItemsInput{
PartitionKeyValue: dynago.StringValue("batch_test"),
SortKeyValue: dynago.StringValue("item_" + string(rune(i+'0'))),
Item: BatchTestRecord{
ID: string(rune(i + '0')),
Name: "User " + string(rune(i+'0')),
Age: 20 + i,
},
})
}
return inputs
}(),
expected: func() []BatchTestRecord {
var expected []BatchTestRecord
for i := 1; i <= 30; i++ {
expected = append(expected, BatchTestRecord{
ID: string(rune(i + '0')),
Name: "User " + string(rune(i+'0')),
Age: 20 + i,
})
}
return expected
}(),
},
}

for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {
t.Parallel()

err := table.BatchPutItems(ctx, tc.inputs)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

// Verify all items were written correctly
for i, input := range tc.inputs {
var result BatchTestRecord
err, found := table.GetItem(ctx, input.PartitionKeyValue, input.SortKeyValue, &result)
if err != nil {
t.Fatalf("unexpected error retrieving item %d: %s", i, err)
}
if !found {
t.Fatalf("item %d not found", i)
}
if !reflect.DeepEqual(tc.expected[i], result) {
t.Errorf("item %d: expected %+v, got %+v", i, tc.expected[i], result)
}
}
})
}
}

func TestBatchPutItemsError(t *testing.T) {
table := prepareTable(t)
ctx := context.TODO()

testCases := []struct {
title string
inputs []dynago.BatchPutItemsInput
expectError bool
}{
{
title: "empty partition key should error",
inputs: []dynago.BatchPutItemsInput{
{
PartitionKeyValue: dynago.StringValue(""),
SortKeyValue: dynago.StringValue("profile_1"),
Item: BatchTestRecord{
ID: "1",
Name: "John Doe",
Age: 30,
},
},
},
expectError: true,
},
{
title: "empty sort key should error",
inputs: []dynago.BatchPutItemsInput{
{
PartitionKeyValue: dynago.StringValue("user_1"),
SortKeyValue: dynago.StringValue(""),
Item: BatchTestRecord{
ID: "1",
Name: "John Doe",
Age: 30,
},
},
},
expectError: true,
},
}

for _, tc := range testCases {
t.Run(tc.title, func(t *testing.T) {
t.Parallel()

err := table.BatchPutItems(ctx, tc.inputs)
if tc.expectError && err == nil {
t.Fatalf("expected error but got none")
}
if !tc.expectError && err != nil {
t.Fatalf("unexpected error: %s", err)
}
})
}
}