diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 84098cd18c..5716457bfd 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -1,4 +1,4 @@ -import { Kafka, ProducerRecord, Partitioners, SASLOptions, KafkaConfig, KafkaJSError, Producer } from 'kafkajs' +import { Kafka, Partitioners, SASLOptions, KafkaConfig, KafkaJSError, Producer } from 'kafkajs' import { DynamicFieldResponse, IntegrationError, Features } from '@segment/actions-core' import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' @@ -196,36 +196,35 @@ export const sendData = async ( ) => { validate(settings) - const groupedPayloads: { [topic: string]: Payload[] } = {} const set = new Set() payload.forEach((p) => { const { topic, partition, default_partition } = p - if (!groupedPayloads[topic]) { - groupedPayloads[topic] = [] - } - groupedPayloads[topic].push(p) set.add(`${topic}-${partition}-${default_partition}`) }) + const topic = payload[0].topic + if (!topic) { + throw new IntegrationError('Topic is required', 'TOPIC_REQUIRED', 400) + } if (statsContext) { const { statsClient, tags } = statsContext statsClient?.histogram('kafka.configurable_batch_keys.unique_keys', set.size, tags) // Add stats to track batch keys for kafka } - const topicMessages: TopicMessages[] = Object.keys(groupedPayloads).map((topic) => ({ + const topicMessages: TopicMessages = { topic, - messages: groupedPayloads[topic].map( - (payload) => + messages: payload.map( + (message) => ({ - value: JSON.stringify(payload.payload), - key: payload.key, - headers: payload?.headers ?? undefined, - partition: payload?.partition ?? payload?.default_partition ?? undefined, + value: JSON.stringify(message.payload), + key: message.key, + headers: message?.headers ?? undefined, + partition: message?.partition ?? message?.default_partition ?? undefined, partitionerType: DEFAULT_PARTITIONER } as Message) ) - })) + } let producer: Producer try { @@ -248,17 +247,11 @@ export const sendData = async ( } } - for (const data of topicMessages) { - try { - await producer.send(data as ProducerRecord) - } catch (error) { - const kafkaError = getKafkaError(error as Error) - throw new IntegrationError( - `Kafka Producer Error - ${kafkaError.name}: ${kafkaError.message}`, - kafkaError.name, - 500 - ) - } + try { + await producer.send(topicMessages) + } catch (error) { + const kafkaError = getKafkaError(error as Error) + throw new IntegrationError(`Kafka Producer Error - ${kafkaError.name}: ${kafkaError.message}`, kafkaError.name, 500) } if (features && features[FLAGON_NAME]) {