diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 43192b26..1cbb8d4f 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -6,13 +6,13 @@ import type { ParseMessageResult, PreHandlingOutputs, Prehandler, + ProcessedMessageMetadata, QueueConsumer, QueueConsumerOptions, TransactionObservabilityManager, } from '@message-queue-toolkit/core' import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core' import type { ChannelModel, Message } from 'amqplib' - import type { AMQPConsumerDependencies, AMQPQueueCreationConfig, @@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } this.internalProcessMessage(parsedMessage, messageType) .then((result) => { if (result.result === 'success') { @@ -267,9 +263,17 @@ export abstract class AbstractAmqpConsumer< return this._messageSchemaContainer.resolveSchema(message) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } // eslint-disable-next-line max-params diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 6366d5ae..e9b7aefd 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher< return } - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) try { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index a15a2056..0ee623cc 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -95,22 +95,21 @@ describe('AmqpPermissionConsumer', () => { await newConsumer.close() - expect(logger.loggedMessages.length).toBe(6) + expect(logger.loggedMessages.length).toBe(4) expect(logger.loggedMessages).toMatchObject([ 'Propagating new connection across 0 receivers', - { - id: '1', - messageType: 'add', - }, 'timestamp not defined, adding it automatically', - expect.any(Object), { - id: '1', - messageType: 'add', - timestamp: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + processingResult: { status: 'published' }, + }), }, { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'consumed' }, + }), }, ]) }) @@ -164,6 +163,7 @@ describe('AmqpPermissionConsumer', () => { id: '1', messageType: 'add', }), + messageMetadata: undefined, }, ]) }) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index a7db562f..3822aa82 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -47,13 +47,20 @@ describe('PermissionPublisher', () => { publisher.publish(message) await waitAndRetry(() => { - return logger.loggedMessages.length === 2 + return logger.loggedMessages.length === 3 }) - expect(logger.loggedMessages[1]).toEqual({ - id: '1', - messageType: 'add', - }) + expect(logger.loggedMessages).toMatchObject([ + 'Propagating new connection across 0 receivers', + 'timestamp not defined, adding it automatically', + { + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'published' }, + }), + }, + ]) }) }) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 4a9e6a39..2da2573e 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -114,6 +114,10 @@ export abstract class AbstractQueueService< * Used to know the store-based message deduplication options */ protected readonly messageDeduplicationOptionsField: string + /** + * Used to know where metadata is stored - for debug logging purposes only + */ + protected readonly messageMetadataField: string protected readonly errorReporter: ErrorReporter public readonly logger: CommonLogger protected readonly messageIdField: string @@ -157,6 +161,7 @@ export abstract class AbstractQueueService< this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId' this.messageDeduplicationOptionsField = options.messageDeduplicationOptionsField ?? 'deduplicationOptions' + this.messageMetadataField = options.messageMetadataField ?? 'metadata' this.creationConfig = options.creationConfig this.locatorConfig = options.locatorConfig this.deletionConfig = options.deletionConfig @@ -239,15 +244,36 @@ export abstract class AbstractQueueService< /** * Format message for logging */ - protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown { - return message + protected resolveMessageLog( + _processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + return null } - /** - * Log preformatted and potentially presanitized message payload - */ - protected logMessage(messageLogEntry: unknown) { - this.logger.debug(messageLogEntry) + protected logMessageProcessed( + processedMessageMetadata: ProcessedMessageMetadata, + ) { + const processedMessageMetadataLog = { + processingResult: processedMessageMetadata.processingResult, + messageId: processedMessageMetadata.messageId, + messageType: processedMessageMetadata.messageType, + queueName: processedMessageMetadata.queueName, + messageTimestamp: processedMessageMetadata.messageTimestamp, + messageDeduplicationId: processedMessageMetadata.messageDeduplicationId, + messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp, + messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp, + messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata), + } + + const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata) + + this.logger.debug( + { + processedMessageMetadata: processedMessageMetadataLog, + ...(resolvedMessageLog ? { message: resolvedMessageLog } : {}), + }, + `Finished processing message ${processedMessageMetadata.messageId}`, + ) } protected handleError(err: unknown, context?: Record) { @@ -284,8 +310,8 @@ export abstract class AbstractQueueService< messageType, ) - const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') - if (!debugLoggingEnabled && !this.messageMetricsManager) return + const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') + if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return const processedMessageMetadata = this.resolveProcessedMessageMetadata( message, @@ -295,11 +321,8 @@ export abstract class AbstractQueueService< params.queueName, messageId, ) - if (debugLoggingEnabled) { - this.logger.debug( - { processedMessageMetadata: stringValueSerializer(processedMessageMetadata) }, - `Finished processing message ${processedMessageMetadata.messageId}`, - ) + if (debugMessageLoggingEnabled) { + this.logMessageProcessed(processedMessageMetadata) } if (this.messageMetricsManager) { this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata) @@ -321,8 +344,13 @@ export abstract class AbstractQueueService< const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined const messageDeduplicationId = message && this.messageDeduplicationIdField in message - ? // @ts-ignore - message[this.messageDeduplicationId] + ? // @ts-expect-error + message[this.messageDeduplicationIdField] + : undefined + const messageMetadata = + message && this.messageMetadataField in message + ? // @ts-expect-error + message[this.messageMetadataField] : undefined return { @@ -335,6 +363,7 @@ export abstract class AbstractQueueService< messageDeduplicationId, messageProcessingStartTimestamp, messageProcessingEndTimestamp, + messageMetadata, } } diff --git a/packages/core/lib/queues/HandlerContainer.ts b/packages/core/lib/queues/HandlerContainer.ts index cbfb8c0a..9beeefc4 100644 --- a/packages/core/lib/queues/HandlerContainer.ts +++ b/packages/core/lib/queues/HandlerContainer.ts @@ -55,8 +55,6 @@ export type Prehandler void, ) => void -export const defaultLogFormatter = (message: MessagePayloadSchema) => message - export type HandlerConfigOptions< MessagePayloadSchema extends object, ExecutionContext, @@ -98,7 +96,7 @@ export class MessageHandlerConfig< PrehandlerOutput, BarrierOutput > - public readonly messageLogFormatter: LogFormatter + public readonly messageLogFormatter?: LogFormatter public readonly preHandlerBarrier?: BarrierCallback< MessagePayloadSchema, ExecutionContext, @@ -126,7 +124,7 @@ export class MessageHandlerConfig< this.definition = eventDefinition this.messageType = options?.messageType this.handler = handler - this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter + this.messageLogFormatter = options?.messageLogFormatter this.preHandlerBarrier = options?.preHandlerBarrier this.preHandlers = options?.preHandlers ?? [] } diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index 9100af14..d9d8a508 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -62,6 +62,11 @@ export type ProcessedMessageMetadata } export interface MessageMetricsManager { @@ -113,6 +118,7 @@ export type CommonQueueOptions = { messageTimestampField?: string messageDeduplicationIdField?: string messageDeduplicationOptionsField?: string + messageMetadataField?: string handlerSpy?: HandlerSpy | HandlerSpyParams | boolean logMessages?: boolean deletionConfig?: DeletionConfig diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index fad743cf..a20c2a9e 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -106,20 +106,6 @@ describe('MessageHandlerConfigBuilder', () => { expect(config.messageLogFormatter).toBe(messageLogFormatter) expect(config.preHandlers).toEqual([]) }) - - it('should use default log formatter when not provided', () => { - const handler = () => Promise.resolve({ result: 'success' as const }) - - const config = new MessageHandlerConfig(USER_MESSAGE_SCHEMA, handler) - - const testMessage: UserMessage = { - type: 'user.created', - userId: '123', - email: 'test@example.com', - } - - expect(config.messageLogFormatter(testMessage)).toEqual(testMessage) - }) }) }) diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 68fff02f..5be4fff8 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -2,6 +2,7 @@ import { type Either, type ErrorResolver, isError } from '@lokalise/node-core' import type { MessageInvalidFormatError, MessageValidationError, + ProcessedMessageMetadata, ResolvedMessage, } from '@message-queue-toolkit/core' import { @@ -861,9 +862,17 @@ export abstract class AbstractPubSubConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override isDeduplicationEnabledForMessage(message: MessagePayloadType): boolean { diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index d9484b43..5927afa0 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -68,12 +68,6 @@ export abstract class AbstractPubSubPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => { // Calculate message size for PubSub @@ -163,8 +157,4 @@ export abstract class AbstractPubSubPublisher protected override resolveSchema(message: MessagePayloadType) { return this.messageSchemaContainer.resolveSchema(message) } - - protected override resolveMessageLog(message: MessagePayloadType, _messageType: string): unknown { - return message - } } diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index cc32d0e3..f6c14ad4 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -125,12 +125,6 @@ export abstract class AbstractSnsPublisher const topicName = this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown' - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - const updatedMessage = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 42b7b8d7..d59d31c6 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -5,6 +5,7 @@ import { SetQueueAttributesCommand, } from '@aws-sdk/client-sqs' import type { Either, ErrorResolver } from '@lokalise/node-core' +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' import { type BarrierResult, type DeadLetterQueueOptions, @@ -25,7 +26,6 @@ import { import type { ConsumerOptions } from 'sqs-consumer' import { Consumer } from 'sqs-consumer' import type { ZodSchema } from 'zod/v4' - import type { SQSMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' import { deleteSqs, initSqs } from '../utils/sqsInitter.ts' @@ -384,10 +384,7 @@ export abstract class AbstractSqsConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } + const result: Either<'retryLater' | Error, 'success'> = await this.internalProcessMessage( parsedMessage, messageType, @@ -835,9 +832,17 @@ export abstract class AbstractSqsConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override resolveMessage(message: SQSMessage) { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index 620d821c..f4ed02d4 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -117,12 +117,6 @@ export abstract class AbstractSqsPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 851d764f..13448db0 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -443,15 +443,16 @@ describe('SqsPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') - expect(logger.loggedMessages.length).toBe(2) + expect(logger.loggedMessages.length).toBe(1) expect(logger.loggedMessages).toMatchObject([ { - id: '1', - messageType: 'add', - timestamp: expect.any(String), - }, - { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { + status: 'consumed', + }, + }), }, ]) await newConsumer.close() @@ -511,6 +512,9 @@ describe('SqsPermissionConsumer', () => { messageProcessingStartTimestamp: expect.any(Number), messageProcessingEndTimestamp: expect.any(Number), queueName: SqsPermissionConsumer.QUEUE_NAME, + messageMetadata: { + schemaVersions: '1.0.0', + }, message: expect.objectContaining({ id: '1', messageType: 'add',