Skip to content

Commit f31ff7b

Browse files
authored
Merge pull request #77 from purkhusid/multi-table-transact
Add multi table transactions
2 parents b5bcf15 + 1fdc601 commit f31ff7b

File tree

4 files changed

+258
-154
lines changed

4 files changed

+258
-154
lines changed

README.md

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,9 @@ table.Scan(startedBefore (DateTimeOffset.Now - TimeSpan.FromDays 1.))
253253

254254
(See [`Script.fsx`](src/FSharp.AWS.DynamoDB/Script.fsx) for example timings showing the relative efficiency.)
255255

256-
## `TransactWriteItems`
256+
## `Transaction`
257257

258-
Using [`TransactWriteItems`](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html)
259-
to compose multiple write operations into an aggregate request that will succeed or fail atomically is supported.
260-
See [overview article](https://www.alexdebrie.com/posts/dynamodb-transactions) by [@alexdebrie](https://github.com/alexdebrie)
261-
262-
NOTE: while the underlying API supports combining operations on multiple tables, the exposed API does not.
258+
`FSharp.AWS.DynamoDB` supports DynamoDB transactions via the `Transaction` class.
263259

264260
The supported individual operations are:
265261
- `Check`: `ConditionCheck` - potentially veto the batch if the ([precompiled](#Precomputing-DynamoDB-Expressions)) `condition` is not fulfilled by the item identified by `key`
@@ -271,21 +267,22 @@ The supported individual operations are:
271267
let compile = table.Template.PrecomputeConditionalExpr
272268
let doesntExistCondition = compile <@ fun t -> NOT_EXISTS t.Value @>
273269
let existsCondition = compile <@ fun t -> EXISTS t.Value @>
274-
275270
let key = TableKey.Combined(hashKey, rangeKey)
276-
let requests = [
277-
TransactWrite.Check (key, doesntExistCondition)
278-
TransactWrite.Put (item2, None)
279-
TransactWrite.Put (item3, Some existsCondition)
280-
TransactWrite.Delete (table.Template.ExtractKey item5, None) ]
281-
do! table.TransactWriteItems requests
271+
272+
let transaction = Transaction()
273+
274+
transaction.Check(table, key, doesntExistCondition)
275+
transaction.Put(table, item2, None)
276+
transaction.Put(table, item3, Some existsCondition)
277+
transaction.Delete (table ,table.Template.ExtractKey item5, None)
278+
do! transaction.TransactWriteItems()
282279
```
283280

284281
Failed preconditions (or `TransactWrite.Check`s) are signalled as per the underlying API: via a `TransactionCanceledException`.
285282
Use `TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed` to trap such conditions:
286283

287284
```fsharp
288-
try do! table.TransactWriteItems writes
285+
try do! transaction.TransactWriteItems()
289286
return Some result
290287
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed -> return None
291288
```

src/FSharp.AWS.DynamoDB/TableContext.fs

Lines changed: 147 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -230,65 +230,6 @@ type private LimitType =
230230
static member AllOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue All
231231
static member DefaultOrCount(l: int option) = l |> Option.map Count |> Option.defaultValue Default
232232

233-
/// <summary>Represents an individual request that can be included in the <c>TransactItems</c> of a <c>TransactWriteItems</c> call.</summary>
234-
[<RequireQualifiedAccess>]
235-
type TransactWrite<'TRecord> =
236-
/// Specify a Check to be run on a specified item.
237-
/// If the condition does not hold, the overall TransactWriteItems request will be Canceled.
238-
| Check of key: TableKey * condition: ConditionExpression<'TRecord>
239-
/// Specify a PutItem operation to be performed, inserting or replacing an item in the Table.
240-
/// If the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
241-
| Put of item: 'TRecord * precondition: ConditionExpression<'TRecord> option
242-
/// Specify an UpdateItem operation to be performed, applying an updater expression on the item identified by the specified `key`, if it exists.
243-
/// If the item exists and the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
244-
| Update of key: TableKey * precondition: ConditionExpression<'TRecord> option * updater: UpdateExpression<'TRecord>
245-
/// Specify a DeleteItem operation to be performed, removing the item identified by the specified `key` if it exists.
246-
/// If the item exists and the (optional) precondition does not hold, the overall TransactWriteItems request will be Canceled.
247-
| Delete of key: TableKey * precondition: ConditionExpression<'TRecord> option
248-
249-
/// Helpers for building a <c>TransactWriteItemsRequest</c> to supply to <c>TransactWriteItems</c>
250-
module TransactWriteItemsRequest =
251-
252-
let private toTransactWriteItem<'TRecord>
253-
tableName
254-
(template: RecordTemplate<'TRecord>)
255-
: TransactWrite<'TRecord> -> TransactWriteItem =
256-
function
257-
| TransactWrite.Check(key, cond) ->
258-
let req = ConditionCheck(TableName = tableName, Key = template.ToAttributeValues key)
259-
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
260-
req.ConditionExpression <- cond.Conditional.Write writer
261-
TransactWriteItem(ConditionCheck = req)
262-
| TransactWrite.Put(item, maybeCond) ->
263-
let req = Put(TableName = tableName, Item = template.ToAttributeValues item)
264-
maybeCond
265-
|> Option.iter (fun cond ->
266-
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
267-
req.ConditionExpression <- cond.Conditional.Write writer)
268-
TransactWriteItem(Put = req)
269-
| TransactWrite.Update(key, maybeCond, updater) ->
270-
let req = Update(TableName = tableName, Key = template.ToAttributeValues key)
271-
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
272-
req.UpdateExpression <- updater.UpdateOps.Write(writer)
273-
maybeCond |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
274-
TransactWriteItem(Update = req)
275-
| TransactWrite.Delete(key, maybeCond) ->
276-
let req = Delete(TableName = tableName, Key = template.ToAttributeValues key)
277-
maybeCond
278-
|> Option.iter (fun cond ->
279-
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
280-
req.ConditionExpression <- cond.Conditional.Write writer)
281-
TransactWriteItem(Delete = req)
282-
let internal toTransactItems<'TRecord> tableName template items =
283-
Seq.map (toTransactWriteItem<'TRecord> tableName template) items |> rlist
284-
285-
/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
286-
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
287-
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
288-
function
289-
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
290-
| _ -> None
291-
292233
/// Helpers for identifying Failed Precondition check outcomes emanating from <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c>
293234
module Precondition =
294235
/// <summary>Exception filter to identify whether an individual (non-transactional) <c>PutItem</c>, <c>UpdateItem</c> or <c>DeleteItem</c> call's <c>precondition</c> check failing.</summary>
@@ -297,6 +238,7 @@ module Precondition =
297238
| :? ConditionalCheckFailedException -> Some()
298239
| _ -> None
299240

241+
300242
/// DynamoDB client object for performing table operations in the context of given F# record representations
301243
[<Sealed; AutoSerializable(false)>]
302244
type TableContext<'TRecord>
@@ -549,7 +491,6 @@ type TableContext<'TRecord>
549491
/// Record-induced table template
550492
member _.Template = template
551493

552-
553494
/// <summary>
554495
/// Creates a DynamoDB client instance for given F# record and table name.<br/>
555496
/// For creating, provisioning or verification, see <c>VerifyOrCreateTableAsync</c> and <c>VerifyTableAsync</c>.
@@ -900,31 +841,6 @@ type TableContext<'TRecord>
900841
return unprocessed |> Array.map template.ExtractKey
901842
}
902843

903-
904-
/// <summary>
905-
/// Atomically applies a set of 1-100 write operations to the table.<br/>
906-
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
907-
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
908-
/// </summary>
909-
/// <param name="items">Operations to be performed.<br/>
910-
/// Throws <c>ArgumentOutOfRangeException</c> if item count is not between 1 and 100 as required by underlying API.<br/>
911-
/// Use <c>TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed</c> to identify any Precondition Check failures.</param>
912-
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
913-
member _.TransactWriteItems(items: seq<TransactWrite<'TRecord>>, ?clientRequestToken) : Async<unit> = async {
914-
let reqs = TransactWriteItemsRequest.toTransactItems tableName template items
915-
if reqs.Count = 0 || reqs.Count > 100 then
916-
raise <| System.ArgumentOutOfRangeException(nameof items, "must be between 1 and 100 items.")
917-
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = reqs)
918-
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
919-
let! ct = Async.CancellationToken
920-
let! response = client.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
921-
maybeReport
922-
|> Option.iter (fun r -> r TransactWriteItems (Seq.toList response.ConsumedCapacity) reqs.Count)
923-
if response.HttpStatusCode <> HttpStatusCode.OK then
924-
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
925-
}
926-
927-
928844
/// <summary>
929845
/// Asynchronously queries table with given condition expressions.
930846
/// </summary>
@@ -1402,6 +1318,141 @@ type TableContext<'TRecord>
14021318
else
14031319
t.VerifyTableAsync()
14041320

1321+
member t.Transaction() =
1322+
match metricsCollector with
1323+
| Some metricsCollector -> Transaction(metricsCollector = metricsCollector)
1324+
| None -> Transaction()
1325+
1326+
/// <summary>
1327+
/// Represents a transactional set of operations to be applied atomically to a arbitrary number of DynamoDB tables.
1328+
/// </summary>
1329+
/// <param name="metricsCollector">Function to receive request metrics.</param>
1330+
and Transaction(?metricsCollector: (RequestMetrics -> unit)) =
1331+
let transactionItems = ResizeArray<TransactWriteItem>()
1332+
let mutable (dynamoDbClient: IAmazonDynamoDB) = null
1333+
1334+
let setClient client =
1335+
if dynamoDbClient = null then
1336+
dynamoDbClient <- client
1337+
1338+
let reportMetrics collector (tableName: string) (operation: Operation) (consumedCapacity: ConsumedCapacity list) (itemCount: int) =
1339+
collector
1340+
{ TableName = tableName
1341+
Operation = operation
1342+
ConsumedCapacity = consumedCapacity
1343+
ItemCount = itemCount }
1344+
1345+
let returnConsumedCapacity, maybeReport =
1346+
match metricsCollector with
1347+
| Some sink -> ReturnConsumedCapacity.INDEXES, Some(reportMetrics sink)
1348+
| None -> ReturnConsumedCapacity.NONE, None
1349+
1350+
/// <summary>
1351+
/// Adds a Put operation to the transaction.
1352+
/// </summary>
1353+
/// <param name="tableContext">Table context to operate on.</param>
1354+
/// <param name="item">Item to be put.</param>
1355+
/// <param name="precondition">Optional precondition expression.</param>
1356+
member this.Put<'TRecord>
1357+
(
1358+
tableContext: TableContext<'TRecord>,
1359+
item: 'TRecord,
1360+
?precondition: ConditionExpression<'TRecord>
1361+
) : Transaction =
1362+
setClient tableContext.Client
1363+
let req = Put(TableName = tableContext.TableName, Item = tableContext.Template.ToAttributeValues item)
1364+
precondition
1365+
|> Option.iter (fun cond ->
1366+
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
1367+
req.ConditionExpression <- cond.Conditional.Write writer)
1368+
transactionItems.Add(TransactWriteItem(Put = req))
1369+
this
1370+
1371+
/// <summary>
1372+
/// Adds a ConditionCheck operation to the transaction.
1373+
/// </summary>
1374+
/// <param name="tableContext">Table context to operate on.</param>
1375+
/// <param name="key">Key of item to check.</param>
1376+
/// <param name="condition">Condition to check.</param>
1377+
member this.Check(tableContext: TableContext<'TRecord>, key: TableKey, condition: ConditionExpression<'TRecord>) : Transaction =
1378+
setClient tableContext.Client
1379+
1380+
let req = ConditionCheck(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
1381+
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
1382+
req.ConditionExpression <- condition.Conditional.Write writer
1383+
transactionItems.Add(TransactWriteItem(ConditionCheck = req))
1384+
this
1385+
1386+
/// <summary>
1387+
/// Adds an Update operation to the transaction.
1388+
/// </summary>
1389+
/// <param name="tableContext">Table context to operate on.</param>
1390+
/// <param name="key">Key of item to update.</param>
1391+
/// <param name="updater">Update expression.</param>
1392+
/// <param name="precondition">Optional precondition expression.</param>
1393+
member this.Update
1394+
(
1395+
tableContext: TableContext<'TRecord>,
1396+
key: TableKey,
1397+
updater: UpdateExpression<'TRecord>,
1398+
?precondition: ConditionExpression<'TRecord>
1399+
1400+
) : Transaction =
1401+
setClient tableContext.Client
1402+
1403+
let req = Update(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
1404+
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
1405+
req.UpdateExpression <- updater.UpdateOps.Write(writer)
1406+
precondition |> Option.iter (fun cond -> req.ConditionExpression <- cond.Conditional.Write writer)
1407+
transactionItems.Add(TransactWriteItem(Update = req))
1408+
this
1409+
1410+
/// <summary>
1411+
/// Adds a Delete operation to the transaction.
1412+
/// </summary>
1413+
/// <param name="tableContext">Table context to operate on.</param>
1414+
/// <param name="key">Key of item to delete.</param>
1415+
/// <param name="precondition">Optional precondition expression.</param>
1416+
member this.Delete
1417+
(
1418+
tableContext: TableContext<'TRecord>,
1419+
key: TableKey,
1420+
precondition: option<ConditionExpression<'TRecord>>
1421+
) : Transaction =
1422+
setClient tableContext.Client
1423+
1424+
let req = Delete(TableName = tableContext.TableName, Key = tableContext.Template.ToAttributeValues key)
1425+
precondition
1426+
|> Option.iter (fun cond ->
1427+
let writer = AttributeWriter(req.ExpressionAttributeNames, req.ExpressionAttributeValues)
1428+
req.ConditionExpression <- cond.Conditional.Write writer)
1429+
transactionItems.Add(TransactWriteItem(Delete = req))
1430+
this
1431+
1432+
/// <summary>
1433+
/// Atomically applies a set of 1-100 operations to the table.<br/>
1434+
/// NOTE requests are charged at twice the normal rate in Write Capacity Units.
1435+
/// See the DynamoDB <a href="https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_TransactWriteItems.html"><c>TransactWriteItems</c> API documentation</a> for full details of semantics and charges.<br/>
1436+
/// </summary>
1437+
/// <param name="clientRequestToken">The <c>ClientRequestToken</c> to supply as an idempotency key (10 minute window).</param>
1438+
member _.TransactWriteItems(?clientRequestToken) : Async<unit> = async {
1439+
if (Seq.length transactionItems) = 0 || (Seq.length transactionItems) > 100 then
1440+
raise
1441+
<| System.ArgumentOutOfRangeException(nameof transactionItems, "must be between 1 and 100 items.")
1442+
let req = TransactWriteItemsRequest(ReturnConsumedCapacity = returnConsumedCapacity, TransactItems = (ResizeArray transactionItems))
1443+
clientRequestToken |> Option.iter (fun x -> req.ClientRequestToken <- x)
1444+
let! ct = Async.CancellationToken
1445+
let! response = dynamoDbClient.TransactWriteItemsAsync(req, ct) |> Async.AwaitTaskCorrect
1446+
maybeReport
1447+
|> Option.iter (fun r ->
1448+
response.ConsumedCapacity
1449+
|> Seq.groupBy (fun x -> x.TableName)
1450+
|> Seq.iter (fun (tableName, consumedCapacity) ->
1451+
r tableName Operation.TransactWriteItems (Seq.toList consumedCapacity) (Seq.length transactionItems)))
1452+
if response.HttpStatusCode <> HttpStatusCode.OK then
1453+
failwithf "TransactWriteItems request returned error %O" response.HttpStatusCode
1454+
}
1455+
14051456
// Deprecated factory method, to be removed. Replaced with
14061457
// 1. TableContext<'T> ctor (synchronous)
14071458
// 2. VerifyOrCreateTableAsync OR VerifyTableAsync (explicitly async to signify that verification/creation is a costly and/or privileged operation)
@@ -1489,6 +1540,8 @@ type TableContext internal () =
14891540
)
14901541
|> Async.RunSynchronously
14911542

1543+
1544+
14921545
/// <summary>
14931546
/// Sync-over-Async helpers that can be opted-into when working in scripting scenarios.
14941547
/// For normal usage, the <c>Async</c> versions of any given API is recommended, in order to ensure one
@@ -2096,3 +2149,12 @@ module Scripting =
20962149
member t.UpdateProvisionedThroughput(provisionedThroughput: ProvisionedThroughput) : unit =
20972150
let spec = Throughput.Provisioned provisionedThroughput
20982151
t.UpdateTableIfRequiredAsync(spec) |> Async.Ignore |> Async.RunSynchronously
2152+
2153+
/// Helpers for working with <c>TransactWriteItemsRequest</c>
2154+
module TransactWriteItemsRequest =
2155+
/// <summary>Exception filter to identify whether a <c>TransactWriteItems</c> call has failed due to
2156+
/// one or more of the supplied <c>precondition</c> checks failing.</summary>
2157+
let (|TransactionCanceledConditionalCheckFailed|_|): exn -> unit option =
2158+
function
2159+
| :? TransactionCanceledException as e when e.CancellationReasons.Exists(fun x -> x.Code = "ConditionalCheckFailed") -> Some()
2160+
| _ -> None

tests/FSharp.AWS.DynamoDB.Tests/MetricsCollectorTests.fs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,10 @@ type Tests(fixture: TableFixture) =
106106
collector.Clear()
107107

108108
let item = mkItem (guid ()) (guid ()) 0
109-
let requests = [ TransactWrite.Put(item, Some(compile <@ fun t -> NOT_EXISTS t.RangeKey @>)) ]
110-
111-
do! sut.TransactWriteItems requests
109+
do!
110+
Transaction(collector.Collect)
111+
.Put(sut, item, compile <@ fun t -> NOT_EXISTS t.RangeKey @>)
112+
.TransactWriteItems()
112113

113114
test
114115
<@
@@ -130,13 +131,13 @@ type Tests(fixture: TableFixture) =
130131
let sut = rawTable.WithMetricsCollector(collector.Collect)
131132

132133
let item = mkItem (guid ()) (guid ()) 0
133-
134-
// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
135-
let requests = [ TransactWrite.Put(item, Some(compile <@ fun t -> EXISTS t.RangeKey @>)) ]
136-
137134
let mutable failed = false
138135
try
139-
do! sut.TransactWriteItems requests
136+
do!
137+
// The check will fail, which triggers a throw from the underlying AWS SDK; there's no way to extract the consumption info in that case
138+
Transaction()
139+
.Put(sut, item, compile <@ fun t -> EXISTS t.RangeKey @>)
140+
.TransactWriteItems()
140141
with TransactWriteItemsRequest.TransactionCanceledConditionalCheckFailed ->
141142
failed <- true
142143
true =! failed

0 commit comments

Comments
 (0)