Skip to content

Commit 68c1d30

Browse files
authored
Merge pull request #23 from coajs/fix-transaction-cache
feat: add support safe transaction
2 parents 40f4295 + 4b6043b commit 68c1d30

File tree

7 files changed

+166
-44
lines changed

7 files changed

+166
-44
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { RedisCache } from "coa-redis"
2+
import { MysqlBin } from "../libs/MysqlBin"
3+
import { CoaMysql } from "../typings"
4+
5+
export class MysqlSafeTransaction {
6+
private readonly bin: MysqlBin
7+
private readonly cache: RedisCache
8+
9+
constructor(bin: MysqlBin, cache: RedisCache) {
10+
this.bin = bin
11+
this.cache = cache
12+
}
13+
14+
async safeTransaction<T>(handler: (trx: CoaMysql.Transaction) => Promise<T>): Promise<T> {
15+
let clearCacheNsps: any[] = []
16+
const result = await this.bin.io.transaction(async (trx: CoaMysql.Transaction) => {
17+
trx.__isSafeTransaction = true
18+
trx.clearCacheNsps = []
19+
20+
const result = await handler(trx)
21+
22+
clearCacheNsps = trx.clearCacheNsps || []
23+
return result
24+
})
25+
26+
if (clearCacheNsps.length > 0) { await this.cache.mDelete(clearCacheNsps) }
27+
return result
28+
}
29+
}

src/index.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
export { CoaMysql } from './typings'
1+
export { MysqlSafeTransaction } from './components/MysqlSafeTransaction'
2+
export { MysqlStorage } from './components/MysqlStorage'
3+
export { MysqlUuid } from './components/MysqlUuid'
24
export { MysqlBin } from './libs/MysqlBin'
3-
export { MysqlNative } from './services/MysqlNative'
45
export { MysqlCache } from './services/MysqlCache'
5-
export { MysqlUuid } from './components/MysqlUuid'
6-
export { MysqlStorage } from './components/MysqlStorage'
6+
export { MysqlNative } from './services/MysqlNative'
7+
export { CoaMysql } from './typings'
8+

src/libs/MysqlBin.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { echo } from 'coa-echo'
22
import { CoaError } from 'coa-error'
3-
import { Knex } from './Knex'
43
import { CoaMysql } from '../typings'
5-
4+
import { Knex } from './Knex'
65
export class MysqlBin {
76
public io: Knex
87
public config: CoaMysql.Config

src/services/MysqlCache.ts

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { secure } from 'coa-secure'
55
import { MysqlBin } from '../libs/MysqlBin'
66
import { CoaMysql } from '../typings'
77
import { MysqlNative } from './MysqlNative'
8-
98
export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
109
redisCache: RedisCache
1110

@@ -15,49 +14,49 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
1514
}
1615

1716
async insert(data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
18-
const id = await super.insert(data, trx)
19-
await this.deleteCache([id], [data])
17+
const id = await super.insert(data, trx);
18+
await this.deleteCache([id], [data], trx)
2019
return id
2120
}
2221

2322
async mInsert(dataList: Array<CoaMysql.SafePartial<Scheme>>, trx?: CoaMysql.Transaction) {
24-
const ids = await super.mInsert(dataList, trx)
25-
await this.deleteCache(ids, dataList)
23+
const ids = await super.mInsert(dataList, trx);
24+
await this.deleteCache(ids, dataList, trx)
2625
return ids
2726
}
2827

2928
async updateById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
3029
const dataList = await this.getCacheChangedDataList([id], data, trx)
3130
const result = await super.updateById(id, data, trx)
32-
if (result) await this.deleteCache([id], dataList)
31+
if (result) await this.deleteCache([id], dataList, trx)
3332
return result
3433
}
3534

3635
async updateByIds(ids: string[], data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
3736
const dataList = await this.getCacheChangedDataList(ids, data, trx)
38-
const result = await super.updateByIds(ids, data, trx)
39-
if (result) await this.deleteCache(ids, dataList)
37+
const result = await super.updateByIds(ids, data, trx);
38+
if (result) await this.deleteCache(ids, dataList, trx)
4039
return result
4140
}
4241

4342
async updateForQueryById(id: string, query: CoaMysql.Query, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
4443
const dataList = await this.getCacheChangedDataList([id], data, trx)
45-
const result = await super.updateForQueryById(id, query, data, trx)
46-
if (result) await this.deleteCache([id], dataList)
44+
const result = await super.updateForQueryById(id, query, data, trx);
45+
if (result) await this.deleteCache([id], dataList, trx)
4746
return result
4847
}
4948

5049
async upsertById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
5150
const dataList = await this.getCacheChangedDataList([id], data, trx)
52-
const result = await super.upsertById(id, data, trx)
53-
await this.deleteCache([id], dataList)
51+
const result = await super.upsertById(id, data, trx);
52+
await this.deleteCache([id], dataList, trx)
5453
return result
5554
}
5655

5756
async deleteByIds(ids: string[], trx?: CoaMysql.Transaction) {
5857
const dataList = await this.getCacheChangedDataList(ids, undefined, trx)
59-
const result = await super.deleteByIds(ids, trx)
60-
if (result) await this.deleteCache(ids, dataList)
58+
const result = await super.deleteByIds(ids, trx);
59+
await this.deleteCache(ids, dataList, trx)
6160
return result
6261
}
6362

@@ -66,16 +65,16 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
6665
}
6766

6867
async getById(id: string, pick = this.columns, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
69-
const result = await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
68+
const result = trx?.__isSafeTransaction ? await super.getById(id, this.columns, trx) : await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
7069
return this.pickResult(result, pick)
7170
}
7271

7372
async getIdBy(field: string, value: string | number, trx?: CoaMysql.Transaction) {
74-
return await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
73+
return trx?.__isSafeTransaction ? await super.getIdBy(field, value, trx) : await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
7574
}
7675

7776
async mGetByIds(ids: string[], pick = this.pick, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
78-
const result = await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
77+
const result = trx?.__isSafeTransaction ? await super.mGetByIds(ids, this.columns, trx) : await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
7978
_.forEach(result, (v, k) => {
8079
result[k] = this.pickResult(v, pick)
8180
})
@@ -88,48 +87,46 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
8887
}
8988

9089
protected async findListCount(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
91-
const cacheNsp = this.getCacheNsp('data')
9290
const cacheId = 'list-count:' + secure.sha1($.sortQueryString(...finger))
93-
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectListCount(query, trx))
91+
return trx?.__isSafeTransaction ? await super.selectListCount(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectListCount(query, trx))
9492
}
9593

9694
protected async findIdList(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
97-
const cacheNsp = this.getCacheNsp('data')
9895
const cacheId = 'list:' + secure.sha1($.sortQueryString(...finger))
99-
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdList(query, trx))
96+
return trx?.__isSafeTransaction ? await super.selectIdList(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdList(query, trx))
10097
}
10198

10299
protected async findIdSortList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
103-
const cacheNsp = this.getCacheNsp('data')
104100
const cacheId = `sort-list:${pager.rows}:${pager.last}:` + secure.sha1($.sortQueryString(...finger))
105-
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdSortList(pager, query, trx))
101+
return trx?.__isSafeTransaction ? await super.selectIdSortList(pager, query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdSortList(pager, query, trx))
106102
}
107103

108104
protected async findIdViewList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
109-
const cacheNsp = this.getCacheNsp('data')
110105
const cacheId = `view-list:${pager.rows}:${pager.page}:` + secure.sha1($.sortQueryString(...finger))
111106
const count = await this.findListCount(finger, query, trx)
112-
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
107+
return trx?.__isSafeTransaction ? await super.selectIdViewList(pager, query, trx, count) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
113108
}
114109

115110
protected async mGetCountBy(field: string, ids: string[], trx?: CoaMysql.Transaction) {
116-
const cacheNsp = this.getCacheNsp('count', field)
117-
return await this.redisCache.mWarp(cacheNsp, ids, async ids => {
111+
const queryFunction = async () => {
118112
const rows = (await this.table(trx).select({ id: field }).count({ count: this.key }).whereIn(field, ids).groupBy(field)) as any[]
119113
const result: CoaMysql.Dic<number> = {}
120114
_.forEach(rows, ({ id, count }) => (result[id] = count))
121115
return result
122-
})
116+
}
117+
const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.mWarp(this.getCacheNsp('count', field), ids, queryFunction)
118+
return result
123119
}
124120

125121
protected async getCountBy(field: string, value: string, query?: CoaMysql.Query, trx?: CoaMysql.Transaction) {
126-
const cacheNsp = this.getCacheNsp('count', field)
127-
return await this.redisCache.warp(cacheNsp, value, async () => {
122+
const queryFunction = async () => {
128123
const qb = this.table(trx).count({ count: this.key })
129124
query ? query(qb) : qb.where(field, value)
130125
const rows = await qb
131126
return (rows[0]?.count as number) || 0
132-
})
127+
}
128+
const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.warp(this.getCacheNsp('count', field), value, queryFunction)
129+
return result
133130
}
134131

135132
protected pickResult<T>(data: T, pick: string[]) {
@@ -155,10 +152,15 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
155152
return resultList
156153
}
157154

158-
protected async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>) {
155+
async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>, trx?: CoaMysql.Transaction) {
159156
const deleteIds = [] as CoaRedis.CacheDelete[]
160-
deleteIds.push([this.getCacheNsp('id'), ids])
161-
deleteIds.push([this.getCacheNsp('data'), []])
157+
if (trx?.__isSafeTransaction) {
158+
(trx as any)?.clearCacheNsps.push([this.getCacheNsp('id'), ids]);
159+
(trx as any)?.clearCacheNsps.push([this.getCacheNsp('data'), []])
160+
} else {
161+
deleteIds.push([this.getCacheNsp('id'), ids])
162+
deleteIds.push([this.getCacheNsp('data'), []])
163+
}
162164
_.forEach(this.caches, (items, name) => {
163165
// name可能为index,count,或自定义
164166
items.forEach(item => {
@@ -169,9 +171,11 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
169171
data?.[key] && ids.push(data[key])
170172
})
171173
ids.push(...keys.slice(1))
172-
ids.length && deleteIds.push([this.getCacheNsp(name, key), ids])
174+
if (ids.length) {
175+
(trx?.__isSafeTransaction) ? (trx as any)?.clearCacheNsps.push([this.getCacheNsp(name, key), ids]) : deleteIds.push([this.getCacheNsp(name, key), ids])
176+
}
173177
})
174178
})
175-
await this.redisCache.mDelete(deleteIds)
179+
if (!trx?.__isSafeTransaction) await this.redisCache.mDelete(deleteIds)
176180
}
177181
}

src/test/demo.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,4 @@ await User.mGetByIds(['id001', 'id002'], ['name']) // 返回 {id001:{userId:'id0
144144
await User.truncate() // 无返回值,主要不报错即成功截断整个表
145145

146146
// 自定义方法
147-
await User.customMethod() // 执行自定义方法
147+
await User.customMethod() // 执行自定义方法

src/test/demoSafeTransaction.ts

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/* eslint-disable @typescript-eslint/no-redeclare */
2+
// @ts-nocheck
3+
import { $, _ } from 'coa-helper'
4+
import { RedisBin, RedisCache } from 'coa-redis'
5+
import { MysqlBin, MysqlCache, MysqlSafeTransaction } from '..'
6+
// MySQL配置
7+
const mysqlConfig = {
8+
host: '127.0.0.1',
9+
port: 3306,
10+
user: 'root',
11+
password: '',
12+
charset: 'utf8mb4',
13+
trace: true,
14+
debug: false,
15+
databases: {
16+
main: { database: 'coa-mysql', ms: 7 * 24 * 3600 * 1000 },
17+
},
18+
}
19+
20+
const redisConfig = {
21+
host: '127.0.0.1',
22+
port: 6379,
23+
password: '',
24+
db: 1,
25+
prefix: 'coa-mysql',
26+
trace: false,
27+
28+
}
29+
const redisBin = new RedisBin(redisConfig)
30+
const redisCache = new RedisCache(redisBin)
31+
32+
// 初始化Mysql基本连接,后续所有模型均依赖此实例
33+
const mysqlBin = new MysqlBin(mysqlConfig)
34+
const safeTransaction = new MysqlSafeTransaction(mysqlBin, redisCache)
35+
36+
const userScheme = {
37+
userId: '' as string,
38+
name: '' as string,
39+
mobile: '' as string,
40+
avatar: '' as string,
41+
gender: 1 as number,
42+
language: '' as string,
43+
status: 1 as number,
44+
created: 0 as number,
45+
updated: 0 as number,
46+
}
47+
48+
49+
// 定义User类型(通过默认结构自动生成)
50+
type UserScheme = typeof userScheme
51+
52+
53+
// 通过基类初始化
54+
const User = new (class extends MysqlCache<UserScheme> {
55+
constructor() {
56+
super(
57+
{
58+
name: 'User', // 表名,默认会转化为下划线(snackCase)形式,如 User->user UserPhoto->user_photo
59+
title: '用户表', // 表的备注名称
60+
scheme: userScheme, // 表的默认结构
61+
pick: ['userId', 'name'], // 查询列表时显示的字段信息
62+
caches: { index: ['name'], count: ['userId', 'name'] },
63+
64+
},
65+
mysqlBin,
66+
redisCache,
67+
)
68+
}
69+
})()
70+
71+
const a = async () => {
72+
await User.checkById('411******728253X')
73+
await $.timeout(3000)
74+
await safeTransaction.safeTransaction(async (trx: CoaMysql.Transaction) => {
75+
await User.updateById('411******728253X', { name: 'mmm' })
76+
for (let index = 0; index < 10; index++) {
77+
const userId = `${_.now()}-${index}-Y`
78+
await User.insert({ userId, name: 'heyifan2' }, trx)
79+
}
80+
})
81+
await $.timeout(3000)
82+
const b = await User.checkById('411******728253X')
83+
console.log(b);
84+
}
85+
a()

src/typings.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ export namespace CoaMysql {
77
export type SafePartial<T> = T extends {} ? Partial<T> : any
88
export type Query = (qb: Knex.QueryBuilder) => void
99
export type QueryBuilder = Knex.QueryBuilder
10-
export type Transaction = Knex.Transaction
10+
export interface Transaction extends Knex.Transaction {
11+
__isSafeTransaction?: boolean
12+
clearCacheNsps?: any[]
13+
}
1114
export interface Pager {
1215
rows: number
1316
last: number

0 commit comments

Comments
 (0)