1
- import { CompressionTypes , Kafka , logLevel , Partitioners , RetryOptions } from 'kafkajs' ;
1
+ import net from 'net' ;
2
+ import tls from 'tls' ;
3
+ import {
4
+ CompressionTypes ,
5
+ ISocketFactory ,
6
+ Kafka ,
7
+ logLevel ,
8
+ Partitioners ,
9
+ RetryOptions ,
10
+ } from 'kafkajs' ;
2
11
import { traceInlineSync , type ServiceLogger } from '@hive/service-common' ;
3
12
import type { RawOperationMap , RawReport } from '@hive/usage-common' ;
4
13
import { compress } from '@hive/usage-common' ;
@@ -32,7 +41,7 @@ const levelMap = {
32
41
33
42
const retryOptions = {
34
43
maxRetryTime : 30_000 ,
35
- initialRetryTime : 500 ,
44
+ initialRetryTime : 300 ,
36
45
factor : 0.2 ,
37
46
multiplier : 2 ,
38
47
retries : 5 ,
@@ -103,6 +112,25 @@ export function createUsage(config: {
103
112
} ) {
104
113
const { logger } = config ;
105
114
115
+ // Default KafkaJS socketFactory implementation with minor optimizations for Azure
116
+ // https://github.com/tulios/kafkajs/blob/master/src/network/socketFactory.js
117
+ const socketFactory : ISocketFactory = ( { host, port, ssl, onConnect } ) => {
118
+ const socket = ssl
119
+ ? tls . connect (
120
+ Object . assign ( { host, port } , ! net . isIP ( host ) ? { servername : host } : { } , ssl ) ,
121
+ onConnect ,
122
+ )
123
+ : net . connect ( { host, port } , onConnect ) ;
124
+
125
+ // This is equivalent to kafka's "connections.max.idle.ms"
126
+ socket . setKeepAlive ( true , 180_000 ) ;
127
+ // disable nagle's algorithm to have higher throughput since this logic
128
+ // is already buffering messages into large payloads
129
+ socket . setNoDelay ( true ) ;
130
+
131
+ return socket ;
132
+ } ;
133
+
106
134
const kafka = new Kafka ( {
107
135
clientId : 'usage' ,
108
136
brokers : [ config . kafka . connection . broker ] ,
@@ -140,10 +168,11 @@ export function createUsage(config: {
140
168
} ;
141
169
} ,
142
170
// settings recommended by Azure EventHub https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-configurations
143
- requestTimeout : 60_000 , //
171
+ requestTimeout : 30_000 ,
144
172
connectionTimeout : 5_000 ,
145
173
authenticationTimeout : 5_000 ,
146
174
retry : retryOptions ,
175
+ socketFactory,
147
176
} ) ;
148
177
149
178
const producer = kafka . producer ( {
@@ -268,10 +297,25 @@ export function createUsage(config: {
268
297
status = newStatus ;
269
298
}
270
299
300
+ producer . on ( producer . events . CONNECT , ( ) => {
301
+ logger . info ( `Kafka producer: connected` ) ;
302
+
303
+ if ( status === Status . Unhealthy ) {
304
+ changeStatus ( Status . Ready ) ;
305
+ }
306
+ } ) ;
307
+
271
308
producer . on ( producer . events . REQUEST_TIMEOUT , ( ) => {
272
309
logger . info ( 'Kafka producer: request timeout' ) ;
273
310
} ) ;
274
311
312
+ producer . on ( producer . events . DISCONNECT , ( ) => {
313
+ logger . info ( `Kafka producer: disconnected` ) ;
314
+ if ( status === Status . Ready ) {
315
+ changeStatus ( Status . Unhealthy ) ;
316
+ }
317
+ } ) ;
318
+
275
319
async function stop ( ) {
276
320
logger . info ( 'Started Usage shutdown...' ) ;
277
321
@@ -281,7 +325,6 @@ export function createUsage(config: {
281
325
await fallback . stop ( ) ;
282
326
logger . info ( `Fallback stopped` ) ;
283
327
await producer . disconnect ( ) ;
284
- logger . info ( `Producer disconnected` ) ;
285
328
286
329
logger . info ( 'Usage stopped' ) ;
287
330
}
0 commit comments