Skip to content

Commit 409805e

Browse files
committed
Merge branch 'main' into test/node-rdkafka_perf_test
# Conflicts: # packages/kafka/lib/test.spec.ts
2 parents 5de9632 + 74f7612 commit 409805e

File tree

71 files changed

+641
-335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+641
-335
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ on:
77
pull_request:
88

99
jobs:
10-
call-ci-flow:
10+
general:
1111
strategy:
1212
matrix:
13-
node-version: [18.x, 20.x, 22.x, 23.x]
13+
node-version: [20.x, 22.x, 24.x]
1414
package-name: [
1515
'@message-queue-toolkit/amqp',
1616
'@message-queue-toolkit/core',
@@ -28,7 +28,7 @@ jobs:
2828
package_name: ${{ matrix.package-name }}
2929

3030
automerge:
31-
needs: [ call-ci-flow ]
31+
needs: [ general ]
3232
runs-on: ubuntu-latest
3333
permissions:
3434
pull-requests: write

examples/sns-sqs/biome.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"$schema": "./node_modules/@biomejs/biome/configuration_schema.json",
3+
"extends": [
4+
"./node_modules/@lokalise/biome-config/configs/biome-base.jsonc",
5+
"./node_modules/@lokalise/biome-config/configs/biome-esm.jsonc",
6+
"./node_modules/@lokalise/biome-config/configs/biome-package.jsonc"
7+
]
8+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
services:
2+
3+
localstack:
4+
image: localstack/localstack:4.0.2
5+
network_mode: bridge
6+
hostname: localstack
7+
ports:
8+
- '127.0.0.1:4566:4566' # LocalStack Gateway
9+
- '127.0.0.1:4510-4559:4510-4559' # external services port range
10+
environment:
11+
- SERVICES=sns,sqs,s3,sts
12+
- DEBUG=0
13+
- DATA_DIR=${DATA_DIR-}
14+
- DOCKER_HOST=unix:///var/run/docker.sock
15+
- LOCALSTACK_HOST=localstack
16+
# - LOCALSTACK_API_KEY=someDummyKey
17+
volumes:
18+
- '${TMPDIR:-/tmp}/localstack:/var/log/localstack'
19+
- '/var/run/docker.sock:/var/run/docker.sock'
20+
restart: on-failure
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { afterEach, beforeEach, describe, expect, it } from 'vitest'
2+
import { userConsumer } from './common/Dependencies.ts'
3+
import { publisherManager } from './common/TestPublisherManager.ts'
4+
import { UserConsumer } from './common/UserConsumer.js'
5+
6+
describe('Publish message', () => {
7+
beforeEach(async () => {
8+
await publisherManager.initRegisteredPublishers([UserConsumer.SUBSCRIBED_TOPIC_NAME])
9+
await userConsumer.start()
10+
})
11+
12+
afterEach(async () => {
13+
await userConsumer.close()
14+
})
15+
16+
it('Publishes a message', async () => {
17+
await publisherManager.publish(UserConsumer.SUBSCRIBED_TOPIC_NAME, {
18+
type: 'user.created',
19+
payload: {
20+
id: '456',
21+
name: 'Jane Doe',
22+
},
23+
})
24+
25+
await publisherManager.publish(UserConsumer.SUBSCRIBED_TOPIC_NAME, {
26+
type: 'user.created',
27+
payload: {
28+
id: '123',
29+
name: 'John Doe',
30+
},
31+
})
32+
33+
const receivedMessage = await userConsumer.handlerSpy.waitForMessage({
34+
type: 'user.created',
35+
payload: {
36+
name: 'John Doe',
37+
},
38+
})
39+
40+
expect(receivedMessage.message.payload).toMatchInlineSnapshot(`
41+
{
42+
"id": "123",
43+
"name": "John Doe",
44+
}
45+
`)
46+
})
47+
})
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { SnsConsumerErrorResolver } from '@message-queue-toolkit/sns'
2+
import { beforeAll, describe, it } from 'vitest'
3+
import {
4+
errorReporter,
5+
logger,
6+
snsClient,
7+
sqsClient,
8+
stsClient,
9+
transactionObservabilityManager,
10+
} from './common/Dependencies.ts'
11+
import { publisherManager } from './common/TestPublisherManager.ts'
12+
import { UserConsumer } from './common/UserConsumer.js'
13+
14+
// This test suite illustrates the importance of only initting the consumers you need
15+
// to prevent test execution time from increasing with every new consumer added
16+
describe('Consumer init', () => {
17+
beforeAll(async () => {
18+
await publisherManager.initRegisteredPublishers([UserConsumer.SUBSCRIBED_TOPIC_NAME])
19+
})
20+
21+
it('Inits one consumer', async () => {
22+
const userConsumer = new UserConsumer({
23+
errorReporter,
24+
logger,
25+
transactionObservabilityManager,
26+
consumerErrorResolver: new SnsConsumerErrorResolver(),
27+
sqsClient,
28+
snsClient,
29+
stsClient,
30+
})
31+
32+
await userConsumer.start()
33+
await userConsumer.close()
34+
})
35+
36+
it('Inits twenty consumers', async () => {
37+
const consumers: UserConsumer[] = []
38+
for (let i = 0; i < 20; i++) {
39+
consumers.push(
40+
new UserConsumer({
41+
errorReporter,
42+
logger,
43+
transactionObservabilityManager,
44+
consumerErrorResolver: new SnsConsumerErrorResolver(),
45+
sqsClient,
46+
snsClient,
47+
stsClient,
48+
}),
49+
)
50+
}
51+
52+
for (const userConsumer of consumers) {
53+
await userConsumer.start()
54+
}
55+
56+
for (const userConsumer of consumers) {
57+
await userConsumer.close()
58+
}
59+
})
60+
})
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { SNSClient, type SNSClientConfig } from '@aws-sdk/client-sns'
2+
import { SQSClient } from '@aws-sdk/client-sqs'
3+
import { STSClient } from '@aws-sdk/client-sts'
4+
import { SnsConsumerErrorResolver } from '@message-queue-toolkit/sns'
5+
import { pino } from 'pino'
6+
import { UserConsumer } from './UserConsumer.ts'
7+
8+
export const TEST_AWS_CONFIG: SNSClientConfig = {
9+
endpoint: 'http://s3.localhost.localstack.cloud:4566',
10+
region: 'eu-west-1',
11+
credentials: {
12+
accessKeyId: 'access',
13+
secretAccessKey: 'secret',
14+
},
15+
}
16+
17+
export const sqsClient = new SQSClient(TEST_AWS_CONFIG)
18+
export const snsClient = new SNSClient(TEST_AWS_CONFIG)
19+
export const stsClient = new STSClient(TEST_AWS_CONFIG)
20+
21+
export const errorReporter = { report: () => {} }
22+
export const logger = pino()
23+
export const transactionObservabilityManager = {
24+
start: () => {},
25+
startWithGroup: () => {},
26+
stop: () => {},
27+
addCustomAttributes: () => {},
28+
}
29+
30+
export const userConsumer = new UserConsumer({
31+
errorReporter,
32+
logger,
33+
transactionObservabilityManager,
34+
consumerErrorResolver: new SnsConsumerErrorResolver(),
35+
sqsClient,
36+
snsClient,
37+
stsClient,
38+
})
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import {
2+
type SnsAwareEventDefinition,
3+
enrichMessageSchemaWithBaseStrict,
4+
} from '@message-queue-toolkit/schemas'
5+
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
6+
import { z } from 'zod/v3'
7+
8+
type AllConsumerMessageSchemas<MessageDefinitionTypes extends CommonEventDefinition[]> = z.infer<
9+
MessageDefinitionTypes[number]['consumerSchema']
10+
>
11+
12+
export const USER_SCHEMA = z.object({
13+
id: z.string(),
14+
name: z.string(),
15+
age: z.number().optional(),
16+
})
17+
18+
export const UserEvents = {
19+
created: {
20+
...enrichMessageSchemaWithBaseStrict('user.created', USER_SCHEMA, {
21+
description: 'User was created',
22+
}),
23+
schemaVersion: '1.0.1',
24+
producedBy: ['user-service'],
25+
domain: 'users',
26+
snsTopic: 'user',
27+
},
28+
29+
updated: {
30+
...enrichMessageSchemaWithBaseStrict('user.updated', USER_SCHEMA, {
31+
description: 'User was updated',
32+
}),
33+
schemaVersion: '1.0.1',
34+
producedBy: ['user-service'],
35+
domain: 'users',
36+
snsTopic: 'user',
37+
},
38+
} satisfies Record<string, SnsAwareEventDefinition>
39+
40+
export type UserEventsType = (typeof UserEvents)[keyof typeof UserEvents][]
41+
export type UserEventConsumerPayloadsType = AllConsumerMessageSchemas<UserEventsType>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import { CommonMetadataFiller, EventRegistry } from '@message-queue-toolkit/core'
2+
import type { AllPublisherMessageSchemas } from '@message-queue-toolkit/schemas'
3+
import { SnsPublisherManager } from '@message-queue-toolkit/sns'
4+
import type { CommonSnsPublisher } from '@message-queue-toolkit/sns'
5+
import { CommonSnsPublisherFactory } from '@message-queue-toolkit/sns'
6+
import { errorReporter, logger, snsClient, stsClient } from './Dependencies.ts'
7+
import { UserEvents, type UserEventsType } from './TestMessages.ts'
8+
9+
const isTest = true
10+
11+
type PublisherTypes = AllPublisherMessageSchemas<UserEventsType>
12+
13+
export const publisherManager = new SnsPublisherManager<
14+
CommonSnsPublisher<PublisherTypes>,
15+
UserEventsType
16+
>(
17+
{
18+
errorReporter,
19+
logger,
20+
eventRegistry: new EventRegistry(Object.values(UserEvents)),
21+
snsClient,
22+
stsClient,
23+
},
24+
{
25+
metadataFiller: new CommonMetadataFiller({
26+
serviceId: 'service',
27+
}),
28+
publisherFactory: new CommonSnsPublisherFactory(),
29+
newPublisherOptions: {
30+
handlerSpy: true,
31+
messageIdField: 'id',
32+
messageTypeField: 'type',
33+
deletionConfig: {
34+
deleteIfExists: isTest, // only enable this in tests
35+
// and ensure that the owning side is doing the deletion.
36+
// if it is enabled both on consumer and publisher side, you are likely to experience confusing behaviour
37+
},
38+
creationConfig: {
39+
updateAttributesIfExists: true,
40+
},
41+
},
42+
},
43+
)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core'
2+
import type { ConsumerMessageSchema } from '@message-queue-toolkit/schemas'
3+
import { AbstractSnsSqsConsumer, type SNSSQSConsumerDependencies } from '@message-queue-toolkit/sns'
4+
import { UserEvents } from './TestMessages.ts'
5+
import { userCreatedHandler } from './handlers/UserCreatedHandler.ts'
6+
import { userUpdatedHandler } from './handlers/UserUpdatedHandler.ts'
7+
8+
type SupportedMessages = ConsumerMessageSchema<
9+
typeof UserEvents.created | typeof UserEvents.updated
10+
>
11+
12+
// biome-ignore lint/complexity/noBannedTypes: to be expanded later
13+
type ExecutionContext = {}
14+
15+
const isTest = true
16+
17+
export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, ExecutionContext> {
18+
public static readonly CONSUMED_QUEUE_NAME = 'user-my_service'
19+
public static readonly SUBSCRIBED_TOPIC_NAME = 'user'
20+
21+
constructor(dependencies: SNSSQSConsumerDependencies) {
22+
super(
23+
dependencies,
24+
{
25+
handlerSpy: true,
26+
handlers: new MessageHandlerConfigBuilder<SupportedMessages, ExecutionContext>()
27+
.addConfig(UserEvents.created, userCreatedHandler, {})
28+
.addConfig(UserEvents.updated, userUpdatedHandler, {})
29+
.build(),
30+
messageTypeField: 'type',
31+
// Consumer creates its own queue
32+
creationConfig: {
33+
queue: {
34+
QueueName: UserConsumer.CONSUMED_QUEUE_NAME,
35+
},
36+
},
37+
deletionConfig: {
38+
deleteIfExists: isTest,
39+
},
40+
locatorConfig: {
41+
// Topic is created by a publisher, consumer relies on it already existing.
42+
// Note that in order for this to work correctly you need to ensure that
43+
// publisher gets initialized first. If consumer will initialize first,
44+
// publisher may delete already existing topic and subscription and break the setup
45+
topicName: UserConsumer.SUBSCRIBED_TOPIC_NAME,
46+
},
47+
// consumer creates its own subscription
48+
subscriptionConfig: {
49+
updateAttributesIfExists: false,
50+
},
51+
},
52+
{},
53+
)
54+
}
55+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { Either } from '@lokalise/node-core'
2+
import type { z } from 'zod/v3'
3+
import type { USER_SCHEMA, UserEvents } from '../TestMessages.ts'
4+
5+
let _latestData: z.infer<typeof USER_SCHEMA>
6+
7+
export function userCreatedHandler(
8+
message: z.infer<typeof UserEvents.created.consumerSchema>,
9+
): Promise<Either<'retryLater', 'success'>> {
10+
_latestData = message.payload
11+
12+
return Promise.resolve({ result: 'success' })
13+
}

0 commit comments

Comments
 (0)