@@ -4,7 +4,7 @@ import path from 'path';
4
4
import fs from 'fs' ;
5
5
import Logger , { LogLevel , StreamHandler } from '@matrixai/logger' ;
6
6
import { withF } from '@matrixai/resources' ;
7
- import { errors as locksErrors } from '@matrixai/async-locks' ;
7
+ import { Barrier , errors as locksErrors } from '@matrixai/async-locks' ;
8
8
import DB from '@/DB' ;
9
9
import DBTransaction from '@/DBTransaction' ;
10
10
import * as errors from '@/errors' ;
@@ -260,6 +260,8 @@ describe(DBTransaction.name, () => {
260
260
expect ( await db . get ( 'hello' ) ) . toBeUndefined ( ) ;
261
261
} ) ;
262
262
test ( 'getForUpdate addresses write-skew by promoting gets into same-value puts' , async ( ) => {
263
+ // Ensure deterministic concurrency
264
+ const barrier = await Barrier . createBarrier ( 2 ) ;
263
265
// Snapshot isolation allows write skew anomalies to occur
264
266
// A write skew means that 2 transactions concurrently read from overlapping keys
265
267
// then make disjoint updates to the keys, that breaks a consistency constraint on those keys
@@ -272,13 +274,15 @@ describe(DBTransaction.name, () => {
272
274
const t1 = withF ( [ db . transaction ( ) ] , async ( [ tran ] ) => {
273
275
let balance1 = parseInt ( ( await tran . getForUpdate ( 'balance1' ) ) ! ) ;
274
276
const balance2 = parseInt ( ( await tran . getForUpdate ( 'balance2' ) ) ! ) ;
277
+ await barrier . wait ( ) ;
275
278
balance1 -= 100 ;
276
279
expect ( balance1 + balance2 ) . toBeGreaterThanOrEqual ( 0 ) ;
277
280
await tran . put ( 'balance1' , balance1 . toString ( ) ) ;
278
281
} ) ;
279
282
const t2 = withF ( [ db . transaction ( ) ] , async ( [ tran ] ) => {
280
283
const balance1 = parseInt ( ( await tran . getForUpdate ( 'balance1' ) ) ! ) ;
281
284
let balance2 = parseInt ( ( await tran . getForUpdate ( 'balance2' ) ) ! ) ;
285
+ await barrier . wait ( ) ;
282
286
balance2 -= 100 ;
283
287
expect ( balance1 + balance2 ) . toBeGreaterThanOrEqual ( 0 ) ;
284
288
await tran . put ( 'balance2' , balance2 . toString ( ) ) ;
@@ -298,16 +302,20 @@ describe(DBTransaction.name, () => {
298
302
) . toBe ( true ) ;
299
303
} ) ;
300
304
test ( 'locking to prevent thrashing for racing counters' , async ( ) => {
305
+ // Ensure deterministic concurrency
306
+ const barrier = await Barrier . createBarrier ( 2 ) ;
301
307
await db . put ( 'counter' , '0' ) ;
302
308
let t1 = withF ( [ db . transaction ( ) ] , async ( [ tran ] ) => {
303
309
// Can also use `getForUpdate`, but a conflict exists even for `get`
304
310
let counter = parseInt ( ( await tran . get ( 'counter' ) ) ! ) ;
311
+ await barrier . wait ( ) ;
305
312
counter ++ ;
306
313
await tran . put ( 'counter' , counter . toString ( ) ) ;
307
314
} ) ;
308
315
let t2 = withF ( [ db . transaction ( ) ] , async ( [ tran ] ) => {
309
316
// Can also use `getForUpdate`, but a conflict exists even for `get`
310
317
let counter = parseInt ( ( await tran . get ( 'counter' ) ) ! ) ;
318
+ await barrier . wait ( ) ;
311
319
counter ++ ;
312
320
await tran . put ( 'counter' , counter . toString ( ) ) ;
313
321
} ) ;
0 commit comments