Skip to content

Commit bd9f9ea

Browse files
authored
Merge pull request #39 from ReactiveDB/feat/transaction
feat: add method: `transaction` for class `Database`
2 parents 856344c + 8faf162 commit bd9f9ea

File tree

6 files changed

+312
-48
lines changed

6 files changed

+312
-48
lines changed

example/rdb/defineSchema.ts

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { RDBType, SchemaDef, Association } from 'reactivedb'
1+
import { RDBType, SchemaDef, Relationship } from 'reactivedb'
22
import Database from './Database'
33

44
interface BasicSchema {
@@ -71,7 +71,7 @@ const basicSchema: SchemaDef<BasicSchema> = {
7171
type: RDBType.OBJECT
7272
},
7373
demo: {
74-
type: Association.oneToOne,
74+
type: Relationship.oneToOne,
7575
virtual: {
7676
name: 'Demo',
7777
where: demoTable => ({
@@ -93,7 +93,7 @@ const demoSchema: SchemaDef<DemoSchema> = {
9393
type: RDBType.LITERAL_ARRAY
9494
},
9595
basics: {
96-
type: Association.oneToMany,
96+
type: Relationship.oneToMany,
9797
virtual: {
9898
name: 'Basic',
9999
where: basicTable => ({
@@ -102,7 +102,7 @@ const demoSchema: SchemaDef<DemoSchema> = {
102102
}
103103
},
104104
other: {
105-
type: Association.oneToOne,
105+
type: Relationship.oneToOne,
106106
virtual: {
107107
name: 'Other',
108108
where: otherTable => ({
@@ -130,7 +130,7 @@ const otherSchema: SchemaDef<OtherSchema> = {
130130
type: RDBType.STRING
131131
},
132132
basic: {
133-
type: Association.oneToOne,
133+
type: Relationship.oneToOne,
134134
virtual: {
135135
name: 'Basic',
136136
where: basicTable => ({
@@ -156,13 +156,5 @@ Database.defineSchema('Basic', basicSchema)
156156
Database.defineSchema('Demo', demoSchema)
157157
Database.defineSchema('Other', otherSchema)
158158

159-
Database.defineHook('Demo', {
160-
destroy: (db, entity) => {
161-
const basicTable = db.getSchema().table('Demo')
162-
return db.delete()
163-
.from(basicTable)
164-
.where(basicTable['_id'].in(entity.basicIds))
165-
}
166-
})
167159

168160
Database.connect()

src/exception/database.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ export const InvalidType =
3030
return new ReactiveDBException(message + '.')
3131
}
3232

33+
export const UnexpectedTransactionUse =
34+
() => new ReactiveDBException('Please use Database#transaction to get a transaction scope first.')
35+
3336
export const PrimaryKeyNotProvided =
3437
() => new ReactiveDBException(`Primary key was not provided.`)
3538

src/interface/index.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Observable } from 'rxjs/Observable'
2+
import { PartialObserver } from 'rxjs/Observer'
23
import { RDBType, Relationship, LeafType, StatementType, JoinMode, DataStoreType } from './enum'
34

45
export type DeepPartial<T> = {
@@ -174,3 +175,16 @@ export type Predicate<T> = {
174175
}
175176

176177
export { StatementType, JoinMode, LeafType, Relationship, DataStoreType, RDBType }
178+
179+
export type TransactionDescriptor<T> = {
180+
[P in keyof T]: PropertyDescriptor
181+
}
182+
183+
export type TransactionHandler = {
184+
commit: () => Observable<ExecutorResult>
185+
abort: () => void
186+
}
187+
188+
export type Transaction<T> = [T, TransactionHandler]
189+
190+
export type TransactionEffects<T = any> = PartialObserver<T> | { next: (x: T) => void, error?: (e: any) => void, complete?: () => void }

src/storage/Database.ts

Lines changed: 130 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ import { dispose, contextTableName, fieldIdentifier, hiddenColName } from './sym
1212
import { forEach, clone, contains, tryCatch, hasOwn, getType, assert, identity, warn } from '../utils'
1313
import { createPredicate, createPkClause, mergeTransactionResult, predicatableQuery, lfFactory } from './helper'
1414
import { Relationship, RDBType, DataStoreType, LeafType, StatementType, JoinMode } from '../interface/enum'
15-
import { Record, Field, JoinInfo, Query, Clause, Predicate } from '../interface'
1615
import { SchemaDef, ColumnDef, ParsedSchema, Association, ScopedHandler } from '../interface'
1716
import { ColumnLeaf, NavigatorLeaf, ExecutorResult, UpsertContext, SelectContext } from '../interface'
17+
import { Record, Field, JoinInfo, Query, Clause, Predicate, Transaction, TransactionDescriptor, TransactionEffects } from '../interface'
18+
19+
const transactionErrorHandler = {
20+
error: () => warn(`Execute failed, transaction is already marked for rollback.`)
21+
}
1822

1923
export class Database {
2024

@@ -24,7 +28,8 @@ export class Database {
2428
return tableNames.map((name) => db.getSchema().table(name))
2529
}
2630

27-
public database$: ConnectableObservable<lf.Database>
31+
public readonly database$: ConnectableObservable<lf.Database>
32+
public readonly inTransaction: boolean = false
2833

2934
private schemaDefs = new Map<string, SchemaDef<any>>()
3035
private schemas = new Map<string, ParsedSchema>()
@@ -149,8 +154,14 @@ export class Database {
149154

150155
const { contextIds, queries } = Mutation.aggregate(db, muts, [])
151156
contextIds.forEach(id => this.storedIds.add(id))
152-
return this.executor(db, queries)
153-
.do({ error: () => contextIds.forEach(id => this.storedIds.delete(id)) })
157+
const onError = { error: () => contextIds.forEach(id => this.storedIds.delete(id)) }
158+
159+
if (this.inTransaction) {
160+
this.attachTx(onError)
161+
return this.executor(db, queries)
162+
}
163+
164+
return this.executor(db, queries).do(onError)
154165
})
155166
}
156167

@@ -208,7 +219,7 @@ export class Database {
208219
}
209220

210221
delete<T>(tableName: string, clause: Predicate<T> = {}): Observable<ExecutorResult> {
211-
const [pk, err] = tryCatch<string>(this.findPrimaryKey)(tableName)
222+
const [ pk, err ] = tryCatch<string>(this.findPrimaryKey)(tableName)
212223
if (err) {
213224
return Observable.throw(err)
214225
}
@@ -225,13 +236,21 @@ export class Database {
225236
.concatMap((scopedIds) => {
226237
const query = predicatableQuery(db, table, provider.getPredicate(), StatementType.Delete)
227238

228-
scopedIds.forEach((entity: any) =>
239+
scopedIds.forEach((entity: object) =>
229240
this.storedIds.delete(fieldIdentifier(tableName, entity[pk!])))
241+
const onError = {
242+
error: () => {
243+
scopedIds.forEach((entity: object) =>
244+
this.storedIds.add(fieldIdentifier(tableName, entity[pk!])))
245+
}
246+
}
247+
248+
if (this.inTransaction) {
249+
this.attachTx(onError)
250+
return this.executor(db, [query])
251+
}
230252

231-
return this.executor(db, [query]).do({ error: () => {
232-
scopedIds.forEach((entity: any) =>
233-
this.storedIds.add(fieldIdentifier(tableName, entity[pk!])))
234-
}})
253+
return this.executor(db, [query]).do(onError)
235254
})
236255
})
237256
}
@@ -252,8 +271,14 @@ export class Database {
252271
const { contextIds, queries } = Mutation.aggregate(db, insert, update)
253272
if (queries.length > 0) {
254273
contextIds.forEach(id => this.storedIds.add(id))
255-
return this.executor(db, queries)
256-
.do({ error: () => contextIds.forEach(id => this.storedIds.delete(id)) })
274+
const onError = { error: () => contextIds.forEach(id => this.storedIds.delete(id)) }
275+
276+
if (this.inTransaction) {
277+
this.attachTx(onError)
278+
return this.executor(db, queries)
279+
}
280+
281+
return this.executor(db, queries).do(onError)
257282
} else {
258283
return Observable.of({ result: false, insert: 0, update: 0, delete: 0, select: 0 })
259284
}
@@ -282,19 +307,32 @@ export class Database {
282307
removedIds.push(fieldIdentifier(tableName, entity[schema!.pk]))
283308
})
284309

310+
const onError = {
311+
error: () => removedIds.forEach((id: string) => this.storedIds.add(id))
312+
}
313+
285314
if (disposeHandler) {
286315
const scope = this.createScopedHandler<T>(db, queries, removedIds)
287316
return disposeHandler(rootEntities, scope)
288317
.do(() => removedIds.forEach((id: string) => this.storedIds.delete(id)))
289-
.concatMap(() => this.executor(db, queries))
318+
.concatMap(() => {
319+
if (this.inTransaction) {
320+
this.attachTx(onError)
321+
return this.executor(db, queries)
322+
}
323+
return this.executor(db, queries).do(onError)
324+
})
290325
} else {
291326
removedIds.forEach((id: string) => this.storedIds.delete(id))
292-
return this.executor(db, queries)
327+
328+
if (this.inTransaction) {
329+
this.attachTx(onError)
330+
return this.executor(db, queries)
331+
}
332+
333+
return this.executor(db, queries).do(onError)
293334
}
294335
})
295-
.do({ error: () =>
296-
removedIds.forEach((id: string) => this.storedIds.add(id))
297-
})
298336
})
299337
}
300338

@@ -318,6 +356,81 @@ export class Database {
318356
})
319357
}
320358

359+
attachTx(_: TransactionEffects) {
360+
throw Exception.UnexpectedTransactionUse()
361+
}
362+
363+
executor(db: lf.Database, queries: lf.query.Builder[]) {
364+
const tx = db.createTransaction()
365+
366+
return Observable.fromPromise(tx.exec(queries))
367+
.do(transactionErrorHandler)
368+
.map((ret) => {
369+
return {
370+
result: true,
371+
...mergeTransactionResult(queries, ret)
372+
}
373+
})
374+
}
375+
376+
transaction(): Observable<Transaction<Database>> {
377+
type ProxyProperty = Pick<Database, 'attachTx' | 'executor' | 'inTransaction'>
378+
379+
return this.database$.map(db => {
380+
const tx = db.createTransaction()
381+
const transactionQueries: lf.query.Builder[] = []
382+
const effects: TransactionEffects[] = []
383+
384+
const transactionContext: TransactionDescriptor<ProxyProperty> = {
385+
attachTx: {
386+
get() {
387+
return (handler: TransactionEffects) => {
388+
effects.push(handler)
389+
}
390+
}
391+
},
392+
executor: {
393+
get() {
394+
return (_: lf.Database, queries: lf.query.Builder[]) => {
395+
transactionQueries.push(...queries)
396+
return Observable.of(null)
397+
}
398+
}
399+
},
400+
inTransaction: {
401+
get() {
402+
return true
403+
}
404+
}
405+
}
406+
407+
const customTx = {
408+
commit: () => {
409+
return effects.reduce((acc, curr) => {
410+
return acc.do(curr)
411+
}, Observable.from(tx.exec(transactionQueries)))
412+
.map((r) => {
413+
return {
414+
result: true,
415+
...mergeTransactionResult(transactionQueries, r)
416+
}
417+
})
418+
},
419+
abort: () => {
420+
effects.length = 0
421+
transactionQueries.length = 0
422+
}
423+
}
424+
425+
const ret: Transaction<Database> = [
426+
Object.create(this, transactionContext),
427+
customTx
428+
]
429+
430+
return ret
431+
})
432+
}
433+
321434
private buildTables() {
322435
this.schemaDefs.forEach((schemaDef, tableName) => {
323436
const tableBuilder = this.schemaBuilder!.createTable(tableName)
@@ -745,20 +858,4 @@ export class Database {
745858
}
746859
}
747860

748-
private executor(db: lf.Database, queries: lf.query.Builder[]) {
749-
const tx = db.createTransaction()
750-
const handler = {
751-
error: () => warn(`Execute failed, transaction is already marked for rollback.`)
752-
}
753-
754-
return Observable.fromPromise(tx.exec(queries))
755-
.do(handler)
756-
.map((ret) => {
757-
return {
758-
result: true,
759-
...mergeTransactionResult(queries, ret)
760-
}
761-
})
762-
}
763-
764861
}

test/e2e/database.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
import { DataStoreType } from '../../src/interface'
22
import { Database } from '../../src/storage/Database'
33

4-
export const database = new Database(DataStoreType.INDEXED_DB, true)
4+
export const database = new Database(DataStoreType.MEMORY, true)

0 commit comments

Comments
 (0)