Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ export const PRODUCER_TTL_MS = Number(process.env.KAFKA_PRODUCER_TTL_MS) || 0.5

export const PRODUCER_REQUEST_TIMEOUT_MS = Number(process.env.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS) || 10 * 1000 // defaults to 10 seconds

export const FLAGON_NAME = 'actions-kafka-optimize-connection'
export const FLAGON_NAME = 'actions-kafka-optimize-connection'

export const CONNECTIONS_CACHE_SIZE = 500
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ const action: ActionDefinition<Settings, Payload> = {
return getTopics(settings)
}
},
perform: async (_request, { settings, payload, features, statsContext }) => {
await sendData(settings, [payload], features, statsContext)
perform: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => {
await sendData(settings, [payload], features, statsContext, subscriptionMetadata)
},
performBatch: async (_request, { settings, payload, features, statsContext }) => {
await sendData(settings, payload, features, statsContext)
performBatch: async (_request, { settings, payload, features, statsContext, subscriptionMetadata }) => {
await sendData(settings, payload, features, statsContext, subscriptionMetadata)
}
}

Expand Down
55 changes: 45 additions & 10 deletions packages/destination-actions/src/destinations/kafka/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { DynamicFieldResponse, IntegrationError, Features } from '@segment/actio
import type { Settings } from './generated-types'
import type { Payload } from './send/generated-types'
import { DEFAULT_PARTITIONER, Message, TopicMessages, SSLConfig, CachedProducer } from './types'
import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME } from './constants'
import { StatsContext } from '@segment/actions-core/destination-kit'
import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME, CONNECTIONS_CACHE_SIZE } from './constants'
import { StatsContext, SubscriptionMetadata } from '@segment/actions-core/destination-kit'
import { LRUCache } from 'lru-cache'

export const producersByConfig: Record<string, CachedProducer> = {}

Expand Down Expand Up @@ -177,11 +178,50 @@ export const getOrCreateProducer = async (
return producer
}

const connectionsCache = new LRUCache<string, Producer>({
max: CONNECTIONS_CACHE_SIZE,
ttl: 500,
dispose: (value, _key, _reason) => {
if (value) {
void value.disconnect().then(() => console.log('Kafka producer disconnected from cache eviction'))
}
}
})

// const kafkaProducerCache = new Map<string, Producer>()

export const getOrCreateProducerLRU = async (
settings: Settings,
statsContext: StatsContext | undefined,
subscriptionMetadata: SubscriptionMetadata | undefined
): Promise<Producer> => {
const key = subscriptionMetadata?.destinationConfigId ?? '<unknown>'
const cachedProducer = connectionsCache.get(key)
// const cached = kafkaProducerCache.get(key)

statsContext?.statsClient?.incr('kafka_connection_cache_size', connectionsCache.size, statsContext?.tags)

if (cachedProducer) {
statsContext?.statsClient?.incr('kafka_connection_reused', 1, statsContext?.tags)
await cachedProducer.connect() // this is idempotent, so is safe
return cachedProducer
}

const kafka = getKafka(settings)
const producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
await producer.connect()
statsContext?.statsClient?.incr('kafka_connection_opened', 1, statsContext?.tags)
// kafkaProducerCache.set(key, producer)
connectionsCache.set(key, producer)
return producer
}

export const sendData = async (
settings: Settings,
payload: Payload[],
features: Features | undefined,
statsContext: StatsContext | undefined
statsContext: StatsContext | undefined,
subscriptionMetadata?: SubscriptionMetadata
) => {
validate(settings)

Expand Down Expand Up @@ -218,7 +258,7 @@ export const sendData = async (

let producer: Producer
if (features && features[FLAGON_NAME]) {
producer = await getOrCreateProducer(settings, statsContext)
producer = await getOrCreateProducerLRU(settings, statsContext, subscriptionMetadata)
} else {
producer = getProducer(settings)
await producer.connect()
Expand All @@ -236,12 +276,7 @@ export const sendData = async (
}
}

if (features && features[FLAGON_NAME]) {
const key = serializeKafkaConfig(settings)
if (producersByConfig[key]) {
producersByConfig[key].lastUsed = Date.now()
}
} else {
if (!features || !features?.[FLAGON_NAME]) {
await producer.disconnect()
}
}
Loading