From 7ac2e2cce5ec80b2076728871e9376434bc469de Mon Sep 17 00:00:00 2001 From: Arijit Ray <35370469+itsarijitray@users.noreply.github.com> Date: Tue, 9 Sep 2025 18:20:50 +0530 Subject: [PATCH 1/3] Add lru cache --- .../src/destinations/kafka/constants.ts | 4 +- .../src/destinations/kafka/send/index.ts | 8 ++-- .../src/destinations/kafka/utils.ts | 47 +++++++++++++++++-- 3 files changed, 50 insertions(+), 9 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/constants.ts b/packages/destination-actions/src/destinations/kafka/constants.ts index 2fa9fe172d3..c1ea41dbcba 100644 --- a/packages/destination-actions/src/destinations/kafka/constants.ts +++ b/packages/destination-actions/src/destinations/kafka/constants.ts @@ -2,4 +2,6 @@ export const PRODUCER_TTL_MS = Number(process.env.KAFKA_PRODUCER_TTL_MS) || 0.5 export const PRODUCER_REQUEST_TIMEOUT_MS = Number(process.env.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS) || 10 * 1000 // defaults to 10 seconds -export const FLAGON_NAME = 'actions-kafka-optimize-connection' \ No newline at end of file +export const FLAGON_NAME = 'actions-kafka-optimize-connection' + +export const CONNECTIONS_CACHE_SIZE = 500 diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts index 190e8a03208..01d7cad1f7a 100644 --- a/packages/destination-actions/src/destinations/kafka/send/index.ts +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -65,11 +65,11 @@ const action: ActionDefinition = { return getTopics(settings) } }, - perform: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, [payload], features, statsContext) + perform: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => { + await sendData(settings, [payload], features, statsContext, subscriptionMetadata) }, - performBatch: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, payload, features, statsContext) + performBatch: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => { + await sendData(settings, payload, features, statsContext, subscriptionMetadata) } } diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 78c0ccab59a..712a9984f6f 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -3,8 +3,9 @@ import { DynamicFieldResponse, IntegrationError, Features } from '@segment/actio import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' import { DEFAULT_PARTITIONER, Message, TopicMessages, SSLConfig, CachedProducer } from './types' -import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME } from './constants' -import { StatsContext } from '@segment/actions-core/destination-kit' +import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME, CONNECTIONS_CACHE_SIZE } from './constants' +import { StatsContext, SubscriptionMetadata } from '@segment/actions-core/destination-kit' +import { LRUCache } from 'lru-cache' export const producersByConfig: Record = {} @@ -177,11 +178,49 @@ export const getOrCreateProducer = async ( return producer } +const connectionsCache = new LRUCache({ + max: CONNECTIONS_CACHE_SIZE, + ttl: PRODUCER_TTL_MS +}) + +const kafkaProducerCache = new Map() + +export const getOrCreateProducerLRU = async ( + settings: Settings, + statsContext: StatsContext | undefined, + subscriptionMetadata?: SubscriptionMetadata +): Promise => { + const key = subscriptionMetadata?.destinationConfigId ?? '' + const isCachedProducer = connectionsCache.get(key) + const cached = kafkaProducerCache.get(key) + + statsContext?.statsClient?.incr('kafka_connection_cache_size', connectionsCache.size, statsContext?.tags) + + if (isCachedProducer && cached) { + statsContext?.statsClient?.incr('kafka_connection_reused', 1, statsContext?.tags) + await cached?.connect() // this is idempotent, so is safe + return cached + } else { + statsContext?.statsClient?.incr('kafka_connection_closed', 1, statsContext?.tags) + kafkaProducerCache.delete(key) + await cached?.disconnect() + } + + const kafka = getKafka(settings) + const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }) + await producer.connect() + statsContext?.statsClient?.incr('kafka_connection_opened', 1, statsContext?.tags) + kafkaProducerCache.set(key, producer) + connectionsCache.set(key, true) + return producer +} + export const sendData = async ( settings: Settings, payload: Payload[], features: Features | undefined, - statsContext: StatsContext | undefined + statsContext: StatsContext | undefined, + subscriptionMetadata?: SubscriptionMetadata ) => { validate(settings) @@ -218,7 +257,7 @@ export const sendData = async ( let producer: Producer if (features && features[FLAGON_NAME]) { - producer = await getOrCreateProducer(settings, statsContext) + producer = await getOrCreateProducerLRU(settings, statsContext, subscriptionMetadata) } else { producer = getProducer(settings) await producer.connect() From 5c7d3a50148379925fa56fc157392542c012832c Mon Sep 17 00:00:00 2001 From: Arijit Ray <35370469+itsarijitray@users.noreply.github.com> Date: Tue, 9 Sep 2025 18:59:33 +0530 Subject: [PATCH 2/3] Add lru cache --- .../destination-actions/src/destinations/kafka/utils.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 712a9984f6f..12416111864 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -275,12 +275,7 @@ export const sendData = async ( } } - if (features && features[FLAGON_NAME]) { - const key = serializeKafkaConfig(settings) - if (producersByConfig[key]) { - producersByConfig[key].lastUsed = Date.now() - } - } else { + if (!features || !features?.[FLAGON_NAME]) { await producer.disconnect() } } From 4886d977793089ba681f3101a7d8e55b1e4af2cb Mon Sep 17 00:00:00 2001 From: Arijit Ray <35370469+itsarijitray@users.noreply.github.com> Date: Wed, 10 Sep 2025 23:18:21 +0530 Subject: [PATCH 3/3] Add LRU dispose hook --- .../src/destinations/kafka/utils.ts | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 12416111864..62c8598bf2a 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -178,40 +178,41 @@ export const getOrCreateProducer = async ( return producer } -const connectionsCache = new LRUCache({ +const connectionsCache = new LRUCache({ max: CONNECTIONS_CACHE_SIZE, - ttl: PRODUCER_TTL_MS + ttl: 500, + dispose: (value, _key, _reason) => { + if (value) { + void value.disconnect().then(() => console.log('Kafka producer disconnected from cache eviction')) + } + } }) -const kafkaProducerCache = new Map() +// const kafkaProducerCache = new Map() export const getOrCreateProducerLRU = async ( settings: Settings, statsContext: StatsContext | undefined, - subscriptionMetadata?: SubscriptionMetadata + subscriptionMetadata: SubscriptionMetadata | undefined ): Promise => { const key = subscriptionMetadata?.destinationConfigId ?? '' - const isCachedProducer = connectionsCache.get(key) - const cached = kafkaProducerCache.get(key) + const cachedProducer = connectionsCache.get(key) + // const cached = kafkaProducerCache.get(key) statsContext?.statsClient?.incr('kafka_connection_cache_size', connectionsCache.size, statsContext?.tags) - if (isCachedProducer && cached) { + if (cachedProducer) { statsContext?.statsClient?.incr('kafka_connection_reused', 1, statsContext?.tags) - await cached?.connect() // this is idempotent, so is safe - return cached - } else { - statsContext?.statsClient?.incr('kafka_connection_closed', 1, statsContext?.tags) - kafkaProducerCache.delete(key) - await cached?.disconnect() + await cachedProducer.connect() // this is idempotent, so is safe + return cachedProducer } const kafka = getKafka(settings) const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }) await producer.connect() statsContext?.statsClient?.incr('kafka_connection_opened', 1, statsContext?.tags) - kafkaProducerCache.set(key, producer) - connectionsCache.set(key, true) + // kafkaProducerCache.set(key, producer) + connectionsCache.set(key, producer) return producer }