Skip to content

Commit c978573

Browse files
shahzad31CAWilson94
authored andcommitted
[Synthetics] Monitors sync request, retry on huge payload !! (elastic#202467)
## Summary Monitors sync request, retry on huge payload by splitting the payload !! Requests will be tried recursively by splitting payload in half !!
1 parent e165a7b commit c978573

File tree

4 files changed

+99
-19
lines changed

4 files changed

+99
-19
lines changed

x-pack/plugins/observability_solution/synthetics/server/routes/monitor_cruds/add_monitor_project.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { ProjectMonitor } from '../../../common/runtime_types';
1414
import { SYNTHETICS_API_URLS } from '../../../common/constants';
1515
import { ProjectMonitorFormatter } from '../../synthetics_service/project_monitor/project_monitor_formatter';
1616

17-
const MAX_PAYLOAD_SIZE = 1048576 * 50; // 20MiB
17+
const MAX_PAYLOAD_SIZE = 1048576 * 100; // 20MiB
1818

1919
export const addSyntheticsProjectMonitorRoute: SyntheticsRestApiRouteFactory = () => ({
2020
method: 'PUT',

x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.test.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,61 @@ describe('callAPI', () => {
470470
url: 'https://service.dev/monitors/sync',
471471
});
472472
});
473+
474+
it('splits the payload into multiple requests if the payload is too large', async () => {
475+
const requests: number[] = [];
476+
const axiosSpy = (axios as jest.MockedFunction<typeof axios>).mockImplementation((req: any) => {
477+
requests.push(req.data.monitors.length);
478+
if (req.data.monitors.length > 100) {
479+
// throw 413 error
480+
return Promise.reject({ response: { status: 413 } });
481+
}
482+
483+
return Promise.resolve({} as any);
484+
});
485+
486+
const apiClient = new ServiceAPIClient(
487+
logger,
488+
{
489+
manifestUrl: 'http://localhost:8080/api/manifest',
490+
tls: { certificate: 'test-certificate', key: 'test-key' } as any,
491+
},
492+
{
493+
isDev: true,
494+
stackVersion: '8.7.0',
495+
cloud: { cloudId: 'test-id', deploymentId: 'deployment-id' },
496+
} as SyntheticsServerSetup
497+
);
498+
499+
apiClient.locations = testLocations;
500+
501+
const output = { hosts: ['https://localhost:9200'], api_key: '12345' };
502+
503+
const monitors = new Array(250).fill({
504+
...request1[0],
505+
locations: [
506+
{
507+
id: 'us_central',
508+
isServiceManaged: true,
509+
},
510+
],
511+
});
512+
513+
await apiClient.syncMonitors({
514+
monitors,
515+
output,
516+
license: licenseMock.license,
517+
location: {
518+
id: 'us_central',
519+
url: 'https://service.dev',
520+
label: 'Test location',
521+
isServiceManaged: true,
522+
},
523+
});
524+
525+
expect(axiosSpy).toHaveBeenCalledTimes(7);
526+
expect(requests).toEqual([250, 125, 125, 63, 62, 63, 62]);
527+
});
473528
});
474529

475530
const testLocations: PublicLocations = [

x-pack/plugins/observability_solution/synthetics/server/synthetics_service/service_api_client.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
*/
77

88
import axios, { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios';
9-
import { forkJoin, from as rxjsFrom, Observable, of } from 'rxjs';
9+
import { concat, forkJoin, from as rxjsFrom, Observable, of } from 'rxjs';
1010
import { catchError, tap } from 'rxjs';
1111
import * as https from 'https';
1212
import { SslConfig } from '@kbn/server-http-tools';
@@ -215,21 +215,47 @@ export class ServiceAPIClient {
215215

216216
const monitorsByLocation = this.processServiceData(serviceData);
217217

218-
monitorsByLocation.forEach(({ location: { url, id }, monitors, data }) => {
219-
const promise = this.callServiceEndpoint(data, method, url, endpoint);
220-
promises.push(
221-
rxjsFrom(promise).pipe(
218+
monitorsByLocation.forEach(({ location: { url, id }, data }) => {
219+
const sendRequest = (payload: ServicePayload): Observable<any> => {
220+
const promise = this.callServiceEndpoint(payload, method, url, endpoint);
221+
return rxjsFrom(promise).pipe(
222222
tap((result) => {
223-
this.logSuccessMessage(url, method, monitors.length, result);
223+
this.logSuccessMessage(url, method, payload.monitors.length, result);
224224
}),
225225
catchError((err: AxiosError<{ reason: string; status: number }>) => {
226+
if (err.response?.status === 413 && payload.monitors.length > 1) {
227+
// If payload is too large, split it and retry
228+
const mid = Math.ceil(payload.monitors.length / 2);
229+
const firstHalfMonitors = payload.monitors.slice(0, mid);
230+
const secondHalfMonitors = payload.monitors.slice(mid);
231+
232+
this.logger.debug(
233+
`Payload of ${payload.monitors.length} monitors is too large for location ${id}, splitting in half, in chunks of ${mid}`
234+
);
235+
236+
return concat(
237+
sendRequest({
238+
...payload,
239+
monitors: firstHalfMonitors,
240+
}), // Retry with the first half
241+
sendRequest({
242+
...payload,
243+
monitors: secondHalfMonitors,
244+
}) // Retry with the second half
245+
);
246+
}
247+
226248
pushErrors.push({ locationId: id, error: err.response?.data! });
227-
this.logServiceError(err, url, method, monitors.length);
228-
// we don't want to throw an unhandled exception here
249+
this.logServiceError(err, url, method, payload.monitors.length);
250+
251+
// Return an empty observable to prevent unhandled exceptions
229252
return of(true);
230253
})
231-
)
232-
);
254+
);
255+
};
256+
257+
// Start with the initial data payload
258+
promises.push(sendRequest(data));
233259
});
234260

235261
const result = await forkJoin(promises).toPromise();

x-pack/plugins/observability_solution/synthetics/server/synthetics_service/synthetics_service.ts

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ export class SyntheticsService {
152152
service.locations = result.locations;
153153
service.apiClient.locations = result.locations;
154154
this.logger.debug(
155-
`Fetched ${service.locations} Synthetics service locations from manifest: ${this.config.manifestUrl}`
155+
`Fetched ${service.locations
156+
.map((loc) => loc.id)
157+
.join(',')} Synthetics service locations from manifest: ${this.config.manifestUrl}`
156158
);
157159
} catch (e) {
158160
this.logger.error(e);
@@ -167,7 +169,7 @@ export class SyntheticsService {
167169
[SYNTHETICS_SERVICE_SYNC_MONITORS_TASK_TYPE]: {
168170
title: 'Synthetics Service - Sync Saved Monitors',
169171
description: 'This task periodically pushes saved monitors to Synthetics Service.',
170-
timeout: '1m',
172+
timeout: '2m',
171173
maxAttempts: 3,
172174

173175
createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => {
@@ -670,22 +672,19 @@ export class SyntheticsService {
670672

671673
if (lastRunAt) {
672674
// log if it has missed last schedule
673-
const diff = moment(lastRunAt).diff(current, 'minutes');
675+
const diff = moment(current).diff(lastRunAt, 'minutes');
674676
const syncInterval = Number((this.config.syncInterval ?? '5m').split('m')[0]);
675677
if (diff > syncInterval) {
676-
const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} ago.`;
678+
const message = `Synthetics monitor sync task has missed its schedule, it last ran ${diff} minutes ago.`;
677679
this.logger.warn(message);
678680
sendErrorTelemetryEvents(this.logger, this.server.telemetry, {
679681
message,
680682
reason: 'Failed to run synthetics sync task on schedule',
681683
type: 'syncTaskMissedSchedule',
682684
stackVersion: this.server.stackVersion,
683685
});
684-
} else {
685-
this.logger.debug(
686-
`Synthetics monitor sync task is running as expected, it last ran ${diff} minutes ago.`
687-
);
688686
}
687+
this.logger.debug(`Synthetics monitor sync task last ran ${diff} minutes ago.`);
689688
}
690689
state.lastRunAt = current.toISOString();
691690
} catch (e) {

0 commit comments

Comments
 (0)