Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions development/backend/api/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,19 @@ interface Response {

**Note**: TypeScript interfaces and JavaScript variables can use camelCase internally, but all external API fields MUST use snake_case.

## Server-Sent Events (SSE)

For real-time updates, use the `/stream` suffix pattern:

- **REST endpoint:** `/api/{resource}/{action}`
- **SSE endpoint:** `/api/{resource}/{action}/stream`

Example:
- `GET /api/users/me/mcp/client-activity` - REST API with pagination
- `GET /api/users/me/mcp/client-activity/stream` - SSE real-time stream

See [Server-Sent Events (SSE)](/development/backend/api/sse) for complete SSE documentation.

## Adding Documentation to Routes

The DeployStack Backend uses **reusable JSON Schema constants** for both validation and documentation generation. This approach provides a single source of truth for API schemas.
Expand Down
238 changes: 238 additions & 0 deletions development/backend/api/sse.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
---
title: Server-Sent Events (SSE)
description: Implementing real-time server-to-client communication using Server-Sent Events in DeployStack Backend.
---

## Overview

DeployStack Backend includes `@fastify/sse` for Server-Sent Events support. SSE provides a simple, unidirectional communication channel from server to client over HTTP - ideal for live updates, notifications, and streaming data.

The plugin is globally registered with a 30-second heartbeat interval to keep connections alive.

## Naming Convention Standard

All SSE endpoints **MUST** follow this URL pattern:

- **REST endpoint:** `/api/{resource}/{action}`
- **SSE endpoint:** `/api/{resource}/{action}/stream`

### URL Pattern Examples

```bash
# Client Activity
GET /api/users/me/mcp/client-activity # REST API (polling)
GET /api/users/me/mcp/client-activity/stream # SSE stream

# Notifications
GET /api/teams/:teamId/notifications # REST API (polling)
GET /api/teams/:teamId/notifications/stream # SSE stream

# Metrics
GET /api/satellites/:satelliteId/metrics # REST API (polling)
GET /api/satellites/:satelliteId/metrics/stream # SSE stream
```

### Paired Endpoints

Every SSE endpoint should have a corresponding REST endpoint:

1. **Same Query Parameters**: Both endpoints accept identical query parameters
2. **Same Data Structure**: Both return the same data format
3. **Consistent Behavior**: Both apply the same filters and limits
4. **Fallback Support**: REST endpoint serves as fallback for clients without SSE support

### Why `/stream`?

- **Industry Standard**: Used by GitHub, Stripe, and Twitter APIs
- **RESTful**: Treats streaming as a sub-resource
- **Technology Agnostic**: Works for SSE, WebSockets, or any streaming protocol
- **Clear Intent**: Immediately indicates real-time streaming capability

## Enabling SSE on a Route

Add the `sse: true` option to any route definition:

```typescript
server.get('/events', { sse: true }, async (request, reply) => {
// SSE methods available on reply.sse
})
```

For route-specific configuration:

```typescript
server.get('/events', {
sse: {
heartbeat: false, // Disable heartbeat for this route
serializer: (data) => JSON.stringify(data) // Custom serializer
}
}, handler)
```

## Sending Messages

### Single Message

```typescript
reply.sse.send({ data: 'Hello world' })
```

### Full SSE Format

```typescript
reply.sse.send({
id: '123',
event: 'user_update',
data: { userId: 'abc', status: 'online' },
retry: 5000 // Client retry interval in ms
})
```

### Streaming with Async Generator

```typescript
async function* generateUpdates() {
for (let i = 0; i < 10; i++) {
yield { data: { count: i } }
await new Promise(r => setTimeout(r, 1000))
}
}

reply.sse.send(generateUpdates())
```

## Connection Management

### Keep Connection Open

By default, the connection closes after the handler completes. To keep it open:

```typescript
reply.sse.keepAlive()
```

### Handle Disconnection

```typescript
reply.sse.onClose(() => {
// Cleanup logic when client disconnects
server.log.info('Client disconnected')
})
```

### Manual Close

```typescript
reply.sse.close()
```

## Client Reconnection

Handle reconnecting clients using the `Last-Event-ID` header:

```typescript
reply.sse.replay(async (lastEventId) => {
// Fetch and send missed events since lastEventId
const missedEvents = await getMissedEvents(lastEventId)
for (const event of missedEvents) {
reply.sse.send(event)
}
})
```

Access the last event ID directly:

```typescript
const lastId = reply.sse.lastEventId
```

## Connection State

```typescript
if (reply.sse.isConnected) {
reply.sse.send({ data: 'still connected' })
}
```

## Complete Route Example

```typescript
import { type FastifyInstance } from 'fastify'
import { requirePermission } from '../../../middleware/roleMiddleware'

export default async function sseRoute(server: FastifyInstance) {
server.get('/notifications/stream', {
sse: true,
preValidation: requirePermission('notifications.read'),
schema: {
tags: ['Notifications'],
summary: 'Stream notifications',
description: 'Real-time notification stream via SSE',
security: [{ cookieAuth: [] }]
}
}, async (request, reply) => {
const userId = request.user!.id

// Handle client reconnection
reply.sse.replay(async (lastEventId) => {
const missed = await notificationService.getMissedNotifications(userId, lastEventId)
for (const notification of missed) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})

// Keep connection open
reply.sse.keepAlive()

// Subscribe to new notifications
const unsubscribe = notificationService.subscribe(userId, (notification) => {
if (reply.sse.isConnected) {
reply.sse.send({ id: notification.id, event: 'notification', data: notification })
}
})

// Cleanup on disconnect
reply.sse.onClose(() => {
unsubscribe()
server.log.debug({ userId }, 'SSE connection closed')
})
})
}
```

## Frontend Client

```javascript
const eventSource = new EventSource('/api/notifications/stream', {
withCredentials: true // Include cookies for authentication
})

eventSource.addEventListener('notification', (event) => {
const data = JSON.parse(event.data)
console.log('New notification:', data)
})

eventSource.onerror = () => {
// Browser automatically reconnects
console.log('Connection lost, reconnecting...')
}
```

## TypeScript Types

Import types from the package:

```typescript
import type { SSEMessage } from '@fastify/sse'

const message: SSEMessage = {
id: '123',
event: 'update',
data: { status: 'active' }
}
```

## Related Documentation

- [API Documentation Generation](/development/backend/api) - General API development patterns
- [API Security](/development/backend/api/security) - Authorization middleware for protected SSE endpoints
70 changes: 46 additions & 24 deletions development/backend/cron.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,29 @@ Create a new file in `src/cron/jobs/`:
```typescript
// src/cron/jobs/dailyCleanup.ts
import type { CronJob } from '../cronManager';
import type { JobQueueService } from '../../services/jobQueueService';

export function createDailyCleanupJob(jobQueueService: JobQueueService): CronJob {
export function createDailyCleanupJob(): CronJob {
return {
name: 'daily-cleanup',
schedule: '0 2 * * *', // Every day at 2 AM

task: async () => {
await jobQueueService.createJob('cleanup_old_data', {
daysToKeep: 30
});
}
jobType: 'cleanup_old_data',
payload: {
daysToKeep: 30,
},
};
}
```

The `CronJob` interface has these fields:

| Field | Required | Description |
|-------|----------|-------------|
| `name` | Yes | Unique identifier for logging and tracking |
| `schedule` | Yes | Cron expression (see syntax below) |
| `jobType` | Yes | Job type that matches a registered worker |
| `payload` | No | Static object or function returning payload data |
| `maxAttempts` | No | Override default retry attempts (default: 3) |

### Step 2: Create the Worker

Create a worker to process the job in `src/workers/`:
Expand Down Expand Up @@ -136,14 +143,16 @@ export function initializeCronJobs(
jobQueueService: JobQueueService,
logger: FastifyBaseLogger
): CronManager {
const cronManager = new CronManager(logger);
const cronManager = new CronManager(logger, jobQueueService);

cronManager.register(createDailyCleanupJob(jobQueueService));
cronManager.register(createDailyCleanupJob());

return cronManager;
}
```

The `CronManager` handles job creation automatically when the schedule fires. You don't need to call `createJob()` yourself - just define the `jobType` and `payload` in your cron job definition.

## Cron Expression Syntax

The system uses standard cron syntax with 5 or 6 fields:
Expand Down Expand Up @@ -194,24 +203,37 @@ Here's a complete example showing how to create a cron job that sends a daily em
```typescript
// src/cron/jobs/dailyDigest.ts
import type { CronJob } from '../cronManager';
import type { JobQueueService } from '../../services/jobQueueService';

export function createDailyDigestJob(jobQueueService: JobQueueService): CronJob {
export function createDailyDigestJob(): CronJob {
return {
name: 'daily-digest-email',
schedule: '0 8 * * *', // Every day at 8 AM

task: async () => {
// Create job to send digest email
await jobQueueService.createJob('send_email', {
to: '[email protected]',
subject: 'Daily Activity Digest',
template: 'daily_digest',
variables: {
date: new Date().toISOString().split('T')[0]
}
});
}
jobType: 'send_email',
payload: {
to: '[email protected]',
subject: 'Daily Activity Digest',
template: 'daily_digest',
},
};
}
```

For dynamic payload values computed at runtime, use a function:

```typescript
export function createDailyDigestJob(): CronJob {
return {
name: 'daily-digest-email',
schedule: '0 8 * * *',
jobType: 'send_email',
payload: () => ({
to: '[email protected]',
subject: 'Daily Activity Digest',
template: 'daily_digest',
variables: {
date: new Date().toISOString().split('T')[0],
},
}),
};
}
```
Expand Down
Loading
Loading