|
| 1 | +import { randomUUID } from 'node:crypto' |
1 | 2 | import {
|
2 |
| - type SnsAwareEventDefinition, |
3 |
| - enrichEventSchemaWithBase, |
| 3 | + CommonMetadataFiller, |
| 4 | + DomainEventEmitter, |
| 5 | + EventRegistry, |
| 6 | +} from '@message-queue-toolkit/core' |
| 7 | +import { |
| 8 | + type CommonEventDefinition, |
| 9 | + type CommonEventDefinitionPublisherSchemaType, |
| 10 | + enrichMessageSchemaWithBase, |
4 | 11 | } from '@message-queue-toolkit/schemas'
|
5 |
| -import { describe } from 'vitest' |
| 12 | +import pino, { type Logger } from 'pino' |
| 13 | +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' |
6 | 14 | import { z } from 'zod'
|
7 |
| -import type { OutboxStorage } from './outbox' |
| 15 | +import { type OutboxEntry, OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox' |
8 | 16 |
|
9 |
| -const TEST_EVENT_SCHEMA = z.object({ |
10 |
| - name: z.string(), |
11 |
| - age: z.number(), |
12 |
| -}) |
| 17 | +const TestEvents = { |
| 18 | + created: { |
| 19 | + ...enrichMessageSchemaWithBase( |
| 20 | + 'entity.created', |
| 21 | + z.object({ |
| 22 | + message: z.string(), |
| 23 | + }), |
| 24 | + ), |
| 25 | + }, |
| 26 | + |
| 27 | + updated: { |
| 28 | + ...enrichMessageSchemaWithBase( |
| 29 | + 'entity.updated', |
| 30 | + z.object({ |
| 31 | + message: z.string(), |
| 32 | + }), |
| 33 | + ), |
| 34 | + }, |
| 35 | +} as const satisfies Record<string, CommonEventDefinition> |
| 36 | + |
| 37 | +type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] |
| 38 | + |
| 39 | +const createdEventPayload: CommonEventDefinitionPublisherSchemaType<typeof TestEvents.created> = { |
| 40 | + payload: { |
| 41 | + message: 'msg', |
| 42 | + }, |
| 43 | + type: 'entity.created', |
| 44 | + metadata: { |
| 45 | + originatedFrom: 'service', |
| 46 | + producedBy: 'producer', |
| 47 | + schemaVersion: '1', |
| 48 | + correlationId: randomUUID(), |
| 49 | + }, |
| 50 | +} |
13 | 51 |
|
14 |
| -const testEvents = { |
15 |
| - 'test-event.something-happened': { |
16 |
| - ...enrichEventSchemaWithBase('test-event.something-happened', TEST_EVENT_SCHEMA), |
17 |
| - snsTopic: 'TEST_TOPIC', |
18 |
| - producedBy: ['unit tes'], |
| 52 | +const updatedEventPayload: CommonEventDefinitionPublisherSchemaType<typeof TestEvents.updated> = { |
| 53 | + ...createdEventPayload, |
| 54 | + type: 'entity.updated', |
| 55 | +} |
| 56 | + |
| 57 | +const expectedCreatedPayload = { |
| 58 | + id: expect.any(String), |
| 59 | + timestamp: expect.any(String), |
| 60 | + payload: { |
| 61 | + message: 'msg', |
| 62 | + }, |
| 63 | + type: 'entity.created', |
| 64 | + metadata: { |
| 65 | + correlationId: expect.any(String), |
| 66 | + originatedFrom: 'service', |
| 67 | + producedBy: 'producer', |
| 68 | + schemaVersion: '1', |
19 | 69 | },
|
20 |
| -} satisfies Record<string, SnsAwareEventDefinition> |
| 70 | +} |
| 71 | + |
| 72 | +const expectedUpdatedPayload = { |
| 73 | + ...expectedCreatedPayload, |
| 74 | + type: 'entity.updated', |
| 75 | +} |
| 76 | + |
| 77 | +const TestLogger: Logger = pino() |
| 78 | + |
| 79 | +class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]> |
| 80 | + implements OutboxStorage<SupportedEvents> |
| 81 | +{ |
| 82 | + public entries: OutboxEntry<SupportedEvents[number]>[] = [] |
21 | 83 |
|
22 |
| -console.log(testEvents) |
| 84 | + create( |
| 85 | + outboxEntry: OutboxEntry<SupportedEvents[number]>, |
| 86 | + ): Promise<OutboxEntry<SupportedEvents[number]>> { |
| 87 | + this.entries = [...this.entries, outboxEntry] |
23 | 88 |
|
24 |
| -const testEventsArray = [...Object.values(testEvents)] satisfies SnsAwareEventDefinition[] |
| 89 | + return Promise.resolve(outboxEntry) |
| 90 | + } |
25 | 91 |
|
26 |
| -type TestEvents = typeof testEventsArray |
| 92 | + getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> { |
| 93 | + const entries = this.entries.filter((entry) => { |
| 94 | + return entry.status !== 'SUCCESS' && entry.retryCount <= maxRetryCount |
| 95 | + }) |
27 | 96 |
|
28 |
| -class InMemoryOutbox implements OutboxStorage<TestEvents> {} |
| 97 | + return Promise.resolve(entries) |
| 98 | + } |
| 99 | + |
| 100 | + update( |
| 101 | + outboxEntry: OutboxEntry<SupportedEvents[number]>, |
| 102 | + ): Promise<OutboxEntry<SupportedEvents[number]>> { |
| 103 | + this.entries = this.entries.map((entry) => { |
| 104 | + if (entry.id === outboxEntry.id) { |
| 105 | + return outboxEntry |
| 106 | + } |
| 107 | + return entry |
| 108 | + }) |
| 109 | + |
| 110 | + return Promise.resolve(outboxEntry) |
| 111 | + } |
| 112 | +} |
29 | 113 |
|
30 | 114 | describe('outbox', () => {
|
31 |
| - it('saves outbox entry to process it later', async () => {}) |
| 115 | + let outboxProcessor: OutboxProcessor<TestEventsType> |
| 116 | + let eventEmitter: DomainEventEmitter<TestEventsType> |
| 117 | + let outboxEventEmitter: OutboxEventEmitter<TestEventsType> |
| 118 | + let outboxStorage: InMemoryOutboxStorage<TestEventsType> |
| 119 | + |
| 120 | + beforeEach(() => { |
| 121 | + eventEmitter = new DomainEventEmitter({ |
| 122 | + logger: TestLogger, |
| 123 | + errorReporter: { report: () => {} }, |
| 124 | + eventRegistry: new EventRegistry(Object.values(TestEvents)), |
| 125 | + metadataFiller: new CommonMetadataFiller({ |
| 126 | + serviceId: 'test', |
| 127 | + }), |
| 128 | + }) |
| 129 | + |
| 130 | + outboxStorage = new InMemoryOutboxStorage<TestEventsType>() |
| 131 | + outboxEventEmitter = new OutboxEventEmitter<TestEventsType>(outboxStorage) |
| 132 | + outboxProcessor = new OutboxProcessor<TestEventsType>(outboxStorage, eventEmitter, 2) |
| 133 | + }) |
| 134 | + |
| 135 | + afterEach(() => { |
| 136 | + vi.restoreAllMocks() |
| 137 | + }) |
| 138 | + |
| 139 | + it('saves outbox entry to storage', async () => { |
| 140 | + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { |
| 141 | + correlationId: randomUUID(), |
| 142 | + }) |
| 143 | + |
| 144 | + const entries = await outboxStorage.getEntries(2) |
| 145 | + |
| 146 | + expect(entries).toHaveLength(1) |
| 147 | + }) |
| 148 | + |
| 149 | + it('saves outbox entry and process it', async () => { |
| 150 | + await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, { |
| 151 | + correlationId: randomUUID(), |
| 152 | + }) |
| 153 | + |
| 154 | + await outboxProcessor.processOutboxEntries({ |
| 155 | + logger: TestLogger, |
| 156 | + reqId: randomUUID(), |
| 157 | + executorId: randomUUID(), |
| 158 | + }) |
| 159 | + |
| 160 | + const entries = await outboxStorage.getEntries(2) |
| 161 | + |
| 162 | + expect(entries).toHaveLength(0) |
| 163 | + |
| 164 | + expect(outboxStorage.entries).toMatchObject([ |
| 165 | + { |
| 166 | + status: 'SUCCESS', |
| 167 | + }, |
| 168 | + ]) |
| 169 | + }) |
32 | 170 | })
|
0 commit comments