Skip to content

Commit ad8d5b4

Browse files
[metering] move usage reporting service to the usage plugin (#255063)
## Summary Required for of elastic/search-team#13010 Related to #252219 and #252434 Move the `UsageReportingService` and related stuff from the `workflowsExecutionEngine` plugin to the `usage` plugin, so that other consumers of the usage/metering API can re-use the type and logic --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
1 parent 6e6637c commit ad8d5b4

File tree

18 files changed

+460
-209
lines changed

18 files changed

+460
-209
lines changed

src/platform/plugins/shared/workflows_execution_engine/moon.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ dependsOn:
3535
- '@kbn/workflows-extensions'
3636
- '@kbn/licensing-plugin'
3737
- '@kbn/usage-api-plugin'
38-
- '@kbn/server-http-tools'
3938
- '@kbn/logging-mocks'
4039
tags:
4140
- plugin

src/platform/plugins/shared/workflows_execution_engine/server/metering/constants.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,3 @@ export const BUCKET_SIZE_MINUTES = 5;
1818

1919
/** Duration normalization bucket size in milliseconds. */
2020
export const BUCKET_SIZE_MS = BUCKET_SIZE_MINUTES * 60 * 1000;
21-
22-
/** Maximum number of retry attempts for sending usage records. */
23-
export const METERING_RETRY_ATTEMPTS = 3;
24-
25-
/** Base delay between retries in milliseconds (exponential backoff applied). */
26-
export const METERING_RETRY_BASE_DELAY_MS = 1000;

src/platform/plugins/shared/workflows_execution_engine/server/metering/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,3 @@
88
*/
99

1010
export { WorkflowsMeteringService } from './metering_service';
11-
export type { UsageMetrics, UsageRecord, UsageSource } from './types';
12-
export { UsageReportingService } from './usage_reporting_service';

src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.test.ts

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99

1010
import type { CloudSetup } from '@kbn/cloud-plugin/server';
1111
import type { Logger } from '@kbn/core/server';
12+
import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server';
1213
import type { EsWorkflowExecution } from '@kbn/workflows';
1314
import { ExecutionStatus } from '@kbn/workflows';
1415

1516
import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants';
1617
import { WorkflowsMeteringService } from './metering_service';
17-
import type { UsageRecord } from './types';
18-
import type { UsageReportingService } from './usage_reporting_service';
1918

2019
const createMockExecution = (
2120
overrides: Partial<EsWorkflowExecution> = {}
@@ -80,8 +79,6 @@ describe('WorkflowsMeteringService', () => {
8079
mockUsageReportingService = createMockUsageReportingService();
8180
mockLogger = createMockLogger();
8281
meteringService = new WorkflowsMeteringService(mockUsageReportingService, mockLogger);
83-
// Mock the internal delay to be instant so tests run fast
84-
jest.spyOn(WorkflowsMeteringService.prototype as any, 'delay').mockResolvedValue(undefined);
8582
});
8683

8784
afterEach(() => {
@@ -312,72 +309,6 @@ describe('WorkflowsMeteringService', () => {
312309
});
313310
});
314311

315-
describe('retry logic', () => {
316-
it('should retry on failure and succeed on second attempt', async () => {
317-
mockUsageReportingService.reportUsage
318-
.mockResolvedValueOnce({ ok: false, status: 500 } as any)
319-
.mockResolvedValueOnce({ ok: true, status: 200 } as any);
320-
321-
const execution = createMockExecution();
322-
const cloudSetup = createMockCloudSetup();
323-
324-
await meteringService.reportWorkflowExecution(execution, cloudSetup);
325-
326-
expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2);
327-
expect(mockLogger.debug).toHaveBeenCalledWith(
328-
expect.stringContaining('Successfully reported metering')
329-
);
330-
});
331-
332-
it('should log error after all retries exhausted', async () => {
333-
mockUsageReportingService.reportUsage.mockResolvedValue({
334-
ok: false,
335-
status: 503,
336-
} as any);
337-
338-
const execution = createMockExecution();
339-
const cloudSetup = createMockCloudSetup();
340-
341-
await meteringService.reportWorkflowExecution(execution, cloudSetup);
342-
343-
expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(3);
344-
expect(mockLogger.error).toHaveBeenCalledWith(
345-
expect.stringContaining('Failed to report workflow metering')
346-
);
347-
});
348-
349-
it('should retry on network errors', async () => {
350-
mockUsageReportingService.reportUsage
351-
.mockRejectedValueOnce(new Error('ECONNREFUSED'))
352-
.mockResolvedValueOnce({ ok: true, status: 200 } as any);
353-
354-
const execution = createMockExecution();
355-
const cloudSetup = createMockCloudSetup();
356-
357-
await meteringService.reportWorkflowExecution(execution, cloudSetup);
358-
359-
expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2);
360-
expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('ECONNREFUSED'));
361-
});
362-
363-
it('should not throw when metering fails (errors are caught internally)', async () => {
364-
mockUsageReportingService.reportUsage.mockRejectedValue(new Error('catastrophic failure'));
365-
366-
const execution = createMockExecution();
367-
const cloudSetup = createMockCloudSetup();
368-
369-
// Should not throw - errors are caught internally
370-
await expect(
371-
meteringService.reportWorkflowExecution(execution, cloudSetup)
372-
).resolves.toBeUndefined();
373-
374-
// Error should be logged
375-
expect(mockLogger.error).toHaveBeenCalledWith(
376-
expect.stringContaining('Failed to report workflow metering')
377-
);
378-
});
379-
});
380-
381312
describe('environment detection', () => {
382313
it('should prefer projectId over deploymentId', async () => {
383314
const execution = createMockExecution();

src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.ts

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,11 @@
99

1010
import type { CloudSetup } from '@kbn/cloud-plugin/server';
1111
import type { Logger } from '@kbn/core/server';
12+
import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server';
1213
import type { EsWorkflowExecution } from '@kbn/workflows';
1314
import { ExecutionStatus, isTerminalStatus } from '@kbn/workflows';
1415

15-
import {
16-
BUCKET_SIZE_MS,
17-
METERING_RETRY_ATTEMPTS,
18-
METERING_RETRY_BASE_DELAY_MS,
19-
METERING_SOURCE_ID,
20-
WORKFLOWS_USAGE_TYPE,
21-
} from './constants';
22-
import type { UsageRecord } from './types';
23-
import type { UsageReportingService } from './usage_reporting_service';
16+
import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants';
2417

2518
/**
2619
* Workflows Metering Service - Stage 1 of the billing pipeline.
@@ -73,7 +66,7 @@ export class WorkflowsMeteringService {
7366
const usageRecord = this.buildUsageRecord(execution, instanceGroupId);
7467

7568
try {
76-
await this.sendWithRetry(usageRecord);
69+
await this.usageReportingService.reportUsage([usageRecord]);
7770
} catch (err) {
7871
// Log with billing-relevant details per monitoring requirements:
7972
// project ID, type, and count for impact assessment
@@ -150,51 +143,4 @@ export class WorkflowsMeteringService {
150143

151144
return stepTypes;
152145
}
153-
154-
/**
155-
* Sends a usage record with inline retry and exponential backoff.
156-
* Per billing team guidance: data loss is preferable to overbilling,
157-
* so we retry a few times then give up (logged at error level).
158-
*/
159-
private async sendWithRetry(record: UsageRecord): Promise<void> {
160-
let lastError: Error | undefined;
161-
162-
for (let attempt = 0; attempt < METERING_RETRY_ATTEMPTS; attempt++) {
163-
try {
164-
const response = await this.usageReportingService.reportUsage([record]);
165-
166-
if (response.ok) {
167-
this.logger.debug(
168-
`Successfully reported metering for execution ${record.id} (attempt ${attempt + 1})`
169-
);
170-
return;
171-
}
172-
173-
lastError = new Error(`Usage API responded with status ${response.status}`);
174-
this.logger.warn(
175-
`Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${
176-
lastError.message
177-
}`
178-
);
179-
} catch (err) {
180-
lastError = err instanceof Error ? err : new Error(String(err));
181-
this.logger.warn(
182-
`Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${
183-
lastError.message
184-
}`
185-
);
186-
}
187-
188-
// Exponential backoff before next retry (skip delay on last attempt)
189-
if (attempt < METERING_RETRY_ATTEMPTS - 1) {
190-
await this.delay(METERING_RETRY_BASE_DELAY_MS * Math.pow(2, attempt));
191-
}
192-
}
193-
194-
throw lastError || new Error('Metering report failed after all retry attempts');
195-
}
196-
197-
private delay(ms: number): Promise<void> {
198-
return new Promise((resolve) => setTimeout(resolve, ms));
199-
}
200146
}

src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import { checkLicense } from './lib/check_license';
3434
import { getAuthenticatedUser } from './lib/get_user';
3535
import { WorkflowExecutionTelemetryClient } from './lib/telemetry/workflow_execution_telemetry_client';
3636
import { WorkflowsMeteringService } from './metering/metering_service';
37-
import { UsageReportingService } from './metering/usage_reporting_service';
3837
import { initializeLogsRepositoryDataStream } from './repositories/logs_repository/data_stream';
3938
import { WorkflowExecutionRepository } from './repositories/workflow_execution_repository';
4039
import type {
@@ -71,7 +70,6 @@ export class WorkflowsExecutionEnginePlugin
7170
{
7271
private readonly logger: Logger;
7372
private readonly config: WorkflowsExecutionEngineConfig;
74-
private readonly kibanaVersion: string;
7573
private concurrencyManager!: ConcurrencyManager;
7674
private setupDependencies?: SetupDependencies;
7775
private meteringService?: WorkflowsMeteringService;
@@ -80,7 +78,6 @@ export class WorkflowsExecutionEnginePlugin
8078
constructor(initializerContext: PluginInitializerContext) {
8179
this.logger = initializerContext.logger.get();
8280
this.config = initializerContext.config.get<WorkflowsExecutionEngineConfig>();
83-
this.kibanaVersion = initializerContext.env.packageInfo.version;
8481
}
8582

8683
public setup(
@@ -101,11 +98,9 @@ export class WorkflowsExecutionEnginePlugin
10198
this.setupDependencies = setupDependencies;
10299

103100
// Initialize metering from the centralized Usage API plugin
104-
const usageApiConfig = plugins.usageApi?.config;
105-
if (usageApiConfig?.enabled && usageApiConfig.url) {
106-
const usageReportingService = new UsageReportingService(usageApiConfig, this.kibanaVersion);
101+
if (plugins.usageApi?.usageReporting) {
107102
this.meteringService = new WorkflowsMeteringService(
108-
usageReportingService,
103+
plugins.usageApi?.usageReporting,
109104
this.logger.get('workflowsMetering')
110105
);
111106
this.logger.debug('Workflows metering service initialized');

src/platform/plugins/shared/workflows_execution_engine/tsconfig.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
"@kbn/workflows-extensions",
3131
"@kbn/licensing-plugin",
3232
"@kbn/usage-api-plugin",
33-
"@kbn/server-http-tools",
3433
"@kbn/logging-mocks"
3534
],
3635
"exclude": ["target/**/*"]

x-pack/platform/plugins/shared/usage_api/moon.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ project:
2020
dependsOn:
2121
- '@kbn/core'
2222
- '@kbn/config-schema'
23+
- '@kbn/logging'
24+
- '@kbn/server-http-tools'
25+
- '@kbn/logging-mocks'
2326
tags:
2427
- plugin
2528
- prod

x-pack/platform/plugins/shared/usage_api/server/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@
88
import type { PluginInitializerContext } from '@kbn/core/server';
99

1010
export type { UsageApiSetup, UsageApiStart } from './plugin';
11+
export type {
12+
UsageReportingService,
13+
UsageRecord,
14+
UsageMetrics,
15+
UsageSource,
16+
} from './usage_reporting';
1117
export { config } from './config';
18+
1219
export const plugin = async (initializerContext: PluginInitializerContext) => {
1320
const { UsageApiPlugin } = await import('./plugin');
1421
return new UsageApiPlugin(initializerContext);

x-pack/platform/plugins/shared/usage_api/server/mocks.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77

88
import type { UsageApiConfigType } from './config';
9+
import type { UsageReportingService } from './usage_reporting';
910

1011
export const usageApiPluginMock = {
1112
createSetupContract: (configOverride: Partial<UsageApiConfigType> = {}) => {
@@ -14,6 +15,9 @@ export const usageApiPluginMock = {
1415
enabled: false,
1516
...configOverride,
1617
},
18+
usageReporting: {
19+
reportUsage: jest.fn(),
20+
} as unknown as jest.Mocked<UsageReportingService>,
1721
};
1822
},
1923
};

0 commit comments

Comments
 (0)