diff --git a/src/platform/plugins/shared/workflows_execution_engine/moon.yml b/src/platform/plugins/shared/workflows_execution_engine/moon.yml index 77f5f1505a794..a9acf4f729252 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/moon.yml +++ b/src/platform/plugins/shared/workflows_execution_engine/moon.yml @@ -35,7 +35,6 @@ dependsOn: - '@kbn/workflows-extensions' - '@kbn/licensing-plugin' - '@kbn/usage-api-plugin' - - '@kbn/server-http-tools' - '@kbn/logging-mocks' tags: - plugin diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/constants.ts b/src/platform/plugins/shared/workflows_execution_engine/server/metering/constants.ts index d8d9afc7254c9..7e265f85443cd 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/constants.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/metering/constants.ts @@ -18,9 +18,3 @@ export const BUCKET_SIZE_MINUTES = 5; /** Duration normalization bucket size in milliseconds. */ export const BUCKET_SIZE_MS = BUCKET_SIZE_MINUTES * 60 * 1000; - -/** Maximum number of retry attempts for sending usage records. */ -export const METERING_RETRY_ATTEMPTS = 3; - -/** Base delay between retries in milliseconds (exponential backoff applied). */ -export const METERING_RETRY_BASE_DELAY_MS = 1000; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/index.ts b/src/platform/plugins/shared/workflows_execution_engine/server/metering/index.ts index 42a1f453e5300..9b43c0c8cffe4 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/index.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/metering/index.ts @@ -8,5 +8,3 @@ */ export { WorkflowsMeteringService } from './metering_service'; -export type { UsageMetrics, UsageRecord, UsageSource } from './types'; -export { UsageReportingService } from './usage_reporting_service'; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.test.ts b/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.test.ts index 16258259efb4e..eb36dc770d426 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.test.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.test.ts @@ -9,13 +9,12 @@ import type { CloudSetup } from '@kbn/cloud-plugin/server'; import type { Logger } from '@kbn/core/server'; +import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server'; import type { EsWorkflowExecution } from '@kbn/workflows'; import { ExecutionStatus } from '@kbn/workflows'; import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants'; import { WorkflowsMeteringService } from './metering_service'; -import type { UsageRecord } from './types'; -import type { UsageReportingService } from './usage_reporting_service'; const createMockExecution = ( overrides: Partial = {} @@ -80,8 +79,6 @@ describe('WorkflowsMeteringService', () => { mockUsageReportingService = createMockUsageReportingService(); mockLogger = createMockLogger(); meteringService = new WorkflowsMeteringService(mockUsageReportingService, mockLogger); - // Mock the internal delay to be instant so tests run fast - jest.spyOn(WorkflowsMeteringService.prototype as any, 'delay').mockResolvedValue(undefined); }); afterEach(() => { @@ -312,72 +309,6 @@ describe('WorkflowsMeteringService', () => { }); }); - describe('retry logic', () => { - it('should retry on failure and succeed on second attempt', async () => { - mockUsageReportingService.reportUsage - .mockResolvedValueOnce({ ok: false, status: 500 } as any) - .mockResolvedValueOnce({ ok: true, status: 200 } as any); - - const execution = createMockExecution(); - const cloudSetup = createMockCloudSetup(); - - await meteringService.reportWorkflowExecution(execution, cloudSetup); - - expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2); - expect(mockLogger.debug).toHaveBeenCalledWith( - expect.stringContaining('Successfully reported metering') - ); - }); - - it('should log error after all retries exhausted', async () => { - mockUsageReportingService.reportUsage.mockResolvedValue({ - ok: false, - status: 503, - } as any); - - const execution = createMockExecution(); - const cloudSetup = createMockCloudSetup(); - - await meteringService.reportWorkflowExecution(execution, cloudSetup); - - expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(3); - expect(mockLogger.error).toHaveBeenCalledWith( - expect.stringContaining('Failed to report workflow metering') - ); - }); - - it('should retry on network errors', async () => { - mockUsageReportingService.reportUsage - .mockRejectedValueOnce(new Error('ECONNREFUSED')) - .mockResolvedValueOnce({ ok: true, status: 200 } as any); - - const execution = createMockExecution(); - const cloudSetup = createMockCloudSetup(); - - await meteringService.reportWorkflowExecution(execution, cloudSetup); - - expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2); - expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('ECONNREFUSED')); - }); - - it('should not throw when metering fails (errors are caught internally)', async () => { - mockUsageReportingService.reportUsage.mockRejectedValue(new Error('catastrophic failure')); - - const execution = createMockExecution(); - const cloudSetup = createMockCloudSetup(); - - // Should not throw - errors are caught internally - await expect( - meteringService.reportWorkflowExecution(execution, cloudSetup) - ).resolves.toBeUndefined(); - - // Error should be logged - expect(mockLogger.error).toHaveBeenCalledWith( - expect.stringContaining('Failed to report workflow metering') - ); - }); - }); - describe('environment detection', () => { it('should prefer projectId over deploymentId', async () => { const execution = createMockExecution(); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.ts b/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.ts index 35aa7b1540fef..4b5cada4d94c8 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/metering/metering_service.ts @@ -9,18 +9,11 @@ import type { CloudSetup } from '@kbn/cloud-plugin/server'; import type { Logger } from '@kbn/core/server'; +import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server'; import type { EsWorkflowExecution } from '@kbn/workflows'; import { ExecutionStatus, isTerminalStatus } from '@kbn/workflows'; -import { - BUCKET_SIZE_MS, - METERING_RETRY_ATTEMPTS, - METERING_RETRY_BASE_DELAY_MS, - METERING_SOURCE_ID, - WORKFLOWS_USAGE_TYPE, -} from './constants'; -import type { UsageRecord } from './types'; -import type { UsageReportingService } from './usage_reporting_service'; +import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants'; /** * Workflows Metering Service - Stage 1 of the billing pipeline. @@ -73,7 +66,7 @@ export class WorkflowsMeteringService { const usageRecord = this.buildUsageRecord(execution, instanceGroupId); try { - await this.sendWithRetry(usageRecord); + await this.usageReportingService.reportUsage([usageRecord]); } catch (err) { // Log with billing-relevant details per monitoring requirements: // project ID, type, and count for impact assessment @@ -150,51 +143,4 @@ export class WorkflowsMeteringService { return stepTypes; } - - /** - * Sends a usage record with inline retry and exponential backoff. - * Per billing team guidance: data loss is preferable to overbilling, - * so we retry a few times then give up (logged at error level). - */ - private async sendWithRetry(record: UsageRecord): Promise { - let lastError: Error | undefined; - - for (let attempt = 0; attempt < METERING_RETRY_ATTEMPTS; attempt++) { - try { - const response = await this.usageReportingService.reportUsage([record]); - - if (response.ok) { - this.logger.debug( - `Successfully reported metering for execution ${record.id} (attempt ${attempt + 1})` - ); - return; - } - - lastError = new Error(`Usage API responded with status ${response.status}`); - this.logger.warn( - `Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${ - lastError.message - }` - ); - } catch (err) { - lastError = err instanceof Error ? err : new Error(String(err)); - this.logger.warn( - `Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${ - lastError.message - }` - ); - } - - // Exponential backoff before next retry (skip delay on last attempt) - if (attempt < METERING_RETRY_ATTEMPTS - 1) { - await this.delay(METERING_RETRY_BASE_DELAY_MS * Math.pow(2, attempt)); - } - } - - throw lastError || new Error('Metering report failed after all retry attempts'); - } - - private delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); - } } diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts b/src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts index d52becee7c104..d255b4b481537 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts +++ b/src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts @@ -34,7 +34,6 @@ import { checkLicense } from './lib/check_license'; import { getAuthenticatedUser } from './lib/get_user'; import { WorkflowExecutionTelemetryClient } from './lib/telemetry/workflow_execution_telemetry_client'; import { WorkflowsMeteringService } from './metering/metering_service'; -import { UsageReportingService } from './metering/usage_reporting_service'; import { initializeLogsRepositoryDataStream } from './repositories/logs_repository/data_stream'; import { WorkflowExecutionRepository } from './repositories/workflow_execution_repository'; import type { @@ -71,7 +70,6 @@ export class WorkflowsExecutionEnginePlugin { private readonly logger: Logger; private readonly config: WorkflowsExecutionEngineConfig; - private readonly kibanaVersion: string; private concurrencyManager!: ConcurrencyManager; private setupDependencies?: SetupDependencies; private meteringService?: WorkflowsMeteringService; @@ -80,7 +78,6 @@ export class WorkflowsExecutionEnginePlugin constructor(initializerContext: PluginInitializerContext) { this.logger = initializerContext.logger.get(); this.config = initializerContext.config.get(); - this.kibanaVersion = initializerContext.env.packageInfo.version; } public setup( @@ -101,11 +98,9 @@ export class WorkflowsExecutionEnginePlugin this.setupDependencies = setupDependencies; // Initialize metering from the centralized Usage API plugin - const usageApiConfig = plugins.usageApi?.config; - if (usageApiConfig?.enabled && usageApiConfig.url) { - const usageReportingService = new UsageReportingService(usageApiConfig, this.kibanaVersion); + if (plugins.usageApi?.usageReporting) { this.meteringService = new WorkflowsMeteringService( - usageReportingService, + plugins.usageApi?.usageReporting, this.logger.get('workflowsMetering') ); this.logger.debug('Workflows metering service initialized'); diff --git a/src/platform/plugins/shared/workflows_execution_engine/tsconfig.json b/src/platform/plugins/shared/workflows_execution_engine/tsconfig.json index 6dba66665a4b1..0ca7623670c21 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/tsconfig.json +++ b/src/platform/plugins/shared/workflows_execution_engine/tsconfig.json @@ -30,7 +30,6 @@ "@kbn/workflows-extensions", "@kbn/licensing-plugin", "@kbn/usage-api-plugin", - "@kbn/server-http-tools", "@kbn/logging-mocks" ], "exclude": ["target/**/*"] diff --git a/x-pack/platform/plugins/shared/usage_api/moon.yml b/x-pack/platform/plugins/shared/usage_api/moon.yml index e601444cf4ed5..621bd14ed0956 100644 --- a/x-pack/platform/plugins/shared/usage_api/moon.yml +++ b/x-pack/platform/plugins/shared/usage_api/moon.yml @@ -20,6 +20,9 @@ project: dependsOn: - '@kbn/core' - '@kbn/config-schema' + - '@kbn/logging' + - '@kbn/server-http-tools' + - '@kbn/logging-mocks' tags: - plugin - prod diff --git a/x-pack/platform/plugins/shared/usage_api/server/index.ts b/x-pack/platform/plugins/shared/usage_api/server/index.ts index a358818c172fc..a6570882cb1b5 100644 --- a/x-pack/platform/plugins/shared/usage_api/server/index.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/index.ts @@ -8,7 +8,14 @@ import type { PluginInitializerContext } from '@kbn/core/server'; export type { UsageApiSetup, UsageApiStart } from './plugin'; +export type { + UsageReportingService, + UsageRecord, + UsageMetrics, + UsageSource, +} from './usage_reporting'; export { config } from './config'; + export const plugin = async (initializerContext: PluginInitializerContext) => { const { UsageApiPlugin } = await import('./plugin'); return new UsageApiPlugin(initializerContext); diff --git a/x-pack/platform/plugins/shared/usage_api/server/mocks.ts b/x-pack/platform/plugins/shared/usage_api/server/mocks.ts index 2d4164e52d560..da8ce30577261 100644 --- a/x-pack/platform/plugins/shared/usage_api/server/mocks.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/mocks.ts @@ -6,6 +6,7 @@ */ import type { UsageApiConfigType } from './config'; +import type { UsageReportingService } from './usage_reporting'; export const usageApiPluginMock = { createSetupContract: (configOverride: Partial = {}) => { @@ -14,6 +15,9 @@ export const usageApiPluginMock = { enabled: false, ...configOverride, }, + usageReporting: { + reportUsage: jest.fn(), + } as unknown as jest.Mocked, }; }, }; diff --git a/x-pack/platform/plugins/shared/usage_api/server/plugin.test.ts b/x-pack/platform/plugins/shared/usage_api/server/plugin.test.ts index 5cbd1f605b017..964ad6bada04a 100644 --- a/x-pack/platform/plugins/shared/usage_api/server/plugin.test.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/plugin.test.ts @@ -8,6 +8,7 @@ import { coreMock } from '@kbn/core/server/mocks'; import type { UsageApiConfigType } from './config'; import { UsageApiPlugin } from './plugin'; +import { UsageReportingService } from './usage_reporting'; describe('Usage API Plugin', () => { const setupPlugin = (configParts: Partial = {}) => { @@ -27,41 +28,32 @@ describe('Usage API Plugin', () => { describe('interface', () => { it('it should return enabled false when no config is provided', () => { const { setup } = setupPlugin(); - expect(setup).toMatchInlineSnapshot(` - Object { - "config": Object { - "enabled": false, - "tls": undefined, - "url": undefined, - }, - } - `); + expect(setup.config).toEqual({ + enabled: false, + tls: undefined, + url: undefined, + }); + expect(setup.usageReporting).toBeUndefined(); }); it('it should return enabled false when config is provided but url is not set', () => { const { setup } = setupPlugin({ enabled: true }); - expect(setup).toMatchInlineSnapshot(` - Object { - "config": Object { - "enabled": false, - "tls": undefined, - "url": undefined, - }, - } - `); + expect(setup.config).toEqual({ + enabled: false, + tls: undefined, + url: undefined, + }); + expect(setup.usageReporting).toBeUndefined(); }); it('it should return enabled true when config is provided and url is set', () => { const { setup } = setupPlugin({ enabled: true, url: 'https://usage-api.example' }); - expect(setup).toMatchInlineSnapshot(` - Object { - "config": Object { - "enabled": true, - "tls": undefined, - "url": "https://usage-api.example", - }, - } - `); + expect(setup.config).toEqual({ + enabled: true, + tls: undefined, + url: 'https://usage-api.example', + }); + expect(setup.usageReporting).toBeInstanceOf(UsageReportingService); }); it('it should return tls when tls is provided', () => { @@ -70,28 +62,26 @@ describe('Usage API Plugin', () => { url: 'https://usage-api.example', tls: { certificate: 'certificate', key: 'key', ca: 'ca' }, }); - expect(setup).toMatchInlineSnapshot(` - Object { - "config": Object { - "enabled": true, - "tls": Object { - "ca": "ca", - "certificate": "certificate", - "key": "key", - }, - "url": "https://usage-api.example", - }, - } - `); + expect(setup.config).toEqual({ + enabled: true, + tls: { ca: 'ca', certificate: 'certificate', key: 'key' }, + url: 'https://usage-api.example', + }); + expect(setup.usageReporting).toBeInstanceOf(UsageReportingService); }); }); }); describe('#start', () => { describe('interface', () => { - it('snapshot', () => { + it('should not expose usageReporting when disabled', () => { const { start } = setupPlugin(); - expect(start).toMatchInlineSnapshot(`undefined`); + expect(start.usageReporting).toBeUndefined(); + }); + + it('should expose usageReporting when enabled', () => { + const { start } = setupPlugin({ enabled: true, url: 'https://usage-api.example' }); + expect(start.usageReporting).toBeInstanceOf(UsageReportingService); }); }); }); diff --git a/x-pack/platform/plugins/shared/usage_api/server/plugin.ts b/x-pack/platform/plugins/shared/usage_api/server/plugin.ts index b5dbaa720194f..4fbb342b097f6 100644 --- a/x-pack/platform/plugins/shared/usage_api/server/plugin.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/plugin.ts @@ -6,8 +6,9 @@ */ import type { Plugin, PluginInitializerContext } from '@kbn/core/server'; - +import type { Logger } from '@kbn/logging'; import type { UsageApiConfigType } from './config'; +import { UsageReportingService } from './usage_reporting'; /** * Setup contract @@ -17,29 +18,57 @@ export interface UsageApiSetup { * Configuration for the Usage API. */ config: UsageApiConfigType; + /** + * Usage reporting service for reporting usage metrics. + * Only exposed if usage reporting is enabled and available. + */ + usageReporting?: UsageReportingService; } /** * Start contract */ -export type UsageApiStart = void; +export interface UsageApiStart { + /** + * Usage reporting service for reporting usage metrics. + * Only exposed if usage reporting is enabled and available. + */ + usageReporting?: UsageReportingService; +} export class UsageApiPlugin implements Plugin { private readonly config: UsageApiConfigType; + private readonly logger: Logger; + private usageReporting?: UsageReportingService; constructor(private readonly context: PluginInitializerContext) { this.config = this.context.config.get(); + this.logger = this.context.logger.get(); } public setup(): UsageApiSetup { + const kibanaVersion = this.context.env.packageInfo.version; + const enabled = this.config.enabled && !!this.config.url; + if (enabled) { + this.usageReporting = new UsageReportingService({ + config: this.config, + kibanaVersion, + logger: this.logger.get('usageReporting'), + }); + } return { config: { enabled: this.config.enabled && !!this.config.url, url: this.config.url, tls: this.config.tls, }, + usageReporting: this.usageReporting, }; } - public start(): UsageApiStart {} + public start(): UsageApiStart { + return { + usageReporting: this.usageReporting, + }; + } } diff --git a/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/constants.ts b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/constants.ts new file mode 100644 index 0000000000000..280834affb016 --- /dev/null +++ b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/constants.ts @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +/** Maximum number of retry attempts for sending usage records. */ +export const METERING_RETRY_ATTEMPTS = 3; + +/** Base delay between retries in milliseconds (exponential backoff applied). */ +export const METERING_RETRY_BASE_DELAY_MS = 1000; diff --git a/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/index.ts b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/index.ts new file mode 100644 index 0000000000000..6d7dd1ef6edee --- /dev/null +++ b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { UsageReportingService, type UsageReportingConfig } from './usage_reporting_service'; +export type { UsageRecord, UsageSource, UsageMetrics } from './types'; diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/types.ts b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/types.ts similarity index 82% rename from src/platform/plugins/shared/workflows_execution_engine/server/metering/types.ts rename to x-pack/platform/plugins/shared/usage_api/server/usage_reporting/types.ts index abfd6f153f0c4..7dbfb012d32f9 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/types.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/types.ts @@ -1,10 +1,8 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ /** diff --git a/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.test.ts b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.test.ts new file mode 100644 index 0000000000000..410c0f4fa9857 --- /dev/null +++ b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.test.ts @@ -0,0 +1,283 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import https from 'https'; +import type { Response } from 'node-fetch'; +import { loggerMock, type MockedLogger } from '@kbn/logging-mocks'; +import { UsageReportingService, type UsageReportingConfig } from './usage_reporting_service'; +import type { UsageRecord } from './types'; +import { METERING_RETRY_ATTEMPTS, METERING_RETRY_BASE_DELAY_MS } from './constants'; + +jest.mock('node-fetch'); +jest.mock('@kbn/server-http-tools', () => ({ + SslConfig: jest.fn().mockImplementation(() => ({ + rejectUnauthorized: true, + certificate: 'mock-cert-content', + key: 'mock-key-content', + certificateAuthorities: ['mock-ca-content'], + })), + sslSchema: { + validate: jest.fn().mockReturnValue({}), + }, +})); + +const fetchMock = jest.requireMock('node-fetch').default as jest.Mock; + +const createRecord = (id = 'rec-1'): UsageRecord => ({ + id, + usage_timestamp: '2025-01-01T00:00:00Z', + creation_timestamp: '2025-01-01T00:00:01Z', + usage: { + type: 'workflow_execution', + quantity: 1, + }, + source: { + id: 'workflows', + instance_group_id: 'project-123', + }, +}); + +const createConfig = (overrides: Partial = {}): UsageReportingConfig => ({ + enabled: true, + url: 'http://usage-api.local/v1/usage', + ...overrides, +}); + +const okResponse = (): Partial => ({ ok: true, status: 200 }); +const errorResponse = (status = 500): Partial => ({ ok: false, status }); + +const advanceThroughRetries = async () => { + for (let i = 0; i < METERING_RETRY_ATTEMPTS - 1; i++) { + await jest.advanceTimersByTimeAsync(METERING_RETRY_BASE_DELAY_MS * Math.pow(2, i)); + } +}; + +describe('UsageReportingService', () => { + let logger: MockedLogger; + + beforeEach(() => { + jest.useFakeTimers(); + logger = loggerMock.create(); + fetchMock.mockReset(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + const createService = (configOverrides: Partial = {}) => + new UsageReportingService({ + config: createConfig(configOverrides), + kibanaVersion: '9.0.0', + logger, + }); + + describe('reportUsage', () => { + it('sends records successfully on first attempt', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService(); + + await service.reportUsage([createRecord()]); + + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(logger.debug).toHaveBeenCalledWith( + expect.stringContaining('Successfully reported metering') + ); + }); + + it('retries on non-ok response and succeeds', async () => { + fetchMock.mockResolvedValueOnce(errorResponse(503)).mockResolvedValueOnce(okResponse()); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + + await jest.advanceTimersByTimeAsync(METERING_RETRY_BASE_DELAY_MS); + await promise; + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(logger.warn).toHaveBeenCalledTimes(1); + expect(logger.debug).toHaveBeenCalledTimes(1); + }); + + it('retries on network error and succeeds', async () => { + fetchMock + .mockRejectedValueOnce(new Error('ECONNREFUSED')) + .mockResolvedValueOnce(okResponse()); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + + await jest.advanceTimersByTimeAsync(METERING_RETRY_BASE_DELAY_MS); + await promise; + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('ECONNREFUSED')); + }); + + it('throws after exhausting all retry attempts (non-ok response)', async () => { + fetchMock.mockResolvedValue(errorResponse(500)); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + promise.catch(() => {}); + + await advanceThroughRetries(); + + await expect(promise).rejects.toThrow('Usage API responded with status 500'); + expect(fetchMock).toHaveBeenCalledTimes(METERING_RETRY_ATTEMPTS); + expect(logger.warn).toHaveBeenCalledTimes(METERING_RETRY_ATTEMPTS); + }); + + it('throws after exhausting all retry attempts (network error)', async () => { + fetchMock.mockRejectedValue(new Error('socket hang up')); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + promise.catch(() => {}); + + await advanceThroughRetries(); + + await expect(promise).rejects.toThrow('socket hang up'); + expect(fetchMock).toHaveBeenCalledTimes(METERING_RETRY_ATTEMPTS); + }); + + it('wraps non-Error throwables', async () => { + fetchMock.mockRejectedValue('string error'); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + promise.catch(() => {}); + + await advanceThroughRetries(); + + await expect(promise).rejects.toThrow('string error'); + }); + + it('applies exponential backoff between retries', async () => { + fetchMock + .mockResolvedValueOnce(errorResponse(503)) + .mockResolvedValueOnce(errorResponse(503)) + .mockResolvedValueOnce(okResponse()); + + const service = createService(); + const promise = service.reportUsage([createRecord()]); + + expect(fetchMock).toHaveBeenCalledTimes(1); + + await jest.advanceTimersByTimeAsync(METERING_RETRY_BASE_DELAY_MS); + expect(fetchMock).toHaveBeenCalledTimes(2); + + await jest.advanceTimersByTimeAsync(METERING_RETRY_BASE_DELAY_MS * 2); + await promise; + + expect(fetchMock).toHaveBeenCalledTimes(3); + }); + + it('logs record ids on success', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService(); + + await service.reportUsage([createRecord('a'), createRecord('b')]); + + expect(logger.debug).toHaveBeenCalledWith(expect.stringContaining('a, b')); + }); + }); + + describe('request format', () => { + it('sends POST with JSON content-type and user-agent', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService(); + + await service.reportUsage([createRecord()]); + + const [url, reqInit] = fetchMock.mock.calls[0]; + expect(url).toBe('http://usage-api.local/v1/usage'); + expect(reqInit.method).toBe('post'); + expect(reqInit.headers).toEqual({ + 'Content-Type': 'application/json', + 'User-Agent': 'Kibana/9.0.0 node-fetch', + }); + }); + + it('serializes the records as JSON body', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService(); + const records = [createRecord('r1'), createRecord('r2')]; + + await service.reportUsage(records); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body); + expect(body).toHaveLength(2); + expect(body[0].id).toBe('r1'); + expect(body[1].id).toBe('r2'); + }); + + it('does not attach an https agent for http URLs', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService({ url: 'http://usage-api.local/v1/usage' }); + + await service.reportUsage([createRecord()]); + + expect(fetchMock.mock.calls[0][1].agent).toBeUndefined(); + }); + + it('attaches an https agent for https URLs', async () => { + fetchMock.mockResolvedValueOnce(okResponse()); + const service = createService({ + url: 'https://usage-api.local/v1/usage', + tls: { certificate: 'cert', key: 'key', ca: 'ca' }, + }); + + await service.reportUsage([createRecord()]); + + expect(fetchMock.mock.calls[0][1].agent).toBeInstanceOf(https.Agent); + }); + }); + + describe('URL validation', () => { + it('throws if url is not configured', async () => { + const service = createService({ url: undefined }); + const promise = service.reportUsage([createRecord()]); + promise.catch(() => {}); + + await advanceThroughRetries(); + + await expect(promise).rejects.toThrow('Usage API URL not configured'); + expect(fetchMock).not.toHaveBeenCalled(); + }); + }); + + describe('TLS / httpAgent', () => { + it('throws if tls config is not provided for https URL', async () => { + const service = createService({ + url: 'https://usage-api.local/v1/usage', + tls: undefined, + }); + const promise = service.reportUsage([createRecord()]); + promise.catch(() => {}); + + await advanceThroughRetries(); + + await expect(promise).rejects.toThrow('Usage API TLS configuration not provided'); + }); + + it('reuses the same agent across calls', async () => { + fetchMock.mockResolvedValue(okResponse()); + const service = createService({ + url: 'https://usage-api.local/v1/usage', + tls: { certificate: 'cert', key: 'key', ca: 'ca' }, + }); + + await service.reportUsage([createRecord()]); + await service.reportUsage([createRecord()]); + + const agent1 = fetchMock.mock.calls[0][1].agent; + const agent2 = fetchMock.mock.calls[1][1].agent; + expect(agent1).toBe(agent2); + }); + }); +}); diff --git a/src/platform/plugins/shared/workflows_execution_engine/server/metering/usage_reporting_service.ts b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.ts similarity index 50% rename from src/platform/plugins/shared/workflows_execution_engine/server/metering/usage_reporting_service.ts rename to x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.ts index 7a989f0c63b47..e784ac691ab55 100644 --- a/src/platform/plugins/shared/workflows_execution_engine/server/metering/usage_reporting_service.ts +++ b/x-pack/platform/plugins/shared/usage_api/server/usage_reporting/usage_reporting_service.ts @@ -1,20 +1,17 @@ /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. */ import https from 'https'; import type { RequestInit, Response } from 'node-fetch'; - import fetch from 'node-fetch'; - +import type { Logger } from '@kbn/logging'; import { SslConfig, sslSchema } from '@kbn/server-http-tools'; - import type { UsageRecord } from './types'; +import { METERING_RETRY_ATTEMPTS, METERING_RETRY_BASE_DELAY_MS } from './constants'; /** * Config shape accepted by UsageReportingService. @@ -41,13 +38,65 @@ export interface UsageReportingConfig { */ export class UsageReportingService { private agent: https.Agent | undefined; + private readonly config: UsageReportingConfig; + private readonly kibanaVersion: string; + private readonly logger: Logger; + + constructor({ + config, + kibanaVersion, + logger, + }: { + config: UsageReportingConfig; + kibanaVersion: string; + logger: Logger; + }) { + this.config = config; + this.kibanaVersion = kibanaVersion; + this.logger = logger; + } - constructor( - private readonly config: UsageReportingConfig, - private readonly kibanaVersion: string - ) {} + /** + * Sends a usage record with inline retry and exponential backoff. + * Per billing team guidance: data loss is preferable to overbilling, + * so we retry a few times then give up (logged at error level). + */ + public async reportUsage(records: UsageRecord[]): Promise { + let lastError: Error | undefined; + + for (let attempt = 0; attempt < METERING_RETRY_ATTEMPTS; attempt++) { + try { + const response = await this._sendUsage(records); + + if (response.ok) { + this.logger.debug( + `Successfully reported metering for records ${records + .map((r) => r.id) + .join(', ')} (attempt ${attempt + 1})` + ); + return; + } + + throw new Error(`Usage API responded with status ${response.status}`); + } catch (err) { + lastError = err instanceof Error ? err : new Error(String(err)); + } + + this.logger.warn( + `Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${ + lastError.message + }` + ); + + if (attempt < METERING_RETRY_ATTEMPTS - 1) { + await delay(METERING_RETRY_BASE_DELAY_MS * Math.pow(2, attempt)); + } + } + + throw lastError || new Error('Metering report failed after all retry attempts'); + } - public async reportUsage(records: UsageRecord[]): Promise { + private async _sendUsage(records: UsageRecord[]): Promise { const reqArgs: RequestInit = { method: 'post', body: JSON.stringify(records), @@ -58,7 +107,7 @@ export class UsageReportingService { }; if (this.usageApiUrl.startsWith('https')) { - reqArgs.agent = this.httpAgent; + reqArgs.agent = this.httpsAgent; } return fetch(this.usageApiUrl, reqArgs); @@ -72,7 +121,7 @@ export class UsageReportingService { return url; } - private get httpAgent(): https.Agent { + private get httpsAgent(): https.Agent { if (this.agent) { return this.agent; } @@ -102,3 +151,5 @@ export class UsageReportingService { return this.agent; } } + +const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); diff --git a/x-pack/platform/plugins/shared/usage_api/tsconfig.json b/x-pack/platform/plugins/shared/usage_api/tsconfig.json index af1d24b889f8e..f8a95a958c6a2 100644 --- a/x-pack/platform/plugins/shared/usage_api/tsconfig.json +++ b/x-pack/platform/plugins/shared/usage_api/tsconfig.json @@ -10,6 +10,9 @@ "kbn_references": [ "@kbn/core", "@kbn/config-schema", + "@kbn/logging", + "@kbn/server-http-tools", + "@kbn/logging-mocks", ], "exclude": [ "target/**/*",