Skip to content

Commit 60981bd

Browse files
authored
Reduce usage producer retry counts; fix usage producer fallback logic (#6755)
1 parent c69c347 commit 60981bd

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

.changeset/tender-flowers-refuse.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'hive': patch
3+
---
4+
5+
Correctly set usage service state to Ready after processing all of the fallback queue.

packages/services/usage/src/fallback-queue.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pLimit from 'p-limit';
22
import type { ServiceLogger } from '@hive/service-common';
3+
import * as Sentry from '@sentry/node';
34

45
// Average message size is ~800kb
56
// 1000 messages = 800mb
@@ -63,6 +64,11 @@ export function createFallbackQueue(config: {
6364
queue.map(msgValue =>
6465
limit(() =>
6566
config.send(msgValue[0], msgValue[1]).catch(error => {
67+
Sentry.setTags({
68+
message: 'Failed to flush message before stopping',
69+
numOfOperations: msgValue[1],
70+
});
71+
Sentry.captureException(error);
6672
config.logger.error(
6773
{
6874
error: error instanceof Error ? error.message : String(error),

packages/services/usage/src/usage.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const retryOptions = {
3535
initialRetryTime: 500,
3636
factor: 0.2,
3737
multiplier: 2,
38-
retries: 10,
38+
retries: 5,
3939
} satisfies RetryOptions; // why satisfies? To be able to use `retryOptions.retries` and get `number` instead of `number | undefined`
4040

4141
export function splitReport(report: RawReport, numOfChunks: number) {
@@ -165,7 +165,7 @@ export function createUsage(config: {
165165
return Object.keys(report.map).length;
166166
},
167167
split(report, numOfChunks) {
168-
logger.info('Splitting into %s', numOfChunks);
168+
logger.info('Splitting report into %s (id=%s)', numOfChunks, report.id);
169169
return splitReport(report, numOfChunks);
170170
},
171171
onRetry(reports) {
@@ -212,21 +212,15 @@ export function createUsage(config: {
212212
rawOperationWrites.inc(numOfOperations);
213213
logger.info(`Flushed (id=%s, operations=%s)`, batchId, numOfOperations);
214214
}
215-
216-
changeStatus(Status.Ready);
217215
} catch (error: any) {
218216
rawOperationFailures.inc(numOfOperations);
219217

220218
changeStatus(Status.Unhealthy);
221-
logger.error(`Failed to flush (id=%s, error=%s)`, batchId, error.message);
222-
Sentry.setTags({
219+
logger.error(
220+
`Failed to flush. Adding to fallback queue (id=%s, error=%s)`,
223221
batchId,
224-
message: error.message,
225-
numOfOperations,
226-
});
227-
Sentry.captureException(error);
228-
229-
logger.info('Adding to fallback queue (id=%s)', batchId);
222+
error.message,
223+
);
230224
fallback.add(value, numOfOperations);
231225

232226
throw error;
@@ -255,6 +249,11 @@ export function createUsage(config: {
255249
} finally {
256250
stopTimer();
257251
}
252+
253+
if (fallback.size() === 0) {
254+
logger.info('Fallback queue flushed');
255+
changeStatus(Status.Ready);
256+
}
258257
},
259258
logger: logger.child({ component: 'fallback' }),
260259
});
@@ -306,7 +305,7 @@ export function createUsage(config: {
306305
},
307306
),
308307
readiness() {
309-
return status === Status.Ready && fallback.size() === 0;
308+
return status === Status.Ready;
310309
},
311310
async start() {
312311
logger.info('Starting Kafka producer');

0 commit comments

Comments
 (0)