Skip to content

Commit cf5fc8a

Browse files
authored
Always using validated message when possible (#316)
* Always using validated message when possible * Lint fix
1 parent 719d816 commit cf5fc8a

File tree

2 files changed

+6
-10
lines changed

2 files changed

+6
-10
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export abstract class AbstractKafkaConsumer<
169169
return this.commitMessage(message)
170170
}
171171

172-
const validatedMessage = parseResult.data
172+
const validatedMessage = { ...message, value: parseResult.data }
173173

174174
const requestContext = this.getRequestContext(message)
175175

@@ -179,33 +179,29 @@ export abstract class AbstractKafkaConsumer<
179179
// exponential backoff -> 2^(retry-1)
180180
if (retries > 0) await setTimeout(Math.pow(2, retries - 1))
181181

182-
consumed = await this.tryToConsume(
183-
{ ...message, value: validatedMessage },
184-
handler.handler,
185-
requestContext,
186-
)
182+
consumed = await this.tryToConsume(validatedMessage, handler.handler, requestContext)
187183
if (consumed) break
188184

189185
retries++
190186
} while (retries < MAX_IN_MEMORY_RETRIES)
191187

192188
if (consumed) {
193189
this.handleMessageProcessed({
194-
message: message,
190+
message: validatedMessage,
195191
processingResult: { status: 'consumed' },
196192
messageProcessingStartTimestamp,
197193
})
198194
} else {
199195
this.handleMessageProcessed({
200-
message: message,
196+
message: validatedMessage,
201197
processingResult: { status: 'error', errorReason: 'handlerError' },
202198
messageProcessingStartTimestamp,
203199
})
204200
}
205201

206202
this.transactionObservabilityManager?.stop(transactionId)
207203

208-
return this.commitMessage(message)
204+
return this.commitMessage(validatedMessage)
209205
}
210206

211207
private async tryToConsume<MessageValue extends object>(

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.3.1",
3+
"version": "0.3.2",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

0 commit comments

Comments
 (0)