Skip to content

Commit 507e689

Browse files
committed
feat: implement real-time event push with SSE and delegation management
1 parent 99215d6 commit 507e689

File tree

10 files changed

+773
-6
lines changed

10 files changed

+773
-6
lines changed

docs/reply/20260309-p2p-event-subscription-proxy.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,22 @@
55
> **日期**: 2026-03-09
66
> **关联文档**: `docs/issues/clawnet-p2p-event-subscription-proxy.md`
77
> **实施文档**: `docs/implementation/subscription-delegation.md`
8-
> **涉及包**: `@claw-network/protocol`, `@claw-network/node`, `@claw-network/sdk`
8+
> **涉及包**: `@claw-network/protocol`, `@claw-network/node`, `@claw-network/sdk`(均已发布 **v0.6.1**
99
1010
---
1111

12-
## 状态:方案 B 已实现
12+
## 状态:方案 B 已实现并发布
1313

1414
采纳需求文档推荐的 **方案 B(ClawNet 协议层事件订阅代理)**,已完成全链路实现:协议类型 → SQLite 存储 → 服务层(自动转发 + 反压控制) → REST API → WebSocket 代理订阅端点 → SDK 方法。
1515

1616
编译通过,lint 零错误,全部 304 个测试通过(含新增 20 个 delegation 测试)。
1717

18+
**npm 已发布**`@claw-network/core@0.6.1``@claw-network/protocol@0.6.1``@claw-network/sdk@0.6.1``@claw-network/node@0.6.1`
19+
20+
```bash
21+
npm install @claw-network/sdk@0.6.1
22+
```
23+
1824
---
1925

2026
## 实现概览
@@ -387,6 +393,11 @@ ws.on('close', () => {
387393
| `packages/node/src/api/ws-messaging.ts` | 新增 WS 端点 | `/subscribe-delegated` 含回放 + 心跳 |
388394
| `packages/sdk/src/messaging.ts` | 新增 SDK 方法 | 3 个类型安全的 REST 客户端方法 |
389395
| `packages/node/test/delegation.test.ts` | 新增测试 | 20 个测试用例覆盖存储、API、去重、回放 |
396+
| `CHANGELOG.md` | 更新 | v0.6.1 变更日志 |
397+
| `docs/api/openapi.yaml` | 更新 | 4 个 REST 端点 + `DelegationRecord` schema |
398+
| `docs/API_REFERENCE.md` | 更新 | Messaging 章节 + `DELEGATION_LIMIT` 错误码 |
399+
| `docs/API_ROUTE_CATALOG.md` | 更新 | 路由总表 #86#89 |
400+
| `docs/SDK_GUIDE.md` | 更新 | Messaging API 章节(delegation CRUD + WS 说明) |
390401

391402
---
392403

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import type { RuntimeContext } from '../types.js';
2+
import { Router } from '../router.js';
3+
import { ok, created } from '../response.js';
4+
import { handleError } from '../route-utils.js';
5+
6+
/**
7+
* Event routes:
8+
* GET / — Local SSE event stream
9+
* POST /subscribe — Target: create delegation (called by gateway via API Proxy)
10+
* POST /unsubscribe — Target: revoke delegation
11+
*/
12+
export function eventRoutes(ctx: RuntimeContext): Router {
13+
const router = new Router();
14+
15+
// Local SSE endpoint — Webapp directly connected to this node
16+
router.get('/', ({ res }) => {
17+
if (!ctx.eventPushService) {
18+
res.writeHead(503, { 'Content-Type': 'application/json' });
19+
res.end(JSON.stringify({ error: 'Event push service unavailable' }));
20+
return;
21+
}
22+
// SSE connection: addLocalClient handles headers, heartbeat, and cleanup.
23+
// Do NOT call res.end() — SSE is a long-lived connection.
24+
ctx.eventPushService.addLocalClient(res);
25+
});
26+
27+
// Target role: gateway asks us to create a delegation
28+
router.post('/subscribe', async ({ res, body }) => {
29+
if (!ctx.eventPushService) {
30+
res.writeHead(503, { 'Content-Type': 'application/json' });
31+
res.end(JSON.stringify({ error: 'Event push service unavailable' }));
32+
return;
33+
}
34+
35+
const payload = body as Record<string, unknown> | undefined;
36+
const gatewayDid = payload?.gatewayDid as string;
37+
if (!gatewayDid || typeof gatewayDid !== 'string') {
38+
res.writeHead(400, { 'Content-Type': 'application/json' });
39+
res.end(JSON.stringify({ error: 'Missing gatewayDid' }));
40+
return;
41+
}
42+
43+
try {
44+
const result = await ctx.eventPushService.createDelegation(gatewayDid);
45+
created(res, result, { self: '/api/v1/events/subscribe' });
46+
} catch (err) {
47+
handleError(res, err);
48+
}
49+
});
50+
51+
// Target role: gateway asks us to revoke a delegation
52+
router.post('/unsubscribe', async ({ res, body }) => {
53+
if (!ctx.eventPushService) {
54+
res.writeHead(503, { 'Content-Type': 'application/json' });
55+
res.end(JSON.stringify({ error: 'Event push service unavailable' }));
56+
return;
57+
}
58+
59+
const payload = body as Record<string, unknown> | undefined;
60+
const delegationId = payload?.delegationId as string;
61+
if (!delegationId || typeof delegationId !== 'string') {
62+
res.writeHead(400, { 'Content-Type': 'application/json' });
63+
res.end(JSON.stringify({ error: 'Missing delegationId' }));
64+
return;
65+
}
66+
67+
try {
68+
await ctx.eventPushService.revokeDelegation(delegationId);
69+
res.writeHead(204);
70+
res.end();
71+
} catch (err) {
72+
handleError(res, err);
73+
}
74+
});
75+
76+
return router;
77+
}

packages/node/src/api/routes/relay.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,28 @@ export async function handleRelayRequest(
138138
return true;
139139
}
140140

141+
// ── SSE relay: GET /:targetDid/api/v1/events ──────────
142+
// This is a long-lived SSE connection — NOT a normal API proxy call.
143+
if (remaining === '/api/v1/events' || remaining === '/api/v1/events/') {
144+
if (req.method !== 'GET') {
145+
writeJson(res, 405, { error: 'Method not allowed' });
146+
return true;
147+
}
148+
if (!ctx.eventPushService) {
149+
writeJson(res, 503, { error: 'Event push service unavailable' });
150+
return true;
151+
}
152+
try {
153+
await ctx.eventPushService.addGatewayClient(res, decodedDid);
154+
// Do NOT call res.end() — SSE is a long-lived connection
155+
} catch (err) {
156+
if (!res.headersSent) {
157+
writeJson(res, 502, { error: `Event subscription failed: ${(err as Error).message}` });
158+
}
159+
}
160+
return true;
161+
}
162+
141163
// Read request body
142164
let body: string | undefined;
143165
if (req.method !== 'GET' && req.method !== 'HEAD' && req.method !== 'DELETE') {

packages/node/src/api/server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { profileRoutes } from './routes/profile.js';
2020
import { sessionRoutes } from './routes/session.js';
2121
import { walletRoutes } from './routes/wallets.js';
2222
import { handleRelayRequest } from './routes/relay.js';
23+
import { eventRoutes } from './routes/events.js';
2324
import type { RuntimeContext } from './types.js';
2425

2526
function buildRouter(ctx: RuntimeContext): Router {
@@ -38,6 +39,7 @@ function buildRouter(ctx: RuntimeContext): Router {
3839
router.mount('/api/v1/session', sessionRoutes(ctx));
3940
router.mount('/api/v1/clawnet', clawnetRoutes(ctx));
4041
router.mount('/api/v1/profile', profileRoutes(ctx));
42+
router.mount('/api/v1/events', eventRoutes(ctx));
4143

4244
return router;
4345
}
@@ -61,6 +63,8 @@ const AUTH_WHITELIST: Array<{ method?: string; path: string }> = [
6163
// (matched via startsWith in isAuthExempt since :did varies)
6264
// Relay routes — auth is enforced on the target node side
6365
{ path: '/relay' },
66+
// Local SSE events — auth enforced separately (session token in query)
67+
{ method: 'GET', path: '/api/v1/events' },
6468
];
6569

6670
function isAuthExempt(method: string, pathname: string): boolean {

packages/node/src/api/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type { OwnerPermissionService } from '../services/owner-permission-servic
1313
import type { SelfProfileStore } from '../storage/profile-store.js';
1414
import type { PeerProfileRepository } from '../storage/peer-profile-repository.js';
1515
import type { ApiProxyService } from '../services/api-proxy-service.js';
16+
import type { EventPushService } from '../services/event-push-service.js';
1617

1718
export interface ApiServerConfig {
1819
host: string;
@@ -38,4 +39,5 @@ export interface RuntimeContext {
3839
peerProfileRepository: PeerProfileRepository;
3940
configuredPassphrase?: string;
4041
apiProxyService?: ApiProxyService;
42+
eventPushService?: EventPushService;
4143
}

packages/node/src/app.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { GroupIndexer } from './indexer/group-indexer.js';
1111
import { AttachmentService } from './services/attachment-service.js';
1212
import { ClawNetTransportService } from './services/clawnet-transport-service.js';
1313
import { ApiProxyService } from './services/api-proxy-service.js';
14+
import { EventPushService } from './services/event-push-service.js';
1415
import { ContractProvider } from './services/contract-provider.js';
1516
import { GroupService } from './services/group-service.js';
1617
import { IdentityAdapterService } from './services/identity-adapter-service.js';
@@ -51,6 +52,7 @@ export class TelagentNode {
5152
private attachmentService: AttachmentService | null = null;
5253
private clawnetTransportService: ClawNetTransportService | null = null;
5354
private apiProxyService: ApiProxyService | null = null;
55+
private eventPushService: EventPushService | null = null;
5456
private monitoringService: NodeMonitoringService | null = null;
5557
private ownerPermissionService: OwnerPermissionService | null = null;
5658
private indexer: GroupIndexer | null = null;
@@ -253,6 +255,12 @@ export class TelagentNode {
253255
);
254256
}
255257

258+
// Event Push Service (real-time SSE for Webapp)
259+
this.eventPushService = new EventPushService(
260+
this.clawnetGateway,
261+
this.apiProxyService ?? undefined,
262+
);
263+
256264
this.monitoringService = new NodeMonitoringService({
257265
thresholds: {
258266
errorRateWarnRatio: this.config.monitoring.errorRateWarnRatio,
@@ -290,6 +298,7 @@ export class TelagentNode {
290298
peerProfileRepository: this.peerProfileRepository,
291299
configuredPassphrase: passphrase ?? undefined,
292300
apiProxyService: this.apiProxyService ?? undefined,
301+
eventPushService: this.eventPushService ?? undefined,
293302
};
294303

295304
this.apiServer = new ApiServer(runtime);
@@ -303,7 +312,17 @@ export class TelagentNode {
303312
logger.info('[telagent] [startup] starting API server...');
304313
await this.apiServer.start();
305314
this.clawnetTransportService.startListening({
306-
onEnvelope: (raw, sourceDid) => this.messageService!.ingestFederatedEnvelope(raw, sourceDid),
315+
onEnvelope: async (raw, sourceDid) => {
316+
await this.messageService!.ingestFederatedEnvelope(raw, sourceDid);
317+
// Push real-time event to local SSE clients
318+
this.eventPushService?.emitLocal({
319+
type: 'new-envelope',
320+
sourceDid,
321+
envelopeId: typeof raw.envelopeId === 'string' ? raw.envelopeId : undefined,
322+
conversationId: typeof raw.conversationId === 'string' ? raw.conversationId : undefined,
323+
atMs: Date.now(),
324+
});
325+
},
307326
onAttachment: async (info, _sourceDid) => {
308327
// A peer relayed an attachment to us via ClawNet P2P.
309328
// Download from our local ClawNet node and save under the same objectKey
@@ -398,6 +417,7 @@ export class TelagentNode {
398417

399418
this.stopMailboxCleaner();
400419
this.apiProxyService?.dispose();
420+
this.eventPushService?.dispose();
401421
this.clawnetTransportService?.stopListening();
402422
await this.apiServer?.stop();
403423
await this.indexer?.stop();

0 commit comments

Comments
 (0)