Skip to content

Commit 1c44345

Browse files
authored
Mark usage-ingestor as unhealthy when consumer crashes (#6609)
1 parent ee70018 commit 1c44345

File tree

2 files changed

+32
-9
lines changed

2 files changed

+32
-9
lines changed

.changeset/lazy-guests-leave.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'hive': patch
3+
---
4+
5+
Mark usage-ingestor as unhealthy when Kafka consumer crashed

packages/services/usage-ingestor/src/ingestor.ts

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ import { createProcessor } from './processor';
1616
import { ClickHouseConfig, createWriter } from './writer';
1717

1818
enum Status {
19-
Waiting,
20-
Connected,
21-
Ready,
22-
Stopped,
19+
Waiting = 'Waiting',
20+
Connected = 'Connected',
21+
Unhealthy = 'Unhealthy',
22+
Ready = 'Ready',
23+
Stopped = 'Stopped',
2324
}
2425

2526
const levelMap = {
@@ -98,8 +99,7 @@ export function createIngestor(config: {
9899

99100
async function stop() {
100101
logger.info('Started Usage Ingestor shutdown...');
101-
102-
status = Status.Stopped;
102+
changeStatus(Status.Stopped);
103103
await consumer.disconnect();
104104
writer.destroy();
105105
logger.info(`Consumer disconnected`);
@@ -114,6 +114,8 @@ export function createIngestor(config: {
114114
consumer.on('consumer.crash', async ev => {
115115
logger.error('Consumer crashed (restart=%s, error=%s)', ev.payload.restart, ev.payload.error);
116116

117+
changeStatus(Status.Unhealthy);
118+
117119
if (ev.payload.restart) {
118120
return;
119121
}
@@ -126,15 +128,22 @@ export function createIngestor(config: {
126128
logger.warn('Consumer disconnected');
127129
});
128130

131+
consumer.on('consumer.fetch', async () => {
132+
if (status !== Status.Ready) {
133+
logger.info('Consumer successfully fetched messages after being in status: %s', status);
134+
changeStatus(Status.Ready);
135+
}
136+
});
137+
129138
async function start() {
130139
logger.info('Starting Usage Ingestor...');
131140

132-
status = Status.Waiting;
141+
changeStatus(Status.Waiting);
133142

134143
logger.info('Connecting Kafka Consumer');
135144
await consumer.connect();
136145

137-
status = Status.Connected;
146+
changeStatus(Status.Connected);
138147

139148
logger.info('Subscribing to Kafka topic: %s', config.kafka.topic);
140149
await consumer.subscribe({
@@ -164,7 +173,7 @@ export function createIngestor(config: {
164173
},
165174
});
166175
logger.info('Kafka is ready');
167-
status = Status.Ready;
176+
changeStatus(Status.Ready);
168177
}
169178

170179
const processor = createProcessor({ logger });
@@ -175,6 +184,15 @@ export function createIngestor(config: {
175184

176185
let status: Status = Status.Waiting;
177186

187+
function changeStatus(newStatus: Status) {
188+
if (status === newStatus) {
189+
return;
190+
}
191+
192+
logger.info('Changing status to %s', newStatus);
193+
status = newStatus;
194+
}
195+
178196
return {
179197
readiness() {
180198
return status === Status.Ready;

0 commit comments

Comments
 (0)