Skip to content

Outbox prisma adapter #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0353847
AP-5046 WIP prisma adapter.
kamilwylegala Sep 9, 2024
14fbfba
WIP prisma adapter.
kamilwylegala Sep 9, 2024
d55405e
Working test.
kamilwylegala Sep 11, 2024
e93bdad
WIP
kamilwylegala Sep 11, 2024
963014a
Working test for saving outbox entries.
kamilwylegala Nov 26, 2024
7444fab
failing test for updating.
kamilwylegala Nov 26, 2024
9c971fc
Bulk update + insert.
kamilwylegala Nov 27, 2024
5ad155f
Failed entries handling.
kamilwylegala Nov 27, 2024
d9e3d41
fetching entries up to the retry count limit.
kamilwylegala Nov 27, 2024
c61533d
Narrowed down types.
kamilwylegala Nov 27, 2024
552e0b6
lint fix
kamilwylegala Nov 27, 2024
f143384
Use generated db client from test dir.
kamilwylegala Nov 27, 2024
432875b
Build includes building test prisma client.
kamilwylegala Nov 27, 2024
ccf29a6
Fixed import.
kamilwylegala Nov 27, 2024
3f39f92
prisma main dependency.
kamilwylegala Nov 27, 2024
0b7d718
prisma client dev dependency.
kamilwylegala Nov 27, 2024
c2108e3
Build before lint.
kamilwylegala Nov 28, 2024
a7a9cb0
Ignore db client in biome.
kamilwylegala Nov 28, 2024
1b3c72c
Peer prisma.
kamilwylegala Nov 28, 2024
77b1d26
inferred type
kamilwylegala Nov 28, 2024
856ee17
debugging ci
kamilwylegala Nov 28, 2024
45d7310
keep prisma outside test folder in root
kamilwylegala Nov 28, 2024
00ca6d0
Fixed import in spec.
kamilwylegala Nov 28, 2024
33b954b
temp ts ignore.
kamilwylegala Nov 28, 2024
829c9ac
wait for db.
kamilwylegala Nov 28, 2024
1f39dbf
wait for db.
kamilwylegala Nov 28, 2024
8b818c1
wait for db.
kamilwylegala Nov 28, 2024
ab58167
Redundant docker start.
kamilwylegala Nov 28, 2024
ed4ce02
Simplified outbox entry.
kamilwylegala Jan 23, 2025
bc80d7d
Fixed tests to new version of outbox entry.
kamilwylegala Jan 23, 2025
55bdd41
Working on ModelDelegate
kamilwylegala Jan 24, 2025
01975c0
Fixed model delegate.
kamilwylegala Jan 24, 2025
0fc3e7f
Type fix.
kamilwylegala Jan 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/linting.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,9 @@ jobs:
run: |
npm install --ignore-scripts

- name: Build
run: |
npm run build

- name: Run lint
run: npm run lint
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ dist
.pnp.*
/.idea
/package-lock.json

# prisma
db-client
23 changes: 13 additions & 10 deletions biome.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
{
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json",
"extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"],
"linter": {
"rules": {
"performance": {
"noBarrelFile": "off",
"noReExportAll": "off"
}
}
}
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json",
"extends": ["./node_modules/@kibertoad/biome-config/configs/biome-package.json"],
"linter": {
"rules": {
"performance": {
"noBarrelFile": "off",
"noReExportAll": "off"
}
}
},
"files": {
"ignore": ["db-client"]
}
}
18 changes: 18 additions & 0 deletions packages/outbox-prisma-adapter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# outbox-prisma-adapter

This package provides a Prisma adapter for the Outbox pattern.

### Development

#### Tests

To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker:

```sh
docker-compose up -d
```

Then update Prisma client:
```sh
npx prisma generate --schema=./test/schema.prisma
```
10 changes: 10 additions & 0 deletions packages/outbox-prisma-adapter/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:

postgres:
image: postgres:16.2
environment:
POSTGRES_USER: prisma
POSTGRES_PASSWORD: prisma
POSTGRES_DB: prisma
ports:
- 5432:5432
1 change: 1 addition & 0 deletions packages/outbox-prisma-adapter/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lib/outbox-prisma-adapter'
167 changes: 167 additions & 0 deletions packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import type {
OutboxAccumulator,
OutboxEntry,
OutboxStorage,
} from '@message-queue-toolkit/outbox-core'
import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas'
import type { PrismaClient } from '@prisma/client'

type ModelDelegate = {
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
create: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
findMany: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
createMany: (args: any) => Promise<any>
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
updateMany: (args: any) => Promise<any>
}
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 I am not really happy with using any here, as we know the method signature could we define input and output types to have type check and IDE help?

type OutboxEntryDelegate<Event extends CommonEventDefinition> = {
  create: (args: { data: OutboxEntry<Event> }) => Promise<OutboxEntry<Event>>
  findMany: (args: { where?: Partial<OutboxEntry<Event>> }) => Promise<OutboxEntry<Event>[]>
....
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. It's still a work in progress. I did any here to move quicker. I fully agree, it needs to be addressed. I will work on it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, please take a look 😁


export class OutboxPrismaAdapter<
SupportedEvents extends CommonEventDefinition[],
ModelName extends keyof PrismaClient & string,
> implements OutboxStorage<SupportedEvents>
{
constructor(
private readonly prisma: PrismaClient,
private readonly modelName: ModelName,
) {}
Comment on lines +38 to +46
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 I remember having issue with passing PrismaClient directly as a parameter on prima-utils package, I don't remember exactly the reason, but it was because the object we are using is the autogenerate one and not the default coming from the Prisma package. I fixed it by doing something like:

export class OutboxPrismaAdapter<
  SupportedEvents extends CommonEventDefinition[],
  Prisma extends PrismaClient,
  ...
> implements OutboxStorage<SupportedEvents>
{
  constructor(
    private readonly prisma: Prisma,
    private readonly modelName: ModelName,
  ) {}
...
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am not sure if the definition of the Model name will work to have IDE autocompletion, wondering if we can make it a but more specific with something like:

type PrismaModelName<T extends PrismaClient> = {
  [K in keyof T]: T[K] extends { create: Function; findMany: Function } ? K : never
}[keyof T]

export class OutboxPrismaAdapter<
  SupportedEvents extends CommonEventDefinition[],
  Prisma extends PrismaClient,
  ModelName extends PrismaModelName<Prisma>,
> implements OutboxStorage<SupportedEvents>
{
  constructor(
    private readonly prisma: Prisma,
    private readonly modelName: ModelName,
  ) {}


createEntry(
outboxEntry: OutboxEntry<SupportedEvents[number]>,
): Promise<OutboxEntry<SupportedEvents[number]>> {
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 I see we are doing this on all methods, could we maybe create a private method to retrieve the model delegate so we can have the casting in a single place


// @ts-ignore
const messageType = getMessageType(outboxEntry.event)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CarlosGamero While you're looking at the PR, could you check this line? For some reason, TSLint complains about type mismatch here. I believe, the code is exactly the same as in the other packages, thus there are still compilation errors.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course! let me have a look :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed in Slack, For visibility:

Most likely we don't know to save the type in a separate filed and we can save it with the event itself, but just for visibility the issue was that getMessageType is expecting a message type and we are passing an event, if I am reading it fine on code, a message is an extension of event and that's why it was failing

return prismaModel.create({
data: {
id: outboxEntry.id,
type: messageType,
created: outboxEntry.created,
updated: outboxEntry.updated,
data: outboxEntry.data,
status: outboxEntry.status,
retryCount: outboxEntry.retryCount,
},
})
}

async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
const entries = await outboxAccumulator.getEntries()
const failedEntries = await outboxAccumulator.getFailedEntries()
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate

const existingEntries = await prismaModel.findMany({
where: {
id: {
in: [...entries.map((entry) => entry.id), ...failedEntries.map((entry) => entry.id)],
},
},
})

await this.prisma.$transaction(async (prisma) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 What do you think about using prismaTransaction (from @lokalise/prisma-utils) here so we can benefit from teh retry mechanism implemented there

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I discussed it with Igor, it's kibertoad namespace, so the preference is to not depend on any lokalise pacakges.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember this conversation, but if it happened, then I disagree with Igor from the past. We use lokalise-namespaced packages liberally in node-service-template, there is nothing wrong with it, and I don't think that @lokalise/prisma-utils is coupled to our internal specifics - as long as it supports key DBs (PostgreSQL and MySQL), it should be fine to use it.
Since this is a dedicated package for prisma adapter, having prisma-related dependencies in it should be fine

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's use prisma utils.

Back then, it was about using @lokalise/id-utils:

#204 (comment)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, it was a slightly different case, we only needed a subset of the library, of which our id-utils is basically an opinionated wrapper

const prismaModel = prisma[this.modelName] as ModelDelegate
await this.handleSuccesses(prismaModel, entries, existingEntries)
await this.handleFailures(prismaModel, failedEntries, existingEntries)
})
Copy link
Collaborator

@CarlosGamero CarlosGamero Jan 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 How many entries do we expect here? wondering if chunking could be beneficial

}

private async handleSuccesses(
prismaModel: ModelDelegate,
entries: OutboxEntry<SupportedEvents[number]>[],
existingEntries: OutboxEntry<SupportedEvents[number]>[],
) {
const toCreate = entries.filter(
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id),
)
const toUpdate = entries.filter((entry) =>
existingEntries.some((existingEntry) => existingEntry.id === entry.id),
)

if (toCreate.length > 0) {
await prismaModel.createMany({
data: toCreate.map((entry) => ({
id: entry.id,
// @ts-ignore
type: getMessageType(entry.event),
created: entry.created,
updated: new Date(),
data: entry.data,
status: 'SUCCESS',
})),
})
}

if (toUpdate.length > 0) {
await prismaModel.updateMany({
where: {
id: {
in: toUpdate.map((entry) => entry.id),
},
},
data: {
status: 'SUCCESS',
updated: new Date(),
},
})
}
}

private async handleFailures(
prismaModel: ModelDelegate,
entries: OutboxEntry<SupportedEvents[number]>[],
existingEntries: OutboxEntry<SupportedEvents[number]>[],
) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 This method is almost the same as handleSuccesses wondering if we can combine them by adding another param like isSuccess

const toCreate = entries.filter(
(entry) => !existingEntries.some((existingEntry) => existingEntry.id === entry.id),
)
const toUpdate = entries.filter((entry) =>
existingEntries.some((existingEntry) => existingEntry.id === entry.id),
)

if (toCreate.length > 0) {
await prismaModel.createMany({
data: toCreate.map((entry) => ({
id: entry.id,
// @ts-ignore
type: getMessageType(entry.event),
created: entry.created,
updated: new Date(),
data: entry.data,
status: 'FAILED',
retryCount: 1,
})),
})
}

if (toUpdate.length > 0) {
await prismaModel.updateMany({
where: {
id: {
in: toUpdate.map((entry) => entry.id),
},
},
data: {
status: 'FAILED',
updated: new Date(),
retryCount: {
increment: 1,
},
},
})
}
}

getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {
const prismaModel = this.prisma[this.modelName] as unknown as ModelDelegate

return prismaModel.findMany({
where: {
retryCount: {
lte: maxRetryCount,
},
},
})
}
}
66 changes: 66 additions & 0 deletions packages/outbox-prisma-adapter/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"name": "@message-queue-toolkit/outbox-prisma-adapter",
"version": "0.1.0",
"private": false,
"license": "MIT",
"description": "OutboxStorage implementation for @message-queue-toolkit/outbox-core package.",
"maintainers": [
{
"name": "Igor Savin",
"email": "[email protected]"
}
],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "del-cli dist && npm run db:update-client && tsc",
"build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json",
"test": "vitest",
"test:coverage": "npm run docker:start:dev && npm run db:wait && npm test -- --coverage && npm run docker:stop:dev",
"test:ci": "npm run test:coverage",
"lint": "biome check . && tsc --project tsconfig.json --noEmit",
"lint:fix": "biome check --write .",
"docker:start:dev": "docker compose up -d",
"docker:stop:dev": "docker compose down",
"db:wait": "while ! echo \"SELECT 1;\" | prisma db execute --stdin; do sleep 1; done",
"db:update-client": "prisma generate",
"prepublishOnly": "npm run build:release"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=14.0.0",
"@message-queue-toolkit/outbox-core": ">=0.1.0",
"@message-queue-toolkit/schemas": ">=4.0.0",
"prisma": "^5.22.0",
"@prisma/client": "^5.19.1"
},
"devDependencies": {
"@biomejs/biome": "1.8.3",
"@kibertoad/biome-config": "^1.2.1",
"@prisma/client": "^5.22.0",
"@types/node": "^22.0.0",
"@vitest/coverage-v8": "^2.0.4",
"del-cli": "^5.1.0",
"prisma": "^5.22.0",
"typescript": "^5.5.3",
"uuidv7": "^1.0.2",
"vitest": "^2.0.4",
"zod": "^3.23.8"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
"type": "git",
"url": "git://github.com/kibertoad/message-queue-toolkit.git"
},
"keywords": [
"message",
"queue",
"queues",
"abstract",
"common",
"utils",
"notification",
"outbox",
"pattern"
],
"files": ["README.md", "LICENSE", "dist/*"]
}
20 changes: 20 additions & 0 deletions packages/outbox-prisma-adapter/prisma/schema.prisma
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
datasource db {
provider = "postgresql"
url = "postgresql://prisma:prisma@localhost:5432/prisma?schema=prisma"
}

model OutboxEntry {
id String @id @default(uuid()) @db.Uuid
created DateTime @default(now())
updated DateTime @default(now()) @updatedAt
type String
retryCount Int @default(0) @map("retry_count")
data Json
status String

@@map("outbox_entry")
}

generator client {
provider = "prisma-client-js"
}
Loading
Loading