Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 13 additions & 25 deletions packages/kafka/lib/AbstractKafkaConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,12 @@ export abstract class AbstractKafkaConsumer<
if (!this.consumerStream && !this.messageBatchStream) return false
try {
return this.consumer.isConnected()
/* v8 ignore start */
} catch (_) {
// this should not happen, but if so it means the consumer is not healthy
/* v8 ignore next */
return false
}
/* v8 ignore stop */
}

/**
Expand All @@ -165,11 +166,12 @@ export abstract class AbstractKafkaConsumer<
if (!this.consumerStream && !this.messageBatchStream) return false
try {
return this.consumer.isActive()
/* v8 ignore start */
} catch (_) {
// this should not happen, but if so it means the consumer is not healthy
/* v8 ignore next */
return false
}
/* v8 ignore stop */
}

async init(): Promise<void> {
Expand All @@ -188,14 +190,19 @@ export abstract class AbstractKafkaConsumer<
})

this.consumerStream = await this.consumer.consume({ ...consumeOptions, topics })
this.consumerStream.on('error', (error) => this.handlerError(error))

if (this.options.batchProcessingEnabled && this.options.batchProcessingOptions) {
this.messageBatchStream = new KafkaMessageBatchStream<
DeserializedMessage<SupportedMessageValues<TopicsConfig>>
>({
batchSize: this.options.batchProcessingOptions.batchSize,
timeoutMilliseconds: this.options.batchProcessingOptions.timeoutMilliseconds,
})
>(
(batch) =>
this.consume(batch.topic, batch.messages).catch((error) => this.handlerError(error)),
this.options.batchProcessingOptions,
)
this.consumerStream.pipe(this.messageBatchStream)
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
}
} catch (error) {
throw new InternalError({
Expand All @@ -204,14 +211,6 @@ export abstract class AbstractKafkaConsumer<
cause: error,
})
}

if (this.options.batchProcessingEnabled && this.messageBatchStream) {
this.handleSyncStreamBatch(this.messageBatchStream).catch((error) => this.handlerError(error))
} else {
this.handleSyncStream(this.consumerStream).catch((error) => this.handlerError(error))
}

this.consumerStream.on('error', (error) => this.handlerError(error))
}

private async handleSyncStream(
Expand All @@ -224,16 +223,6 @@ export abstract class AbstractKafkaConsumer<
)
}
}
private async handleSyncStreamBatch(
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
): Promise<void> {
for await (const messageBatch of stream) {
await this.consume(
messageBatch.topic,
messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
)
}
}

async close(): Promise<void> {
if (!this.consumerStream && !this.messageBatchStream) {
Expand Down Expand Up @@ -291,7 +280,6 @@ export abstract class AbstractKafkaConsumer<
const firstMessage = validMessages[0]!
const requestContext = this.getRequestContext(firstMessage)

/* v8 ignore next */
const transactionId = randomUUID()
this.transactionObservabilityManager?.start(this.buildTransactionName(topic), transactionId)

Expand Down
175 changes: 130 additions & 45 deletions packages/kafka/lib/utils/KafkaMessageBatchStream.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'node:timers/promises'
import { waitAndRetry } from '@lokalise/universal-ts-utils/node'
import { KafkaMessageBatchStream, type MessageBatch } from './KafkaMessageBatchStream.ts'

describe('KafkaMessageBatchStream', () => {
Expand All @@ -12,22 +14,27 @@ describe('KafkaMessageBatchStream', () => {
}))

// When
const batchStream = new KafkaMessageBatchStream<any>({
batchSize: 3,
timeoutMilliseconds: 10000,
}) // Setting big timeout to check batch size only

const receivedBatches: MessageBatch<any>[] = []

const dataFetchingPromise = new Promise((resolve) => {
batchStream.on('data', (batch) => {
let resolvePromise: () => void
const dataFetchingPromise = new Promise<void>((resolve) => {
resolvePromise = resolve
})

const batchStream = new KafkaMessageBatchStream<any>(
(batch) => {
receivedBatches.push(batch)
// We expect 3 batches and last message waiting in the stream
// We expect 3 batches and the last message waiting in the stream
if (receivedBatches.length >= 3) {
resolve(null)
resolvePromise()
}
})
})
return Promise.resolve()
},
{
batchSize: 3,
timeoutMilliseconds: 10000,
},
) // Setting big timeout to check batch size only

for (const message of messages) {
batchStream.write(message)
Expand All @@ -54,24 +61,25 @@ describe('KafkaMessageBatchStream', () => {
}))

// When
const batchStream = new KafkaMessageBatchStream<any>({
batchSize: 1000,
timeoutMilliseconds: 500,
}) // Setting big batch size to check timeout only

const receivedBatches: MessageBatch<any>[] = []
batchStream.on('data', (batch) => {
receivedBatches.push(batch)
})

const batchStream = new KafkaMessageBatchStream<any>(
(batch) => {
receivedBatches.push(batch)
return Promise.resolve()
},
{
batchSize: 1000,
timeoutMilliseconds: 100,
},
) // Setting big batch size to check timeout only

for (const message of messages) {
batchStream.write(message)
}

// Sleep 1 seconds to let the timeout trigger
await new Promise((resolve) => {
setTimeout(resolve, 1000)
})
// Sleep to let the timeout trigger
await setTimeout(150)

// Then
expect(receivedBatches).toEqual([{ topic, partition: 0, messages }])
Expand Down Expand Up @@ -104,16 +112,16 @@ describe('KafkaMessageBatchStream', () => {
]

// When
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({
batchSize: 2,
timeoutMilliseconds: 10000,
}) // Setting big timeout to check batch size only

const receivedBatchesByTopicPartition: Record<string, any[][]> = {}

let receivedMessagesCounter = 0
const dataFetchingPromise = new Promise((resolve) => {
batchStream.on('data', (batch) => {

let resolvePromise: () => void
const dataFetchingPromise = new Promise<void>((resolve) => {
resolvePromise = resolve
})

const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>(
(batch) => {
const key = `${batch.topic}:${batch.partition}`
if (!receivedBatchesByTopicPartition[key]) {
receivedBatchesByTopicPartition[key] = []
Expand All @@ -123,10 +131,16 @@ describe('KafkaMessageBatchStream', () => {
// We expect 5 batches and last message waiting in the stream
receivedMessagesCounter++
if (receivedMessagesCounter >= 5) {
resolve(null)
resolvePromise()
}
})
})

return Promise.resolve()
},
{
batchSize: 2,
timeoutMilliseconds: 10000,
},
) // Setting big timeout to check batch size only

for (const message of messages) {
batchStream.write(message)
Expand Down Expand Up @@ -177,25 +191,31 @@ describe('KafkaMessageBatchStream', () => {
]

// When
const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>({
batchSize: 2,
timeoutMilliseconds: 10000,
}) // Setting big timeout to check batch size only

const receivedBatches: any[] = []

let receivedBatchesCounter = 0
const dataFetchingPromise = new Promise((resolve) => {
batchStream.on('data', (batch) => {

let resolvePromise: () => void
const dataFetchingPromise = new Promise<void>((resolve) => {
resolvePromise = resolve
})

const batchStream = new KafkaMessageBatchStream<{ topic: string; partition: number }>(
(batch) => {
receivedBatches.push(batch)

// We expect 4 batches (2 per partition)
receivedBatchesCounter++
if (receivedBatchesCounter >= 4) {
resolve(null)
resolvePromise()
}
})
})

return Promise.resolve()
},
{
batchSize: 2,
timeoutMilliseconds: 10000,
},
) // Setting big timeout to check batch size only

for (const message of messages) {
batchStream.write(message)
Expand All @@ -211,4 +231,69 @@ describe('KafkaMessageBatchStream', () => {
{ topic, partition: 1, messages: [messages[5], messages[7]] },
])
})

it('should handle backpressure correctly when timeout flush is slow', async () => {
// Given
const topic = 'test-topic'
const messages = Array.from({ length: 6 }, (_, i) => ({
id: i + 1,
content: `Message ${i + 1}`,
topic,
partition: 0,
}))

const batchStartTimes: number[] = [] // Track start times of batch processing
const batchEndTimes: number[] = [] // Track end times of batch processing
const batchMessageCounts: number[] = [] // Track number of messages per batch
let maxConcurrentBatches = 0 // Track max concurrent batches

let batchesProcessing = 0
const batchStream = new KafkaMessageBatchStream<any>(
async (batch) => {
batchStartTimes.push(Date.now())
batchMessageCounts.push(batch.messages.length)

batchesProcessing++
maxConcurrentBatches = Math.max(maxConcurrentBatches, batchesProcessing)

// Simulate batch processing (50ms per batch)
await setTimeout(50)

batchEndTimes.push(Date.now())
batchesProcessing--
},
{
batchSize: 1000, // Large batch size to never trigger size-based flushing
timeoutMilliseconds: 10, // Short timeout to trigger flush after each message
},
)

// When: Write messages with 20ms delay between them
// Since processing (50ms) is slower than message arrival + timeout, backpressure causes accumulation
for (const message of messages) {
batchStream.write(message)
await setTimeout(20)
}

// Then
// Wait until all 3 batches have been processed
await waitAndRetry(() => batchMessageCounts.length >= 3, 500, 20)

// Backpressure causes messages to accumulate while previous batch processes:
// - Batch 1: Message 1 (flushed at 10ms timeout)
// - Batch 2: Messages 2-4 (accumulated during Batch 1 processing, including Message 4 arriving at ~60ms)
// - Batch 3: Messages 5-6 (accumulated during Batch 2 processing)
expect(batchMessageCounts).toEqual([1, 3, 2])

// Verify that batches never processed in parallel (backpressure working)
expect(maxConcurrentBatches).toBe(1) // Should never process more than 1 batch at a time

// Verify that batches were processed sequentially (each starts after previous ends)
for (let i = 1; i < batchStartTimes.length; i++) {
const previousEndTime = batchEndTimes[i - 1]
const currentStartTime = batchStartTimes[i]
// The current batch must start after the previous batch finished
expect(currentStartTime).toBeGreaterThanOrEqual(previousEndTime ?? 0)
}
})
})
Loading
Loading