Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependsOn:
- '@kbn/workflows-extensions'
- '@kbn/licensing-plugin'
- '@kbn/usage-api-plugin'
- '@kbn/server-http-tools'
- '@kbn/logging-mocks'
tags:
- plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,3 @@ export const BUCKET_SIZE_MINUTES = 5;

/** Duration normalization bucket size in milliseconds. */
export const BUCKET_SIZE_MS = BUCKET_SIZE_MINUTES * 60 * 1000;

/** Maximum number of retry attempts for sending usage records. */
export const METERING_RETRY_ATTEMPTS = 3;

/** Base delay between retries in milliseconds (exponential backoff applied). */
export const METERING_RETRY_BASE_DELAY_MS = 1000;
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,3 @@
*/

export { WorkflowsMeteringService } from './metering_service';
export type { UsageMetrics, UsageRecord, UsageSource } from './types';
export { UsageReportingService } from './usage_reporting_service';
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@

import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { Logger } from '@kbn/core/server';
import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server';
import type { EsWorkflowExecution } from '@kbn/workflows';
import { ExecutionStatus } from '@kbn/workflows';

import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants';
import { WorkflowsMeteringService } from './metering_service';
import type { UsageRecord } from './types';
import type { UsageReportingService } from './usage_reporting_service';

const createMockExecution = (
overrides: Partial<EsWorkflowExecution> = {}
Expand Down Expand Up @@ -80,8 +79,6 @@ describe('WorkflowsMeteringService', () => {
mockUsageReportingService = createMockUsageReportingService();
mockLogger = createMockLogger();
meteringService = new WorkflowsMeteringService(mockUsageReportingService, mockLogger);
// Mock the internal delay to be instant so tests run fast
jest.spyOn(WorkflowsMeteringService.prototype as any, 'delay').mockResolvedValue(undefined);
});

afterEach(() => {
Expand Down Expand Up @@ -312,72 +309,6 @@ describe('WorkflowsMeteringService', () => {
});
});

describe('retry logic', () => {
it('should retry on failure and succeed on second attempt', async () => {
mockUsageReportingService.reportUsage
.mockResolvedValueOnce({ ok: false, status: 500 } as any)
.mockResolvedValueOnce({ ok: true, status: 200 } as any);

const execution = createMockExecution();
const cloudSetup = createMockCloudSetup();

await meteringService.reportWorkflowExecution(execution, cloudSetup);

expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2);
expect(mockLogger.debug).toHaveBeenCalledWith(
expect.stringContaining('Successfully reported metering')
);
});

it('should log error after all retries exhausted', async () => {
mockUsageReportingService.reportUsage.mockResolvedValue({
ok: false,
status: 503,
} as any);

const execution = createMockExecution();
const cloudSetup = createMockCloudSetup();

await meteringService.reportWorkflowExecution(execution, cloudSetup);

expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(3);
expect(mockLogger.error).toHaveBeenCalledWith(
expect.stringContaining('Failed to report workflow metering')
);
});

it('should retry on network errors', async () => {
mockUsageReportingService.reportUsage
.mockRejectedValueOnce(new Error('ECONNREFUSED'))
.mockResolvedValueOnce({ ok: true, status: 200 } as any);

const execution = createMockExecution();
const cloudSetup = createMockCloudSetup();

await meteringService.reportWorkflowExecution(execution, cloudSetup);

expect(mockUsageReportingService.reportUsage).toHaveBeenCalledTimes(2);
expect(mockLogger.warn).toHaveBeenCalledWith(expect.stringContaining('ECONNREFUSED'));
});

it('should not throw when metering fails (errors are caught internally)', async () => {
mockUsageReportingService.reportUsage.mockRejectedValue(new Error('catastrophic failure'));

const execution = createMockExecution();
const cloudSetup = createMockCloudSetup();

// Should not throw - errors are caught internally
await expect(
meteringService.reportWorkflowExecution(execution, cloudSetup)
).resolves.toBeUndefined();

// Error should be logged
expect(mockLogger.error).toHaveBeenCalledWith(
expect.stringContaining('Failed to report workflow metering')
);
});
});

describe('environment detection', () => {
it('should prefer projectId over deploymentId', async () => {
const execution = createMockExecution();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,11 @@

import type { CloudSetup } from '@kbn/cloud-plugin/server';
import type { Logger } from '@kbn/core/server';
import type { UsageRecord, UsageReportingService } from '@kbn/usage-api-plugin/server';
import type { EsWorkflowExecution } from '@kbn/workflows';
import { ExecutionStatus, isTerminalStatus } from '@kbn/workflows';

import {
BUCKET_SIZE_MS,
METERING_RETRY_ATTEMPTS,
METERING_RETRY_BASE_DELAY_MS,
METERING_SOURCE_ID,
WORKFLOWS_USAGE_TYPE,
} from './constants';
import type { UsageRecord } from './types';
import type { UsageReportingService } from './usage_reporting_service';
import { BUCKET_SIZE_MS, METERING_SOURCE_ID, WORKFLOWS_USAGE_TYPE } from './constants';

/**
* Workflows Metering Service - Stage 1 of the billing pipeline.
Expand Down Expand Up @@ -73,7 +66,7 @@ export class WorkflowsMeteringService {
const usageRecord = this.buildUsageRecord(execution, instanceGroupId);

try {
await this.sendWithRetry(usageRecord);
await this.usageReportingService.reportUsage([usageRecord]);
} catch (err) {
// Log with billing-relevant details per monitoring requirements:
// project ID, type, and count for impact assessment
Expand Down Expand Up @@ -150,51 +143,4 @@ export class WorkflowsMeteringService {

return stepTypes;
}

/**
* Sends a usage record with inline retry and exponential backoff.
* Per billing team guidance: data loss is preferable to overbilling,
* so we retry a few times then give up (logged at error level).
*/
private async sendWithRetry(record: UsageRecord): Promise<void> {
let lastError: Error | undefined;

for (let attempt = 0; attempt < METERING_RETRY_ATTEMPTS; attempt++) {
Comment on lines -159 to -162
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retry logic moved to the usageReportingService

try {
const response = await this.usageReportingService.reportUsage([record]);

if (response.ok) {
this.logger.debug(
`Successfully reported metering for execution ${record.id} (attempt ${attempt + 1})`
);
return;
}

lastError = new Error(`Usage API responded with status ${response.status}`);
this.logger.warn(
`Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${
lastError.message
}`
);
} catch (err) {
lastError = err instanceof Error ? err : new Error(String(err));
this.logger.warn(
`Metering report attempt ${attempt + 1}/${METERING_RETRY_ATTEMPTS} failed: ${
lastError.message
}`
);
}

// Exponential backoff before next retry (skip delay on last attempt)
if (attempt < METERING_RETRY_ATTEMPTS - 1) {
await this.delay(METERING_RETRY_BASE_DELAY_MS * Math.pow(2, attempt));
}
}

throw lastError || new Error('Metering report failed after all retry attempts');
}

private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import { checkLicense } from './lib/check_license';
import { getAuthenticatedUser } from './lib/get_user';
import { WorkflowExecutionTelemetryClient } from './lib/telemetry/workflow_execution_telemetry_client';
import { WorkflowsMeteringService } from './metering/metering_service';
import { UsageReportingService } from './metering/usage_reporting_service';
import { initializeLogsRepositoryDataStream } from './repositories/logs_repository/data_stream';
import { WorkflowExecutionRepository } from './repositories/workflow_execution_repository';
import type {
Expand Down Expand Up @@ -71,7 +70,6 @@ export class WorkflowsExecutionEnginePlugin
{
private readonly logger: Logger;
private readonly config: WorkflowsExecutionEngineConfig;
private readonly kibanaVersion: string;
private concurrencyManager!: ConcurrencyManager;
private setupDependencies?: SetupDependencies;
private meteringService?: WorkflowsMeteringService;
Expand All @@ -80,7 +78,6 @@ export class WorkflowsExecutionEnginePlugin
constructor(initializerContext: PluginInitializerContext) {
this.logger = initializerContext.logger.get();
this.config = initializerContext.config.get<WorkflowsExecutionEngineConfig>();
this.kibanaVersion = initializerContext.env.packageInfo.version;
}

public setup(
Expand All @@ -101,11 +98,9 @@ export class WorkflowsExecutionEnginePlugin
this.setupDependencies = setupDependencies;

// Initialize metering from the centralized Usage API plugin
const usageApiConfig = plugins.usageApi?.config;
if (usageApiConfig?.enabled && usageApiConfig.url) {
const usageReportingService = new UsageReportingService(usageApiConfig, this.kibanaVersion);
if (plugins.usageApi?.usageReporting) {
this.meteringService = new WorkflowsMeteringService(
usageReportingService,
plugins.usageApi?.usageReporting,
this.logger.get('workflowsMetering')
);
this.logger.debug('Workflows metering service initialized');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"@kbn/workflows-extensions",
"@kbn/licensing-plugin",
"@kbn/usage-api-plugin",
"@kbn/server-http-tools",
"@kbn/logging-mocks"
],
"exclude": ["target/**/*"]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/platform/plugins/shared/usage_api/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ project:
dependsOn:
- '@kbn/core'
- '@kbn/config-schema'
- '@kbn/logging'
- '@kbn/server-http-tools'
- '@kbn/logging-mocks'
tags:
- plugin
- prod
Expand Down
7 changes: 7 additions & 0 deletions x-pack/platform/plugins/shared/usage_api/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
import type { PluginInitializerContext } from '@kbn/core/server';

export type { UsageApiSetup, UsageApiStart } from './plugin';
export type {
UsageReportingService,
UsageRecord,
UsageMetrics,
UsageSource,
} from './usage_reporting';
export { config } from './config';

export const plugin = async (initializerContext: PluginInitializerContext) => {
const { UsageApiPlugin } = await import('./plugin');
return new UsageApiPlugin(initializerContext);
Expand Down
4 changes: 4 additions & 0 deletions x-pack/platform/plugins/shared/usage_api/server/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { UsageApiConfigType } from './config';
import type { UsageReportingService } from './usage_reporting';

export const usageApiPluginMock = {
createSetupContract: (configOverride: Partial<UsageApiConfigType> = {}) => {
Expand All @@ -14,6 +15,9 @@ export const usageApiPluginMock = {
enabled: false,
...configOverride,
},
usageReporting: {
reportUsage: jest.fn(),
} as unknown as jest.Mocked<UsageReportingService>,
};
},
};
Loading
Loading