Skip to content

Commit 5403ef5

Browse files
committed
save
1 parent 269ac18 commit 5403ef5

File tree

11 files changed

+678
-594
lines changed

11 files changed

+678
-594
lines changed

typeorm/typeorm-store/src/database.ts

Lines changed: 81 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import assert from 'assert'
44
import {DataSource, EntityManager} from 'typeorm'
55
import {ChangeWriter, rollbackBlock} from './utils/changeWriter'
66
import {DatabaseState, FinalTxInfo, HashAndHeight, HotTxInfo} from './interfaces'
7-
import {Store} from './store'
7+
import {CacheMode, FlushMode, ResetMode, Store} from './store'
8+
import {createLogger} from '@subsquid/logger'
9+
import {StateManager} from './utils/stateManager'
810
import {sortMetadatasInCommitOrder} from './utils/commitOrder'
911

1012

@@ -14,14 +16,21 @@ export type IsolationLevel = 'SERIALIZABLE' | 'READ COMMITTED' | 'REPEATABLE REA
1416
export interface TypeormDatabaseOptions {
1517
supportHotBlocks?: boolean
1618
isolationLevel?: IsolationLevel
19+
flushMode?: FlushMode
20+
resetMode?: ResetMode
21+
cacheMode?: CacheMode
1722
stateSchema?: string
1823
projectDir?: string
1924
}
2025

26+
const STATE_MANAGERS: WeakMap<DataSource, StateManager> = new WeakMap()
2127

2228
export class TypeormDatabase {
2329
private statusSchema: string
2430
private isolationLevel: IsolationLevel
31+
private flushMode: FlushMode
32+
private resetMode: ResetMode
33+
private cacheMode: CacheMode
2534
private con?: DataSource
2635
private projectDir: string
2736

@@ -30,6 +39,9 @@ export class TypeormDatabase {
3039
constructor(options?: TypeormDatabaseOptions) {
3140
this.statusSchema = options?.stateSchema || 'squid_processor'
3241
this.isolationLevel = options?.isolationLevel || 'SERIALIZABLE'
42+
this.resetMode = options?.resetMode || 'BATCH'
43+
this.flushMode = options?.flushMode || 'AUTO'
44+
this.cacheMode = options?.cacheMode || 'ALL'
3345
this.supportsHotBlocks = options?.supportHotBlocks !== false
3446
this.projectDir = options?.projectDir || process.cwd()
3547
}
@@ -43,48 +55,46 @@ export class TypeormDatabase {
4355
await this.con.initialize()
4456

4557
try {
46-
return await this.con.transaction('SERIALIZABLE', em => this.initTransaction(em))
47-
} catch(e: any) {
58+
return await this.con.transaction('SERIALIZABLE', (em) => this.initTransaction(em))
59+
} catch (e: any) {
4860
await this.con.destroy().catch(() => {}) // ignore error
4961
this.con = undefined
5062
throw e
5163
}
5264
}
5365

5466
async disconnect(): Promise<void> {
55-
await this.con?.destroy().finally(() => this.con = undefined)
67+
await this.con?.destroy().finally(() => (this.con = undefined))
5668
}
5769

5870
private async initTransaction(em: EntityManager): Promise<DatabaseState> {
5971
let schema = this.escapedSchema()
6072

61-
await em.query(
62-
`CREATE SCHEMA IF NOT EXISTS ${schema}`
63-
)
73+
await em.query(`CREATE SCHEMA IF NOT EXISTS ${schema}`)
6474
await em.query(
6575
`CREATE TABLE IF NOT EXISTS ${schema}.status (` +
66-
`id int4 primary key, ` +
67-
`height int4 not null, ` +
68-
`hash text DEFAULT '0x', ` +
69-
`nonce int4 DEFAULT 0`+
70-
`)`
76+
`id int4 primary key, ` +
77+
`height int4 not null, ` +
78+
`hash text DEFAULT '0x', ` +
79+
`nonce int4 DEFAULT 0` +
80+
`)`
7181
)
72-
await em.query( // for databases created by prev version of typeorm store
82+
await em.query(
83+
// for databases created by prev version of typeorm store
7384
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS hash text DEFAULT '0x'`
7485
)
75-
await em.query( // for databases created by prev version of typeorm store
76-
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
77-
)
7886
await em.query(
79-
`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`
87+
// for databases created by prev version of typeorm store
88+
`ALTER TABLE ${schema}.status ADD COLUMN IF NOT EXISTS nonce int DEFAULT 0`
8089
)
90+
await em.query(`CREATE TABLE IF NOT EXISTS ${schema}.hot_block (height int4 primary key, hash text not null)`)
8191
await em.query(
8292
`CREATE TABLE IF NOT EXISTS ${schema}.hot_change_log (` +
83-
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
84-
`index int4 not null, ` +
85-
`change jsonb not null, ` +
86-
`PRIMARY KEY (block_height, index)` +
87-
`)`
93+
`block_height int4 not null references ${schema}.hot_block on delete cascade, ` +
94+
`index int4 not null, ` +
95+
`change jsonb not null, ` +
96+
`PRIMARY KEY (block_height, index)` +
97+
`)`
8898
)
8999

90100
let status: (HashAndHeight & {nonce: number})[] = await em.query(
@@ -95,9 +105,7 @@ export class TypeormDatabase {
95105
status.push({height: -1, hash: '0x', nonce: 0})
96106
}
97107

98-
let top: HashAndHeight[] = await em.query(
99-
`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`
100-
)
108+
let top: HashAndHeight[] = await em.query(`SELECT height, hash FROM ${schema}.hot_block ORDER BY height`)
101109

102110
return assertStateInvariants({...status[0], top})
103111
}
@@ -111,15 +119,13 @@ export class TypeormDatabase {
111119

112120
assert(status.length == 1)
113121

114-
let top: HashAndHeight[] = await em.query(
115-
`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`
116-
)
122+
let top: HashAndHeight[] = await em.query(`SELECT hash, height FROM ${schema}.hot_block ORDER BY height`)
117123

118124
return assertStateInvariants({...status[0], top})
119125
}
120126

121127
transact(info: FinalTxInfo, cb: (store: Store) => Promise<void>): Promise<void> {
122-
return this.submit(async em => {
128+
return this.submit(async (em) => {
123129
let state = await this.getState(em)
124130
let {prevHead: prev, nextHead: next} = info
125131

@@ -147,15 +153,21 @@ export class TypeormDatabase {
147153
})
148154
}
149155

150-
transactHot2(info: HotTxInfo, cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>): Promise<void> {
151-
return this.submit(async em => {
156+
transactHot2(
157+
info: HotTxInfo,
158+
cb: (store: Store, sliceBeg: number, sliceEnd: number) => Promise<void>
159+
): Promise<void> {
160+
return this.submit(async (em) => {
152161
let state = await this.getState(em)
153162
let chain = [state, ...state.top]
154163

155164
assertChainContinuity(info.baseHead, info.newBlocks)
156165
assert(info.finalizedHead.height <= (maybeLast(info.newBlocks) ?? info.baseHead).height)
157166

158-
assert(chain.find(b => b.hash === info.baseHead.hash), RACE_MSG)
167+
assert(
168+
chain.find((b) => b.hash === info.baseHead.hash),
169+
RACE_MSG
170+
)
159171
if (info.newBlocks.length == 0) {
160172
assert(last(chain).hash === info.baseHead.hash, RACE_MSG)
161173
}
@@ -170,18 +182,14 @@ export class TypeormDatabase {
170182
if (info.newBlocks.length) {
171183
let finalizedEnd = info.finalizedHead.height - info.newBlocks[0].height + 1
172184
if (finalizedEnd > 0) {
173-
await this.performUpdates(store => cb(store, 0, finalizedEnd), em)
185+
await this.performUpdates((store) => cb(store, 0, finalizedEnd), em)
174186
} else {
175187
finalizedEnd = 0
176188
}
177189
for (let i = finalizedEnd; i < info.newBlocks.length; i++) {
178190
let b = info.newBlocks[i]
179191
await this.insertHotBlock(em, b)
180-
await this.performUpdates(
181-
store => cb(store, i, i + 1),
182-
em,
183-
new ChangeWriter(em, this.statusSchema, b.height)
184-
)
192+
await this.performUpdates((store) => cb(store, i, i + 1), em, new ChangeWriter(em, this.statusSchema, b.height))
185193
}
186194
}
187195

@@ -196,17 +204,14 @@ export class TypeormDatabase {
196204
}
197205

198206
private deleteHotBlocks(em: EntityManager, finalizedHeight: number): Promise<void> {
199-
return em.query(
200-
`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`,
201-
[finalizedHeight]
202-
)
207+
return em.query(`DELETE FROM ${this.escapedSchema()}.hot_block WHERE height <= $1`, [finalizedHeight])
203208
}
204209

205210
private insertHotBlock(em: EntityManager, block: HashAndHeight): Promise<void> {
206-
return em.query(
207-
`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`,
208-
[block.height, block.hash]
209-
)
211+
return em.query(`INSERT INTO ${this.escapedSchema()}.hot_block (height, hash) VALUES ($1, $2)`, [
212+
block.height,
213+
block.hash,
214+
])
210215
}
211216

212217
private async updateStatus(em: EntityManager, nonce: number, next: HashAndHeight): Promise<void> {
@@ -221,36 +226,30 @@ export class TypeormDatabase {
221226

222227
// Will never happen if isolation level is SERIALIZABLE or REPEATABLE_READ,
223228
// but occasionally people use multiprocessor setups and READ_COMMITTED.
224-
assert.strictEqual(
225-
rowsChanged,
226-
1,
227-
RACE_MSG
228-
)
229+
assert.strictEqual(rowsChanged, 1, RACE_MSG)
229230
}
230231

231232
private async performUpdates(
232233
cb: (store: Store) => Promise<void>,
233234
em: EntityManager,
234-
changeTracker?: ChangeWriter
235+
changeWriter?: ChangeWriter
235236
): Promise<void> {
236-
let running = true
237-
238-
let store = new Store(
239-
() => {
240-
assert(running, `too late to perform db updates, make sure you haven't forgot to await on db query`)
241-
return em
242-
},
243-
{
244-
tracker: changeTracker,
245-
commitOrder: this.getCommitOrder()
246-
}
247-
)
237+
let store = new Store({
238+
em,
239+
state: this.getStateManager(),
240+
logger: this.getLogger(),
241+
changes: changeWriter,
242+
cacheMode: this.cacheMode,
243+
flushMode: this.flushMode,
244+
resetMode: this.resetMode,
245+
})
248246

249247
try {
250248
await cb(store)
251249
await store.flush()
250+
if (this.resetMode === 'BATCH') store.reset()
252251
} finally {
253-
running = false
252+
store._close()
254253
}
255254
}
256255

@@ -261,7 +260,7 @@ export class TypeormDatabase {
261260
let con = this.con
262261
assert(con != null, 'not connected')
263262
return await con.transaction(this.isolationLevel, tx)
264-
} catch(e: any) {
263+
} catch (e: any) {
265264
if (e.code == '40001' && retries) {
266265
retries -= 1
267266
} else {
@@ -277,10 +276,22 @@ export class TypeormDatabase {
277276
}
278277

279278
@def
280-
private getCommitOrder() {
281-
let con = this.con
282-
assert(con != null, 'not connected')
283-
return sortMetadatasInCommitOrder(con.entityMetadatas)
279+
private getLogger() {
280+
return createLogger('sqd:typeorm-db')
281+
}
282+
283+
private getStateManager() {
284+
let con = assertNotNull(this.con)
285+
let stateManager = STATE_MANAGERS.get(con)
286+
if (stateManager != null) return stateManager
287+
288+
stateManager = new StateManager({
289+
commitOrder: sortMetadatasInCommitOrder(con),
290+
logger: this.getLogger(),
291+
})
292+
STATE_MANAGERS.set(con, stateManager)
293+
294+
return stateManager
284295
}
285296
}
286297

typeorm/typeorm-store/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
export * from './database'
2-
export {FindManyOptions, FindOneOptions, Store, EntityLiteral} from './store'
2+
export {FindManyOptions, FindOneOptions, Store} from './store'
33
export * from './decorators'
44
export * from './utils/transformers'

0 commit comments

Comments
 (0)