diff --git a/src/components/MysqlSafeTransaction.ts b/src/components/MysqlSafeTransaction.ts new file mode 100644 index 0000000..aeae061 --- /dev/null +++ b/src/components/MysqlSafeTransaction.ts @@ -0,0 +1,29 @@ +import { RedisCache } from "coa-redis" +import { MysqlBin } from "../libs/MysqlBin" +import { CoaMysql } from "../typings" + +export class MysqlSafeTransaction { + private readonly bin: MysqlBin + private readonly cache: RedisCache + + constructor(bin: MysqlBin, cache: RedisCache) { + this.bin = bin + this.cache = cache + } + + async safeTransaction(handler: (trx: CoaMysql.Transaction) => Promise): Promise { + let clearCacheNsps: any[] = [] + const result = await this.bin.io.transaction(async (trx: CoaMysql.Transaction) => { + trx.__isSafeTransaction = true + trx.clearCacheNsps = [] + + const result = await handler(trx) + + clearCacheNsps = trx.clearCacheNsps || [] + return result + }) + + if (clearCacheNsps.length > 0) { await this.cache.mDelete(clearCacheNsps) } + return result + } +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 4aeee94..351f65e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,8 @@ -export { CoaMysql } from './typings' +export { MysqlSafeTransaction } from './components/MysqlSafeTransaction' +export { MysqlStorage } from './components/MysqlStorage' +export { MysqlUuid } from './components/MysqlUuid' export { MysqlBin } from './libs/MysqlBin' -export { MysqlNative } from './services/MysqlNative' export { MysqlCache } from './services/MysqlCache' -export { MysqlUuid } from './components/MysqlUuid' -export { MysqlStorage } from './components/MysqlStorage' +export { MysqlNative } from './services/MysqlNative' +export { CoaMysql } from './typings' + diff --git a/src/libs/MysqlBin.ts b/src/libs/MysqlBin.ts index 21c8f35..141b79d 100644 --- a/src/libs/MysqlBin.ts +++ b/src/libs/MysqlBin.ts @@ -1,8 +1,7 @@ import { echo } from 'coa-echo' import { CoaError } from 'coa-error' -import { Knex } from './Knex' import { CoaMysql } from '../typings' - +import { Knex } from './Knex' export class MysqlBin { public io: Knex public config: CoaMysql.Config diff --git a/src/services/MysqlCache.ts b/src/services/MysqlCache.ts index 7ccd92c..cf1f7f3 100644 --- a/src/services/MysqlCache.ts +++ b/src/services/MysqlCache.ts @@ -5,7 +5,6 @@ import { secure } from 'coa-secure' import { MysqlBin } from '../libs/MysqlBin' import { CoaMysql } from '../typings' import { MysqlNative } from './MysqlNative' - export class MysqlCache extends MysqlNative { redisCache: RedisCache @@ -15,49 +14,49 @@ export class MysqlCache extends MysqlNative { } async insert(data: CoaMysql.SafePartial, trx?: CoaMysql.Transaction) { - const id = await super.insert(data, trx) - await this.deleteCache([id], [data]) + const id = await super.insert(data, trx); + await this.deleteCache([id], [data], trx) return id } async mInsert(dataList: Array>, trx?: CoaMysql.Transaction) { - const ids = await super.mInsert(dataList, trx) - await this.deleteCache(ids, dataList) + const ids = await super.mInsert(dataList, trx); + await this.deleteCache(ids, dataList, trx) return ids } async updateById(id: string, data: CoaMysql.SafePartial, trx?: CoaMysql.Transaction) { const dataList = await this.getCacheChangedDataList([id], data, trx) const result = await super.updateById(id, data, trx) - if (result) await this.deleteCache([id], dataList) + if (result) await this.deleteCache([id], dataList, trx) return result } async updateByIds(ids: string[], data: CoaMysql.SafePartial, trx?: CoaMysql.Transaction) { const dataList = await this.getCacheChangedDataList(ids, data, trx) - const result = await super.updateByIds(ids, data, trx) - if (result) await this.deleteCache(ids, dataList) + const result = await super.updateByIds(ids, data, trx); + if (result) await this.deleteCache(ids, dataList, trx) return result } async updateForQueryById(id: string, query: CoaMysql.Query, data: CoaMysql.SafePartial, trx?: CoaMysql.Transaction) { const dataList = await this.getCacheChangedDataList([id], data, trx) - const result = await super.updateForQueryById(id, query, data, trx) - if (result) await this.deleteCache([id], dataList) + const result = await super.updateForQueryById(id, query, data, trx); + if (result) await this.deleteCache([id], dataList, trx) return result } async upsertById(id: string, data: CoaMysql.SafePartial, trx?: CoaMysql.Transaction) { const dataList = await this.getCacheChangedDataList([id], data, trx) - const result = await super.upsertById(id, data, trx) - await this.deleteCache([id], dataList) + const result = await super.upsertById(id, data, trx); + await this.deleteCache([id], dataList, trx) return result } async deleteByIds(ids: string[], trx?: CoaMysql.Transaction) { const dataList = await this.getCacheChangedDataList(ids, undefined, trx) - const result = await super.deleteByIds(ids, trx) - if (result) await this.deleteCache(ids, dataList) + const result = await super.deleteByIds(ids, trx); + await this.deleteCache(ids, dataList, trx) return result } @@ -66,16 +65,16 @@ export class MysqlCache extends MysqlNative { } async getById(id: string, pick = this.columns, trx?: CoaMysql.Transaction, ms = this.ms, force = false) { - const result = await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force) + const result = trx?.__isSafeTransaction ? await super.getById(id, this.columns, trx) : await this.redisCache.warp(this.getCacheNsp('id'), id, async () => await super.getById(id, this.columns, trx), ms, force) return this.pickResult(result, pick) } async getIdBy(field: string, value: string | number, trx?: CoaMysql.Transaction) { - return await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx)) + return trx?.__isSafeTransaction ? await super.getIdBy(field, value, trx) : await this.redisCache.warp(this.getCacheNsp('index', field), '' + value, async () => await super.getIdBy(field, value, trx)) } async mGetByIds(ids: string[], pick = this.pick, trx?: CoaMysql.Transaction, ms = this.ms, force = false) { - const result = await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force) + const result = trx?.__isSafeTransaction ? await super.mGetByIds(ids, this.columns, trx) : await this.redisCache.mWarp(this.getCacheNsp('id'), ids, async ids => await super.mGetByIds(ids, this.columns, trx), ms, force) _.forEach(result, (v, k) => { result[k] = this.pickResult(v, pick) }) @@ -88,48 +87,46 @@ export class MysqlCache extends MysqlNative { } protected async findListCount(finger: Array>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('data') const cacheId = 'list-count:' + secure.sha1($.sortQueryString(...finger)) - return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectListCount(query, trx)) + return trx?.__isSafeTransaction ? await super.selectListCount(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectListCount(query, trx)) } protected async findIdList(finger: Array>, query: CoaMysql.Query, trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('data') const cacheId = 'list:' + secure.sha1($.sortQueryString(...finger)) - return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdList(query, trx)) + return trx?.__isSafeTransaction ? await super.selectIdList(query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdList(query, trx)) } protected async findIdSortList(finger: Array>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('data') const cacheId = `sort-list:${pager.rows}:${pager.last}:` + secure.sha1($.sortQueryString(...finger)) - return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdSortList(pager, query, trx)) + return trx?.__isSafeTransaction ? await super.selectIdSortList(pager, query, trx) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdSortList(pager, query, trx)) } protected async findIdViewList(finger: Array>, pager: CoaMysql.Pager, query: CoaMysql.Query, trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('data') const cacheId = `view-list:${pager.rows}:${pager.page}:` + secure.sha1($.sortQueryString(...finger)) const count = await this.findListCount(finger, query, trx) - return await this.redisCache.warp(cacheNsp, cacheId, async () => await super.selectIdViewList(pager, query, trx, count)) + return trx?.__isSafeTransaction ? await super.selectIdViewList(pager, query, trx, count) : await this.redisCache.warp(this.getCacheNsp('data'), cacheId, async () => await super.selectIdViewList(pager, query, trx, count)) } protected async mGetCountBy(field: string, ids: string[], trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('count', field) - return await this.redisCache.mWarp(cacheNsp, ids, async ids => { + const queryFunction = async () => { const rows = (await this.table(trx).select({ id: field }).count({ count: this.key }).whereIn(field, ids).groupBy(field)) as any[] const result: CoaMysql.Dic = {} _.forEach(rows, ({ id, count }) => (result[id] = count)) return result - }) + } + const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.mWarp(this.getCacheNsp('count', field), ids, queryFunction) + return result } protected async getCountBy(field: string, value: string, query?: CoaMysql.Query, trx?: CoaMysql.Transaction) { - const cacheNsp = this.getCacheNsp('count', field) - return await this.redisCache.warp(cacheNsp, value, async () => { + const queryFunction = async () => { const qb = this.table(trx).count({ count: this.key }) query ? query(qb) : qb.where(field, value) const rows = await qb return (rows[0]?.count as number) || 0 - }) + } + const result = trx?.__isSafeTransaction ? await queryFunction() : await this.redisCache.warp(this.getCacheNsp('count', field), value, queryFunction) + return result } protected pickResult(data: T, pick: string[]) { @@ -155,10 +152,15 @@ export class MysqlCache extends MysqlNative { return resultList } - protected async deleteCache(ids: string[], dataList: Array>) { + async deleteCache(ids: string[], dataList: Array>, trx?: CoaMysql.Transaction) { const deleteIds = [] as CoaRedis.CacheDelete[] - deleteIds.push([this.getCacheNsp('id'), ids]) - deleteIds.push([this.getCacheNsp('data'), []]) + if (trx?.__isSafeTransaction) { + (trx as any)?.clearCacheNsps.push([this.getCacheNsp('id'), ids]); + (trx as any)?.clearCacheNsps.push([this.getCacheNsp('data'), []]) + } else { + deleteIds.push([this.getCacheNsp('id'), ids]) + deleteIds.push([this.getCacheNsp('data'), []]) + } _.forEach(this.caches, (items, name) => { // name可能为index,count,或自定义 items.forEach(item => { @@ -169,9 +171,11 @@ export class MysqlCache extends MysqlNative { data?.[key] && ids.push(data[key]) }) ids.push(...keys.slice(1)) - ids.length && deleteIds.push([this.getCacheNsp(name, key), ids]) + if (ids.length) { + (trx?.__isSafeTransaction) ? (trx as any)?.clearCacheNsps.push([this.getCacheNsp(name, key), ids]) : deleteIds.push([this.getCacheNsp(name, key), ids]) + } }) }) - await this.redisCache.mDelete(deleteIds) + if (!trx?.__isSafeTransaction) await this.redisCache.mDelete(deleteIds) } } diff --git a/src/test/demo.ts b/src/test/demo.ts index 3e1c899..010b7f1 100644 --- a/src/test/demo.ts +++ b/src/test/demo.ts @@ -144,4 +144,4 @@ await User.mGetByIds(['id001', 'id002'], ['name']) // 返回 {id001:{userId:'id0 await User.truncate() // 无返回值,主要不报错即成功截断整个表 // 自定义方法 -await User.customMethod() // 执行自定义方法 +await User.customMethod() // 执行自定义方法 \ No newline at end of file diff --git a/src/test/demoSafeTransaction.ts b/src/test/demoSafeTransaction.ts new file mode 100644 index 0000000..74aa9e1 --- /dev/null +++ b/src/test/demoSafeTransaction.ts @@ -0,0 +1,85 @@ +/* eslint-disable @typescript-eslint/no-redeclare */ +// @ts-nocheck +import { $, _ } from 'coa-helper' +import { RedisBin, RedisCache } from 'coa-redis' +import { MysqlBin, MysqlCache, MysqlSafeTransaction } from '..' +// MySQL配置 +const mysqlConfig = { + host: '127.0.0.1', + port: 3306, + user: 'root', + password: '', + charset: 'utf8mb4', + trace: true, + debug: false, + databases: { + main: { database: 'coa-mysql', ms: 7 * 24 * 3600 * 1000 }, + }, +} + +const redisConfig = { + host: '127.0.0.1', + port: 6379, + password: '', + db: 1, + prefix: 'coa-mysql', + trace: false, + +} +const redisBin = new RedisBin(redisConfig) +const redisCache = new RedisCache(redisBin) + +// 初始化Mysql基本连接,后续所有模型均依赖此实例 +const mysqlBin = new MysqlBin(mysqlConfig) +const safeTransaction = new MysqlSafeTransaction(mysqlBin, redisCache) + +const userScheme = { + userId: '' as string, + name: '' as string, + mobile: '' as string, + avatar: '' as string, + gender: 1 as number, + language: '' as string, + status: 1 as number, + created: 0 as number, + updated: 0 as number, +} + + +// 定义User类型(通过默认结构自动生成) +type UserScheme = typeof userScheme + + +// 通过基类初始化 +const User = new (class extends MysqlCache { + constructor() { + super( + { + name: 'User', // 表名,默认会转化为下划线(snackCase)形式,如 User->user UserPhoto->user_photo + title: '用户表', // 表的备注名称 + scheme: userScheme, // 表的默认结构 + pick: ['userId', 'name'], // 查询列表时显示的字段信息 + caches: { index: ['name'], count: ['userId', 'name'] }, + + }, + mysqlBin, + redisCache, + ) + } +})() + +const a = async () => { + await User.checkById('411******728253X') + await $.timeout(3000) + await safeTransaction.safeTransaction(async (trx: CoaMysql.Transaction) => { + await User.updateById('411******728253X', { name: 'mmm' }) + for (let index = 0; index < 10; index++) { + const userId = `${_.now()}-${index}-Y` + await User.insert({ userId, name: 'heyifan2' }, trx) + } + }) + await $.timeout(3000) + const b = await User.checkById('411******728253X') + console.log(b); +} +a() diff --git a/src/typings.ts b/src/typings.ts index 9408788..a91254d 100644 --- a/src/typings.ts +++ b/src/typings.ts @@ -7,7 +7,10 @@ export namespace CoaMysql { export type SafePartial = T extends {} ? Partial : any export type Query = (qb: Knex.QueryBuilder) => void export type QueryBuilder = Knex.QueryBuilder - export type Transaction = Knex.Transaction + export interface Transaction extends Knex.Transaction { + __isSafeTransaction?: boolean + clearCacheNsps?: any[] + } export interface Pager { rows: number last: number