Skip to content
20 changes: 12 additions & 8 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import type {
ParseMessageResult,
PreHandlingOutputs,
Prehandler,
ProcessedMessageMetadata,
QueueConsumer,
QueueConsumerOptions,
TransactionObservabilityManager,
} from '@message-queue-toolkit/core'
import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core'
import type { ChannelModel, Message } from 'amqplib'

import type {
AMQPConsumerDependencies,
AMQPQueueCreationConfig,
Expand Down Expand Up @@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer<
// @ts-expect-error
const uniqueTransactionKey = parsedMessage[this.messageIdField]
this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey)
if (this.logMessages) {
const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType)
this.logMessage(resolvedLogMessage)
}
this.internalProcessMessage(parsedMessage, messageType)
.then((result) => {
if (result.result === 'success') {
Expand Down Expand Up @@ -267,9 +263,17 @@ export abstract class AbstractAmqpConsumer<
return this._messageSchemaContainer.resolveSchema(message)
}

protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.messageLogFormatter(message)
protected override resolveMessageLog(
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadType>,
): unknown | null {
if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) {
return null
}
const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType)
if (!handler.messageLogFormatter) {
return null
}
return handler.messageLogFormatter(processedMessageMetadata.message)
}

// eslint-disable-next-line max-params
Expand Down
6 changes: 0 additions & 6 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher<
return
}

if (this.logMessages) {
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)

try {
Expand Down
20 changes: 10 additions & 10 deletions packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,21 @@ describe('AmqpPermissionConsumer', () => {

await newConsumer.close()

expect(logger.loggedMessages.length).toBe(6)
expect(logger.loggedMessages.length).toBe(4)
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
{
id: '1',
messageType: 'add',
},
'timestamp not defined, adding it automatically',
expect.any(Object),
{
id: '1',
messageType: 'add',
timestamp: expect.any(String),
processedMessageMetadata: expect.objectContaining({
processingResult: { status: 'published' },
}),
},
{
processedMessageMetadata: expect.any(String),
processedMessageMetadata: expect.objectContaining({
messageId: '1',
messageType: 'add',
processingResult: { status: 'consumed' },
}),
},
])
})
Expand Down Expand Up @@ -164,6 +163,7 @@ describe('AmqpPermissionConsumer', () => {
id: '1',
messageType: 'add',
}),
messageMetadata: undefined,
},
])
})
Expand Down
17 changes: 12 additions & 5 deletions packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,20 @@ describe('PermissionPublisher', () => {
publisher.publish(message)

await waitAndRetry(() => {
return logger.loggedMessages.length === 2
return logger.loggedMessages.length === 3
})

expect(logger.loggedMessages[1]).toEqual({
id: '1',
messageType: 'add',
})
expect(logger.loggedMessages).toMatchObject([
'Propagating new connection across 0 receivers',
'timestamp not defined, adding it automatically',
{
processedMessageMetadata: expect.objectContaining({
messageId: '1',
messageType: 'add',
processingResult: { status: 'published' },
}),
},
])
})
})

Expand Down
61 changes: 45 additions & 16 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ export abstract class AbstractQueueService<
* Used to know the store-based message deduplication options
*/
protected readonly messageDeduplicationOptionsField: string
/**
* Used to know where metadata is stored - for debug logging purposes only
*/
protected readonly messageMetadataField: string
protected readonly errorReporter: ErrorReporter
public readonly logger: CommonLogger
protected readonly messageIdField: string
Expand Down Expand Up @@ -157,6 +161,7 @@ export abstract class AbstractQueueService<
this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId'
this.messageDeduplicationOptionsField =
options.messageDeduplicationOptionsField ?? 'deduplicationOptions'
this.messageMetadataField = options.messageMetadataField ?? 'metadata'
this.creationConfig = options.creationConfig
this.locatorConfig = options.locatorConfig
this.deletionConfig = options.deletionConfig
Expand Down Expand Up @@ -239,15 +244,36 @@ export abstract class AbstractQueueService<
/**
* Format message for logging
*/
protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown {
return message
protected resolveMessageLog(
_processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
): unknown | null {
return null
}

/**
* Log preformatted and potentially presanitized message payload
*/
protected logMessage(messageLogEntry: unknown) {
this.logger.debug(messageLogEntry)
protected logMessageProcessed(
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadSchemas>,
) {
const processedMessageMetadataLog = {
processingResult: processedMessageMetadata.processingResult,
messageId: processedMessageMetadata.messageId,
messageType: processedMessageMetadata.messageType,
queueName: processedMessageMetadata.queueName,
messageTimestamp: processedMessageMetadata.messageTimestamp,
messageDeduplicationId: processedMessageMetadata.messageDeduplicationId,
messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp,
messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp,
messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata),
}

const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata)

this.logger.debug(
{
processedMessageMetadata: processedMessageMetadataLog,
...(resolvedMessageLog ? { message: resolvedMessageLog } : {}),
},
`Finished processing message ${processedMessageMetadata.messageId}`,
)
}

protected handleError(err: unknown, context?: Record<string, unknown>) {
Expand Down Expand Up @@ -284,8 +310,8 @@ export abstract class AbstractQueueService<
messageType,
)

const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
if (!debugLoggingEnabled && !this.messageMetricsManager) return
const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug')
if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return

const processedMessageMetadata = this.resolveProcessedMessageMetadata(
message,
Expand All @@ -295,11 +321,8 @@ export abstract class AbstractQueueService<
params.queueName,
messageId,
)
if (debugLoggingEnabled) {
this.logger.debug(
{ processedMessageMetadata: stringValueSerializer(processedMessageMetadata) },
`Finished processing message ${processedMessageMetadata.messageId}`,
)
if (debugMessageLoggingEnabled) {
this.logMessageProcessed(processedMessageMetadata)
}
if (this.messageMetricsManager) {
this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata)
Expand All @@ -321,8 +344,13 @@ export abstract class AbstractQueueService<
const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined
const messageDeduplicationId =
message && this.messageDeduplicationIdField in message
? // @ts-ignore
message[this.messageDeduplicationId]
? // @ts-expect-error
message[this.messageDeduplicationIdField]
: undefined
const messageMetadata =
message && this.messageMetadataField in message
? // @ts-expect-error
message[this.messageMetadataField]
: undefined

return {
Expand All @@ -335,6 +363,7 @@ export abstract class AbstractQueueService<
messageDeduplicationId,
messageProcessingStartTimestamp,
messageProcessingEndTimestamp,
messageMetadata,
}
}

Expand Down
6 changes: 2 additions & 4 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ export type Prehandler<MessagePayloadSchema extends object, ExecutionContext, Pr
next: (result: PrehandlerResult) => void,
) => void

export const defaultLogFormatter = <MessagePayloadSchema>(message: MessagePayloadSchema) => message

export type HandlerConfigOptions<
MessagePayloadSchema extends object,
ExecutionContext,
Expand Down Expand Up @@ -98,7 +96,7 @@ export class MessageHandlerConfig<
PrehandlerOutput,
BarrierOutput
>
public readonly messageLogFormatter: LogFormatter<MessagePayloadSchema>
public readonly messageLogFormatter?: LogFormatter<MessagePayloadSchema>
public readonly preHandlerBarrier?: BarrierCallback<
MessagePayloadSchema,
ExecutionContext,
Expand Down Expand Up @@ -126,7 +124,7 @@ export class MessageHandlerConfig<
this.definition = eventDefinition
this.messageType = options?.messageType
this.handler = handler
this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove default formatter? if not set, we prefer not logging at all?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still always logging message metadata, and if custom formatter is provided, we are additionally including formatted message there. If it's not provided, we are skipping this additional property

See: https://github.com/kibertoad/message-queue-toolkit/pull/385/changes#diff-b683f7e6db8850fc5b1312907d63672464ca467c4f217a0b29ddc238d2fa88a5R253

this.messageLogFormatter = options?.messageLogFormatter
this.preHandlerBarrier = options?.preHandlerBarrier
this.preHandlers = options?.preHandlers ?? []
}
Expand Down
6 changes: 6 additions & 0 deletions packages/core/lib/types/queueOptionsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ export type ProcessedMessageMetadata<MessagePayloadSchemas extends object = obje
* ID used for the message deduplication, in case it's enabled
*/
messageDeduplicationId?: string

/**
* Message metadata (see ConsumerMessageMetadataType)
*/
messageMetadata?: Record<string, unknown>
}

export interface MessageMetricsManager<MessagePayloadSchemas extends object = object> {
Expand Down Expand Up @@ -113,6 +118,7 @@ export type CommonQueueOptions = {
messageTimestampField?: string
messageDeduplicationIdField?: string
messageDeduplicationOptionsField?: string
messageMetadataField?: string
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
logMessages?: boolean
deletionConfig?: DeletionConfig
Expand Down
14 changes: 0 additions & 14 deletions packages/core/test/queues/HandlerContainer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,20 +106,6 @@ describe('MessageHandlerConfigBuilder', () => {
expect(config.messageLogFormatter).toBe(messageLogFormatter)
expect(config.preHandlers).toEqual([])
})

it('should use default log formatter when not provided', () => {
const handler = () => Promise.resolve({ result: 'success' as const })

const config = new MessageHandlerConfig(USER_MESSAGE_SCHEMA, handler)

const testMessage: UserMessage = {
type: 'user.created',
userId: '123',
email: '[email protected]',
}

expect(config.messageLogFormatter(testMessage)).toEqual(testMessage)
})
})
})

Expand Down
15 changes: 12 additions & 3 deletions packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type Either, type ErrorResolver, isError } from '@lokalise/node-core'
import type {
MessageInvalidFormatError,
MessageValidationError,
ProcessedMessageMetadata,
ResolvedMessage,
} from '@message-queue-toolkit/core'
import {
Expand Down Expand Up @@ -861,9 +862,17 @@ export abstract class AbstractPubSubConsumer<
)
}

protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown {
const handler = this.handlerContainer.resolveHandler(messageType)
return handler.messageLogFormatter(message)
protected override resolveMessageLog(
processedMessageMetadata: ProcessedMessageMetadata<MessagePayloadType>,
): unknown | null {
if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) {
return null
}
const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType)
if (!handler.messageLogFormatter) {
return null
}
return handler.messageLogFormatter(processedMessageMetadata.message)
}

protected override isDeduplicationEnabledForMessage(message: MessagePayloadType): boolean {
Expand Down
10 changes: 0 additions & 10 deletions packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ export abstract class AbstractPubSubPublisher<MessagePayloadType extends object>
const messageProcessingStartTimestamp = Date.now()
const parsedMessage = messageSchemaResult.result.parse(message)

if (this.logMessages) {
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
this.logMessage(resolvedLogMessage)
}

message = this.updateInternalProperties(message)
const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => {
// Calculate message size for PubSub
Expand Down Expand Up @@ -163,8 +157,4 @@ export abstract class AbstractPubSubPublisher<MessagePayloadType extends object>
protected override resolveSchema(message: MessagePayloadType) {
return this.messageSchemaContainer.resolveSchema(message)
}

protected override resolveMessageLog(message: MessagePayloadType, _messageType: string): unknown {
return message
}
}
6 changes: 0 additions & 6 deletions packages/sns/lib/sns/AbstractSnsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ export abstract class AbstractSnsPublisher<MessagePayloadType extends object>
const topicName =
this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown'

if (this.logMessages) {
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
this.logMessage(resolvedLogMessage)
}

const updatedMessage = this.updateInternalProperties(message)

// Resolve FIFO options from original message BEFORE offloading
Expand Down
Loading
Loading