Skip to content

Commit 1fad912

Browse files
authored
Add examples to the project (#277)
1 parent ad6c6a3 commit 1fad912

17 files changed

+367
-12
lines changed

examples/sns-sqs/biome.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json",
3+
"extends": [
4+
"./node_modules/@lokalise/biome-config/configs/biome-base.jsonc",
5+
"./node_modules/@lokalise/biome-config/configs/biome-esm.jsonc",
6+
"./node_modules/@lokalise/biome-config/configs/biome-package.jsonc"
7+
]
8+
}

examples/sns-sqs/docker-compose.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
services:
2+
3+
localstack:
4+
image: localstack/localstack:4.0.2
5+
network_mode: bridge
6+
hostname: localstack
7+
ports:
8+
- '127.0.0.1:4566:4566' # LocalStack Gateway
9+
- '127.0.0.1:4510-4559:4510-4559' # external services port range
10+
environment:
11+
- SERVICES=sns,sqs,s3,sts
12+
- DEBUG=0
13+
- DATA_DIR=${DATA_DIR-}
14+
- DOCKER_HOST=unix:///var/run/docker.sock
15+
- LOCALSTACK_HOST=localstack
16+
# - LOCALSTACK_API_KEY=someDummyKey
17+
volumes:
18+
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
19+
- '/var/run/docker.sock:/var/run/docker.sock'
20+
restart: on-failure
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
2+
import { userConsumer } from './common/Dependencies.ts'
3+
import { publisherManager } from './common/TestPublisherManager.ts'
4+
import { UserConsumer } from './common/UserConsumer.js'
5+
6+
describe('Publish message', () => {
7+
beforeEach(async () => {
8+
await publisherManager.initRegisteredPublishers([UserConsumer.SUBSCRIBED_TOPIC_NAME])
9+
await userConsumer.start()
10+
})
11+
12+
afterEach(async () => {
13+
await userConsumer.close()
14+
})
15+
16+
it('Publishes a message', async () => {
17+
await publisherManager.publish(UserConsumer.SUBSCRIBED_TOPIC_NAME, {
18+
type: 'user.created',
19+
payload: {
20+
id: '123',
21+
name: 'John Doe',
22+
},
23+
})
24+
25+
const receivedMessage = await userConsumer.handlerSpy.waitForMessage({
26+
type: 'user.created',
27+
})
28+
29+
expect(receivedMessage.message.payload).toMatchInlineSnapshot(`
30+
{
31+
"id": "123",
32+
"name": "John Doe",
33+
}
34+
`)
35+
})
36+
})
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { SNSClient, type SNSClientConfig } from '@aws-sdk/client-sns'
2+
import { SQSClient } from '@aws-sdk/client-sqs'
3+
import { STSClient } from '@aws-sdk/client-sts'
4+
import { SnsConsumerErrorResolver } from '@message-queue-toolkit/sns'
5+
import { pino } from 'pino'
6+
import { UserConsumer } from './UserConsumer.ts'
7+
8+
export const TEST_AWS_CONFIG: SNSClientConfig = {
9+
endpoint: 'http://s3.localhost.localstack.cloud:4566',
10+
region: 'eu-west-1',
11+
credentials: {
12+
accessKeyId: 'access',
13+
secretAccessKey: 'secret',
14+
},
15+
}
16+
17+
export const sqsClient = new SQSClient(TEST_AWS_CONFIG)
18+
export const snsClient = new SNSClient(TEST_AWS_CONFIG)
19+
export const stsClient = new STSClient(TEST_AWS_CONFIG)
20+
21+
export const errorReporter = { report: () => {} }
22+
export const logger = pino()
23+
export const transactionObservabilityManager = {
24+
start: () => {},
25+
startWithGroup: () => {},
26+
stop: () => {},
27+
addCustomAttributes: () => {},
28+
}
29+
30+
export const userConsumer = new UserConsumer({
31+
errorReporter,
32+
logger,
33+
transactionObservabilityManager,
34+
consumerErrorResolver: new SnsConsumerErrorResolver(),
35+
sqsClient,
36+
snsClient,
37+
stsClient,
38+
})
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import {
2+
type SnsAwareEventDefinition,
3+
enrichMessageSchemaWithBaseStrict,
4+
} from '@message-queue-toolkit/schemas'
5+
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
6+
import { z } from 'zod'
7+
8+
type AllConsumerMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> = z.infer<
9+
MessageDefinitionTypes[number]['consumerSchema']
10+
>
11+
12+
export const USER_SCHEMA = z.object({
13+
id: z.string(),
14+
name: z.string(),
15+
age: z.number().optional(),
16+
})
17+
18+
export const UserEvents = {
19+
created: {
20+
...enrichMessageSchemaWithBaseStrict('user.created', USER_SCHEMA, {
21+
description: 'User was created',
22+
}),
23+
schemaVersion: '1.0.1',
24+
producedBy: ['user-service'],
25+
domain: 'users',
26+
snsTopic: 'user',
27+
},
28+
29+
updated: {
30+
...enrichMessageSchemaWithBaseStrict('user.updated', USER_SCHEMA, {
31+
description: 'User was updated',
32+
}),
33+
schemaVersion: '1.0.1',
34+
producedBy: ['user-service'],
35+
domain: 'users',
36+
snsTopic: 'user',
37+
},
38+
} satisfies Record<string, SnsAwareEventDefinition>
39+
40+
export type UserEventsType = (typeof UserEvents)[keyof typeof UserEvents][]
41+
export type UserEventConsumerPayloadsType = AllConsumerMessageSchemas<UserEventsType>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { CommonMetadataFiller, EventRegistry } from '@message-queue-toolkit/core'
2+
import type { allPublisherMessageSchemas } from '@message-queue-toolkit/schemas'
3+
import { SnsPublisherManager } from '@message-queue-toolkit/sns'
4+
import type { CommonSnsPublisher } from '@message-queue-toolkit/sns'
5+
import { CommonSnsPublisherFactory } from '@message-queue-toolkit/sns'
6+
import { errorReporter, logger, snsClient, stsClient } from './Dependencies.ts'
7+
import { UserEvents, type UserEventsType } from './TestMessages.ts'
8+
9+
const isTest = true
10+
11+
type PublisherTypes = allPublisherMessageSchemas<UserEventsType>
12+
13+
export const publisherManager = new SnsPublisherManager<
14+
CommonSnsPublisher<PublisherTypes>,
15+
UserEventsType
16+
>(
17+
{
18+
errorReporter,
19+
logger,
20+
eventRegistry: new EventRegistry(Object.values(UserEvents)),
21+
snsClient,
22+
stsClient,
23+
},
24+
{
25+
metadataFiller: new CommonMetadataFiller({
26+
serviceId: 'service',
27+
}),
28+
publisherFactory: new CommonSnsPublisherFactory(),
29+
newPublisherOptions: {
30+
handlerSpy: true,
31+
messageIdField: 'id',
32+
messageTypeField: 'type',
33+
deletionConfig: {
34+
deleteIfExists: isTest, // only enable this in tests
35+
// and ensure that the owning side is doing the deletion.
36+
// if it is enabled both on consumer and publisher side, you are likely to experience confusing behaviour
37+
},
38+
creationConfig: {
39+
updateAttributesIfExists: true,
40+
},
41+
},
42+
},
43+
)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
2+
import type { ConsumerMessageSchema } from '@message-queue-toolkit/schemas'
3+
import { AbstractSnsSqsConsumer, type SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns'
4+
import { UserEvents } from './TestMessages.ts'
5+
import { userCreatedHandler } from './handlers/UserCreatedHandler.ts'
6+
import { userUpdatedHandler } from './handlers/UserUpdatedHandler.ts'
7+
8+
type SupportedMessages = ConsumerMessageSchema<
9+
typeof UserEvents.created | typeof UserEvents.updated
10+
>
11+
12+
// biome-ignore lint/complexity/noBannedTypes: to be expanded later
13+
type ExecutionContext = {}
14+
15+
export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, ExecutionContext> {
16+
public static readonly CONSUMED_QUEUE_NAME = 'user-my_service'
17+
public static readonly SUBSCRIBED_TOPIC_NAME = 'user'
18+
19+
constructor(dependencies: SNSSQSConsumerDependencies) {
20+
super(
21+
dependencies,
22+
{
23+
handlerSpy: true,
24+
handlers: new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
25+
.addConfig(UserEvents.created, userCreatedHandler, {})
26+
.addConfig(UserEvents.updated, userUpdatedHandler, {})
27+
.build(),
28+
messageTypeField: 'type',
29+
creationConfig: {
30+
queue: {
31+
QueueName: UserConsumer.CONSUMED_QUEUE_NAME,
32+
},
33+
},
34+
locatorConfig: {
35+
topicName: UserConsumer.SUBSCRIBED_TOPIC_NAME,
36+
},
37+
subscriptionConfig: {
38+
updateAttributesIfExists: false,
39+
},
40+
},
41+
{},
42+
)
43+
}
44+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { Either } from '@lokalise/node-core'
2+
import type { z } from 'zod'
3+
import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts'
4+
5+
let _latestData: z.infer<typeof USER_SCHEMA>
6+
7+
export function userCreatedHandler(
8+
message: z.infer<typeof UserEvents.created.consumerSchema>,
9+
): Promise<Either<'retryLater', 'success'>> {
10+
_latestData = message.payload
11+
12+
return Promise.resolve({ result: 'success' })
13+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { Either } from '@lokalise/node-core'
2+
import type { z } from 'zod'
3+
import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts'
4+
5+
let _latestData: z.infer<typeof USER_SCHEMA>
6+
7+
export function userUpdatedHandler(
8+
message: z.infer<typeof UserEvents.updated.consumerSchema>,
9+
): Promise<Either<'retryLater', 'success'>> {
10+
_latestData = message.payload
11+
12+
return Promise.resolve({ result: 'success' })
13+
}

examples/sns-sqs/package.json

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
{
2+
"name": "@message-queue-toolkit/sns-sqs-examples",
3+
"version": "1.0.0",
4+
"description": "",
5+
"scripts": {
6+
"build": "tsc",
7+
"lint": "biome check . && tsc",
8+
"lint:fix": "biome check --write .",
9+
"docker:start": "docker compose up -d",
10+
"docker:stop": "docker compose down",
11+
"test": "vitest --coverage"
12+
},
13+
"type": "module",
14+
"repository": {
15+
"type": "git",
16+
"url": "https://github.com/kibertoad/message-queue-toolkit.git"
17+
},
18+
"dependencies": {
19+
"@aws-sdk/client-sns": "^3.812.0",
20+
"@aws-sdk/client-sts": "^3.812.0",
21+
"@aws-sdk/client-sqs": "^3.812.0",
22+
"@message-queue-toolkit/core": "^21.1.1",
23+
"@message-queue-toolkit/schemas": "^6.1.0",
24+
"@message-queue-toolkit/sns": "22.0.1",
25+
"@message-queue-toolkit/sqs": "21.0.1",
26+
"pino": "^9.7.0",
27+
"zod": "^3.24.4"
28+
},
29+
"devDependencies": {
30+
"@biomejs/biome": "1.9.4",
31+
"@lokalise/biome-config": "^2.0.0",
32+
"@types/node": "^22.15.18",
33+
"@lokalise/tsconfig": "^1.3.0",
34+
"@vitest/coverage-v8": "^3.1.3",
35+
"typescript": "^5.8.3",
36+
"vitest": "^3.1.3"
37+
},
38+
"private": true
39+
}

0 commit comments

Comments
 (0)