Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-and-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
tags:
- 'v*'
workflow_dispatch:

jobs:
build_and_push:
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
}
},
"queue": {
"jobManagerBaseUrl": "http//localhost:8080",
"heartbeatBaseUrl": "http//localhost:8081",
"jobManagerBaseUrl": "http://localhost:8080",
"heartbeatBaseUrl": "http://localhost:8081",
"heartbeatIntervalMs": 1000,
"dequeueIntervalMs": 3000
},
Expand Down
321 changes: 269 additions & 52 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const SERVICES = {
ERROR_HANDLER: Symbol('ErrorHandler'),
STRATEGY_FACTORY: Symbol('StrategyFactory'),
TASK_VALIDATOR: Symbol('TaskValidator'),
POLLING_PAIRS: Symbol('PollingPairs'),
// =============================================================================
// TODO: When we move to the new job-manager, we will use @map-colonies/jobnik-sdk
// The tokens below are kept for future migration.
Expand Down
35 changes: 34 additions & 1 deletion src/containerConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import { instancePerContainerCachingFactory } from 'tsyringe';
import { DependencyContainer } from 'tsyringe/dist/typings/types';
import jsLogger, { Logger } from '@map-colonies/js-logger';
import { IWorker, JobnikSDK } from '@map-colonies/jobnik-sdk';
import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import { InjectionObject, registerDependencies } from '@common/dependencyRegistration';
import { SERVICES, SERVICE_NAME } from '@common/constants';
import { getTracing } from '@common/tracing';
import type { QueueConfig } from './cleaner/types';
import { ConfigType, getConfig } from './common/config';
import { workerBuilder } from './worker';
import { StrategyFactory } from './cleaner/strategies';
import { StrategyFactory, TilesDeletionStrategy } from './cleaner/strategies';
import { ErrorHandler } from './cleaner/errors';

export interface RegisterOptions {
override?: InjectionObject<unknown>[];
Expand Down Expand Up @@ -49,6 +52,24 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
}),
},
},
{
token: SERVICES.QUEUE_CLIENT,
provider: {
useFactory: instancePerContainerCachingFactory((container) => {
const logger = container.resolve<Logger>(SERVICES.LOGGER);
const config = container.resolve<ConfigType>(SERVICES.CONFIG);
const queueConfig = config.get('queue') as QueueConfig;

return new QueueClient(
logger,
queueConfig.jobManagerBaseUrl,
queueConfig.heartbeatBaseUrl,
queueConfig.dequeueIntervalMs,
queueConfig.heartbeatIntervalMs
);
}),
},
},
{
token: SERVICES.WORKER,
provider: {
Expand All @@ -61,6 +82,18 @@ export const registerExternalValues = async (options?: RegisterOptions): Promise
useClass: StrategyFactory,
},
},
{
token: SERVICES.ERROR_HANDLER,
provider: {
useClass: ErrorHandler,
},
},
{
token: configInstance.get('jobDefinitions.tasks.tilesDeletion.type') as unknown as string, //TODO: when we create worker config schema we can move this to a constant and remove the cast
provider: {
useClass: TilesDeletionStrategy,
},
},
{
token: 'onSignal',
provider: {
Expand Down
15 changes: 1 addition & 14 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1 @@
import type { IWorker } from '@map-colonies/jobnik-sdk';
import type { FactoryFunction } from 'tsyringe';

/**
* Worker factory function.
* TODO: Implement worker creation once TaskPoller is implemented.
* For now, this is a stub to satisfy the containerConfig registration.
*/
export const workerBuilder: FactoryFunction<IWorker> = () => {
// TODO: Replace with actual worker implementation
// When TaskPoller is ready, this will create and configure the worker
// For the skeleton, we return a minimal stub
throw new Error('Worker not implemented - TaskPoller integration pending');
};
export { workerBuilder } from './worker/workerBuilder';
144 changes: 144 additions & 0 deletions src/worker/taskPoller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { setTimeout as sleep } from 'timers/promises';
import { inject, injectable } from 'tsyringe';
import type { Logger } from '@map-colonies/js-logger';
import type { TaskHandler as QueueClient, ITaskResponse } from '@map-colonies/mc-priority-queue';
import type { IWorker } from '@map-colonies/jobnik-sdk';
import { SERVICES } from '@common/constants';
import type { ConfigType } from '@common/config';
import type { PollingPairConfig } from '../cleaner/types';
import type { StrategyFactory } from '../cleaner/strategies';
import { UnrecoverableError, type ErrorHandler } from '../cleaner/errors';

/**
* TaskPoller - Simple bridge to implement IWorker using the old mc-priority-queue SDK
*/
@injectable()
class TaskPoller implements IWorker {
private shouldStop = false;
private readonly dequeueIntervalMs: number;

public constructor(
@inject(SERVICES.LOGGER) private readonly logger: Logger,
@inject(SERVICES.CONFIG) config: ConfigType,
@inject(SERVICES.QUEUE_CLIENT) private readonly queueClient: QueueClient,
@inject(SERVICES.STRATEGY_FACTORY) private readonly strategyFactory: StrategyFactory,
@inject(SERVICES.ERROR_HANDLER) private readonly errorHandler: ErrorHandler,
@inject(SERVICES.POLLING_PAIRS) private readonly pollingPairs: PollingPairConfig[]
) {
this.dequeueIntervalMs = config.get('queue.dequeueIntervalMs') as unknown as number; //TODO:when we create worker config schema we can remove the cast
}

public async start(): Promise<void> {
this.shouldStop = false;
await this.poll();
}

public async stop(): Promise<void> {
this.shouldStop = true;
await Promise.resolve();
}

// IWorker event methods - delegated to internal EventEmitter (no-op since nothing listens)
public on(): this {
return this;
}

public off(): this {
return this;
}

public once(): this {
return this;
}

public removeAllListeners(): this {
return this;
}

private async poll(): Promise<void> {
while (!this.shouldStop) {
const result = await this.tryDequeue();

if (!result) {
await sleep(this.dequeueIntervalMs);
continue;
}

await this.processTask(result);
}
}

private async tryDequeue(): Promise<
| {
task: ITaskResponse<unknown>;
pair: PollingPairConfig;
}
| undefined
> {
for (const pair of this.pollingPairs) {
if (this.shouldStop) {
return undefined;
}

try {
const task = await this.queueClient.dequeue<unknown>(pair.jobType, pair.taskType);
if (task) {
this.logger.info({ msg: 'Task dequeued', taskId: task.id, taskType: task.type, jobId: task.jobId });
return { task, pair };
}
} catch (error) {
this.logger.error({ msg: 'Dequeue error', error });
}
}

return undefined;
}

private async processTask(dequeued: { task: ITaskResponse<unknown>; pair: PollingPairConfig }): Promise<void> {
const { task, pair } = dequeued;
const startTime = Date.now();

this.logger.debug({ msg: 'Task started', taskId: task.id, jobId: task.jobId });

try {
if (task.attempts >= pair.maxAttempts) {
throw new UnrecoverableError(`Task exceeded max attempts: ${task.attempts}/${pair.maxAttempts}`);
}

const strategy = this.strategyFactory.resolveWithContext({
jobId: task.jobId,
taskId: task.id,
jobType: pair.jobType,
taskType: pair.taskType,
});

const validated = strategy.validate(task.parameters);
await strategy.execute(validated);

await this.queueClient.ack(task.jobId, task.id);

const duration = Date.now() - startTime;
this.logger.info({ msg: 'Task completed', taskId: task.id, duration });
} catch (error) {
await this.handleTaskFailure(error, task, pair);
}
}

private async handleTaskFailure(error: unknown, task: ITaskResponse<unknown>, pair: PollingPairConfig): Promise<void> {
const decision = this.errorHandler.handleError({
jobId: task.jobId,
taskId: task.id,
attemptNumber: task.attempts,
maxAttempts: pair.maxAttempts,
error: error instanceof Error ? error : new Error(String(error)),
});

try {
await this.queueClient.reject(task.jobId, task.id, decision.shouldRetry, decision.reason);
} catch (error) {
this.logger.error({ msg: 'Failed to reject task', taskId: task.id, error });
}
}
}

export { TaskPoller };
22 changes: 22 additions & 0 deletions src/worker/workerBuilder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { FactoryFunction } from 'tsyringe';
import type { IWorker } from '@map-colonies/jobnik-sdk';
import { SERVICES } from '@common/constants';
import type { ConfigType } from '@common/config';
import type { WorkerCapabilities, JobDefinitions, PollingPairConfig } from '../cleaner/types';
import { buildPollingPairs } from '../cleaner/utils';
import { TaskPoller } from './taskPoller';

const workerBuilder: FactoryFunction<IWorker> = (container) => {
const config = container.resolve<ConfigType>(SERVICES.CONFIG);

const jobDefinitions = config.get('jobDefinitions') as JobDefinitions; //TODO:when we create worker config schema we can remove the cast
const workerCapabilities = config.get('worker.capabilities') as unknown as WorkerCapabilities; //TODO:when we create worker config schema we can remove the cast

const pollingPairs = buildPollingPairs(jobDefinitions, workerCapabilities);

container.register<PollingPairConfig[]>(SERVICES.POLLING_PAIRS, { useValue: pollingPairs });

return container.resolve(TaskPoller);
};

export { workerBuilder };
2 changes: 2 additions & 0 deletions tests/helpers/fakes/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './taskFakes';
export * from './pairFakes';
11 changes: 11 additions & 0 deletions tests/helpers/fakes/pairFakes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { faker } from '@faker-js/faker';
import type { PollingPairConfig } from '@src/cleaner/types';

export function buildPair(overrides: Partial<PollingPairConfig> = {}): PollingPairConfig {
return {
jobType: faker.word.noun(),
taskType: faker.word.noun(),
maxAttempts: faker.number.int({ min: 2, max: 10 }),
...overrides,
};
}
19 changes: 19 additions & 0 deletions tests/helpers/fakes/taskFakes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { faker } from '@faker-js/faker';
import { OperationStatus, type ITaskResponse } from '@map-colonies/mc-priority-queue';

export function buildTask(overrides: Partial<ITaskResponse<unknown>> = {}): ITaskResponse<unknown> {
return {
id: faker.string.uuid(),
jobId: faker.string.uuid(),
type: 'tiles-deletion',
description: faker.lorem.sentence(),
parameters: {},
created: faker.date.past().toISOString(),
updated: faker.date.recent().toISOString(),
status: OperationStatus.IN_PROGRESS,
reason: '',
attempts: 1,
resettable: true,
...overrides,
};
}
76 changes: 72 additions & 4 deletions tests/helpers/mocks.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { vi } from 'vitest';
import type { Logger } from '@map-colonies/js-logger';
import type { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue';
import type { ConfigType } from '../../src/common/config';
import type { ITaskStrategy, StrategyFactory } from '../../src/cleaner/strategies';
import type { ErrorHandler } from '../../src/cleaner/errors';
import type { ErrorDecision, PollingPairConfig } from '../../src/cleaner/types';
import { TaskPoller } from '../../src/worker/taskPoller';

// ─── Logger ──────────────────────────────────────────────────────────────────

/**
* Creates a mock Logger instance for testing.
* All logger methods are vi.fn() spies that can be asserted against.
*/
export function createMockLogger(): Logger {
return {
debug: vi.fn(),
Expand All @@ -16,3 +20,67 @@ export function createMockLogger(): Logger {
child: vi.fn(() => createMockLogger()),
} as unknown as Logger;
}

// ─── Config ───────────────────────────────────────────────────────────────────

export function createMockConfig(dequeueIntervalMs = 0): ConfigType {
return { get: vi.fn().mockReturnValue(dequeueIntervalMs) } as unknown as ConfigType;
}

// ─── QueueClient ─────────────────────────────────────────────────────────────

export function createMockQueueClient(): QueueClient {
return {
dequeue: vi.fn().mockResolvedValue(null),
ack: vi.fn().mockResolvedValue(undefined),
reject: vi.fn().mockResolvedValue(undefined),
} as unknown as QueueClient;
}

// ─── Strategy ────────────────────────────────────────────────────────────────

export function buildMockStrategy(overrides: Partial<ITaskStrategy> = {}): ITaskStrategy {
return {
validate: vi.fn().mockReturnValue({}),
execute: vi.fn().mockResolvedValue(undefined),
...overrides,
};
}

// ─── StrategyFactory ─────────────────────────────────────────────────────────

export function createMockStrategyFactory(): StrategyFactory {
return { resolveWithContext: vi.fn() } as unknown as StrategyFactory;
}

// ─── ErrorHandler ────────────────────────────────────────────────────────────

export function createMockErrorHandler(defaultDecision: ErrorDecision = { shouldRetry: false, reason: 'test' }): ErrorHandler {
return {
handleError: vi.fn().mockReturnValue(defaultDecision),
} as unknown as ErrorHandler;
}

// ─── TaskPoller factory ───────────────────────────────────────────────────────

/**
* Creates a TaskPoller with all typed mock dependencies.
* All casting from mock types to the real SDK types is contained here.
*/
export function createTaskPoller({
logger = createMockLogger(),
config = createMockConfig(),
queueClient = createMockQueueClient(),
strategyFactory = createMockStrategyFactory(),
errorHandler = createMockErrorHandler(),
pollingPairs,
}: {
logger?: Logger;
config?: ConfigType;
queueClient?: QueueClient;
strategyFactory?: StrategyFactory;
errorHandler?: ErrorHandler;
pollingPairs: PollingPairConfig[];
}): TaskPoller {
return new TaskPoller(logger, config, queueClient, strategyFactory, errorHandler, pollingPairs);
}
Loading
Loading