Skip to content

Commit 4f021d7

Browse files
[Response Ops][Reporting] Classifying CSV timeouts with row count warnings as user error (#241349)
Resolves #232421 ## Summary Adds a mechanism for specifying user errors when generating a report and classifying CSV reports that time out with a CSV row count warning as user errors. ## To Verify 1. Modify the following to force a timeout with a row count warning: ``` --- a/src/platform/packages/private/kbn-generate-csv/src/generate_csv.ts +++ b/src/platform/packages/private/kbn-generate-csv/src/generate_csv.ts @@ -426,7 +426,8 @@ export class CsvGenerator { // update iterator currentRecord += table.rows.length; - } while (totalRecords != null && currentRecord < totalRecords - 1); + } while (true); + // } while (totalRecords != null && currentRecord < totalRecords - 1); // Add warnings to be logged if (this.csvContainsFormulas && escapeFormulaValues) { ``` ``` --- a/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts +++ b/x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts @@ -406,7 +406,7 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType> cancellationToken.cancel(); jobTimedOut = true; resolve(); - }, this.queueTimeout); + }, 150 /* this.queueTimeout*/); ``` 2. Start ES and Kibana. Install the sample data logs and then navigate to Discover to export a CSV for the last 15 days. 3. In the logs, you should see the report end in a timeout ``` [2025-10-31T09:59:57.785-04:00][WARN ][plugins.reporting.csv-searchsource-export.execute-job:f34953e0-c127-4ac3-b3d4-5f917f2567ec] ES scroll returned fewer total hits than expected! [2025-10-31T09:59:57.785-04:00][WARN ][plugins.reporting.csv-searchsource-export.execute-job:f34953e0-c127-4ac3-b3d4-5f917f2567ec] Search result total hits: 2952. Row count: 189 [2025-10-31T09:59:57.785-04:00][ERROR][plugins.reporting.runTask.f34953e0-c127-4ac3-b3d4-5f917f2567ec] Error: ReportingError(code: queue_timeout_error) at operation (run_report.ts:594:25) at processTicksAndRejections (node:internal/process/task_queues:105:5) at retryOnError (retry_on_error.ts:32:20) at Object.run (run_report.ts:559:13) at TaskManagerRunner.run (task_runner.ts:419:22) [2025-10-31T09:59:58.665-04:00][INFO ][plugins.reporting.runTask] Job f34953e0-c127-4ac3-b3d4-5f917f2567ec failed on its last attempt and will not be retried. Error: ReportingError(code: queue_timeout_error). [2025-10-31T09:59:58.666-04:00][ERROR][plugins.taskManager] Task report:execute "edd4025a-daae-40c6-979e-52c3477651d4" failed: ReportingError(code: queue_timeout_error) ``` 4. Navigate to https://localhost:5601/api/task_manager/metrics?reset=false and see in `task_run.value.by_type.report:execute` that the `user_errors` field is incremented. 5. Undo the code changes and make sure normal report generation works as expected. --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent 2f2e11e commit 4f021d7

File tree

4 files changed

+147
-25
lines changed

4 files changed

+147
-25
lines changed

src/platform/packages/private/kbn-generate-csv/src/generate_csv.test.ts

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,7 @@ describe('CsvGenerator', () => {
494494
"rows": 0,
495495
},
496496
},
497+
"user_error": undefined,
497498
"warnings": Array [
498499
"Unable to close the Point-In-Time used for search. Check the Kibana server logs.",
499500
],
@@ -1451,6 +1452,84 @@ describe('CsvGenerator', () => {
14511452
);
14521453
});
14531454

1455+
it('will return warning and user_error if search results does not match expected total', async () => {
1456+
const mockJobUsingPitPaging = createMockJob({
1457+
columns: ['date', 'ip', 'message'],
1458+
pagingStrategy: 'pit',
1459+
});
1460+
mockDataClient.search = jest
1461+
.fn()
1462+
.mockImplementationOnce(() =>
1463+
Rx.of({
1464+
rawResponse: getMockRawResponse(
1465+
range(0, (HITS_TOTAL - 20) / 10).map(
1466+
() =>
1467+
({
1468+
fields: {
1469+
date: ['2020-12-31T00:14:28.000Z'],
1470+
ip: ['110.135.176.89'],
1471+
message: ['hit from the initial search'],
1472+
},
1473+
} as unknown as estypes.SearchHit)
1474+
),
1475+
HITS_TOTAL
1476+
),
1477+
})
1478+
)
1479+
.mockImplementation(() =>
1480+
Rx.of({
1481+
rawResponse: getMockRawResponse(
1482+
range(0, HITS_TOTAL / 10).map(
1483+
() =>
1484+
({
1485+
fields: {
1486+
date: ['2020-12-31T00:14:28.000Z'],
1487+
ip: ['110.135.176.89'],
1488+
message: ['hit from a subsequent scroll'],
1489+
},
1490+
} as unknown as estypes.SearchHit)
1491+
)
1492+
),
1493+
})
1494+
);
1495+
1496+
const generateCsv = new CsvGenerator(
1497+
mockJobUsingPitPaging,
1498+
mockConfig,
1499+
mockTaskInstanceFields,
1500+
{
1501+
es: mockEsClient,
1502+
data: mockDataClient,
1503+
uiSettings: uiSettingsClient,
1504+
},
1505+
{
1506+
searchSourceStart: mockSearchSourceService,
1507+
fieldFormatsRegistry: mockFieldFormatsRegistry,
1508+
},
1509+
new CancellationToken(),
1510+
mockLogger,
1511+
stream
1512+
);
1513+
const csvResult = await generateCsv.generateData();
1514+
expect(csvResult).toMatchInlineSnapshot(`
1515+
Object {
1516+
"content_type": "text/csv",
1517+
"csv_contains_formulas": false,
1518+
"error_code": undefined,
1519+
"max_size_reached": false,
1520+
"metrics": Object {
1521+
"csv": Object {
1522+
"rows": 108,
1523+
},
1524+
},
1525+
"user_error": true,
1526+
"warnings": Array [
1527+
"Encountered an error with the number of CSV rows generated from the search: expected 100, received 108.",
1528+
],
1529+
}
1530+
`);
1531+
});
1532+
14541533
it('will return partial data if the scroll or search fails', async () => {
14551534
mockDataClient.search = jest.fn().mockImplementation(() => {
14561535
throw new esErrors.ResponseError({
@@ -1488,6 +1567,7 @@ describe('CsvGenerator', () => {
14881567
"rows": 0,
14891568
},
14901569
},
1570+
"user_error": undefined,
14911571
"warnings": Array [
14921572
"Received a 500 response from Elasticsearch: my error",
14931573
"Encountered an error with the number of CSV rows generated from the search: expected rows were indeterminable, received 0.",
@@ -1540,6 +1620,7 @@ describe('CsvGenerator', () => {
15401620
"rows": 0,
15411621
},
15421622
},
1623+
"user_error": undefined,
15431624
"warnings": Array [
15441625
"Encountered an unknown error: An unknown error",
15451626
"Encountered an error with the number of CSV rows generated from the search: expected rows were indeterminable, received 0.",

src/platform/packages/private/kbn-generate-csv/src/generate_csv.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ export class CsvGenerator {
300300
const indexPatternTitle = index.getIndexPattern();
301301
const builder = new MaxSizeStringBuilder(this.stream, byteSizeValueToNumber(maxSizeBytes), bom);
302302
const warnings: string[] = [];
303+
let userError: boolean | undefined;
303304
let first = true;
304305
let currentRecord = -1;
305306
let totalRecords: number | undefined;
@@ -465,6 +466,7 @@ export class CsvGenerator {
465466
warnings.push(
466467
i18nTexts.csvRowCountError({ expected: totalRecords, received: this.csvRowCount })
467468
);
469+
userError = true;
468470
} else {
469471
warnings.push(i18nTexts.csvRowCountIndeterminable({ received: this.csvRowCount }));
470472
}
@@ -496,6 +498,7 @@ export class CsvGenerator {
496498
csv: { rows: this.csvRowCount },
497499
},
498500
warnings,
501+
user_error: userError,
499502
error_code: reportingError?.code,
500503
};
501504
}

src/platform/packages/private/kbn-reporting/common/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export interface TaskRunResult {
3333
max_size_reached?: boolean;
3434
warnings?: string[];
3535
metrics?: TaskRunMetrics;
36+
user_error?: boolean;
3637
/**
3738
* When running a report task we may finish with warnings that were triggered
3839
* by an error. We can pass the error code via the task run result to the

x-pack/platform/plugins/private/reporting/server/lib/tasks/run_report.ts

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import moment from 'moment';
99
import * as Rx from 'rxjs';
10-
import { timeout } from 'rxjs';
1110
import type { Writable } from 'stream';
1211
import type { FakeRawRequest, Headers } from '@kbn/core-http-server';
1312
import type { UpdateResponse } from '@elastic/elasticsearch/lib/api/types';
@@ -17,6 +16,7 @@ import {
1716
CancellationToken,
1817
KibanaShuttingDownError,
1918
MissingAuthenticationError,
19+
QueueTimeoutError,
2020
numberToDuration,
2121
} from '@kbn/reporting-common';
2222
import type {
@@ -28,6 +28,8 @@ import type {
2828
} from '@kbn/reporting-common/types';
2929
import { ScheduleType, decryptJobHeaders, type ReportingConfigType } from '@kbn/reporting-server';
3030
import {
31+
TaskErrorSource,
32+
createTaskRunError,
3133
throwRetryableError,
3234
type ConcreteTaskInstance,
3335
type RunContext,
@@ -99,6 +101,11 @@ export interface PrepareJobResults {
99101
scheduledReport?: SavedObject<ScheduledReportType>;
100102
}
101103

104+
interface PerformJobResults {
105+
result: TaskRunResult;
106+
timedOut: boolean;
107+
}
108+
102109
type ReportTaskParamsType = Record<string, any>;
103110

104111
export interface MaxAttempts {
@@ -379,7 +386,7 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
379386
taskInstanceFields,
380387
cancellationToken,
381388
stream,
382-
}: PerformJobOpts): Promise<TaskRunResult> {
389+
}: PerformJobOpts): Promise<PerformJobResults> {
383390
const exportType = this.exportTypesRegistry.getByJobType(task.jobtype);
384391
if (!exportType) {
385392
throw new Error(`No export type from ${task.jobtype} found to execute report`);
@@ -392,18 +399,30 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
392399
encryptedHeaders: task.payload.headers,
393400
});
394401

395-
return Rx.lastValueFrom(
396-
Rx.from(
397-
exportType.runTask({
398-
jobId: task.id,
399-
payload: task.payload,
400-
request,
401-
taskInstanceFields,
402-
cancellationToken,
403-
stream,
404-
})
405-
).pipe(timeout(this.queueTimeout)) // throw an error if a value is not emitted before timeout
406-
);
402+
// We use this internal timeout mechanism (vs relying solely on the task manager cancel function)
403+
// to handle scheduled exports that have been configured to retry multiple times within a single task run
404+
// because task manager does not retry recurring tasks.
405+
let jobTimedOut: boolean = false;
406+
const timerId = setTimeout(() => {
407+
cancellationToken.cancel();
408+
jobTimedOut = true;
409+
}, this.queueTimeout);
410+
411+
const runTaskPromise = exportType.runTask({
412+
jobId: task.id,
413+
payload: task.payload,
414+
request,
415+
taskInstanceFields,
416+
cancellationToken,
417+
stream,
418+
});
419+
420+
try {
421+
const result = await runTaskPromise;
422+
return { result, timedOut: jobTimedOut };
423+
} finally {
424+
clearTimeout(timerId);
425+
}
407426
}
408427

409428
protected async completeJob(
@@ -477,7 +496,8 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
477496
// Keep a separate local stack for each task run
478497
return ({ taskInstance, fakeRequest }: RunContext) => {
479498
let jobId: string;
480-
const cancellationToken = new CancellationToken();
499+
let output: PerformJobResults;
500+
let cancellationToken: CancellationToken | undefined;
481501
const { retryAt: taskRetryAt, startedAt: taskStartedAt } = taskInstance;
482502

483503
return {
@@ -543,6 +563,7 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
543563
retries,
544564
report,
545565
operation: async (rep: SavedReport) => {
566+
cancellationToken = new CancellationToken();
546567
// keep track of the number of times we try within the task
547568
atmpts = isNumber(atmpts) ? atmpts + 1 : undefined;
548569
const jobContentEncoding = this.getJobContentEncoding(jobType);
@@ -560,17 +581,21 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
560581
);
561582
eventLog.logExecutionStart();
562583

563-
const output = await Promise.race<TaskRunResult>([
584+
output = await Promise.race<PerformJobResults>([
564585
this.performJob({
565586
task,
566587
fakeRequest,
567588
taskInstanceFields: { retryAt: taskRetryAt, startedAt: taskStartedAt },
568-
cancellationToken,
589+
cancellationToken: cancellationToken!,
569590
stream,
570591
}),
571592
this.throwIfKibanaShutsDown(),
572593
]);
573594

595+
if (output.timedOut) {
596+
throw new QueueTimeoutError();
597+
}
598+
574599
stream.end();
575600

576601
logger.debug(`Begin waiting for the stream's pending callbacks...`);
@@ -581,27 +606,26 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
581606
rep._primary_term = stream.getPrimaryTerm()!;
582607

583608
const byteSize = stream.bytesWritten;
584-
eventLog.logExecutionComplete({ ...(output.metrics ?? {}), byteSize });
609+
eventLog.logExecutionComplete({ ...(output.result.metrics ?? {}), byteSize });
585610

586-
if (output) {
611+
if (output.result) {
587612
logger.debug(`Job output size: ${byteSize} bytes.`);
588613
// Update the job status to "completed"
589614
report = await this.completeJob(rep, isNumber(atmpts) ? atmpts : rep.attempts, {
590-
...output,
615+
...output.result,
591616
size: byteSize,
592617
});
593618

594619
await this.notify(
595620
report,
596621
taskInstance,
597-
output,
622+
output.result,
598623
byteSize,
599624
scheduledReport,
600625
task.payload.spaceId
601626
);
602627
}
603628

604-
// untrack the report for concurrency awareness
605629
logger.debug(`Stopping ${jobId}.`);
606630
},
607631
});
@@ -615,7 +639,9 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
615639
}
616640
);
617641

618-
cancellationToken.cancel();
642+
if (cancellationToken) {
643+
cancellationToken.cancel();
644+
}
619645

620646
if (isLastAttempt) {
621647
this.logger.info(
@@ -627,8 +653,17 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
627653
);
628654
}
629655

630-
throwRetryableError(failedToExecuteErr, new Date(Date.now() + TIME_BETWEEN_ATTEMPTS));
656+
let error = failedToExecuteErr;
657+
if (
658+
failedToExecuteErr instanceof QueueTimeoutError &&
659+
output?.result.user_error === true
660+
) {
661+
error = createTaskRunError(failedToExecuteErr, TaskErrorSource.USER);
662+
}
663+
664+
throwRetryableError(error, new Date(Date.now() + TIME_BETWEEN_ATTEMPTS));
631665
} finally {
666+
// untrack the report for concurrency awareness
632667
this.opts.reporting.untrackReport(jobId);
633668
logger.debug(`Reports running: ${this.opts.reporting.countConcurrentReports()}.`);
634669
}
@@ -642,7 +677,9 @@ export abstract class RunReportTask<TaskParams extends ReportTaskParamsType>
642677
if (jobId) {
643678
this.logger.get(jobId).warn(`Cancelling job ${jobId}...`);
644679
}
645-
cancellationToken.cancel();
680+
if (cancellationToken) {
681+
cancellationToken.cancel();
682+
}
646683
},
647684
};
648685
};

0 commit comments

Comments
 (0)