generated from MapColonies/ts-server-boilerplate
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtileMergeTaskManager.spec.ts
More file actions
507 lines (388 loc) · 19.3 KB
/
tileMergeTaskManager.spec.ts
File metadata and controls
507 lines (388 loc) · 19.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
/* eslint-disable jest/no-commented-out-tests */
import { randomUUID } from 'crypto';
import nock from 'nock';
import { TileOutputFormat } from '@map-colonies/raster-shared';
import { ConflictError } from '@map-colonies/error-types';
import { configMock, registerDefaultConfig } from '../../mocks/configMock';
import { ingestionNewJob } from '../../mocks/jobsMockData';
import type { MergeTaskParameters, JobResumeState, MergeTilesTaskParams } from '../../../../src/common/interfaces';
import { IngestionCreateTasksTask } from '../../../../src/utils/zod/schemas/job.schema';
import { createFakeTask } from '../../mocks/tasksMockData';
import {
createMergeTilesTaskParams,
createTaskGenerator,
type MergeTilesTaskBuilderContext,
setupMergeTilesTaskBuilderTest,
} from './tileMergeTaskManagerSetup';
describe('tileMergeTaskManager', () => {
const buildTasksParams = createMergeTilesTaskParams();
let testContext: MergeTilesTaskBuilderContext;
let mockInitTask: IngestionCreateTasksTask;
beforeEach(() => {
registerDefaultConfig();
testContext = setupMergeTilesTaskBuilderTest();
mockInitTask = createFakeTask<IngestionCreateTasksTask['parameters']>();
});
afterEach(() => {
jest.clearAllMocks();
});
describe('buildTasks', () => {
it('should handle initTask (fresh start)', async () => {
const { tileMergeTaskManager } = testContext;
// No taskIndex set - should default to fresh start
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Collect tasks to verify fresh start works correctly
const taskSample: MergeTaskParameters[] = [];
const taskResumeSample: JobResumeState[] = [];
const maxSamples = 5;
for await (const task of tasks) {
taskSample.push(task.mergeTasksGenerator);
taskResumeSample.push(task.latestTaskIndex);
if (taskSample.length >= maxSamples) {
break;
}
}
// Fresh start should generate tasks normally
expect(taskSample).toHaveLength(5);
taskResumeSample.forEach((taskIndex) => {
expect(taskIndex.zoomLevel).toBeGreaterThanOrEqual(0);
expect(taskIndex.lastInsertedTaskIndex).toBeGreaterThanOrEqual(0);
});
taskSample.forEach((task) => {
expect(task.sources.length).toBeGreaterThan(0);
expect(task.isNewTarget).toBe(true);
expect(task.targetFormat).toBe(TileOutputFormat.PNG);
expect(task.sources.length).toBeGreaterThan(0);
expect(task.sources[0].path).toBe('layerRelativePath');
expect(task.batches.length).toBeGreaterThan(0);
});
});
it('should handle errors in buildTasks correctly', () => {
const { tileMergeTaskManager } = testContext;
const invalidBuildTasksParams: MergeTilesTaskParams = {
...buildTasksParams,
inputFiles: {
...buildTasksParams.inputFiles,
gpkgFilesPath: ['file1.gpkg', 'file2.gpkg'],
},
};
const action = () => tileMergeTaskManager.buildTasks(invalidBuildTasksParams, mockInitTask);
expect(action).toThrow();
});
it('resume state initialization - should handle initTask with valid resume parameters', async () => {
const { tileMergeTaskManager } = testContext;
const resumeZoomLevel = 6;
mockInitTask.parameters.latestTaskState = {
zoomLevel: resumeZoomLevel,
lastInsertedTaskIndex: 2,
};
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Collect a small sample of tasks to verify business logic
const taskSample: MergeTaskParameters[] = [];
const taskResumeSample: JobResumeState[] = [];
const maxSamples = 5;
for await (const task of tasks) {
taskSample.push(task.mergeTasksGenerator);
taskResumeSample.push(task.latestTaskIndex);
if (taskSample.length >= maxSamples) {
break;
}
}
// Explicit business logic assertions
expect(taskSample).toHaveLength(5);
taskResumeSample.forEach((taskIndex) => {
expect(taskIndex.zoomLevel).toBeLessThanOrEqual(resumeZoomLevel);
expect(taskIndex.lastInsertedTaskIndex).toBeGreaterThanOrEqual(0);
});
taskSample.forEach((task) => {
expect(task.sources.length).toBeGreaterThan(0);
expect(task.batches.length).toBeGreaterThan(0);
});
});
it('resume state initialization - should handle initTask with boundary resume parameters', async () => {
const { tileMergeTaskManager } = testContext;
mockInitTask.parameters.latestTaskState = {
zoomLevel: 0, // Minimum zoom
lastInsertedTaskIndex: 0, // Start index
};
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Collect tasks to verify boundary conditions work
const taskSample: MergeTaskParameters[] = [];
const taskResumeSample: JobResumeState[] = [];
const maxSamples = 2;
for await (const task of tasks) {
taskSample.push(task.mergeTasksGenerator);
taskResumeSample.push(task.latestTaskIndex);
if (taskSample.length >= maxSamples) {
break;
}
}
expect(taskSample.length).toBeGreaterThan(0);
taskResumeSample.forEach((taskIndex) => {
expect(taskIndex.zoomLevel).toBe(0);
expect(taskIndex.lastInsertedTaskIndex).toBeGreaterThanOrEqual(0);
});
taskSample.forEach((task) => {
expect(task.sources.length).toBeGreaterThan(0);
expect(task.batches.length).toBeGreaterThan(0);
});
});
it('resume state initialization - should handle high skip index values', async () => {
const { tileMergeTaskManager } = testContext;
mockInitTask.parameters.latestTaskState = {
zoomLevel: 4,
lastInsertedTaskIndex: 999, // High skip value
};
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Try to collect tasks with high skip value
const taskSample: MergeTaskParameters[] = [];
const taskResumeSample: JobResumeState[] = [];
for await (const task of tasks) {
taskSample.push(task.mergeTasksGenerator);
taskResumeSample.push(task.latestTaskIndex);
}
// High skip values should not crash the system
expect(taskSample.length).toBeGreaterThanOrEqual(0); // Likely 0 since skip value is very high
// If any tasks are generated, they should be valid
taskSample.forEach((task) => {
expect(task.targetFormat).toBe(TileOutputFormat.PNG);
expect(task.isNewTarget).toBe(true);
});
taskResumeSample.forEach((taskIndex) => {
expect(taskIndex.zoomLevel).toBeLessThanOrEqual(4);
expect(taskIndex.lastInsertedTaskIndex).toBeGreaterThanOrEqual(0);
});
const zoomLevels = taskResumeSample.map((taskIndex) => taskIndex.zoomLevel);
expect(zoomLevels.filter((zoom) => zoom === 0).length).toBeGreaterThan(0);
expect(zoomLevels.filter((zoom) => zoom === 1).length).toBeGreaterThan(0);
expect(zoomLevels.filter((zoom) => zoom === 2).length).toBeGreaterThan(0);
expect(zoomLevels.filter((zoom) => zoom === 3).length).toBeGreaterThan(0);
});
it('should handle malformed resume parameters gracefully', async () => {
const { tileMergeTaskManager } = testContext;
// Set malformed taskIndex
mockInitTask.parameters.latestTaskState = {
zoomLevel: -1, // Invalid zoom level
lastInsertedTaskIndex: -5, // Invalid index
};
// Should not throw error - implementation should handle gracefully
expect(() => {
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
expect(tasks).toBeDefined();
}).not.toThrow();
// Verify no tasks are generated with malformed parameters
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
const taskSample: MergeTaskParameters[] = [];
const taskResumeSample: JobResumeState[] = [];
const maxSamples = 5;
for await (const task of tasks) {
taskSample.push(task.mergeTasksGenerator);
taskResumeSample.push(task.latestTaskIndex);
if (taskSample.length >= maxSamples) {
break;
}
}
// Expect no tasks to be pushed with malformed parameters
expect(taskSample).toHaveLength(0);
// Spy on updateTask to verify it's not called
const updateTaskSpy = jest.spyOn(tileMergeTaskManager['queueClient'].jobManagerClient, 'updateTask');
const jobId = randomUUID();
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
// Expect updateJob to not be called when no tasks are processed
expect(updateTaskSpy).not.toHaveBeenCalled();
updateTaskSpy.mockRestore();
});
it('should complete full recovery cycle: build -> push with resume state', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
// Simulate recovery scenario
mockInitTask.parameters.latestTaskState = {
zoomLevel: 3,
lastInsertedTaskIndex: 0,
};
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const createTasksPath = `/jobs/${jobId}/tasks`;
const updateTaskPath = `/jobs/${jobId}/tasks/${mockInitTask.id}`;
nock(jobManagerBaseUrl).post(createTasksPath).reply(200).persist();
nock(jobManagerBaseUrl).put(updateTaskPath).reply(200).persist();
// Execute full recovery cycle
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
let error: Error | null = null;
try {
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
} catch (err) {
error = err as Error;
}
// Verify successful completion
expect(error).toBeNull();
});
});
// describe('calculateIntersectionState', () => {
// // Test case 1: No intersection found
// it('should return null intersection when no overlap is found', () => {
// const { tileMergeTaskManager } = testContext;
// const { input } = testData[0];
// const result = tileMergeTaskManager['calculateIntersectionState'](input.state, input.subGroupFootprints);
// expect(result.currentIntersection).toBeNull();
// expect(result.accumulatedIntersection).toBeNull();
// });
// // Test case 2: Intersection found, no accumulated overlap
// it('should return intersection when found with no previous accumulated overlap', () => {
// const { tileMergeTaskManager } = testContext;
// const { input } = testData[1];
// const result = tileMergeTaskManager['calculateIntersectionState'](input.state, input.subGroupFootprints);
// const isCurrentEqualAccumulated = booleanEqual(result.currentIntersection as Feature, result.accumulatedIntersection as Feature);
// expect(result.currentIntersection).not.toBeNull();
// expect(result.accumulatedIntersection).not.toBeNull();
// expect(isCurrentEqualAccumulated).toBe(true);
// });
// // Test case 3: Intersection found, with accumulated overlap, new intersection
// it('should return new intersection and updated accumulated overlap', () => {
// const { tileMergeTaskManager } = testContext;
// const { input } = testData[2];
// const result = tileMergeTaskManager['calculateIntersectionState'](input.state, input.subGroupFootprints);
// const isCurrentEqualAccumulated = booleanEqual(result.currentIntersection as Feature, result.accumulatedIntersection as Feature);
// expect(result.currentIntersection).not.toBeNull();
// expect(result.accumulatedIntersection).not.toBeNull();
// expect(isCurrentEqualAccumulated).toBe(false);
// });
// // Test case 4: Intersection found, with accumulated overlap, no new intersection
// it('should return null intersection when new intersection is fully within accumulated overlap', () => {
// const { tileMergeTaskManager } = testContext;
// const { input } = testData[2];
// input.state.accumulatedIntersection = {
// type: 'Polygon',
// coordinates: [
// [
// [2, 2],
// [3, 2],
// [3, 3],
// [2, 3],
// [2, 2],
// ],
// ],
// };
// const result = tileMergeTaskManager['calculateIntersectionState'](input.state, input.subGroupFootprints);
// expect(result.currentIntersection).toBeNull();
// expect(result.accumulatedIntersection).not.toBeNull();
// });
// });
describe('pushTasks', () => {
it('should push tasks in batches correctly', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const path = `/jobs/${jobId}/tasks`;
const updatePath = `/jobs/${jobId}/tasks/${mockInitTask.id}`;
nock(jobManagerBaseUrl).post(path).reply(200).persist();
nock(jobManagerBaseUrl).put(updatePath).reply(200).persist();
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
let error: Error | null = null;
try {
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
} catch (err) {
error = err as Error;
}
expect(error).toBeNull();
});
it('should push leftover tasks correctly', async () => {
const { tileMergeTaskManager } = testContext;
const taskBatchSize = configMock.get<number>('jobManagement.ingestion.tasks.tilesMerging.taskBatchSize');
const numberOfTasks = 3;
const enqueueTasksTotal = Math.ceil(numberOfTasks / taskBatchSize);
const jobId = randomUUID();
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const path = `/jobs/${jobId}/tasks`;
const updatePath = `/jobs/${jobId}/tasks/${mockInitTask.id}`;
nock(jobManagerBaseUrl).post(path).reply(200).persist();
nock(jobManagerBaseUrl).put(updatePath).reply(200).persist();
const tasks = createTaskGenerator(numberOfTasks);
const enqueueTasksSpy = jest.spyOn(tileMergeTaskManager as unknown as { enqueueTasks: jest.Func }, 'enqueueTasks');
let error: Error | null = null;
try {
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
} catch (err) {
error = err as Error;
}
expect(error).toBeNull();
expect(enqueueTasksSpy).toHaveBeenCalledTimes(enqueueTasksTotal);
});
it('should handle errors in pushTasks correctly', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const path = `/jobs/${jobId}/tasks`;
nock(jobManagerBaseUrl).post(path).reply(500).persist();
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
const action = async () => tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
await expect(action).rejects.toThrow();
});
it('should update initTask progress after pushTasks', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
// Setup HTTP mocks for job manager interactions
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const createTasksPath = `/jobs/${jobId}/tasks`;
const updateTaskPath = `/jobs/${jobId}/tasks/${mockInitTask.id}`;
nock(jobManagerBaseUrl).post(createTasksPath).reply(200).persist();
nock(jobManagerBaseUrl).put(updateTaskPath).reply(200).persist();
// Spy on the job manager client methods
const updateTaskSpy = jest.spyOn(tileMergeTaskManager['queueClient'].jobManagerClient, 'updateTask');
const createTaskSpy = jest.spyOn(tileMergeTaskManager['queueClient'].jobManagerClient, 'createTaskForJob');
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
// Verify progress tracking occurred
expect(updateTaskSpy).toHaveBeenCalled();
expect(updateTaskSpy.mock.calls.length).toBeGreaterThan(0);
updateTaskSpy.mockRestore();
createTaskSpy.mockRestore();
});
it('resume state initialization - should handle progress tracking with resume state on pushTasks', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
// Set resume state
mockInitTask.parameters.latestTaskState = {
zoomLevel: 4,
lastInsertedTaskIndex: 1,
};
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const createTasksPath = `/jobs/${jobId}/tasks`;
const updateTaskPath = `/jobs/${jobId}/tasks/${mockInitTask.id}`;
nock(jobManagerBaseUrl).post(createTasksPath).reply(200).persist();
nock(jobManagerBaseUrl).put(updateTaskPath).reply(200).persist();
const updateTaskSpy = jest.spyOn(tileMergeTaskManager['queueClient'].jobManagerClient, 'updateTask');
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
await tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks);
// Verify that progress tracking works even with resume state
expect(updateTaskSpy).toHaveBeenCalled();
updateTaskSpy.mockRestore();
});
it('should handle recovery when HTTP requests fail', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
// Setup HTTP mocks to simulate failures
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const createTasksPath = `/jobs/${jobId}/tasks`;
nock(jobManagerBaseUrl).post(createTasksPath).reply(500).persist();
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Should reject when HTTP requests fail
await expect(tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks)).rejects.toThrow();
});
it('should handle ConflictError during batch publishing and stop immediately', async () => {
const { tileMergeTaskManager } = testContext;
const jobId = randomUUID();
// Setup HTTP mock to return 409 Conflict
const jobManagerBaseUrl = configMock.get<string>('jobManagement.config.jobManagerBaseUrl');
const createTasksPath = `/jobs/${jobId}/tasks`;
nock(jobManagerBaseUrl).post(createTasksPath).reply(409, { message: 'Job already aborted' }).persist();
const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask);
// Spy on enqueueTasks to verify it's only called once (stops after conflict)
const enqueueTasksSpy = jest.spyOn(tileMergeTaskManager as unknown as { enqueueTasks: jest.Func }, 'enqueueTasks');
// Should throw ConflictError when job manager returns 409
await expect(tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks)).rejects.toThrow(ConflictError);
// Verify that enqueueTasks was called (at least once before the conflict)
expect(enqueueTasksSpy).toHaveBeenCalled();
enqueueTasksSpy.mockRestore();
});
});
});