diff --git a/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts b/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts index 4dcf3690bd..b5b444eb84 100644 --- a/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts +++ b/packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts @@ -57,7 +57,8 @@ const testData = { }, mapping: { topic: 'test-topic', - payload: { '@path': '$.' } + payload: { '@path': '$.' }, + batch_bytes: 1000000 } } diff --git a/packages/destination-actions/src/destinations/kafka/send/generated-types.ts b/packages/destination-actions/src/destinations/kafka/send/generated-types.ts index 8a68e951c4..9736968bdc 100644 --- a/packages/destination-actions/src/destinations/kafka/send/generated-types.ts +++ b/packages/destination-actions/src/destinations/kafka/send/generated-types.ts @@ -37,4 +37,8 @@ export interface Payload { * The keys to use for batching the events. */ batch_keys?: string[] + /** + * The number of bytes to batch together. Default is 1MB. Maximum value varies by kafka cluster. The less you batch, the more requests will be sent to your Kafka cluster. + */ + batch_bytes: number } diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts index 190e8a0320..5ac08807fe 100644 --- a/packages/destination-actions/src/destinations/kafka/send/index.ts +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -58,6 +58,16 @@ const action: ActionDefinition = { required: false, multiple: true, default: ['topic', 'partition', 'default_partition'] + }, + batch_bytes: { + type: 'number', + label: 'Batch Bytes', + description: + 'The number of bytes to batch together. Default is 1MB. Maximum value varies by kafka cluster. The less you batch, the more requests will be sent to your Kafka cluster.', + default: 1000000, // 1MB, + required: false, + minimum: 1, + unsafe_hidden: false } }, dynamicFields: { diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 84098cd18c..a2dc2324eb 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -207,11 +207,10 @@ export const sendData = async ( groupedPayloads[topic].push(p) set.add(`${topic}-${partition}-${default_partition}`) }) - if (statsContext) { - const { statsClient, tags } = statsContext - statsClient?.histogram('kafka.configurable_batch_keys.unique_keys', set.size, tags) - // Add stats to track batch keys for kafka - } + const statsClient = statsContext?.statsClient + const tags = statsContext?.tags + statsClient?.histogram('kafka.configurable_batch_keys.unique_keys', set.size, tags) + // Add stats to track batch keys for kafka const topicMessages: TopicMessages[] = Object.keys(groupedPayloads).map((topic) => ({ topic, @@ -250,6 +249,8 @@ export const sendData = async ( for (const data of topicMessages) { try { + const batch_bytes = Buffer.byteLength(JSON.stringify(data.messages), 'utf8') + statsClient?.histogram('kafka.batch_bytes', batch_bytes, tags) await producer.send(data as ProducerRecord) } catch (error) { const kafkaError = getKafkaError(error as Error)