Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/service-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# @powersync/service-core

## 0.8.8

### Patch Changes

- 21de621: Add probe endpoints which can be used for system health checks.

## 0.8.7

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"publishConfig": {
"access": "public"
},
"version": "0.8.7",
"version": "0.8.8",
"main": "dist/index.js",
"license": "FSL-1.1-Apache-2.0",
"type": "module",
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/routes/configure-fastify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import * as system from '../system/system-index.js';

import { ADMIN_ROUTES } from './endpoints/admin.js';
import { CHECKPOINT_ROUTES } from './endpoints/checkpointing.js';
import { PROBES_ROUTES } from './endpoints/probes.js';
import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js';
import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js';
import { createRequestQueueHook, CreateRequestQueueParams } from './hooks.js';
Expand Down Expand Up @@ -35,7 +36,7 @@ export type FastifyServerConfig = {

export const DEFAULT_ROUTE_OPTIONS = {
api: {
routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...SYNC_RULES_ROUTES],
routes: [...ADMIN_ROUTES, ...CHECKPOINT_ROUTES, ...SYNC_RULES_ROUTES, ...PROBES_ROUTES],
queueOptions: {
concurrency: 10,
max_queue_depth: 20
Expand Down
58 changes: 58 additions & 0 deletions packages/service-core/src/routes/endpoints/probes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { container, router } from "@powersync/lib-services-framework";
import { routeDefinition } from "../router.js";

export enum ProbeRoutes {
STARTUP = '/probes/startup',
LIVENESS = '/probes/liveness',
READINESS = '/probes/readiness'
}

export const startupCheck = routeDefinition({
path: ProbeRoutes.STARTUP,
method: router.HTTPMethod.GET,
handler: async () => {
const state = container.probes.state();

return new router.RouterResponse({
status: state.started ? 200 : 400,
data: {
...state,
}
});
}
});

export const livenessCheck = routeDefinition({
path: ProbeRoutes.LIVENESS,
method: router.HTTPMethod.GET,
handler: async () => {
const state = container.probes.state();

const timeDifference = Date.now() - state.touched_at.getTime()
const status = timeDifference < 10000 ? 200 : 400;

return new router.RouterResponse({
status,
data: {
...state,
}
});
}
});

export const readinessCheck = routeDefinition({
path: ProbeRoutes.READINESS,
method: router.HTTPMethod.GET,
handler: async () => {
const state = container.probes.state();

return new router.RouterResponse({
status: state.ready ? 200 : 400,
data: {
...state,
}
});
}
});

export const PROBES_ROUTES = [startupCheck, livenessCheck, readinessCheck];
6 changes: 3 additions & 3 deletions packages/service-core/src/routes/router.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { router } from '@powersync/lib-services-framework';
import * as auth from '../auth/auth-index.js';
import { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js';
import type { JwtPayload } from '../auth/auth-index.js';
import type { CorePowerSyncSystem } from '../system/CorePowerSyncSystem.js';

/**
* Common context for routes
Expand All @@ -9,7 +9,7 @@ export type Context = {
user_id?: string;
system: CorePowerSyncSystem;

token_payload?: auth.JwtPayload;
token_payload?: JwtPayload;
token_errors?: string[];
/**
* Only on websocket endpoints.
Expand Down
231 changes: 231 additions & 0 deletions packages/service-core/test/src/routes/probes.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import Fastify, { FastifyInstance } from 'fastify';
import { container } from '@powersync/lib-services-framework';
import * as auth from '../../../src/routes/auth.js';
import * as system from '../../../src/system/system-index.js';
import { configureFastifyServer } from '../../../src/index.js';
import { ProbeRoutes } from '../../../src/routes/endpoints/probes.js';

vi.mock("@powersync/lib-services-framework", async () => {
const actual = await vi.importActual("@powersync/lib-services-framework") as any;
return {
...actual,
container: {
...actual.container,
probes: {
state: vi.fn()
}
},
}
})

describe('Probe Routes Integration', () => {
let app: FastifyInstance;
let mockSystem: system.CorePowerSyncSystem;

beforeEach(async () => {
app = Fastify();
mockSystem = {} as system.CorePowerSyncSystem;
await configureFastifyServer(app, { system: mockSystem });
await app.ready();
});

afterEach(async () => {
await app.close();
});

describe('Startup Probe', () => {
it('returns 200 when system is started', async () => {
const mockState = {
started: true,
ready: true,
touched_at: new Date()
};
vi.spyOn(auth, 'authUser').mockResolvedValue({ authorized: true });
vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.STARTUP,
});

expect(response.statusCode).toBe(200);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});

it('returns 400 when system is not started', async () => {
const mockState = {
started: false,
ready: false,
touched_at: new Date()
};

vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.STARTUP,
});

expect(response.statusCode).toBe(400);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});
});

describe('Liveness Probe', () => {
it('returns 200 when system was touched recently', async () => {
const mockState = {
started: true,
ready: true,
touched_at: new Date()
};

vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.LIVENESS,
});

expect(response.statusCode).toBe(200);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});

it('returns 400 when system has not been touched recently', async () => {
const mockState = {
started: true,
ready: true,
touched_at: new Date(Date.now() - 15000) // 15 seconds ago
};

vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.LIVENESS,
});

expect(response.statusCode).toBe(400);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});
});

describe('Readiness Probe', () => {
it('returns 200 when system is ready', async () => {
const mockState = {
started: true,
ready: true,
touched_at: new Date()
};

vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.READINESS,
});

expect(response.statusCode).toBe(200);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});

it('returns 400 when system is not ready', async () => {
const mockState = {
started: true,
ready: false,
touched_at: new Date()
};

vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.READINESS,
});

expect(response.statusCode).toBe(400);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});
});

describe('Request Queue Behavior', () => {
it('handles concurrent requests within limits', async () => {
const mockState = { started: true, ready: true, touched_at: new Date() };
vi.mocked(container.probes.state).mockReturnValue(mockState);

// Create array of 15 concurrent requests (default concurrency is 10)
const requests = Array(15).fill(null).map(() =>
app.inject({
method: 'GET',
url: ProbeRoutes.STARTUP,
})
);

const responses = await Promise.all(requests);

// All requests should complete successfully
responses.forEach(response => {
expect(response.statusCode).toBe(200);
expect(JSON.parse(response.payload)).toEqual({
...mockState,
touched_at: mockState.touched_at.toISOString()
});
});
});

it('respects max queue depth', async () => {
const mockState = { started: true, ready: true, touched_at: new Date() };
vi.mocked(container.probes.state).mockReturnValue(mockState);

// Create array of 35 concurrent requests (default max_queue_depth is 20)
const requests = Array(35).fill(null).map(() =>
app.inject({
method: 'GET',
url: ProbeRoutes.STARTUP,
})
);

const responses = await Promise.all(requests);

// Some requests should succeed and some should fail with 429
const successCount = responses.filter(r => r.statusCode === 200).length;
const queueFullCount = responses.filter(r => r.statusCode === 429).length;

expect(successCount).toBeGreaterThan(0);
expect(queueFullCount).toBeGreaterThan(0);
expect(successCount + queueFullCount).toBe(35);
});
});

describe('Content Types', () => {
it('returns correct content type headers', async () => {
const mockState = { started: true, ready: true, touched_at: new Date() };
vi.mocked(container.probes.state).mockReturnValue(mockState);

const response = await app.inject({
method: 'GET',
url: ProbeRoutes.STARTUP,
});

expect(response.headers['content-type']).toMatch(/application\/json/);
});
});
});
Loading
Loading