Skip to content

Commit 728120b

Browse files
metrics(ender): refine processing and publishing metrics (#3309)
1 parent 754d77d commit 728120b

File tree

4 files changed

+42
-3
lines changed

4 files changed

+42
-3
lines changed

indexer/packages/kafka/src/batch-kafka-producer.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,34 @@ export class BatchKafkaProducer {
7272
this.producer.send({ topic: this.topic, messages: this.producerMessages }),
7373
);
7474
}
75+
const elapsed = Date.now() - startTime;
76+
7577
logger.info({
7678
at: 'BatchMessenger#sendBatch',
7779
message: 'Produced kafka batch',
7880
currentSize: this.currentSize,
81+
topic: this.topic,
82+
sendTime: elapsed,
83+
});
84+
85+
// avoid logging encoded message contents at INFO level
86+
logger.debug({
87+
at: 'BatchMessenger#sendBatch',
88+
message: 'Produced kafka batch with messages',
89+
currentSize: this.currentSize,
7990
producerMessages: JSON.stringify(this.producerMessages),
8091
recalculatedCurrentSize: this.producerMessages.reduce(
8192
(acc: number, msg: ProducerMessage) => acc + msg.value.byteLength,
8293
0,
8394
),
8495
topic: this.topic,
85-
sendTime: Date.now() - startTime,
96+
sendTime: elapsed,
8697
});
98+
8799
stats.gauge(`${config.SERVICE_NAME}.kafka_batch_size`, this.currentSize);
100+
// TODO: this metric is useless it only captures creation of promises
88101
stats.timing(`${config.SERVICE_NAME}.kafka_batch_send_time`, Date.now() - startTime);
102+
89103
this.producerMessages = [];
90104
this.currentSize = 0;
91105
}

indexer/services/ender/src/lib/block-processor.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ export class BlockProcessor {
271271
}
272272

273273
private async processEvents(): Promise<KafkaPublisher> {
274-
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
274+
const start: number = Date.now();
275275

276276
await Promise.all(this.sqlEventPromises).then((values) => {
277277
for (let i: number = 0; i < this.block.events.length; i++) {
@@ -287,7 +287,6 @@ export class BlockProcessor {
287287
}
288288
});
289289

290-
const start: number = Date.now();
291290
let success = false;
292291
let resultRow: pg.QueryResultRow;
293292
try {
@@ -318,17 +317,27 @@ export class BlockProcessor {
318317
}
319318

320319
// Create a block message from the current block
320+
const kafkaPublisher: KafkaPublisher = new KafkaPublisher();
321321
kafkaPublisher.addEvent(this.createBlockHeightMsg());
322322

323323
// in genesis, handle sync events first, then batched events.
324324
// in other blocks, handle batched events first, then sync events.
325+
const startHandlingEvents: number = Date.now();
326+
325327
if (this.block.height === 0) {
326328
await this.syncHandlers.process(kafkaPublisher, resultRow);
327329
await this.batchedHandlers.process(kafkaPublisher, resultRow);
328330
} else {
329331
await this.batchedHandlers.process(kafkaPublisher, resultRow);
330332
await this.syncHandlers.process(kafkaPublisher, resultRow);
331333
}
334+
335+
stats.timing(
336+
`${config.SERVICE_NAME}.handle_events.timing`,
337+
Date.now() - startHandlingEvents,
338+
STATS_NO_SAMPLING,
339+
);
340+
332341
return kafkaPublisher;
333342
}
334343
}

indexer/services/ender/src/lib/kafka-publisher.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ export class KafkaPublisher {
185185
}
186186

187187
public async publish() {
188+
const start = Date.now();
188189
const allTopicKafkaMessages:
189190
TopicKafkaMessages[] = this.generateAllTopicKafkaMessages();
190191

@@ -204,6 +205,11 @@ export class KafkaPublisher {
204205
},
205206
),
206207
);
208+
stats.timing(
209+
`${config.SERVICE_NAME}.kafka_publish.timing`,
210+
Date.now() - start,
211+
STATS_NO_SAMPLING,
212+
);
207213
}
208214

209215
private generateAllTopicKafkaMessages(): TopicKafkaMessages[] {

indexer/services/ender/src/lib/on-message.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,20 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
9595
);
9696
const kafkaPublisher: KafkaPublisher = await blockProcessor.process();
9797

98+
// TODO: KafkaPublisher is not idempotent, so we can't send non-candles messages here
99+
98100
const candlesGenerator: CandlesGenerator = new CandlesGenerator(
99101
kafkaPublisher,
100102
DateTime.fromJSDate(indexerTendermintBlock.time!),
101103
txId,
102104
);
103105
const candles: CandleFromDatabase[] = await candlesGenerator.updateCandles();
106+
107+
// TODO: ideally, use a candles-only kafka publisher to send here
108+
104109
await Transaction.commit(txId);
105110
stats.gauge(`${config.SERVICE_NAME}.processing_block_height`, indexerTendermintBlock.height);
111+
106112
// Update caches after transaction is committed
107113
updateBlockCache(blockHeight);
108114
_.forEach(candles, updateCandleCacheWithCandle);
@@ -114,13 +120,16 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
114120
'kafkaPublisher.publish',
115121
);
116122
}
123+
117124
logger.info({
118125
at: 'onMessage#onMessage',
119126
message: 'Successfully processed block',
120127
height: blockHeight,
121128
});
122129
success = true;
130+
123131
} catch (error) {
132+
124133
await Transaction.rollback(txId);
125134
await refreshDataCaches();
126135
stats.increment(`${config.SERVICE_NAME}.update_event_tables.failure`, 1);
@@ -143,6 +152,7 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
143152
}
144153
// Throw error so the message is not acked and will be reprocessed
145154
throw error;
155+
146156
} finally {
147157
const done: number = Date.now();
148158
stats.timing(

0 commit comments

Comments
 (0)