Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/libs/MysqlBin.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { echo } from 'coa-echo'
import { CoaError } from 'coa-error'
import { Knex } from './Knex'
import { MysqlCache } from '../services/MysqlCache'
import { CoaMysql } from '../typings'

import { Knex } from './Knex'
export class MysqlBin {
public io: Knex
public config: CoaMysql.Config
Expand Down Expand Up @@ -30,4 +30,25 @@ export class MysqlBin {
this.config = config
this.io = io
}

async safeTransaction<T>(handler: (trx: CoaMysql.Transaction) => Promise<T>): Promise<T> {
let cacheTasks = [] as Array<{ model: MysqlCache<any>, ids: string[], dataList: any[] }>

const result = await this.io.transaction(async (trx: any) => {
// 收集事务更新缓存数据
trx.trxUpdateCacheTaskList = async (model: MysqlCache<any>, ids: string[], dataList: any[]) => {
cacheTasks.push({ model, ids, dataList })
}
return await handler(trx)
})
// 统一清理事务更新缓存
for (const task of cacheTasks) {
await task.model.deleteCache(task.ids, task.dataList)
}

// 初始数组 减少trx未及时销毁时的内存占用
cacheTasks = []

return result
}
}
63 changes: 30 additions & 33 deletions src/services/MysqlCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { secure } from 'coa-secure'
import { MysqlBin } from '../libs/MysqlBin'
import { CoaMysql } from '../typings'
import { MysqlNative } from './MysqlNative'

export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
redisCache: RedisCache

Expand All @@ -15,49 +14,49 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

async insert(data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const id = await super.insert(data, trx)
await this.deleteCache([id], [data])
const id = await super.insert(data, trx);
(trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, [id], [data]) : await this.deleteCache([id], [data])
return id
}

async mInsert(dataList: Array<CoaMysql.SafePartial<Scheme>>, trx?: CoaMysql.Transaction) {
const ids = await super.mInsert(dataList, trx)
await this.deleteCache(ids, dataList)
const ids = await super.mInsert(dataList, trx);
(trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, ids, dataList) : await this.deleteCache(ids, dataList)
return ids
}

async updateById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.updateById(id, data, trx)
if (result) await this.deleteCache([id], dataList)
const result = await super.updateById(id, data, trx);
if (result) { (trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, [id], dataList) : await this.deleteCache([id], dataList) }
return result
}

async updateByIds(ids: string[], data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList(ids, data, trx)
const result = await super.updateByIds(ids, data, trx)
if (result) await this.deleteCache(ids, dataList)
const result = await super.updateByIds(ids, data, trx);
if (result) { (trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, ids, dataList) : await this.deleteCache(ids, dataList) }
return result
}

async updateForQueryById(id: string, query: CoaMysql.Query, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.updateForQueryById(id, query, data, trx)
if (result) await this.deleteCache([id], dataList)
const result = await super.updateForQueryById(id, query, data, trx);
if (result) { (trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, [id], dataList) : await this.deleteCache([id], dataList) }
return result
}

async upsertById(id: string, data: CoaMysql.SafePartial<Scheme>, trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList([id], data, trx)
const result = await super.upsertById(id, data, trx)
await this.deleteCache([id], dataList)
const result = await super.upsertById(id, data, trx);
(trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, [id], dataList) : await this.deleteCache([id], dataList)
return result
}

async deleteByIds(ids: string[], trx?: CoaMysql.Transaction) {
const dataList = await this.getCacheChangedDataList(ids, undefined, trx)
const result = await super.deleteByIds(ids, trx)
if (result) await this.deleteCache(ids, dataList)
const result = await super.deleteByIds(ids, trx);
if (result) { (trx && (trx as any).trxUpdateCacheTaskList) ? await (trx as any).trxUpdateCacheTaskList(this, ids, dataList) : await this.deleteCache(ids, dataList) }
return result
}

Expand All @@ -66,16 +65,16 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

async getById(id: string, pick = this.columns, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
const result = await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
const result = trx ? await super.getById(id, this.columns, trx) : await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force)
return this.pickResult(result, pick)
}

async getIdBy(field: string, value: string | number, trx?: CoaMysql.Transaction) {
return await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
return trx ? await super.getIdBy(field, value, trx) : await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx))
}

async mGetByIds(ids: string[], pick = this.pick, trx?: CoaMysql.Transaction, ms = this.ms, force = false) {
const result = await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
const result = trx ? await super.mGetByIds(ids, this.columns, trx) : await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force)
_.forEach(result, (v, k) => {
result[k] = this.pickResult(v, pick)
})
Expand All @@ -88,48 +87,46 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
}

protected async findListCount(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = 'list-count:' + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectListCount(query, trx))
return trx ? await super.selectListCount(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectListCount(query, trx))
}

protected async findIdList(finger: Array<CoaMysql.Dic<any>>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = 'list:' + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdList(query, trx))
return trx ? await super.selectIdList(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdList(query, trx))
}

protected async findIdSortList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = `sort-list:${pager.rows}:${pager.last}:` + secure.sha1($.sortQueryString(...finger))
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdSortList(pager, query, trx))
return trx ? await super.selectIdSortList(pager, query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdSortList(pager, query, trx))
}

protected async findIdViewList(finger: Array<CoaMysql.Dic<any>>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('data')
const cacheId = `view-list:${pager.rows}:${pager.page}:` + secure.sha1($.sortQueryString(...finger))
const count = await this.findListCount(finger, query, trx)
return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
return trx ? await super.selectIdViewList(pager, query, trx, count) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdViewList(pager, query, trx, count))
}

protected async mGetCountBy(field: string, ids: string[], trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('count', field)
return await this.redisCache.mWarp(cacheNsp, ids, async ids => {
const queryFunction = async () => {
const rows = (await this.table(trx).select({ id: field }).count({ count: this.key }).whereIn(field, ids).groupBy(field)) as any[]
const result: CoaMysql.Dic<number> = {}
_.forEach(rows, ({ id, count }) => (result[id] = count))
return result
})
}
const result = trx ? await queryFunction() : await this.redisCache.mWarp(this.getCacheNsp('count', field), ids, queryFunction)
return result
}

protected async getCountBy(field: string, value: string, query?: CoaMysql.Query, trx?: CoaMysql.Transaction) {
const cacheNsp = this.getCacheNsp('count', field)
return await this.redisCache.warp(cacheNsp, value, async () => {
const queryFunction = async () => {
const qb = this.table(trx).count({ count: this.key })
query ? query(qb) : qb.where(field, value)
const rows = await qb
return (rows[0]?.count as number) || 0
})
}
const result = trx ? await queryFunction() : await this.redisCache.warp(this.getCacheNsp('count', field), value, queryFunction)
return result
}

protected pickResult<T>(data: T, pick: string[]) {
Expand All @@ -155,7 +152,7 @@ export class MysqlCache<Scheme> extends MysqlNative<Scheme> {
return resultList
}

protected async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>) {
async deleteCache(ids: string[], dataList: Array<CoaMysql.SafePartial<Scheme>>) {
const deleteIds = [] as CoaRedis.CacheDelete[]
deleteIds.push([this.getCacheNsp('id'), ids])
deleteIds.push([this.getCacheNsp('data'), []])
Expand Down
Loading