Skip to content

Commit 19421c9

Browse files
Implement Go-based real-time service for task run updates
- Add PostgreSQL logical replication consumer using pgx - Implement in-memory state management with indexes for run_id, env_id, tags - Create SSE endpoint /v1/runs/stream with filtering support - Add snapshot and checkpointing system with zstd compression - Integrate Redis-based concurrency limiting for 400k connections - Support event types: initial, delta, keepalive - Include test client HTML for local testing - Target p95 latency ≤ 300ms from WAL to client receive - Designed for vertical scaling with warm restart capability Co-Authored-By: Eric Allam <[email protected]>
1 parent ff157e5 commit 19421c9

File tree

11 files changed

+1462
-0
lines changed

11 files changed

+1462
-0
lines changed

apps/realtime-service/.env.example

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
DATABASE_URL=postgres://localhost/trigger_dev
2+
PORT=8080
3+
REPLICATION_SLOT=trigger_realtime_slot
4+
PUBLICATION_NAME=trigger_realtime_pub
5+
REDIS_URL=redis://localhost:6379
6+
CONCURRENCY_LIMIT=100000

apps/realtime-service/Dockerfile

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
FROM golang:1.21-alpine AS builder
2+
3+
WORKDIR /app
4+
COPY go.mod go.sum ./
5+
RUN go mod download
6+
7+
COPY . .
8+
RUN CGO_ENABLED=0 GOOS=linux go build -o realtime-service .
9+
10+
FROM alpine:latest
11+
RUN apk --no-cache add ca-certificates tzdata
12+
WORKDIR /root/
13+
14+
COPY --from=builder /app/realtime-service .
15+
16+
EXPOSE 8080
17+
18+
CMD ["./realtime-service"]

apps/realtime-service/README.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# Trigger.dev Real-Time Service
2+
3+
A high-performance Go service that provides real-time streaming of task run updates via Server-Sent Events (SSE) using PostgreSQL logical replication.
4+
5+
## Features
6+
7+
- **Low Latency**: p95 latency ≤ 300ms from WAL commit to client receive
8+
- **Scalable**: Supports 400k+ concurrent SSE connections
9+
- **Efficient**: Single PostgreSQL replication slot with REPLICA IDENTITY FULL
10+
- **Flexible Filtering**: Subscribe by run_id, env_id, tags, or time windows
11+
- **Resilient**: Automatic reconnection with exponential backoff
12+
13+
## Architecture
14+
15+
- **Single Process**: Vertical scaling approach with in-memory state
16+
- **Logical Replication**: Consumes PostgreSQL WAL via pgoutput format
17+
- **SSE Streaming**: HTTP/2 Server-Sent Events for real-time updates
18+
- **Memory Indexes**: Fast lookups by run_id, env_id, and tags
19+
20+
## Configuration
21+
22+
Environment variables:
23+
24+
- `DATABASE_URL`: PostgreSQL connection string
25+
- `PORT`: HTTP server port (default: 8080)
26+
- `REPLICATION_SLOT`: Logical replication slot name
27+
- `PUBLICATION_NAME`: PostgreSQL publication name
28+
29+
## API Endpoints
30+
31+
### Stream Task Runs
32+
33+
```
34+
GET /v1/runs/stream?filter=<json>
35+
```
36+
37+
Filter examples:
38+
```json
39+
{
40+
"run_id": "123e4567-e89b-12d3-a456-426614174000",
41+
"env_id": "123e4567-e89b-12d3-a456-426614174001",
42+
"tags": ["tag1", "tag2"],
43+
"created_at": "2025-06-01T00:00:00Z"
44+
}
45+
```
46+
47+
### Health Check
48+
49+
```
50+
GET /health
51+
```
52+
53+
## Event Types
54+
55+
- `initial`: Full current state sent once per run on new stream
56+
- `delta`: Partial updates with changed fields
57+
- `keepalive`: Sent every 15 seconds to maintain connection
58+
59+
## Client Protocol
60+
61+
- **Headers**: `Accept: text/event-stream`, `Last-Event-Id` for replay
62+
- **Reconnection**: Exponential backoff with jitter
63+
- **Back-pressure**: Connections dropped if write buffer > 64KB
64+
65+
## Performance Targets
66+
67+
- **Latency**: p95 ≤ 300ms from WAL to client
68+
- **Capacity**: 400k concurrent connections
69+
- **Memory**: ≤ 3KB per connection + 200B per run
70+
- **Cost**: ≤ $1000/month infrastructure
71+
72+
## Deployment
73+
74+
```bash
75+
# Build
76+
go build -o trigger-realtime-service .
77+
78+
# Run
79+
./trigger-realtime-service
80+
81+
# Docker
82+
docker build -t trigger-realtime-service .
83+
docker run -p 8080:8080 trigger-realtime-service
84+
```
85+
86+
## Database Setup
87+
88+
The service automatically creates the required PostgreSQL publication and replication slot:
89+
90+
```sql
91+
-- Publication for task_run table
92+
CREATE PUBLICATION trigger_realtime_pub FOR TABLE task_run
93+
WITH (publish = 'insert,update,delete');
94+
95+
-- Set replica identity to include full row data
96+
ALTER TABLE task_run REPLICA IDENTITY FULL;
97+
98+
-- Replication slot (created automatically)
99+
SELECT pg_create_logical_replication_slot('trigger_realtime_slot', 'pgoutput');
100+
```
101+
102+
## Monitoring
103+
104+
- Health endpoint provides service status and warmup state
105+
- Logs include replication lag and connection metrics
106+
- Built-in keepalive prevents connection timeouts
107+
108+
## Integration
109+
110+
This service is designed to integrate with the existing Trigger.dev platform:
111+
112+
- Replaces Electric SQL for real-time updates
113+
- Compatible with existing SDK subscription patterns
114+
- Maintains the same client-side API surface
115+
- Provides better performance and lower operational overhead

apps/realtime-service/go.mod

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module github.com/triggerdotdev/trigger.dev/apps/realtime-service
2+
3+
go 1.21
4+
5+
require (
6+
github.com/google/uuid v1.5.0
7+
github.com/jackc/pgx/v5 v5.5.1
8+
github.com/klauspost/compress v1.17.4
9+
github.com/redis/go-redis/v9 v9.3.0
10+
)
11+
12+
require (
13+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
14+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
15+
github.com/jackc/pgpassfile v1.0.0 // indirect
16+
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
17+
github.com/jackc/puddle/v2 v2.2.1 // indirect
18+
golang.org/x/crypto v0.17.0 // indirect
19+
golang.org/x/sync v0.1.0 // indirect
20+
golang.org/x/text v0.14.0 // indirect
21+
)

apps/realtime-service/go.sum

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
2+
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
3+
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
4+
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
5+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
6+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
7+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
9+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
10+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
11+
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
12+
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
13+
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
14+
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
15+
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
16+
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
17+
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
18+
github.com/jackc/pgx/v5 v5.5.1 h1:5I9etrGkLrN+2XPCsi6XLlV5DITbSL/xBZdmAxFcXPI=
19+
github.com/jackc/pgx/v5 v5.5.1/go.mod h1:Ig06C2Vu0t5qXC60W8sqIthScaEnFvojjj9dSljmHRA=
20+
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
21+
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
22+
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
23+
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
24+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
25+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
26+
github.com/redis/go-redis/v9 v9.3.0 h1:RiVDjmig62jIWp7Kk4XVLs0hzV6pI3PyTnnL0cnn0u0=
27+
github.com/redis/go-redis/v9 v9.3.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
28+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
29+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
30+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
31+
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
32+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
33+
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
34+
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
35+
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
36+
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
37+
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
38+
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
39+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
40+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
41+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
42+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)