Skip to content

Commit e087213

Browse files
gcmsgclaude
andcommitted
feat: push server notifications to agents via signaling WebSocket
Add PushNotification method to signaling Hub for delivering notification messages to connected agents. Wire the notification emitter to push notifications through signaling in addition to the dashboard WebSocket, enabling OpenClaw integration where agents forward notifications to OpenClaw conversations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8dbb9ef commit e087213

File tree

2 files changed

+41
-2
lines changed

2 files changed

+41
-2
lines changed

cmd/peerclawd/main.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"encoding/json"
56
"flag"
67
"log/slog"
78
"os"
@@ -227,6 +228,9 @@ func main() {
227228
logger.Info("user ACL service initialized")
228229
}
229230

231+
// Forward-declare sigHub so the notification emitter closure can reference it.
232+
var sigHub *signaling.Hub
233+
230234
// Initialize notification service.
231235
var notificationSvc *notification.Service
232236
var notifHub *notification.DashboardHub
@@ -242,9 +246,15 @@ func main() {
242246
// Set emitter: WebSocket push + optional email for critical events.
243247
emailSender := userauth.NewEmailSender(cfg.SMTP, logger)
244248
notificationSvc.SetEmitter(func(n *notification.Notification) {
245-
// Push via WebSocket.
249+
// Push via WebSocket (dashboard).
246250
notifHub.Push(n)
247251

252+
// Push via signaling to agent (for OpenClaw integration).
253+
if sigHub != nil && n.AgentID != "" {
254+
payload, _ := json.Marshal(n)
255+
_ = sigHub.PushNotification(context.Background(), n.AgentID, payload)
256+
}
257+
248258
// Send email for critical events if enabled.
249259
if cfg.Notification.EmailEnabled && n.Severity == notification.SeverityCritical {
250260
if cfg.Notification.EmailOnOffline || n.Type != notification.TypeAgentOffline {
@@ -281,7 +291,6 @@ func main() {
281291
}
282292

283293
// Initialize signaling hub.
284-
var sigHub *signaling.Hub
285294
if cfg.Signaling.Enabled {
286295
var turnCfg *signaling.TURNConfig
287296
if len(cfg.Signaling.TURN.URLs) > 0 {

internal/signaling/ws.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,36 @@ func (h *Hub) Forward(ctx context.Context, msg signaling.SignalMessage) {
359359
h.DeliverLocal(ctx, msg)
360360
}
361361

362+
// PushNotification sends a notification to a connected agent via WebSocket signaling.
363+
func (h *Hub) PushNotification(ctx context.Context, agentID string, payload json.RawMessage) error {
364+
h.mu.RLock()
365+
conn, ok := h.conns[agentID]
366+
h.mu.RUnlock()
367+
368+
if !ok {
369+
return fmt.Errorf("agent %s not connected", agentID)
370+
}
371+
372+
msg := signaling.SignalMessage{
373+
Type: signaling.MessageTypeNotification,
374+
From: "server",
375+
To: agentID,
376+
Payload: payload,
377+
}
378+
379+
data, err := json.Marshal(msg)
380+
if err != nil {
381+
return fmt.Errorf("marshal notification message: %w", err)
382+
}
383+
384+
if err := conn.Write(ctx, websocket.MessageText, data); err != nil {
385+
return fmt.Errorf("deliver notification: %w", err)
386+
}
387+
388+
h.logger.Debug("pushed notification via signaling", "agent_id", agentID)
389+
return nil
390+
}
391+
362392
// DeliverEnvelope sends a bridge_message to a connected agent via WebSocket.
363393
func (h *Hub) DeliverEnvelope(ctx context.Context, agentID string, envPayload json.RawMessage) error {
364394
h.mu.RLock()

0 commit comments

Comments
 (0)